知识广场

按学科筛选:计算机科学 / 人工智能
清除筛选

«计算机科学 / 人工智能» 分类下共 30 篇帖子

PyTorch Lightning 把训练循环写成 50 行(多卡 / 混合精度 / checkpoint 全免费)

## 起因 裸 PyTorch 写一个像样的训练循环要处理:device 切换 / `.train()/.eval()` / gradient zero / loss accumulation / lr scheduler step / checkpoint 保存 / early stopping / 多 GPU DDP 启动 / 混合精度 / logging。 一个研究 notebook 反复抄这些代码很烦,还容易写错(忘 `optimizer.zero_grad()` 或者 `.eval()` 是经典的)。 PyTorch Lightning 把"工程脚手架"和"模型逻辑"分开:你只写 `training_step` / `validation_step` / `configure_optimizers`,其它由 framework 处理。 ## 解决方案 ```bash uv add lightning torchvision ``` ```python import lightning as L import torch import torch.nn.functional as F from torch.utils.data import DataLoader from torchvision import datasets, transforms class MNIST(L.LightningModule): def __init__(self, lr=1e-3): super().__init__() self.save_hyperparameters() self.conv = torch.nn.Sequential( torch.nn.Conv2d(1, 32, 3, padding=1), torch.nn.ReLU(), torch.nn.MaxPool2d(2), torch.nn.Conv2d(32, 64, 3, padding=1), torch.nn.ReLU(), torch.nn.MaxPool2d(2), torch.nn.Flatten(), torch.nn.Linear(64*7*7, 128), torch.nn.ReLU(), torch.nn.Linear(128, 10), ) def forward(self, x): return self.conv(x) def training_step(self, batch, batch_idx): x, y = batch logits = self(x) loss = F.cross_entropy(logits, y) acc = (logits.argmax(1) == y).float().mean() self.log_dict({'train/loss': loss, 'train/acc': acc}, prog_bar=True) return loss def validation_step(self, batch, batch_idx): x, y = batch logits = self(x) loss = F.cross_entropy(logits, y) acc = (logits.argmax(1) == y).float().mean() self.log_dict({'val/loss': loss, 'val/acc': acc}, prog_bar=True) def configure_optimizers(self): opt = torch.optim.Adam(self.parameters(), lr=self.hparams.lr) sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=10) return [opt], [sched] def main(): transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]) train = DataLoader(datasets.MNIST('./data', train=True, download=True, transform=transform), batch_size=128, num_workers=4, shuffle=True) val = DataLoader(datasets.MNIST('./data', train=False, transform=transform), batch_size=512, num_workers=4) trainer = L.Trainer( max_epochs=5, accelerator='auto', # cuda / mps / cpu 自动选 devices='auto', # 多卡时自动用全部 precision='16-mixed', # 混合精度 callbacks=[ L.pytorch.callbacks.EarlyStopping(monitor='val/loss', patience=3), L.pytorch.callbacks.ModelCheckpoint(monitor='val/acc', mode='max', save_top_k=2), ], logger=L.pytorch.loggers.TensorBoardLogger('logs', name='mnist'), ) trainer.fit(MNIST(), train, val) if __name__ == '__main__': main() ``` 50 行包含训练 + 验证 + 多卡 + 混合精度 + 早停 + checkpoint + TensorBoard。 ## 效果 - 单 GPU vs 4 GPU DDP 只改 `devices=4`,无需写 init_process_group 之类 - 混合精度只改 `precision='16-mixed'`,速度 ~1.5-2x,显存减 30% - TensorBoard `tensorboard --logdir logs` 看 loss / metric 曲线 - ModelCheckpoint 自动保存 val/acc 最高的 2 个 checkpoint - 中断后 `Trainer(resume_from_checkpoint=...)` 恢复 - 切换 wandb logger 一行:`L.pytorch.loggers.WandbLogger(project=...)` ## 踩过的坑 1. **`self.log` 不能放在 `forward` 里**:只能在 step 方法里。否则 batch_size 信息不对。 2. **DDP 时 `validation_step` 写了 print** → 多进程刷屏。用 `self.log` 或 `rank_zero_only` 装饰。 3. **`num_workers > 0` 时 macOS / Windows 死锁**:DataLoader 用 fork 策略。Mac 上设 `num_workers=0` 或 `persistent_workers=True`。 4. **混合精度 NaN**:loss scale 不对。`precision='bf16-mixed'`(A100+) 比 `16-mixed` 更稳,不需要 grad scaler。 5. **保存的 checkpoint 太大**:默认保存 optimizer state。要 inference-only: `ModelCheckpoint(save_weights_only=True)`。

用 uv 装 PyTorch + 正确 CUDA 版本(一条命令搞定)

PyTorch 装错 CUDA 版本(CPU-only / 不匹配显卡驱动)是新手最常踩的 第一个坑。`uv` + PyTorch 官方提供的 index URL 能在一行命令里搞定。 ## 1. 看清你需要的版本 ```bash nvidia-smi # 关注右上角 "CUDA Version: 12.4" —— 这是 driver 支持的最高 CUDA 版本 # PyTorch 实际使用的 runtime 可以等于或更低,但不能更高 ``` PyTorch 现在主要 ship 三种轮子: - **cu124** (CUDA 12.4 runtime) - **cu121** (CUDA 12.1) - **cu118** (CUDA 11.8) — 老显卡 / 老驱动 - **cpu** (无 GPU) 你的驱动支持 12.x 就装 cu121 或 cu124;不确定就 cu121(兼容性最好)。 ## 2. 一条命令装 ```bash # uv 新项目 uv init mlproject --python 3.12 cd mlproject # 直接装最新稳定版 uv add torch torchvision torchaudio --index https://download.pytorch.org/whl/cu124 # 或指定版本 uv add 'torch==2.4.1' 'torchvision==0.19.1' \ --index https://download.pytorch.org/whl/cu124 ``` `--index` 让 uv 从 PyTorch 自己的 CDN 拉轮子(PyPI 上的轮子默认是 CPU 版的)。 ## 3. 校验 ```python # check.py import torch print(f'torch: {torch.__version__}') print(f'cuda built: {torch.version.cuda}') print(f'cuda avail: {torch.cuda.is_available()}') print(f'device count: {torch.cuda.device_count()}') if torch.cuda.is_available(): print(f'device name: {torch.cuda.get_device_name(0)}') print(f'capability: {torch.cuda.get_device_capability(0)}') # 做一次实际计算 x = torch.randn(1000, 1000) if torch.cuda.is_available(): x = x.cuda() print(f'matmul on GPU: {(x @ x).sum().item():.2f}') ``` ```bash uv run python check.py ``` 期望输出: ``` torch: 2.4.1+cu124 cuda built: 12.4 cuda avail: True device count: 1 device name: NVIDIA GeForce RTX 4090 capability: (8, 9) matmul on GPU: ... ``` `+cu124` 后缀确认装的是 CUDA wheel;`is_available()` False 说明 wheel 匹配了但驱动 / NVIDIA 库有问题。 ## 4. 锁版本 ```bash ls uv.lock # 已经写入 ``` CI / 同事直接: ```bash uv sync --frozen --index https://download.pytorch.org/whl/cu124 ``` 得到完全一致的环境。 ## 5. 配置 pyproject.toml 让 index 持久化 每次都加 `--index` 很烦。`pyproject.toml`: ```toml [project] name = "mlproject" version = "0.1.0" requires-python = ">=3.12" dependencies = [ "torch>=2.4", "torchvision>=0.19", ] [[tool.uv.index]] name = "pytorch-cu124" url = "https://download.pytorch.org/whl/cu124" explicit = true [tool.uv.sources] torch = { index = "pytorch-cu124" } torchvision = { index = "pytorch-cu124" } ``` 之后 `uv add ...` 不再需要 `--index`。 ## 6. Apple Silicon (M1/M2/M3) 用 MPS ```bash uv add torch torchvision torchaudio # Mac 上 PyPI 默认轮子已经带 MPS(Metal Performance Shaders)后端 ``` ```python device = 'mps' if torch.backends.mps.is_available() else 'cpu' x = torch.randn(1000, 1000, device=device) ``` MPS 不如 CUDA 快但比 CPU 快几倍,免费的不错。 ## 7. CPU-only(开发 / 测试 / CI) ```bash uv add torch --index https://download.pytorch.org/whl/cpu ``` CI 跑测试一般用 CPU wheel,节省 wheel 大小 + 启动时间。 ## 8. 多 GPU 看见 ```python torch.cuda.device_count() # 几张卡 # 显式选择 device = torch.device('cuda:0') model = model.to(device) ``` 多卡训练用 `torch.nn.DataParallel` 简单粗暴 / `DistributedDataParallel` (DDP)才是生产选择。 ## 9. PyTorch 升级 ```bash uv lock --upgrade-package torch --index https://download.pytorch.org/whl/cu124 ``` 不同 CUDA 版本的 PyTorch 是不同 package(torch+cu121 vs torch+cu124), 直接 lock 升级最稳。 ## 10. Docker 镜像 ```dockerfile FROM nvidia/cuda:12.4.1-cudnn-devel-ubuntu22.04 # uv 二进制 COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv # Python 3.12 RUN apt update && apt install -y python3.12 python3-pip git WORKDIR /app COPY pyproject.toml uv.lock ./ RUN uv sync --frozen --index https://download.pytorch.org/whl/cu124 COPY . . CMD ["uv", "run", "python", "train.py"] ``` 镜像底层用 NVIDIA 官方 `cuda:*-devel`,宿主装 `nvidia-container-toolkit`: ```bash docker run --gpus all -v $(pwd):/app mlproject ``` ## 踩过的坑 - 装了 CPU wheel 但代码里 `.cuda()`:报 "no CUDA-capable device"。 pip / uv 默认从 PyPI 拉 = CPU wheel。必须 `--index pytorch`。 - 驱动 CUDA 12.0 但装了 cu124:装得上但 `is_available()=False` 或运行 时 segfault。升驱动或者降 wheel。 - 多版本 Python 共存时 wheel 不匹配(cp310 wheel 装到 cp312 venv): uv 会自动挑对的,但手动 pip 时容易选错。 - 想用 `torch.compile()` 但 wheel 没带:cu118 上没 compile 支持。 cu121+ / cu124 才有。 - jupyter notebook 启动时不见 CUDA:notebook 是另一个进程, 确认是用 `uv run jupyter` 启动(继承了 venv 的 path)。

训练一个 MNIST 分类器(PyTorch from scratch,60 行)

MNIST 手写数字识别是 ML 的 "hello world"。下面用 PyTorch 写一个从 数据加载到训练 / 评估的完整 pipeline。60 行可读代码,能跑到 99%+ 准确率。 ## 完整代码 ```python import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import DataLoader from torchvision import datasets, transforms device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f'using {device}') # 1. 数据 transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)), ]) train_set = datasets.MNIST('./data', train=True, download=True, transform=transform) test_set = datasets.MNIST('./data', train=False, download=True, transform=transform) train_loader = DataLoader(train_set, batch_size=128, shuffle=True, num_workers=4) test_loader = DataLoader(test_set, batch_size=512, shuffle=False, num_workers=4) # 2. 模型 class Net(nn.Module): def __init__(self): super().__init__() self.conv1 = nn.Conv2d(1, 32, 3, padding=1) self.conv2 = nn.Conv2d(32, 64, 3, padding=1) self.fc1 = nn.Linear(64 * 7 * 7, 128) self.fc2 = nn.Linear(128, 10) self.drop = nn.Dropout(0.25) def forward(self, x): x = F.relu(F.max_pool2d(self.conv1(x), 2)) # 28x28 -> 14x14 x = F.relu(F.max_pool2d(self.conv2(x), 2)) # 14x14 -> 7x7 x = x.flatten(1) x = F.relu(self.fc1(x)) x = self.drop(x) return self.fc2(x) model = Net().to(device) opt = torch.optim.Adam(model.parameters(), lr=1e-3) loss_fn = nn.CrossEntropyLoss() # 3. 训练 def train_epoch(epoch): model.train() total, correct, total_loss = 0, 0, 0.0 for x, y in train_loader: x, y = x.to(device), y.to(device) opt.zero_grad() logits = model(x) loss = loss_fn(logits, y) loss.backward() opt.step() total_loss += loss.item() * x.size(0) correct += (logits.argmax(1) == y).sum().item() total += x.size(0) print(f'epoch {epoch} train loss {total_loss/total:.4f} acc {correct/total:.4f}') # 4. 评估 def evaluate(): model.eval() total, correct = 0, 0 with torch.no_grad(): for x, y in test_loader: x, y = x.to(device), y.to(device) correct += (model(x).argmax(1) == y).sum().item() total += x.size(0) print(f'test acc {correct/total:.4f}') for ep in range(5): train_epoch(ep) evaluate() torch.save(model.state_dict(), 'mnist.pt') ``` 5 个 epoch 后测试集准确率应 ≥ 99%。GPU 上每 epoch < 10 秒; CPU 大约 30-60 秒。 ## 解释几个关键点 ### `Normalize((0.1307,), (0.3081,))` MNIST 训练集统计出的均值和标准差。归一化让输入分布更均匀, 训练更稳。 ### `num_workers=4` DataLoader 用 4 个子进程并行 prefetch 数据。GPU 训练时 IO 是瓶颈, 设大点(4-8)能让 GPU 持续吃满。CPU 训练时设 0 反而更快(避免进程切换)。 ### `opt.zero_grad()` PyTorch 默认 gradient 累加。每次 backward 前清零。 忘了清零 → loss 一直涨。 ### `with torch.no_grad():` 评估时不需要梯度,省内存 + 快。等价的还有装饰器 `@torch.no_grad()` 或 `torch.inference_mode()`(更激进,PyTorch 1.9+)。 ### `.to(device)` 每个 batch 显式搬到 GPU;model 一次性 `.to(device)`。 ## 加几个常见优化 ### 1. 学习率调度 ```python sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=5) # 每个 epoch 结束后调用 sched.step() ``` cosine annealing 在前期保持高 lr 探索,后期降低做精调。 ### 2. 混合精度(AMP) ```python scaler = torch.cuda.amp.GradScaler() for x, y in train_loader: x, y = x.to(device), y.to(device) opt.zero_grad() with torch.cuda.amp.autocast(): logits = model(x) loss = loss_fn(logits, y) scaler.scale(loss).backward() scaler.step(opt) scaler.update() ``` 混合精度让大部分计算用 fp16,需要精度的(loss / 权重更新)用 fp32。 RTX 30 系以上 ~2x 加速,显存降一半。MNIST 这种小模型差别不大, 但对 transformer / 大网络明显。 ### 3. 用 `torch.compile()`(PyTorch 2.0+) ```python model = torch.compile(model) ``` 编译期内联 + 算子融合,训练 / 推理 1.3-2x 加速。第一次 batch 慢 (编译),之后变快。 ### 4. 早停 ```python best_acc = 0 patience = 3 no_improve = 0 for ep in range(50): train_epoch(ep) acc = evaluate() if acc > best_acc: best_acc = acc no_improve = 0 torch.save(model.state_dict(), 'best.pt') else: no_improve += 1 if no_improve >= patience: print('early stop') break ``` ## 推理 ```python model = Net().to(device) model.load_state_dict(torch.load('mnist.pt')) model.eval() # 单张图片 from PIL import Image img = Image.open('test.png').convert('L').resize((28, 28)) x = transform(img).unsqueeze(0).to(device) with torch.no_grad(): pred = model(x).argmax(1).item() print(f'predicted: {pred}') ``` ## 可视化训练曲线 ```python import matplotlib.pyplot as plt losses = [] accs = [] # ... 在 train_epoch / evaluate 里 append fig, ax = plt.subplots(1, 2, figsize=(10, 4)) ax[0].plot(losses); ax[0].set_title('train loss') ax[1].plot(accs); ax[1].set_title('test acc') plt.savefig('curves.png') ``` 或者用 `tensorboard` / `wandb`(参见专题文章)。 ## 踩过的坑 - 第一次跑会下载 MNIST 数据(~10MB),需要网络。`download=True` 给 你这个能力。 - forward 里 `x.view(-1, 784)` vs `x.flatten(1)`:view 要求内存连续, flatten 不要求。前者偶尔报 "view size is not compatible..." 错。 - 不调用 `model.train()` / `model.eval()` → Dropout / BatchNorm 行为错。 eval 期间 Dropout 应该关(输出固定);BatchNorm 应该用 running stats。 - `torch.cuda.empty_cache()` 是给 PyTorch 内部 caching allocator 用的, 通常没必要手动调;调了也不会真的还给 OS。担心 OOM 应该减小 batch。

用 Whisper 把视频 / 录音转中文字幕(本地、免费、断点续传)

## 起因 存了 200 小时的会议录音 + 课程视频,想搜里面"我哪节课讲过 X 主题"。 传统方案:手工转录(贵 / 慢)、剪映等付费云转录(隐私 + 钱)。 Whisper 是 OpenAI 开源的 ASR 模型,本地跑能转 90+ 种语言, 中文识别质量直接打过国内大多数云服务。 ## 解决方案 最快的方式是 `whisper.cpp`(C++ 重新实现,CPU/GPU 都很快,无需 PyTorch)。 ### 装 ```bash git clone https://github.com/ggerganov/whisper.cpp cd whisper.cpp # Apple Silicon 用 Metal;NVIDIA 用 CUDA;纯 CPU 也可以 make -j # 下模型(large-v3 中文最准;medium 速度+准确度平衡;base 最快但中文一般) bash ./models/download-ggml-model.sh large-v3 # 1.5 GB 左右 ``` ### 转录单个文件 ```bash # 输入要是 16kHz mono WAV ffmpeg -i lecture.mp4 -vn -ar 16000 -ac 1 -c:a pcm_s16le tmp.wav ./build/bin/whisper-cli -m models/ggml-large-v3.bin \ -l zh -f tmp.wav -osrt -otxt # 生成 tmp.wav.srt + tmp.wav.txt ``` 参数解释: - `-l zh`:源语言中文(不指定的话自动检测) - `-osrt`:输出 SubRip 字幕 - `-otxt`:输出纯文本 ### 批量脚本 ```bash #!/usr/bin/env bash for f in *.mp4; do base="${f%.mp4}" [ -f "$base.srt" ] && { echo "skip $f (already done)"; continue; } ffmpeg -y -i "$f" -vn -ar 16000 -ac 1 -c:a pcm_s16le /tmp/_audio.wav ./whisper-cli -m models/ggml-large-v3.bin -l zh -f /tmp/_audio.wav \ -osrt -of "$base" done ``` `[ -f "$base.srt" ] && continue` 做断点续传:跑了一半中断重启不重做。 ### Python 版(openai-whisper) 如果要嵌入到 pipeline: ```bash uv add openai-whisper ``` ```python import whisper model = whisper.load_model('large-v3') # 第一次自动下载 result = model.transcribe('lecture.mp4', language='zh', verbose=True) print(result['text']) # 带时间戳 for seg in result['segments']: print(f"[{seg['start']:.1f}s] {seg['text']}") ``` GPU 大模型一篇 1 小时音频约 5-10 分钟。CPU 慢 5-10 倍。 ## 性能 / 选型 我笔记本 M1 Pro + whisper.cpp + large-v3: | 模型 | 中文 WER | 速度 | 显存 | |---|---|---|---| | tiny | ~25% | 30x realtime | 1 GB | | base | ~18% | 16x | 1 GB | | small | ~13% | 6x | 2 GB | | medium | ~9% | 2x | 5 GB | | **large-v3** | ~6% | 1x | 10 GB | "realtime" = 处理 1 分钟音频要多少秒。`large-v3` 比 medium 准确率明显 但速度慢 2 倍,中文场景值得。 ## 效果 - 200 小时录音转完一晚上跑完 - `.srt` 字幕直接拖进 VLC / 剪映对应视频 - 搜索 `grep -l "知识点关键词" *.srt` 一秒定位"哪一节哪一段说过" - 转录质量手工抽查:科技 / 普通话清晰场景准确率 > 95%,方言 / 嘈杂 环境降到 80% 左右 ## 踩过的坑 1. **GPU 装错版本**:whisper-py 默认装 CPU torch。要手动装 CUDA 版: `uv add torch --index https://download.pytorch.org/whl/cu124`。 2. **音频不是 16kHz mono**:whisper.cpp 严格要求格式;自动失败但 error message 不明显。永远先 `ffmpeg -ar 16000 -ac 1`。 3. **长视频中间 hallucination**:超过 30 秒的安静段落 whisper 会"幻想" 一些重复的"谢谢观看"之类的话。`--no-speech-threshold 0.6` 加严格点 过滤,或者 `--temperature 0` 减少创造性。 4. **断句不准**:whisper 自动给句号但有时一长串没标点。后处理用 `ja-tokenize` 或者再过一遍 LLM 加标点: ```python client.chat.completions.create(messages=[{ 'role': 'user', 'content': f'给下面文本加合理标点,不要改字:\n{raw_text}' }]) ``` 5. **GPU 显存不够**:large-v3 需要 10 GB+。降级到 medium(5 GB)+ `--word-timestamps` 也能拿很好结果。

Feast feature store:把训练 / 在线推理用同一份特征代码

## 起因 我们的推荐模型在训练时算"用户过去 30 天点击次数"用 pandas, 线上推理时用 Redis lookup。两套代码必然漂移:一次 pandas 用错时区 窗口对不上 → 训练效果在线上拉胯。 "训练 / 在线特征不一致" 是 ML 生产最常见的痛点。 Feast 是开源 feature store,把特征定义统一在一处,训练 / 在线都从 同一份代码读。 ## 解决方案 ### 装 ```bash uv add feast feast version ``` ### 项目初始化 ```bash feast init my_store cd my_store/feature_repo ``` 生成示例 `example_repo.py`。 ### 定义特征 ```python # feature_repo/user_features.py from datetime import timedelta from feast import Entity, Feature, FeatureView, FileSource, ValueType from feast.types import Float32, Int64 user = Entity(name='user_id', value_type=ValueType.INT64) # 数据源:parquet 文件(生产用 BigQuery / Snowflake / S3 等) user_clicks_source = FileSource( path='data/user_clicks.parquet', timestamp_field='event_time', ) user_clicks_30d = FeatureView( name='user_clicks_30d', entities=[user], ttl=timedelta(days=30), schema=[ Feature(name='click_count', dtype=Int64), Feature(name='avg_dwell_seconds', dtype=Float32), Feature(name='top_category', dtype='string'), ], source=user_clicks_source, online=True, ) ``` ```bash feast apply # 注册 entity / feature view 元数据 ``` ### 训练时拉历史特征 ```python from feast import FeatureStore import pandas as pd store = FeatureStore(repo_path='feature_repo') # 训练样本(user_id + label + event_time) training_df = pd.DataFrame({ 'user_id': [1, 2, 3], 'event_time': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']), 'label': [1, 0, 1], }) # point-in-time 拼接特征(保证不用未来数据) training = store.get_historical_features( entity_df=training_df, features=[ 'user_clicks_30d:click_count', 'user_clicks_30d:avg_dwell_seconds', 'user_clicks_30d:top_category', ], ).to_df() # train X = training[['click_count', 'avg_dwell_seconds']] y = training['label'] model.fit(X, y) ``` Feast 自动按 entity_df 的 event_time 去 source 找当时点的特征值, **避免 data leakage**(不会用未来的 click_count 训练历史样本)。 ### 在线 materialize(把最近特征推到 Redis) ```bash feast materialize-incremental $(date -u +%Y-%m-%dT%H:%M:%S) # 把 last_materialization → now 的特征写到 online store (Redis) ``` 定时跑(每小时 / 每天): ```bash # /etc/systemd/system/feast-materialize.timer [Timer] OnCalendar=*-*-* */1:00:00 ``` ### 在线推理时读特征 ```python features = store.get_online_features( features=[ 'user_clicks_30d:click_count', 'user_clicks_30d:avg_dwell_seconds', ], entity_rows=[{'user_id': 42}], ).to_dict() x = [[features['click_count'][0], features['avg_dwell_seconds'][0]]] prediction = model.predict(x) ``` **同一份 FeatureView 定义** 决定了训练和在线都拿"click_count"的相同 语义。漂移消失。 ### Online store backend Feast 支持多种 online store: ```yaml # feature_repo/feature_store.yaml online_store: type: redis connection_string: 'localhost:6379' # 或: online_store: type: dynamodb region: us-east-1 table_name: feast_online # 或: online_store: type: sqlite # dev 用 ``` ### 数据源支持 ```yaml offline_store: type: file # parquet # 或 bigquery / snowflake / redshift / spark / trino ``` 不同公司栈用不同 source,Feast 抽象统一。 ## 实战流程 ``` ┌─────────────────┐ │ FeatureView 定义 │ │ (Python 代码) │ └────────┬────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌────────────┐ ┌──────────┐ │ 训练 pipeline │ │ materialize │ │ 在线推理 │ │ get_historical│ │ → Redis │ │ get_online│ └──────────┘ └────────────┘ └──────────┘ ``` 特征定义是 single source of truth。 ## 何时该上 feature store - 多个模型共用相同特征(用户画像 / 商品画像) - 训练 / 在线漂移导致过线下效果不在线上复现 - 特征工程团队跟 ML 团队分工(特征 owner 模型) - 需要 point-in-time 一致性(防 data leakage) 何时**不**该上(杀鸡用牛刀): - 单模型 + 简单特征(计算一下 mean / sum 直接做) - 团队 < 5 人 ML 工程师 - 没有上线 / 用 batch 预测 ## 替代方案对比 | | Feast (OSS) | Hopsworks (商业) | Tecton (商业) | 自己拼 (Redis + DAG) | |---|---|---|---|---| | 复杂度 | 中 | 中 | 高 | 低-高 | | 价格 | 免费 | 付费 | 付费 | 0 | | 在线 store | 多选 | HBase 主 | 多 | 自选 | | 离线 store | 多选 | 自家 + S3 | 多 | 自选 | | 实时特征 | 较弱 | 强 | 强 | 看实现 | 中小团队 Feast;大企业 Tecton / Hopsworks(带运维支持); 极简团队拼 Redis + cron 也能 work。 ## 与训练 pipeline 集成 ```python # Kubeflow / Airflow / Dagster pipeline: @task def fetch_features(entity_df): store = FeatureStore(...) return store.get_historical_features(entity_df, ...).to_df() @task def train(df): model.fit(df[features], df['label']) return model @task def deploy(model): save_to_s3(model) feast materialize-incremental ... ``` 特征 fetch 是 pipeline 第一步,后续 train / deploy 都用 feast。 ## 效果 我们一个 churn 预测模型用 Feast 后: - 训练 / 在线特征一致性:100%(之前 ~95%,漂移 5% 是 bug) - 新模型上线时间从 2 周 → 3 天(特征不用重写,复用现成 view) - 多模型共享 user_features view:避免重复算 - "为什么这个用户预测分数低" 类调试:直接 feast 查在线特征 + 对照训练 分布 ## 踩过的坑 1. **`event_time` 时区**:feast 假设所有时间是 UTC。本地时间传进去 会算偏差。always to_datetime(...).tz_localize('UTC')。 2. **materialize 漏数据**:`materialize-incremental` 从 last_materialization 开始。如果之前没 materialize 过,从 ttl 之前开始,可能遗漏。 首次用 `materialize <start> <end>` 全量。 3. **online store 一致性**:Redis 单机时 materialize 期间挂掉 → 部分 特征写进去 + 部分没写 → 在线读到旧 + 新混合值。Redis cluster 有助稳定。 4. **特征 schema 变了**:加新字段简单(FeatureView 重新 apply); 删 / 改字段类型麻烦,需要重新 materialize 整 entity 群。 5. **point-in-time join 慢**:百万级 entity_df 跨多个 feature view join 几分钟到几十分钟。生产 source 用 BigQuery / Snowflake 让 join 推到 DB 端比较快。

polars 替代 pandas 处理大型 CSV / Parquet(性能 + API 都更好)

pandas 是 2010 年代标准工具但有几个本质局限: - 单线程(GIL) - eager evaluation,复杂 pipeline 中间结果都实例化 - API 历史包袱(10 种 index、SettingWithCopyWarning、`inplace=True`) polars 是 Rust 写的列存数据框,多线程 + lazy 模式 + 干净的链式 API。 1GB+ 的 CSV 处理快 5-30 倍。 ## 安装 ```bash uv add polars pyarrow ``` `pyarrow` 不是必需但读 Parquet / Feather 时性能好。 ## 30 秒上手 ```python import polars as pl df = pl.read_csv('sales.csv') print(df.head()) print(df.schema) # 过滤 + 聚合 + 排序 result = ( df.filter(pl.col('country') == 'CN') .group_by('product') .agg( pl.col('amount').sum().alias('total'), pl.col('amount').count().alias('orders'), ) .sort('total', descending=True) .head(10) ) print(result) ``` 注意: - `pl.col('x')` 是表达式,可以组合(`pl.col('x') * 2`) - `group_by().agg()` 链式 - `.alias()` 重命名 - 不像 pandas 那样有索引;纯列数据 ## lazy 模式 ```python # 用 scan_csv 而不是 read_csv 进 lazy 模式 q = ( pl.scan_csv('sales.csv') .filter(pl.col('country') == 'CN') .group_by('product') .agg(pl.col('amount').sum()) .sort('amount', descending=True) ) # 这一步还没读文件!只构建了 query plan print(q.explain()) # 看到优化后的 plan(如 filter 下推) # 真正执行 + 取结果 result = q.collect() ``` lazy 让 polars 做查询优化:predicate pushdown、projection pushdown、 predicate fusion 等。处理 10GB CSV 时 collect 之前都不占内存。 ## Parquet:列存的好处 ```python # 写 Parquet df.write_parquet('sales.parquet', compression='zstd') # 读 df = pl.read_parquet('sales.parquet') # lazy + projection q = (pl.scan_parquet('sales.parquet') .select(['country', 'amount']) # 只读这两列 .filter(pl.col('country') == 'US') .group_by('country').agg(pl.col('amount').sum()) .collect()) ``` Parquet 列存意味着 `.select(['country', 'amount'])` 完全不读其它列。 10GB 表只读 2 列可能只 IO 1GB。 ## 性能对比 |任务(1GB CSV,5000 万行)| pandas | polars eager | polars lazy | |---|---|---|---| | 读文件 | 18s | 6s | 0.5s (scan) | | filter + groupby + agg | 25s | 4s | 3s | | 内存峰值 | 8GB | 3GB | 1.5GB | 数字会因数据 / 机器而异,但量级对。 ## 常用对照表 | pandas | polars | |---|---| | `df['col']` | `df['col']` 或 `df.get_column('col')` | | `df[df['x'] > 0]` | `df.filter(pl.col('x') > 0)` | | `df.groupby('a')['b'].sum()` | `df.group_by('a').agg(pl.col('b').sum())` | | `df.merge(other, on='id')` | `df.join(other, on='id')` | | `df['col'].str.lower()` | `df.with_columns(pl.col('col').str.to_lowercase())` | | `df.dropna()` | `df.drop_nulls()` | | `df.fillna(0)` | `df.fill_null(0)` | | `pd.concat([df1, df2])` | `pl.concat([df1, df2])` | | `df.pivot_table` | `df.pivot()` | ## 窗口函数 ```python # 按 user 排序后的 cumsum df = df.with_columns( pl.col('amount').cum_sum().over('user').alias('cum_amount') ) # 计算每行相对所在组的平均 df = df.with_columns( (pl.col('amount') / pl.col('amount').mean().over('country')).alias('rel') ) ``` `.over(...)` 是 partition by 的简洁写法。 ## 与 pandas 互转 ```python # polars → pandas pdf = df.to_pandas() # pandas → polars df2 = pl.from_pandas(pdf) ``` 适合渐进迁移:现有 pandas 流程改一段为 polars 跑性能瓶颈。 ## 与 Arrow / DuckDB 联用 polars 内部就是 Arrow 格式,零拷贝传给 DuckDB / Arrow Compute: ```python import duckdb df = pl.read_parquet('big.parquet') # 直接把 polars df 当 DuckDB 视图 result = duckdb.sql("SELECT country, SUM(amount) FROM df GROUP BY 1").pl() ``` DuckDB 跑 SQL,polars 拿结果,全程 Arrow buffer。 ## 写入数据库 ```python import polars as pl df = pl.read_csv('users.csv') df.write_database( table_name='users', connection='postgresql://localhost/mydb', if_table_exists='replace', ) ``` 底层用 ConnectorX / SQLAlchemy 写。 ## 何时仍用 pandas - ML 库强制:scikit-learn 接受 numpy / pandas,polars 要 `.to_numpy()` - 小数据 + 现有代码:pandas 足够时迁移没收益 - 复杂的 multi-level index 需求 不过 polars 团队明确目标是覆盖 pandas 90% 的用法,差距越来越小。 ## 踩过的坑 - 没有 index 的概念:以前 pandas `df.loc['2024-01-01']` 这种行为不存在。 polars 都用 column filter。 - `with_columns` 返回新 DataFrame(immutable),忘了赋值回去 `df = ...`: ```python df.with_columns(pl.col('x') * 2) # 没改 df! df = df.with_columns(pl.col('x') * 2) # 对 ``` - group_by 的 Python 列表语法 → 改成表达式: ```python # pandas df.groupby(['a', 'b']).agg({'c': 'sum'}) # polars df.group_by(['a', 'b']).agg(pl.col('c').sum()) ``` - 日期解析自动推断不总是对:`pl.read_csv(..., try_parse_dates=True)` 或者显式 `pl.col('date').str.to_datetime('%Y-%m-%d')`。

LangChain + Ollama 跑本地 LLM(隐私 + 零成本 + 可写入向量库)

不想把数据发到 OpenAI 但想用大模型?Ollama 让你本地跑 Llama 3 / Qwen / DeepSeek / Mistral 等开源模型,LangChain 提供统一封装做 RAG / agent。 整套零成本(GPU 电费除外),数据完全在本地。 ## 1. 装 Ollama ```bash curl -fsSL https://ollama.com/install.sh | sh # macOS: brew install ollama # Windows: 从 ollama.com 下安装包 ollama --version ``` Ollama 作为后台服务跑(端口 11434): ```bash systemctl start ollama # Linux brew services start ollama # macOS ``` ## 2. 拉个模型 ```bash ollama pull qwen2.5:7b # 或更小的:qwen2.5:3b (4 GB 显存就能跑) # 或更大的:qwen2.5:14b、llama3.1:70b(需要 24+ GB / 80 GB) ollama list ``` ## 3. 命令行直接聊 ```bash ollama run qwen2.5:7b >>> 用 Python 写一个二分查找 ``` `Ctrl-D` 退出。 ## 4. HTTP API(OpenAI 兼容) Ollama 默认在 `:11434` 暴露 OpenAI 兼容 endpoint: ```bash curl http://localhost:11434/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "qwen2.5:7b", "messages": [{"role": "user", "content": "你好"}] }' ``` 任何 OpenAI 库直接换 base URL 就能用: ```python from openai import OpenAI client = OpenAI(base_url='http://localhost:11434/v1', api_key='ollama') resp = client.chat.completions.create( model='qwen2.5:7b', messages=[{'role': 'user', 'content': '你好'}], ) print(resp.choices[0].message.content) ``` ## 5. LangChain 集成 ```bash uv add langchain langchain-community langchain-ollama ``` ```python from langchain_ollama import ChatOllama, OllamaEmbeddings llm = ChatOllama(model='qwen2.5:7b', temperature=0.3) emb = OllamaEmbeddings(model='nomic-embed-text') # 嵌入模型,需另拉 ``` `ollama pull nomic-embed-text` 拉一个 274MB 的嵌入模型。 ## 6. RAG:用本地 LLM + 本地知识库回答 ```python from langchain_chroma import Chroma from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain.chains import RetrievalQA from langchain_community.document_loaders import TextLoader # 1. 加载文档(这里举例一个 txt,实际可能是 md / pdf / html) loader = TextLoader('knowledge.md', encoding='utf-8') docs = loader.load() # 2. 切块 splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) chunks = splitter.split_documents(docs) # 3. 向量化 + 入库 vectorstore = Chroma.from_documents(chunks, embedding=emb, persist_directory='./chroma_db') # 4. 检索 + LLM 回答 qa = RetrievalQA.from_chain_type( llm=llm, chain_type='stuff', retriever=vectorstore.as_retriever(search_kwargs={'k': 4}), return_source_documents=True, ) result = qa.invoke({'query': '本文如何解释 X 概念?'}) print(result['result']) print('---sources---') for d in result['source_documents']: print(d.page_content[:80]) ``` 整个 pipeline 完全在本地。 ## 7. 持久化向量库 ```python # 写入后端 vectorstore.persist() # Chroma 自动 persist 到 directory # 之后加载已有库 vectorstore = Chroma(persist_directory='./chroma_db', embedding_function=emb) ``` Chroma 是文件型向量数据库,适合 < 100 万 chunk 的小规模 RAG。 大规模用 Qdrant / Milvus / Weaviate。 ## 8. Agent / tools ```python from langchain.agents import create_react_agent, AgentExecutor from langchain.tools import tool from langchain import hub @tool def get_weather(city: str) -> str: """获取指定城市的当前天气。""" # 假装调 API return f'{city} 今天晴,22°C' @tool def calculator(expr: str) -> str: """计算数学表达式,例如 '2 * (3 + 4)'。""" try: return str(eval(expr, {'__builtins__': {}})) except Exception as e: return f'error: {e}' prompt = hub.pull('hwchase17/react') agent = create_react_agent(llm, [get_weather, calculator], prompt) executor = AgentExecutor(agent=agent, tools=[get_weather, calculator], verbose=True, max_iterations=4) executor.invoke({'input': '北京天气怎么样?顺便算一下 47 * 12'}) ``` `verbose=True` 输出 agent 的思考过程(很好玩,但生产关掉)。 ## 9. 流式输出 ```python for chunk in llm.stream('用 100 字介绍 RAG'): print(chunk.content, end='', flush=True) ``` 或者在 FastAPI 里返回 SSE 流。 ## 10. 性能 tip - **量化**:`ollama pull qwen2.5:7b-instruct-q4_K_M` 4-bit 量化, 4 GB 显存 / 内存就能跑。精度降几个点 - **多模型切换**:`ollama list` + `ollama run`,自动 load / unload - **并发**:Ollama 默认串行处理,需要并发的话调 `OLLAMA_NUM_PARALLEL=4` - **GPU**:自动检测 CUDA / Metal;CPU 也能跑但慢 5-10x - **保持模型在内存**:`ollama keep-alive` 控制;默认 5 分钟没请求会卸载 ## 11. 模型选择 | 用途 | 推荐 | 显存 | |---|---|---| | 通用对话 / RAG | qwen2.5:7b / 14b | 8 / 16 GB | | 编程 | qwen2.5-coder:7b / deepseek-coder:6.7b | 8 GB | | 中文为主 | qwen2.5、yi、deepseek | - | | 极轻量 | qwen2.5:3b、phi3:mini | 4 GB | | 顶配 | llama3.1:70b、qwen2.5:72b | 48-80 GB | | 嵌入 | nomic-embed-text、bge-m3 | < 2 GB | ## 12. 数据安全 - Ollama 默认监听 `127.0.0.1:11434`,不暴露公网 - LangChain 调用的所有 API 都走本地 - 向量库(Chroma)默认本地文件 - 整个 pipeline 不发任何数据到云端 完美的 enterprise / 隐私敏感场景。 ## 踩过的坑 - 第一次拉大模型很慢(GB 级),可以预先 `ollama pull` 而不是等首次调用 超时。 - 7B 模型在 CPU 上跑非常慢(每 token ~1 秒),交互式不可用;GPU / Apple Silicon 必备。 - 上下文窗口默认很小(2048)。Ollama Modelfile 可以加大: ``` FROM qwen2.5:7b PARAMETER num_ctx 8192 ``` `ollama create my-qwen -f Modelfile`。 - LangChain 升级特别快,API 经常变。在 pyproject.toml lock 版本, 不要随便升。

RAG 文档切块的 3 种策略对比:固定长度 / 语义边界 / 父子层级

## 起因 第一次做 RAG 时按"每 500 字符一刀"切了 1 万份文档进向量库, 召回出来的 chunk 经常是半句话开头、半句话结尾。LLM 看着这种残缺片段 回答经常张冠李戴:"根据文档 ...(被截断)"。 解决回答质量问题,70% 在切块上下功夫,30% 在检索算法。 ## 三种切块策略 ### A. 固定长度(最简单) ```python def chunk_fixed(text, size=500, overlap=50): chunks = [] for i in range(0, len(text), size - overlap): chunks.append(text[i:i+size]) return chunks ``` `overlap` 让相邻 chunk 有 50 字符重叠,避免句子被切两半时只有半句进 任一 chunk。优点:实现简单,长度均匀;缺点:仍可能在标点 / 段落中间截断。 ### B. 语义边界(推荐) 按段落、句子、Markdown 标题切。LangChain 的 `RecursiveCharacterTextSplitter` 最常用: ```python from langchain_text_splitters import RecursiveCharacterTextSplitter splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=50, separators=["\n\n", "\n", "。", "!", "?", ". ", " ", ""], ) chunks = splitter.split_text(text) ``` `separators` 顺序很重要:先按双换行切(段落),不够再按单换行(行), 还不够按句号,最后才硬切。中文要加 `"。"`、`"!"`、`"?"`, 英文版本默认有 `"."`。 代码 / 表格 / Markdown 文档: ```python from langchain_text_splitters import MarkdownHeaderTextSplitter splitter = MarkdownHeaderTextSplitter( headers_to_split_on=[("#", "h1"), ("##", "h2"), ("###", "h3")], ) chunks = splitter.split_text(md_text) # 每 chunk 自动带 metadata { h1: "...", h2: "..." } ``` 按 Markdown 标题切的好处:chunk 自带"它属于哪一节"的元数据,可以在 prompt 里附带给 LLM 当上下文。 ### C. 父子层级(parent-child retrieval) 切两份: - **小 chunk**(200-300 字符)用于 embedding + 检索(细颗粒,语义匹配准) - **大 chunk**(1000-2000 字符)用于喂给 LLM(提供完整上下文) 存储时小 chunk 通过 metadata 指向所属大 chunk: ```python from langchain.retrievers import ParentDocumentRetriever from langchain.storage import InMemoryStore parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000) child_splitter = RecursiveCharacterTextSplitter(chunk_size=400) retriever = ParentDocumentRetriever( vectorstore=Chroma(embedding=emb), docstore=InMemoryStore(), child_splitter=child_splitter, parent_splitter=parent_splitter, ) retriever.add_documents(docs) # 查询时:小 chunk 匹配 → 返回对应大 chunk chunks = retriever.invoke('问题') ``` ## 效果对比 我在一个内部知识库(5000 篇文档)做过 A/B: | 策略 | 检索准确率 | LLM 回答完整度 | 实现复杂度 | |---|---|---|---| | 固定长度 | 62% | 70%(多被截断) | 极简 | | 语义边界 | 78% | 88% | 简单 | | 父子层级 | 81% | 94% | 中 | **结论**:起步用语义边界(RecursiveCharacterTextSplitter),效果不够好 再升级到父子。固定长度只在快速 POC 时用。 ## 其它要诀 ### 1. chunk size 不是越小越好 300 字符的 chunk 召回时上下文太少;2000 字符的 chunk embedding 时 语义被"稀释",匹配不准。**400-800 字符是甜点**(英文)/ **200-400 中文字符** (中文每字承载语义比英文 token 多)。 ### 2. metadata 一起存 ```python chunk = { 'text': '...', 'metadata': { 'source': 'manual.md', 'section': 'Installation', 'date': '2024-01-15', } } ``` 检索时可以 filter(只查"最近 30 天的内容"),rerank 时可以加权。 ### 3. 长文档加 summary chunk 文档开头加一个"全文摘要" chunk(用 LLM 生成)作为 top-level entry。 检索时 summary chunk 匹配 → 暗示整篇文档相关 → 扩展取相邻 chunk。 ## 踩过的坑 1. **Tokenizer 不匹配**:embedding model 是按 token 计长,但我按字符切 500,可能实际是 800-1200 token,超过 model 上限被截。改成 `from_huggingface_tokenizer` 按真实 token 切: ```python splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer( tokenizer, chunk_size=200, chunk_overlap=20) ``` 2. **PDF 提取文字时表格变乱**:纯文本提取(pdfplumber)保留不了表格结构。 表格密集的 PDF 用 `unstructured` 库专门处理表格,或者 PDF → HTML → 切。 3. **重复内容污染检索**:每篇文档都有相同的 "本文档版权所有…" 页脚 → embedding 集中在这个 footer,检索结果偏向有 footer 的文档。 切之前先 strip 这些 boilerplate。 4. **多语言文档**:中英文混排时 RecursiveCharacterTextSplitter 默认 separators 不包含中文标点。手动加 `"。"`、`"!"`、`"?"`、`";"`。

MLflow:本地自托管的实验跟踪 + 模型注册 + 部署 4-in-1

## 起因 公司数据不能上 wandb / Comet 这种 cloud SaaS。要本地自托管的实验 跟踪 + 模型版本控制 + 一键部署。 MLflow 是 Databricks 出的开源套件,4 个组件覆盖 ML lifecycle: - **Tracking**:记录实验(params / metrics / artifacts) - **Projects**:可复现的 ML 包格式 - **Models**:模型版本 + 多 framework 统一接口 - **Registry**:模型生命周期(staging / production / archived) ## 装 + 启服务 ```bash uv add mlflow # 启动 tracking server(默认 SQLite 后端 + 本地文件 artifact) mlflow server \ --host 0.0.0.0 --port 5000 \ --backend-store-uri sqlite:///mlflow.db \ --default-artifact-root ./mlruns ``` 或更"生产"配置(PostgreSQL + S3): ```bash mlflow server \ --host 0.0.0.0 --port 5000 \ --backend-store-uri postgresql://user:pass@db/mlflow \ --default-artifact-root s3://my-bucket/mlruns \ --workers 4 ``` systemd unit + nginx 套一下就是企业级服务。 ## Tracking:训练时记录 ```python import mlflow import mlflow.pytorch mlflow.set_tracking_uri('http://localhost:5000') mlflow.set_experiment('churn-prediction') with mlflow.start_run(run_name='lgbm-baseline'): mlflow.log_params({ 'model': 'lgbm', 'lr': 0.05, 'n_estimators': 200, 'max_depth': 7, }) mlflow.set_tag('dataset_version', 'v2024-05-01') # 训练 model = train(...) eval_metrics = evaluate(model, X_val, y_val) mlflow.log_metrics(eval_metrics) # {'auc': 0.84, 'precision': 0.71, 'recall': 0.66} # 多 step:每 epoch log for epoch in range(20): train_one_epoch() mlflow.log_metric('train/loss', loss, step=epoch) mlflow.log_metric('val/auc', auc, step=epoch) # 保存模型(mlflow 自动 capture 依赖 env) mlflow.lightgbm.log_model(model, 'model') # 任意 artifact mlflow.log_artifact('confusion_matrix.png') mlflow.log_artifact('feature_importance.csv') ``` 跑完 → MLflow UI 看到 run,有 params / metrics 表 + 曲线 + artifacts 下载。 ## 实验对比 UI 选多个 run → Compare → 表格 + parallel coordinates + scatter plot。 一眼看出"哪几个超参组合 auc 高"。 ## Model Registry ```python # 训练完成后注册到 registry mlflow.lightgbm.log_model( lgb_model=model, artifact_path='model', registered_model_name='ChurnPredictor', ) ``` UI 里看到 `ChurnPredictor v1`。 版本管理 + 状态机: ```python client = mlflow.MlflowClient() # 升级到 staging client.transition_model_version_stage( name='ChurnPredictor', version=1, stage='Staging', ) # 验证后升级到 production client.transition_model_version_stage( name='ChurnPredictor', version=1, stage='Production', archive_existing_versions=True, # 老 production 自动 archive ) ``` 线上代码总是拿 production 版本: ```python model = mlflow.pyfunc.load_model( model_uri='models:/ChurnPredictor/Production' ) prediction = model.predict(X) ``` 回滚?把老版本 transition 回 Production 即可。 ## Models:统一 framework 接口 ```python # sklearn mlflow.sklearn.log_model(model, 'm') # PyTorch mlflow.pytorch.log_model(model, 'm') # Transformers mlflow.transformers.log_model(pipeline, 'm') # 自定义(Pyfunc) class MyModel(mlflow.pyfunc.PythonModel): def load_context(self, context): self.artifact = load(context.artifacts['my_file']) def predict(self, context, input_df): return my_inference(input_df, self.artifact) mlflow.pyfunc.log_model('m', python_model=MyModel(), artifacts={'my_file': 'data.pkl'}) ``` 加载时**不需要知道 framework**: ```python model = mlflow.pyfunc.load_model('models:/X/Production') model.predict(df) ``` 业务代码不再 hardcode "import torch / sklearn / xgboost"。 ## 部署:一行 serve ```bash mlflow models serve -m models:/ChurnPredictor/Production -p 5001 # 起一个 HTTP API on :5001 ``` ```bash curl -X POST http://localhost:5001/invocations \ -H 'Content-Type: application/json' \ -d '{"dataframe_records": [{"age": 30, "balance": 1000}]}' # {"predictions": [0.72]} ``` 或 build Docker image: ```bash mlflow models build-docker -m models:/X/Production -n my-model:latest docker run -p 5001:8080 my-model:latest ``` 或部署到 Sagemaker / Azure ML / K8s: ```bash mlflow sagemaker deploy ... mlflow azureml deploy ... ``` framework 抽象一直延伸到部署。 ## Autologging ```python mlflow.sklearn.autolog() # 之后所有 sklearn fit() 自动 log model + params + metrics model = RandomForestClassifier(n_estimators=100) model.fit(X, y) # 自动 log: n_estimators / max_depth / mean_score / training_time / ... ``` 支持 sklearn / PyTorch / Lightning / TensorFlow / XGBoost / LightGBM。 适合"快速 baseline 跑 N 个 algorithm 选最好的"。 ## Projects:可复现 ML 包 `MLproject` 文件: ```yaml name: churn-prediction python_env: python_env.yaml entry_points: main: parameters: data_path: {type: string, default: 'data/train.parquet'} lr: {type: float, default: 0.05} n_estimators: {type: int, default: 200} command: 'python train.py --data {data_path} --lr {lr} --n {n_estimators}' ``` 跑: ```bash mlflow run . -P lr=0.1 -P n_estimators=500 # 或 git URL: mlflow run https://github.com/me/churn-prediction.git -P lr=0.1 ``` 自动建 virtualenv + 装依赖 + 跑训练。同事任何人能复现。 ## 与替代品对比 | | MLflow | Weights & Biases | Neptune | DVC | |---|---|---|---|---| | 自托管 | ✅ 简单 | 企业版 | ✅ | ✅ | | 实验跟踪 | ✅ | ✅ 最强 | ✅ | 较弱 | | 模型注册 | ✅ | ✅ | ✅ | ❌ | | 部署 | ✅ 内置 | ❌ | ❌ | ❌ | | Pipeline | ❌(外部 Airflow) | weave | ❌ | ✅ | | 价格 | 免费 | 付费(个人免费) | 付费 | 免费 | 数据合规要本地的 → MLflow。 体验最好 + 不担心数据外发 → wandb。 ## 我们的实战配置 `docker-compose.yml`: ```yaml services: mlflow: image: ghcr.io/mlflow/mlflow:v2.16.2 ports: ["5000:5000"] environment: MLFLOW_S3_ENDPOINT_URL: http://minio:9000 AWS_ACCESS_KEY_ID: minio AWS_SECRET_ACCESS_KEY: ... command: > mlflow server --host 0.0.0.0 --port 5000 --backend-store-uri postgresql://mlflow:pw@pg/mlflow --default-artifact-root s3://mlflow-artifacts/ pg: image: postgres:16 environment: POSTGRES_DB: mlflow POSTGRES_USER: mlflow POSTGRES_PASSWORD: pw minio: image: minio/minio command: server /data --console-address ":9001" ports: ["9000:9000", "9001:9001"] environment: MINIO_ROOT_USER: minio MINIO_ROOT_PASSWORD: ... ``` 三个 service:MLflow + Postgres (metadata) + Minio (S3 兼容 artifact)。 全本地,数据不出公司。 ## 效果 - 30+ 个实验跑下来"哪个 lr × dataset 最优"清晰 - 上线模型有版本号 + git commit 关联 + 训练数据版本 traceable - 回滚到上版本:UI 点 button + 30 秒 redeploy - 数据科学家 / ML 工程师 / 部署运维各看 UI 不同部分 ## 踩过的坑 1. **artifact 上传慢**:local file backend OK;S3 时大模型几 GB 上传 几分钟。`mlflow.log_artifact` 阻塞 train script。后台 thread 异步。 2. **autolog 误 log 整张 dataframe**:默认 sklearn autolog 会 log X_train shape + 部分 sample。私密数据可能进 MLflow → 安全隐患。 `mlflow.sklearn.autolog(log_input_examples=False)`。 3. **Model registry 没强制 stage gating**:任何人能把 dev model 推 Production。生产建 ACL + reviewer 流程。 4. **跨 Python 版本 model 加载失败**:在 3.10 训练的 sklearn model 在 3.12 load 时 unpickle 错。MLflow log model 时 capture 了 python_env.yaml,确认部署机器装对版本。 5. **UI 慢**:实验数量 > 几万后 list 慢。定期 archive 老 experiment 到 `s3://archived/`。

LangSmith 调试 LLM agent:把每个 prompt / 工具调用都看清楚

## 起因 写了一个 LangChain agent 帮用户查数据库 + 写 SQL + 解释结果, 跑起来时不时给出乱七八糟的答案。问题可能出在: - 哪一步的 prompt 让 LLM 跑偏? - 调了什么 tool、tool 返回了什么? - 重试了几次? - 哪段花了最多 token / 最长时间? `print(intermediate_steps)` 看不出来。LangSmith 是 LangChain 团队的可观测平台, 自动把 chain / agent 的每一次执行都记下来,UI 时间线展开看。 ## 解决方案 ### 注册 + 装 注册 [smith.langchain.com](https://smith.langchain.com)(免费层个人项目够), 拿 API key。 ```bash uv add langsmith export LANGCHAIN_TRACING_V2=true export LANGCHAIN_API_KEY=ls__xxxxxxxx export LANGCHAIN_PROJECT=my-agent-debug ``` **就这样**。LangChain 自动把所有 chain / agent 执行 trace 上报到 LangSmith。 不需要改任何代码。 ### 看 trace 进 LangSmith UI → 选 project → 看 trace 列表: ``` Run name Duration Tokens Status sql_agent.invoke 5.3s 2,341 success sql_agent.invoke 12.1s 8,902 error ... ``` 点进任意 run: ``` └─ sql_agent (5.3s, 2341 tokens) ├─ planner (1.2s, 540 tokens) │ └─ ChatOpenAI (1.1s, 540 tokens) │ ▶ prompt: "You are a SQL planner. Decide..." │ ◀ output: {"action": "run_sql", "sql": "SELECT ..."} ├─ tool: run_sql (0.3s) │ ▶ input: SELECT count(*) FROM users WHERE ... │ ◀ output: [{"count": 142}] └─ responder (3.8s, 1801 tokens) └─ ChatOpenAI (3.7s, 1801 tokens) ▶ prompt: "Given the query result, answer..." ◀ output: "总共有 142 个符合条件的用户。" ``` 每一层展开看完整 prompt + completion + tokens + latency。 ### 找出"为什么这次出错" filter "status=error" 看所有失败 run。点进去: ``` └─ sql_agent (failed at responder) ├─ planner ✓ ├─ run_sql ✗ (ERROR: column "user_typ" does not exist) └─ ... ``` 清楚看到 SQL agent 拼错列名(应该是 user_type)。回头改 prompt 加 schema 提示。 ### A/B 对比 prompt LangSmith 有 "Datasets" + "Compare experiments": 1. 创建 dataset:10-20 个典型 query + 期望答案 2. 跑 prompt v1:`run_on_dataset(dataset_name, prompt_v1_chain)` 3. 跑 prompt v2:同上 4. UI 对比每个 input 上 v1 vs v2 的输出 + 自动 eval 分数 ```python from langsmith import Client client = Client() dataset = client.create_dataset('sql_agent_eval') client.create_example( inputs={'query': '过去 7 天注册的用户数'}, outputs={'expected': '~150'}, dataset_id=dataset.id, ) from langchain.smith import RunEvalConfig client.run_on_dataset( dataset_name='sql_agent_eval', llm_or_chain_factory=lambda: sql_agent_v2, evaluation=RunEvalConfig(evaluators=['qa', 'context_qa']), ) ``` LangSmith 自动用 GPT-4 当 judge 评分。 ### prompt hub LangChain Hub 集成在 LangSmith: ```python from langchain import hub prompt = hub.pull('rlm/rag-prompt') ``` 社区 prompt 模板,pull + 改自己版本 + push 回去(你的私有 namespace)。 ### 用于 production 监控 不止 dev 用: ```python import os os.environ['LANGCHAIN_TRACING_V2'] = 'true' os.environ['LANGCHAIN_PROJECT'] = 'prod' # 区分环境 ``` production 上跑的每个 trace 都收集。看: - 每天调用量 - 每个 chain 的 P50 / P95 延迟 - token 消耗趋势 - error 率 + 错误类型分布 可以接 alert:当 error rate > 5% 邮件通知。 ## 与替代品对比 | | LangSmith | Langfuse | Phoenix (Arize) | |---|---|---|---| | 开源 / 自托管 | ❌(cloud 为主,自托管要 enterprise) | ✅ 全开源 | ✅ | | 与 LangChain 集成 | 最原生 | 中 | 中 | | 与 LlamaIndex | 中 | 中 | 强 | | eval 框架 | ✅ | ✅ | ✅ | | 价格 | 免费 5k traces/月 | 完全免费(自托管) | 免费层 | LangChain 项目用 LangSmith 最方便;不想绑定平台用 Langfuse 自托管。 ## 效果 - agent 失败率 12% → 4%,靠看 trace 改 prompt 一周搞定 - "为什么这次跑出 X" 的问题从"猜 + 加 print"变成 "去 LangSmith 看一下" - 找到一个 prompt 让 token 消耗减半(trace 里看到 LLM 反复重复同样 上下文) - 团队 review prompt 改动有了客观依据(运行某 dataset 对比 v1/v2 分数) ## 踩过的坑 1. **traces 含敏感数据上 cloud**:用户邮箱 / 手机号都进 prompt 时 隐私问题。开 `LANGCHAIN_TRACING_V2=false` 临时关,或者 enterprise 自托管。 2. **大批量 run 上报慢**:默认同步上报,每个 run 加 50-100ms。设 `LANGCHAIN_CALLBACKS_BACKGROUND=true` 异步上报。 3. **trace 嵌套太深**:复杂 agent 数十层调用,UI 加载慢。用 tag / metadata 标记关键步骤再筛选。 4. **eval 用 GPT-4 评分**:成本可能比被评模型还高。先小 dataset 验证 eval 设置对,再扩规模。 5. **本地 LLM trace**:用 Ollama / vLLM 跑本地模型时,把 ChatOpenAI 的 base_url 改本地即可,trace 一样上报。注意 token 计数对本地 模型不准。

PyTorch 训练 OOM 排查:activation checkpoint / 梯度累积 / offload

## 起因 要 fine-tune 一个 7B 模型,A100 40GB 显存,跑起来直接 CUDA OOM。 "换大卡"是简单解决但贵。理解几个技术能在同样显存里训更大模型 / 更大 batch。 ## 各项的显存占用拆解 训练时显存 ≈ 模型权重 + 梯度 + optimizer state + activations + 临时 buffer。以 7B FP16 模型 + AdamW 为例: | 项 | 公式 | 7B 模型 | |---|---|---| | 权重 | params × 2 bytes (fp16) | 14 GB | | 梯度 | params × 2 bytes | 14 GB | | optimizer state(AdamW) | params × 8 bytes (FP32 m+v) | 56 GB | | activations | 依 batch / seq | 几 GB-几十 GB | **总 = 84 GB + activations**。一张 A100 40GB 远不够。 ## 解决方案逐个上 ### 1. 混合精度(FP16/BF16)— 必选 ```python # pure PyTorch scaler = torch.cuda.amp.GradScaler() for batch in loader: with torch.cuda.amp.autocast(dtype=torch.bfloat16): loss = model(batch).loss scaler.scale(loss).backward() scaler.step(opt); scaler.update() ``` 权重 / 梯度从 FP32 4 bytes → FP16/BF16 2 bytes,对半省。 A100+ 推荐 BF16(无需 grad scaler,数值更稳)。 ### 2. Gradient Checkpointing — 用计算换显存 normal 前向把所有 activations 都存着(反向用)。checkpointing 只保存 某几层,其它 layer 反向时重新算前向: ```python model.gradient_checkpointing_enable() # transformers 一行 ``` 省 activations 50-80%,代价是训练慢 ~20-30%。LLM fine-tune 默认开。 ### 3. Gradient Accumulation — 模拟更大 batch 显存装不下 batch=32?跑 batch=8 累 4 次 = batch=32 等效: ```python accum_steps = 4 for i, batch in enumerate(loader): loss = model(batch).loss / accum_steps loss.backward() if (i + 1) % accum_steps == 0: opt.step(); opt.zero_grad() ``` 显存等同 batch=8,效果近似 batch=32。 ### 4. CPU offload(DeepSpeed / accelerate) 把 optimizer state 卸到 CPU 内存,反正它不参与每步前反向: ```python from accelerate import Accelerator acc = Accelerator( mixed_precision='bf16', gradient_accumulation_steps=4, ) model, opt, loader = acc.prepare(model, opt, loader) ``` 或用 DeepSpeed ZeRO-2 / ZeRO-3: ```python # accelerate config 选 DeepSpeed # 跑: accelerate launch --num_processes=1 \ --mixed_precision=bf16 \ --deepspeed_stage=2 \ train.py ``` ZeRO-2 把 optimizer state 分片(多卡时)/ offload 到 CPU(单卡时), 省 56 GB → 0 GB(cpu 接管)。代价:每 step 数据传输延迟。 ### 5. LoRA / QLoRA — 只训一小部分参数 ```python from peft import LoraConfig, get_peft_model config = LoraConfig( r=8, lora_alpha=16, lora_dropout=0.05, target_modules=['q_proj', 'v_proj'], task_type='CAUSAL_LM', ) model = get_peft_model(base_model, config) model.print_trainable_parameters() # trainable: 4.2M / 7B = 0.06% ``` 只有 LoRA 的小矩阵需要梯度 + optimizer state。7B 模型变成"7B 冻结 + 4M 可训",显存暴跌。 QLoRA 进一步把 base model 也量化到 4-bit: ```python from transformers import BitsAndBytesConfig bnb = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type='nf4', bnb_4bit_compute_dtype=torch.bfloat16, ) model = AutoModelForCausalLM.from_pretrained(name, quantization_config=bnb) model = get_peft_model(model, config) ``` 7B QLoRA 单 A100 40GB 训 batch=4 可以跑得动。 ## 效果(我的 case:A100 40GB fine-tune Qwen2 7B) | 配置 | 显存 | 训练速度 | 效果损失 | |---|---|---|---| | FP32 full ft | OOM | — | — | | BF16 full ft | OOM (~80 GB) | — | — | | BF16 + grad checkpoint | OOM (~50 GB) | — | — | | BF16 + checkpoint + ZeRO-2 cpu offload | 32 GB | 1x | 0 | | BF16 + LoRA | 24 GB | 1.3x | 微小 | | BF16 + QLoRA | 14 GB | 1.2x | 1-2% | 最终 QLoRA 跑通 fine-tune,loss 收敛、benchmark 比 base 提升 8%。 ## 调试技巧 ```python # 看每层显存 print(torch.cuda.memory_summary()) # 最大峰值 print(f'peak: {torch.cuda.max_memory_allocated() / 1e9:.2f} GB') torch.cuda.reset_peak_memory_stats() # 实时监控 nvidia-smi -l 1 # 或更细: nvtop ``` 跑 OOM 时立刻 `nvidia-smi` 看到底是 model load 时挂了还是 forward 时挂了, 对症下药。 ## 踩过的坑 1. **`del var` 不立刻释放**:PyTorch caching allocator 不还给 OS。 `torch.cuda.empty_cache()` 也只是把 cached block 让出来,**不会** 实际减少 OS 看到的进程显存。 2. **DataLoader pin_memory + num_workers 大**:每个 worker 一份 GPU 显存映射。OOM 时先减 `num_workers`。 3. **eval 不开 no_grad**:评估时没 `with torch.no_grad():`,accidentally build 完整 computation graph,显存翻倍。 4. **多个模型同时 load**:base model + LoRA + reward model 一起在 GPU 上时,DPO / RLHF 训练显存压力极大。把 reward model 量化或 freeze 后丢 CPU。 5. **使用 `compute_dtype=torch.float16` + Adam**:fp16 + Adam 数值 不稳定。一律 `bf16` 或者 fp32 master weight(mixed precision)。

用 ONNX Runtime 部署 PyTorch 模型(CPU / GPU 通用、跨语言)

训练用 PyTorch 灵活,部署到生产时通常希望: - 没有 PyTorch 100+ MB 依赖 - 跨语言(C++ / Go / JS / Java 都能加载) - CPU / GPU 都能跑 - 性能更好(融合算子) ONNX 是开放神经网络交换格式,ONNX Runtime 是 Microsoft 的高性能推理引擎。 PyTorch 训练完导出 ONNX,运行时用 ORT 加载。 ## 装 ```bash uv add torch onnx onnxruntime # GPU 推理: uv add onnxruntime-gpu ``` ## 1. 导出 PyTorch 模型为 ONNX ```python import torch from your_model import Net model = Net() model.load_state_dict(torch.load('mnist.pt')) model.eval() # 一个 dummy 输入用于 trace dummy = torch.randn(1, 1, 28, 28) torch.onnx.export( model, dummy, 'mnist.onnx', input_names=['input'], output_names=['logits'], dynamic_axes={ 'input': {0: 'batch'}, # batch 维度可变 'logits': {0: 'batch'}, }, opset_version=17, ) ``` `dynamic_axes` 让导出的模型支持任意 batch size,否则固定为 dummy 的形状。 ## 2. 校验导出正确 ```python import onnx m = onnx.load('mnist.onnx') onnx.checker.check_model(m) print(onnx.helper.printable_graph(m.graph)) ``` `check_model` 不报错就 OK。 ## 3. 推理(Python) ```python import onnxruntime as ort import numpy as np sess = ort.InferenceSession('mnist.onnx', providers=['CPUExecutionProvider']) # GPU: providers=['CUDAExecutionProvider'] # 看输入输出 for i in sess.get_inputs(): print(f'input {i.name}: {i.shape} {i.type}') for o in sess.get_outputs(): print(f'output {o.name}: {o.shape} {o.type}') # 跑推理 x = np.random.rand(4, 1, 28, 28).astype(np.float32) logits = sess.run(['logits'], {'input': x})[0] pred = logits.argmax(axis=1) print(pred) ``` ## 4. 性能基准 ```python import time, numpy as np x = np.random.rand(1, 1, 28, 28).astype(np.float32) # warm up for _ in range(10): sess.run(['logits'], {'input': x}) t0 = time.time() for _ in range(1000): sess.run(['logits'], {'input': x}) print(f'avg latency: {(time.time()-t0)/1000*1000:.2f} ms') ``` 对比 PyTorch: ```python import torch model.eval() x = torch.randn(1, 1, 28, 28) with torch.no_grad(): for _ in range(10): model(x) t0 = time.time() for _ in range(1000): model(x) print(f'pytorch avg: {(time.time()-t0)/1000*1000:.2f} ms') ``` CPU 上 ONNX Runtime 通常比 PyTorch 快 1.5-3x(算子融合 + 简化 graph)。 ## 5. 性能优化 ```python sess = ort.InferenceSession( 'mnist.onnx', providers=['CPUExecutionProvider'], sess_options=ort.SessionOptions(), ) opts = ort.SessionOptions() opts.intra_op_num_threads = 4 opts.inter_op_num_threads = 1 opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL opts.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL sess = ort.InferenceSession('mnist.onnx', sess_options=opts, providers=['CPUExecutionProvider']) ``` 对于 transformer / 大模型,`enable_profiling=True` 让 ORT 输出每个算子的耗时 帮助找瓶颈。 ## 6. 多 provider ```python # 优先 GPU,没有就 CPU sess = ort.InferenceSession( 'mnist.onnx', providers=['CUDAExecutionProvider', 'CPUExecutionProvider'] ) print(sess.get_providers()) ``` NVIDIA:CUDAExecutionProvider / TensorrtExecutionProvider Apple:CoreMLExecutionProvider AMD:ROCMExecutionProvider Intel:OpenVINOExecutionProvider ## 7. 量化(减小模型 + 加速 CPU 推理) ```python from onnxruntime.quantization import quantize_dynamic, QuantType quantize_dynamic('mnist.onnx', 'mnist.int8.onnx', weight_type=QuantType.QInt8) ``` INT8 量化通常 2-4x 推理加速 + 模型大小 1/4。精度损失对 ResNet / 简单 CNN 很小(< 1% 准确率),对 BERT 类需要 calibration 复杂些。 ## 8. C++ 推理 ```cpp #include "onnxruntime_cxx_api.h" Ort::Env env(ORT_LOGGING_LEVEL_WARNING, "test"); Ort::SessionOptions opts; Ort::Session session(env, "mnist.onnx", opts); // ... 准备输入 tensor ... auto output = session.Run(...); ``` 完整 C++ 例子在 ONNX Runtime 仓库。集成到 C++ 服务里完全摆脱 Python 依赖。 ## 9. 浏览器推理(onnxruntime-web) ```html <script src="https://cdn.jsdelivr.net/npm/onnxruntime-web/dist/ort.min.js"></script> <script> const session = await ort.InferenceSession.create('mnist.onnx'); const input = new ort.Tensor('float32', data, [1, 1, 28, 28]); const results = await session.run({ input: input }); console.log(results.logits.data); </script> ``` WebGPU / WebGL / wasm 多后端自动选。小模型直接跑浏览器,数据不离开 用户设备。 ## 10. 部署模式比较 | 工具 | 适合 | |---|---| | ONNX Runtime | 跨语言、跨平台、单进程 | | TorchServe | 多 PyTorch 模型微服务 | | Triton Inference Server | NVIDIA GPU 多模型高并发 | | BentoML | Python 服务封装(含监控 / 队列) | | vLLM | LLM 专用(PagedAttention) | ONNX Runtime 是"最通用最简单"那档;要更高级 ops(动态 batching、 gpu 调度)上 Triton。 ## 踩过的坑 - 导出失败 "Unsupported ONNX opset version":升 `opset_version` 或者 降 PyTorch 中用的 op(替换 custom op)。 - 导出后形状对但数值差:训练时 BatchNorm 等 running stats 没保存好, 确保 `model.eval()` 后再 export。 - Dynamic axes 没写:onnx 模型固定 batch=1,部署时 batch=N 直接报错。 - ONNX 模型文件很大(含权重):考虑用 `onnx.save_model(m, ..., save_as_external_data=True)` 把权重分离存储,便于 CDN / 分发。

LLM prompt engineering 实战 6 个 pattern(少花钱 + 多对题)

## 起因 接了一个"自动分类客服工单" 的任务。最 naive prompt: ``` 分类下面的客服请求到一个类别: {ticket} ``` 效果:60% 正确率 + 经常输出无关解释 + 偶尔编造类别。 调了几轮 prompt 后到 92%,token 使用减半。下面是几个真正提分的 pattern,不是"魔法咒语"。 ## Pattern 1: 明确角色 + 任务 + 输出格式 ``` 你是客服工单分类专家。 任务:把客户请求归到下面一个类别。 类别(严格选其中一个): - billing (付款 / 发票 / 退款) - bug (产品故障 / 异常报错) - feature_request (希望新增功能) - account (账号登录 / 权限 / 密码) - other (上面都不对) 输出格式: {"category": "<类别名>", "confidence": <0-1>} 不要输出任何额外解释。 客户请求: {ticket} ``` 为什么有效: - **角色**:把模型从"通用助手" 引导到"分类专家"语境 - **限制类别**:明确告诉它可选范围,减少幻觉 - **JSON 输出**:好解析 + 没散文废话 - **"不要输出额外解释"**:明确禁止前/后缀闲话 立刻从 60% → 80%。 ## Pattern 2: Few-shot 例子 光描述类别不够,给具体例子: ``` [上面的 prompt...] 例子: 输入:「我的卡被扣了两次,请退一次」 输出:{"category": "billing", "confidence": 0.95} 输入:「点击保存按钮后页面卡死」 输出:{"category": "bug", "confidence": 0.9} 输入:「能不能加个深色模式?」 输出:{"category": "feature_request", "confidence": 0.95} 输入:{ticket} 输出: ``` 3-5 个例子覆盖"边界 / 容易混淆" 的情况效果最好。 "为什么这个分到 billing 而不是 account" 类的边界 case 用例子讲清楚。 → 80% → 90%。 ## Pattern 3: Chain of thought(思考过程) 对推理类任务(不是简单分类)让模型先"想"再答: ``` 你是数学题解题专家。 题目:一个商店周一卖了 23 件商品,周二是周一的 1.5 倍, 周三是周一周二之和的一半。三天总共卖了多少? 要求: 1. 先逐步推理(标 [思考]) 2. 再给最终答案(标 [答案]) ``` 模型输出: ``` [思考] - 周一:23 件 - 周二:23 × 1.5 = 34.5 件 → 取整 35(实际 35 才合理) - 周三:(23 + 35) / 2 = 29 件 - 总:23 + 35 + 29 = 87 件 [答案] 87 ``` 直接问"答案是?"很多时候算错。让它写推理过程后准确率显著上升。 适合:数学 / 推理 / 多步逻辑。 不适合:简单 lookup / 分类(CoT 浪费 token)。 ## Pattern 4: Structured output (JSON mode / function call) 不要让 LLM 用 markdown 包 JSON 后让你正则提取——浪费 token + 不稳定。 直接用 API 的 structured output: ```python from openai import OpenAI import json client = OpenAI() resp = client.chat.completions.create( model='gpt-4o', messages=[{'role': 'user', 'content': prompt}], response_format={ 'type': 'json_schema', 'json_schema': { 'name': 'ticket_classification', 'schema': { 'type': 'object', 'properties': { 'category': { 'type': 'string', 'enum': ['billing', 'bug', 'feature_request', 'account', 'other'], }, 'confidence': {'type': 'number', 'minimum': 0, 'maximum': 1}, }, 'required': ['category', 'confidence'], 'additionalProperties': False, }, 'strict': True, } } ) result = json.loads(resp.choices[0].message.content) ``` `strict: true` 让模型在 decoding 时只产生 schema 合法 token。 **100% 合规 JSON**,从此不需要 try/except parse。 Anthropic 用 tool use / Gemini 用 response_schema 同理。 ## Pattern 5: 减少 token = 省钱 + 快 token 不仅是钱,还是延迟。每个 token 大模型 50-200ms。 技巧: 1. **删冗余形容词**:"请帮忙仔细认真地分析下面..." → "分析:" 2. **缩短例子**:3 个例子够时不放 10 个 3. **不重复 system + user**:system 里写完类别后 user 不重复 4. **truncate 长输入**:保留 ticket 前 1000 字符(多数信号在开头) 5. **batch 处理**:5 个工单一起喂("分析下面 5 个 ticket")省 system token ```python # 单条 vs batch single_cost = (system_tokens + ticket_tokens + output) * N batch_cost = (system_tokens + N * ticket_tokens + N * output) # batch 省的是 N 份 system tokens ``` 我们 batch=10 后 token 量降 30%。注意 batch 太大模型 attention 分散 精度反而降,5-15 是甜点。 ## Pattern 6: 自检 / verifier loop 对关键任务,跑两次让另一个 prompt 验证: ```python # Pass 1: 主分类 result = classify(ticket) # Pass 2: 验证 verify_prompt = f""" 有人把这个工单分类为 '{result['category']}'。是否合理? 工单:{ticket} 输出 JSON: {{"agree": true|false, "reason": "..."}} """ verify = llm(verify_prompt) if not verify['agree']: # 退到人工 review 队列 flag_for_human(ticket, result, verify['reason']) ``` cost 翻倍但低置信度 ticket 被人工接管,整体准确率上 96%+。 ## 一些反 pattern ### ❌ 写一堆"必须 / 不要 / 绝对" ``` 你必须按下面规则。 你绝对不能输出 X。 你一定要返回 JSON。 你不允许加任何解释。 你必须用中文。 ``` 模型对负面指令响应一般。改成正面: ``` 输出只包含 JSON。返回的语言是中文。 ``` ### ❌ 提供模糊定义 ``` 分类为: - 重要的问题 - 不重要的问题 ``` "重要" 没定义 → 模型自己猜 → 不稳定。 给具体标准 + 例子。 ### ❌ "请尽量准确" 这种废话 模型不会因为你 polite 就更努力。直接指令。 ### ❌ 给"超过 100k token" 的 context 大 context 时模型有"middle lost in the haystack" 效应——中间的信息 被忽略。能 chunk + RAG 的就别一次塞。 ## 调试 / 评估 写个 eval set(30-100 个 ground truth 例子): ```python test_cases = [ {'ticket': '...', 'expected': 'billing'}, {'ticket': '...', 'expected': 'bug'}, ... ] correct = 0 for tc in test_cases: pred = classify(tc['ticket']) if pred['category'] == tc['expected']: correct += 1 print(f'accuracy: {correct/len(test_cases):.2%}') ``` 每改 prompt 跑一遍 eval。比"感觉好像准了" 客观。 用 LangSmith / Promptfoo / weave 等工具系统化跑 A/B prompt 对比。 ## 模型选择 不同任务用不同模型: | 任务 | 推荐 | |---|---| | 简单分类 / 提取 | gpt-4o-mini / claude-haiku(便宜 + 快) | | 推理 / 代码 | gpt-4o / claude-sonnet-4-5 | | 极致难推理 | o1 / claude opus-4-7 / DeepSeek-R1 | | 本地隐私 | qwen2.5:14b / llama3.1:8b(Ollama) | 强行用大模型做简单分类 = 浪费钱。 ## 效果 工单分类项目最终: - 准确率 60% → 92% (+ verifier 后 96%) - 单 ticket 成本 $0.012 → $0.003(batch + 改 mini) - P95 延迟 5s → 1.2s(mini + structured output) - 总 month cost 从 $800 → $150 ## 踩过的坑 1. **改 prompt 不跑 eval**:"感觉变好了"是偏见。每次改后跑测试集 验证。 2. **prompt 越改越长**:加 patch 修 case → 长 prompt → 模型 confused。 定期 refactor 简化。 3. **生产环境模型版本变**:OpenAI / Anthropic 偶尔 silent 升级模型, prompt 行为变化。pin model name 包括 date suffix (`gpt-4o-2024-08-06`) + 监控 metrics。 4. **temperature**:分类 / 提取类 task 设 0 (deterministic)。 创意写作设 0.7-1.0。 5. **隐私数据**:不要把客户 PII 直接发 OpenAI。先 mask 邮箱 / 手机 / 信用卡再调 API。

用 bge-m3 自托管 embedding 服务(替代 OpenAI text-embedding API)

## 起因 我们的 RAG 系统每天调 OpenAI `text-embedding-3-small` 几十万次,账单 $300+/月。而且内部知识库不能出公网。bge-m3 是智源开源的多语言 embedding 模型,质量在 MTEB 中文榜上经常排前 3,比 OpenAI 3-small 还好。 本地跑一张 4090 就够。 ## 解决方案:FastAPI 包一层 OpenAI 兼容接口 让现有用 OpenAI Python SDK 的代码改一行 base_url 就能切。 ### 装 ```bash uv add fastapi 'uvicorn[standard]' sentence-transformers # 第一次启动会下载 bge-m3 模型 (~1.2 GB) ``` ### service.py ```python import logging from contextlib import asynccontextmanager from typing import List import torch from FlagEmbedding import BGEM3FlagModel from fastapi import FastAPI, HTTPException from pydantic import BaseModel log = logging.getLogger('embedding') class EmbeddingRequest(BaseModel): input: str | List[str] model: str = 'bge-m3' encoding_format: str | None = 'float' class EmbeddingData(BaseModel): object: str = 'embedding' embedding: List[float] index: int class EmbeddingResponse(BaseModel): object: str = 'list' data: List[EmbeddingData] model: str usage: dict model: BGEM3FlagModel | None = None @asynccontextmanager async def lifespan(app: FastAPI): global model log.info('loading bge-m3 ...') model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=torch.cuda.is_available()) log.info('ready') yield app = FastAPI(lifespan=lifespan) @app.post('/v1/embeddings', response_model=EmbeddingResponse) def embed(req: EmbeddingRequest): if model is None: raise HTTPException(503, 'model loading') texts = [req.input] if isinstance(req.input, str) else req.input if not texts: raise HTTPException(400, 'empty input') if len(texts) > 256: raise HTTPException(400, 'batch too large') out = model.encode(texts, batch_size=32, max_length=8192, return_dense=True)['dense_vecs'] data = [EmbeddingData(embedding=vec.tolist(), index=i) for i, vec in enumerate(out)] total_tokens = sum(len(t) for t in texts) // 4 # 粗估 return EmbeddingResponse( data=data, model='bge-m3', usage={'prompt_tokens': total_tokens, 'total_tokens': total_tokens}, ) @app.get('/health') def health(): return {'ok': model is not None} ``` ### 起服务 ```bash uv run uvicorn service:app --host 0.0.0.0 --port 8001 # 或生产: uv run gunicorn -k uvicorn.workers.UvicornWorker \ -w 1 -b 0.0.0.0:8001 service:app ``` 注意 `-w 1`:embedding 是 GPU 密集,多 worker 抢一张卡反而慢。 水平扩容用多机或者把模型放多张卡。 ### 客户端:OpenAI SDK 直接用 ```python from openai import OpenAI client = OpenAI( base_url='http://localhost:8001/v1', api_key='local', # 不校验,随便填 ) resp = client.embeddings.create( input=['你好世界', 'Hello world', '今日天气不错'], model='bge-m3', ) for i, d in enumerate(resp.data): print(f'{i}: dim={len(d.embedding)} first 5={d.embedding[:5]}') ``` ## 性能 我的 RTX 4090 上: - batch=32 文本(每条 ~200 字):~80ms(≈ 400 texts/s 持续) - batch=1:~20ms P95 延迟 < 100ms 在企业 RAG 场景完全够。OpenAI 3-small 平均 200-500ms (网络),自托管反而更快。 ## 效果 - 月度 embedding 账单 $300 → $0 - P95 延迟 320ms → 80ms(无公网往返) - 内部敏感文档不出公网 - bge-m3 中文 MTEB 评测比 text-embedding-3-small 高 5-8 个点 ## 踩过的坑 1. **第一次冷启动慢**:bge-m3 模型 1.2 GB 从 HuggingFace 拉。在国内 设 `HF_ENDPOINT=https://hf-mirror.com`。 2. **OpenAI SDK 严格校验维度**:bge-m3 输出 1024 维,OpenAI 3-small 是 1536。如果客户端代码硬编码 1536 校验,要么改客户端要么改 model 多输出 padding。 3. **`use_fp16=True` 在某些老 GPU 数值溢出**:T4 / 老 CUDA 上偶尔 NaN。 降到 `use_fp16=False` 慢 30% 但稳。 4. **batch 太大显存爆**:bge-m3 max_length=8192,batch=64 × 8192 token 可能 OOM。生产 `batch_size=32` + `max_length=512`(多数 chunk 这个 长度够用)。 5. **multi-process 共享 GPU 模型不共享内存**:每个 worker 各自 load 一份 1.2GB。`-w 1` 单 worker 是正确选择;要并发用 async + 内部 batch 合并请求。

用 Weights & Biases (wandb) 跟踪 ML 实验(替代手抄表格)

跑 ML 实验最容易混乱的是"我那个 lr=0.001 + dropout=0.3 的 run 是哪天哪份代码?" wandb 自动记录:超参、loss / metric 曲线、代码 git hash、系统资源、模型权重。 免费层个人项目无限。 ## 安装 + 注册 ```bash uv add wandb wandb login # 浏览器打开,复制 API key 进来 # 或:export WANDB_API_KEY=... ``` ## 最小集成(5 行代码) ```python import wandb wandb.init( project='mnist-cnn', config={ 'lr': 0.001, 'batch_size': 128, 'epochs': 5, 'dropout': 0.25, 'model': 'cnn-v1', }, ) # 训练循环里 for epoch in range(5): train_loss, train_acc = train_epoch() val_loss, val_acc = evaluate() wandb.log({ 'train/loss': train_loss, 'train/acc': train_acc, 'val/loss': val_loss, 'val/acc': val_acc, 'epoch': epoch, }) wandb.finish() ``` 跑一下 `python train.py`,wandb 打印一个 URL,打开就能看到实时曲线。 ## config 优先级 ```python # 1. 默认在代码里 wandb.init(config={'lr': 0.001}) # 2. 命令行覆盖(用 argparse / typer / hydra) import argparse p = argparse.ArgumentParser() p.add_argument('--lr', type=float, default=0.001) args = p.parse_args() wandb.init(config=vars(args)) # 3. Sweep 时由 wandb 注入 config = wandb.config # 读,不能写 lr = config.lr ``` ## Sweep:自动超参搜索 ```yaml # sweep.yaml program: train.py method: bayes metric: name: val/acc goal: maximize parameters: lr: distribution: log_uniform_values min: 1e-5 max: 1e-2 dropout: values: [0.1, 0.25, 0.5] batch_size: values: [64, 128, 256] ``` ```bash wandb sweep sweep.yaml # Output: wandb agent yourname/project/sweep_id wandb agent yourname/project/sweep_id --count 20 ``` 跑 20 个实验,自动按 Bayesian / grid / random 选超参。 多台机器并行:每台跑 `wandb agent ...` 共享同一个 sweep。 ## 记录媒体 ```python # 图片 import matplotlib.pyplot as plt fig, ax = plt.subplots() ax.plot(losses) wandb.log({'curve': wandb.Image(fig)}) # 表格 wandb.log({ 'predictions': wandb.Table( columns=['image', 'pred', 'truth'], data=[[wandb.Image(x), p, y] for x, p, y in samples], ) }) # 直方图 wandb.log({'grad_norm': wandb.Histogram(grad_norms)}) # 视频 / 音频 wandb.log({'video': wandb.Video('output.mp4')}) ``` ## 保存模型(Artifact) ```python artifact = wandb.Artifact(name='mnist-cnn', type='model') artifact.add_file('checkpoints/best.pt') wandb.log_artifact(artifact) ``` 模型权重 + 元数据存在 wandb 服务端(免费层有配额)。后续加载: ```python api = wandb.Api() artifact = api.artifact('yourname/project/mnist-cnn:latest') artifact.download(root='./model') ``` ## 监控系统资源 `wandb` 自动记录: - GPU 利用率 / 显存 - CPU / 内存 - 磁盘 / 网络 - Python 进程 无需任何代码,看 dashboard 的 "System" 标签页。 ## 代码 + git 状态 `wandb` 自动 capture: - 当前 git commit hash - 未提交的 diff(不要在没干净 commit 时跑实验!) - 命令行参数 - Python 版本 + 包列表 回放某次 run:知道用了哪份代码 + 哪个数据。 ## 离线模式 无网时: ```bash wandb offline # 或:export WANDB_MODE=offline python train.py # 数据存在本地 ./wandb/ # 之后有网时 wandb sync ./wandb/offline-run-* ``` CI / 内网集群里非常有用。 ## 与 PyTorch Lightning / HuggingFace 集成 ```python # PyTorch Lightning from pytorch_lightning.loggers import WandbLogger logger = WandbLogger(project='myproj') trainer = Trainer(logger=logger) # HuggingFace Transformers training_args = TrainingArguments( report_to='wandb', run_name='bert-finetune-v3', ... ) ``` 不用手写 log,框架自动调 wandb。 ## 团队协作 - **Project** 是大目录(按代号 / 任务) - **Run** 是单次实验 - **Group** 让你把同一个 sweep / 同一组对比放一起 - **Tags** 给 run 打标签(baseline / experiment / final) ```python wandb.init( project='myproj', group='ablation-dropout', tags=['final', 'cnn-v2'], notes='increased lr to 5e-3 to test convergence speed', ) ``` ## 数据 dashboard dashboard 默认按时间线显示。常用功能: - **Reports**:把多个 run 拖到一份"报告"里,加文字 + 自动同步图表, 当作"项目周报"或 paper 草稿 - **Parallel Coordinates**:可视化超参 → metric 的关系, 找哪个超参影响最大 - **Compare runs**:勾几个 run 一起看曲线 / config diff ## 隐私 / 自托管 wandb cloud 免费个人无限,团队收费。如果数据敏感不能上 cloud: ```bash # 自托管 wandb server(社区版免费) docker run -p 8080:8080 wandb/local ``` Python 端: ```python wandb.init(project='...', host='https://wandb.your-company.com') ``` ## 替代方案 - **MLflow**:开源,自托管简单。tracking + model registry,UI 朴素 - **TensorBoard**:本地用,无云端 - **Comet / Neptune**:商业产品类似 - **Aim**:开源极简,无云 个人项目 wandb 最快;公司里数据合规要求高用 MLflow。 ## 踩过的坑 - 忘 `wandb.finish()` —— 长 run 退出后 wandb sync 一直挂着。 脚本最后必须 `finish()` 或者用 `with wandb.init(...) as run:`。 - 每次 `wandb.log` 都立刻发到云端 → 训练时大量请求可能拖慢。 设 `commit=False` 累积后批量发:`wandb.log({...}, commit=False)`, 最后 `wandb.log({}, commit=True)`。 - 在 jupyter 里 wandb.init 多次:会创建多个 run。每次重启 kernel 之前先 `wandb.finish()`。 - artifact 配额:免费个人 100GB,超了不能再传。定期清老 artifact。