知识广场
按学科筛选:计算机科学 / 人工智能 / 机器学习
«计算机科学 / 人工智能 / 机器学习» 分类下共 23 篇帖子
## 起因 裸 PyTorch 写一个像样的训练循环要处理:device 切换 / `.train()/.eval()` / gradient zero / loss accumulation / lr scheduler step / checkpoint 保存 / early stopping / 多 GPU DDP 启动 / 混合精度 / logging。 一个研究 notebook 反复抄这些代码很烦,还容易写错(忘 `optimizer.zero_grad()` 或者 `.eval()` 是经典的)。 PyTorch Lightning 把"工程脚手架"和"模型逻辑"分开:你只写 `training_step` / `validation_step` / `configure_optimizers`,其它由 framework 处理。 ## 解决方案 ```bash uv add lightning torchvision ``` ```python import lightning as L import torch import torch.nn.functional as F from torch.utils.data import DataLoader from torchvision import datasets, transforms class MNIST(L.LightningModule): def __init__(self, lr=1e-3): super().__init__() self.save_hyperparameters() self.conv = torch.nn.Sequential( torch.nn.Conv2d(1, 32, 3, padding=1), torch.nn.ReLU(), torch.nn.MaxPool2d(2), torch.nn.Conv2d(32, 64, 3, padding=1), torch.nn.ReLU(), torch.nn.MaxPool2d(2), torch.nn.Flatten(), torch.nn.Linear(64*7*7, 128), torch.nn.ReLU(), torch.nn.Linear(128, 10), ) def forward(self, x): return self.conv(x) def training_step(self, batch, batch_idx): x, y = batch logits = self(x) loss = F.cross_entropy(logits, y) acc = (logits.argmax(1) == y).float().mean() self.log_dict({'train/loss': loss, 'train/acc': acc}, prog_bar=True) return loss def validation_step(self, batch, batch_idx): x, y = batch logits = self(x) loss = F.cross_entropy(logits, y) acc = (logits.argmax(1) == y).float().mean() self.log_dict({'val/loss': loss, 'val/acc': acc}, prog_bar=True) def configure_optimizers(self): opt = torch.optim.Adam(self.parameters(), lr=self.hparams.lr) sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=10) return [opt], [sched] def main(): transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]) train = DataLoader(datasets.MNIST('./data', train=True, download=True, transform=transform), batch_size=128, num_workers=4, shuffle=True) val = DataLoader(datasets.MNIST('./data', train=False, transform=transform), batch_size=512, num_workers=4) trainer = L.Trainer( max_epochs=5, accelerator='auto', # cuda / mps / cpu 自动选 devices='auto', # 多卡时自动用全部 precision='16-mixed', # 混合精度 callbacks=[ L.pytorch.callbacks.EarlyStopping(monitor='val/loss', patience=3), L.pytorch.callbacks.ModelCheckpoint(monitor='val/acc', mode='max', save_top_k=2), ], logger=L.pytorch.loggers.TensorBoardLogger('logs', name='mnist'), ) trainer.fit(MNIST(), train, val) if __name__ == '__main__': main() ``` 50 行包含训练 + 验证 + 多卡 + 混合精度 + 早停 + checkpoint + TensorBoard。 ## 效果 - 单 GPU vs 4 GPU DDP 只改 `devices=4`,无需写 init_process_group 之类 - 混合精度只改 `precision='16-mixed'`,速度 ~1.5-2x,显存减 30% - TensorBoard `tensorboard --logdir logs` 看 loss / metric 曲线 - ModelCheckpoint 自动保存 val/acc 最高的 2 个 checkpoint - 中断后 `Trainer(resume_from_checkpoint=...)` 恢复 - 切换 wandb logger 一行:`L.pytorch.loggers.WandbLogger(project=...)` ## 踩过的坑 1. **`self.log` 不能放在 `forward` 里**:只能在 step 方法里。否则 batch_size 信息不对。 2. **DDP 时 `validation_step` 写了 print** → 多进程刷屏。用 `self.log` 或 `rank_zero_only` 装饰。 3. **`num_workers > 0` 时 macOS / Windows 死锁**:DataLoader 用 fork 策略。Mac 上设 `num_workers=0` 或 `persistent_workers=True`。 4. **混合精度 NaN**:loss scale 不对。`precision='bf16-mixed'`(A100+) 比 `16-mixed` 更稳,不需要 grad scaler。 5. **保存的 checkpoint 太大**:默认保存 optimizer state。要 inference-only: `ModelCheckpoint(save_weights_only=True)`。
PyTorch 装错 CUDA 版本(CPU-only / 不匹配显卡驱动)是新手最常踩的 第一个坑。`uv` + PyTorch 官方提供的 index URL 能在一行命令里搞定。 ## 1. 看清你需要的版本 ```bash nvidia-smi # 关注右上角 "CUDA Version: 12.4" —— 这是 driver 支持的最高 CUDA 版本 # PyTorch 实际使用的 runtime 可以等于或更低,但不能更高 ``` PyTorch 现在主要 ship 三种轮子: - **cu124** (CUDA 12.4 runtime) - **cu121** (CUDA 12.1) - **cu118** (CUDA 11.8) — 老显卡 / 老驱动 - **cpu** (无 GPU) 你的驱动支持 12.x 就装 cu121 或 cu124;不确定就 cu121(兼容性最好)。 ## 2. 一条命令装 ```bash # uv 新项目 uv init mlproject --python 3.12 cd mlproject # 直接装最新稳定版 uv add torch torchvision torchaudio --index https://download.pytorch.org/whl/cu124 # 或指定版本 uv add 'torch==2.4.1' 'torchvision==0.19.1' \ --index https://download.pytorch.org/whl/cu124 ``` `--index` 让 uv 从 PyTorch 自己的 CDN 拉轮子(PyPI 上的轮子默认是 CPU 版的)。 ## 3. 校验 ```python # check.py import torch print(f'torch: {torch.__version__}') print(f'cuda built: {torch.version.cuda}') print(f'cuda avail: {torch.cuda.is_available()}') print(f'device count: {torch.cuda.device_count()}') if torch.cuda.is_available(): print(f'device name: {torch.cuda.get_device_name(0)}') print(f'capability: {torch.cuda.get_device_capability(0)}') # 做一次实际计算 x = torch.randn(1000, 1000) if torch.cuda.is_available(): x = x.cuda() print(f'matmul on GPU: {(x @ x).sum().item():.2f}') ``` ```bash uv run python check.py ``` 期望输出: ``` torch: 2.4.1+cu124 cuda built: 12.4 cuda avail: True device count: 1 device name: NVIDIA GeForce RTX 4090 capability: (8, 9) matmul on GPU: ... ``` `+cu124` 后缀确认装的是 CUDA wheel;`is_available()` False 说明 wheel 匹配了但驱动 / NVIDIA 库有问题。 ## 4. 锁版本 ```bash ls uv.lock # 已经写入 ``` CI / 同事直接: ```bash uv sync --frozen --index https://download.pytorch.org/whl/cu124 ``` 得到完全一致的环境。 ## 5. 配置 pyproject.toml 让 index 持久化 每次都加 `--index` 很烦。`pyproject.toml`: ```toml [project] name = "mlproject" version = "0.1.0" requires-python = ">=3.12" dependencies = [ "torch>=2.4", "torchvision>=0.19", ] [[tool.uv.index]] name = "pytorch-cu124" url = "https://download.pytorch.org/whl/cu124" explicit = true [tool.uv.sources] torch = { index = "pytorch-cu124" } torchvision = { index = "pytorch-cu124" } ``` 之后 `uv add ...` 不再需要 `--index`。 ## 6. Apple Silicon (M1/M2/M3) 用 MPS ```bash uv add torch torchvision torchaudio # Mac 上 PyPI 默认轮子已经带 MPS(Metal Performance Shaders)后端 ``` ```python device = 'mps' if torch.backends.mps.is_available() else 'cpu' x = torch.randn(1000, 1000, device=device) ``` MPS 不如 CUDA 快但比 CPU 快几倍,免费的不错。 ## 7. CPU-only(开发 / 测试 / CI) ```bash uv add torch --index https://download.pytorch.org/whl/cpu ``` CI 跑测试一般用 CPU wheel,节省 wheel 大小 + 启动时间。 ## 8. 多 GPU 看见 ```python torch.cuda.device_count() # 几张卡 # 显式选择 device = torch.device('cuda:0') model = model.to(device) ``` 多卡训练用 `torch.nn.DataParallel` 简单粗暴 / `DistributedDataParallel` (DDP)才是生产选择。 ## 9. PyTorch 升级 ```bash uv lock --upgrade-package torch --index https://download.pytorch.org/whl/cu124 ``` 不同 CUDA 版本的 PyTorch 是不同 package(torch+cu121 vs torch+cu124), 直接 lock 升级最稳。 ## 10. Docker 镜像 ```dockerfile FROM nvidia/cuda:12.4.1-cudnn-devel-ubuntu22.04 # uv 二进制 COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv # Python 3.12 RUN apt update && apt install -y python3.12 python3-pip git WORKDIR /app COPY pyproject.toml uv.lock ./ RUN uv sync --frozen --index https://download.pytorch.org/whl/cu124 COPY . . CMD ["uv", "run", "python", "train.py"] ``` 镜像底层用 NVIDIA 官方 `cuda:*-devel`,宿主装 `nvidia-container-toolkit`: ```bash docker run --gpus all -v $(pwd):/app mlproject ``` ## 踩过的坑 - 装了 CPU wheel 但代码里 `.cuda()`:报 "no CUDA-capable device"。 pip / uv 默认从 PyPI 拉 = CPU wheel。必须 `--index pytorch`。 - 驱动 CUDA 12.0 但装了 cu124:装得上但 `is_available()=False` 或运行 时 segfault。升驱动或者降 wheel。 - 多版本 Python 共存时 wheel 不匹配(cp310 wheel 装到 cp312 venv): uv 会自动挑对的,但手动 pip 时容易选错。 - 想用 `torch.compile()` 但 wheel 没带:cu118 上没 compile 支持。 cu121+ / cu124 才有。 - jupyter notebook 启动时不见 CUDA:notebook 是另一个进程, 确认是用 `uv run jupyter` 启动(继承了 venv 的 path)。
MNIST 手写数字识别是 ML 的 "hello world"。下面用 PyTorch 写一个从 数据加载到训练 / 评估的完整 pipeline。60 行可读代码,能跑到 99%+ 准确率。 ## 完整代码 ```python import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import DataLoader from torchvision import datasets, transforms device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f'using {device}') # 1. 数据 transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)), ]) train_set = datasets.MNIST('./data', train=True, download=True, transform=transform) test_set = datasets.MNIST('./data', train=False, download=True, transform=transform) train_loader = DataLoader(train_set, batch_size=128, shuffle=True, num_workers=4) test_loader = DataLoader(test_set, batch_size=512, shuffle=False, num_workers=4) # 2. 模型 class Net(nn.Module): def __init__(self): super().__init__() self.conv1 = nn.Conv2d(1, 32, 3, padding=1) self.conv2 = nn.Conv2d(32, 64, 3, padding=1) self.fc1 = nn.Linear(64 * 7 * 7, 128) self.fc2 = nn.Linear(128, 10) self.drop = nn.Dropout(0.25) def forward(self, x): x = F.relu(F.max_pool2d(self.conv1(x), 2)) # 28x28 -> 14x14 x = F.relu(F.max_pool2d(self.conv2(x), 2)) # 14x14 -> 7x7 x = x.flatten(1) x = F.relu(self.fc1(x)) x = self.drop(x) return self.fc2(x) model = Net().to(device) opt = torch.optim.Adam(model.parameters(), lr=1e-3) loss_fn = nn.CrossEntropyLoss() # 3. 训练 def train_epoch(epoch): model.train() total, correct, total_loss = 0, 0, 0.0 for x, y in train_loader: x, y = x.to(device), y.to(device) opt.zero_grad() logits = model(x) loss = loss_fn(logits, y) loss.backward() opt.step() total_loss += loss.item() * x.size(0) correct += (logits.argmax(1) == y).sum().item() total += x.size(0) print(f'epoch {epoch} train loss {total_loss/total:.4f} acc {correct/total:.4f}') # 4. 评估 def evaluate(): model.eval() total, correct = 0, 0 with torch.no_grad(): for x, y in test_loader: x, y = x.to(device), y.to(device) correct += (model(x).argmax(1) == y).sum().item() total += x.size(0) print(f'test acc {correct/total:.4f}') for ep in range(5): train_epoch(ep) evaluate() torch.save(model.state_dict(), 'mnist.pt') ``` 5 个 epoch 后测试集准确率应 ≥ 99%。GPU 上每 epoch < 10 秒; CPU 大约 30-60 秒。 ## 解释几个关键点 ### `Normalize((0.1307,), (0.3081,))` MNIST 训练集统计出的均值和标准差。归一化让输入分布更均匀, 训练更稳。 ### `num_workers=4` DataLoader 用 4 个子进程并行 prefetch 数据。GPU 训练时 IO 是瓶颈, 设大点(4-8)能让 GPU 持续吃满。CPU 训练时设 0 反而更快(避免进程切换)。 ### `opt.zero_grad()` PyTorch 默认 gradient 累加。每次 backward 前清零。 忘了清零 → loss 一直涨。 ### `with torch.no_grad():` 评估时不需要梯度,省内存 + 快。等价的还有装饰器 `@torch.no_grad()` 或 `torch.inference_mode()`(更激进,PyTorch 1.9+)。 ### `.to(device)` 每个 batch 显式搬到 GPU;model 一次性 `.to(device)`。 ## 加几个常见优化 ### 1. 学习率调度 ```python sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=5) # 每个 epoch 结束后调用 sched.step() ``` cosine annealing 在前期保持高 lr 探索,后期降低做精调。 ### 2. 混合精度(AMP) ```python scaler = torch.cuda.amp.GradScaler() for x, y in train_loader: x, y = x.to(device), y.to(device) opt.zero_grad() with torch.cuda.amp.autocast(): logits = model(x) loss = loss_fn(logits, y) scaler.scale(loss).backward() scaler.step(opt) scaler.update() ``` 混合精度让大部分计算用 fp16,需要精度的(loss / 权重更新)用 fp32。 RTX 30 系以上 ~2x 加速,显存降一半。MNIST 这种小模型差别不大, 但对 transformer / 大网络明显。 ### 3. 用 `torch.compile()`(PyTorch 2.0+) ```python model = torch.compile(model) ``` 编译期内联 + 算子融合,训练 / 推理 1.3-2x 加速。第一次 batch 慢 (编译),之后变快。 ### 4. 早停 ```python best_acc = 0 patience = 3 no_improve = 0 for ep in range(50): train_epoch(ep) acc = evaluate() if acc > best_acc: best_acc = acc no_improve = 0 torch.save(model.state_dict(), 'best.pt') else: no_improve += 1 if no_improve >= patience: print('early stop') break ``` ## 推理 ```python model = Net().to(device) model.load_state_dict(torch.load('mnist.pt')) model.eval() # 单张图片 from PIL import Image img = Image.open('test.png').convert('L').resize((28, 28)) x = transform(img).unsqueeze(0).to(device) with torch.no_grad(): pred = model(x).argmax(1).item() print(f'predicted: {pred}') ``` ## 可视化训练曲线 ```python import matplotlib.pyplot as plt losses = [] accs = [] # ... 在 train_epoch / evaluate 里 append fig, ax = plt.subplots(1, 2, figsize=(10, 4)) ax[0].plot(losses); ax[0].set_title('train loss') ax[1].plot(accs); ax[1].set_title('test acc') plt.savefig('curves.png') ``` 或者用 `tensorboard` / `wandb`(参见专题文章)。 ## 踩过的坑 - 第一次跑会下载 MNIST 数据(~10MB),需要网络。`download=True` 给 你这个能力。 - forward 里 `x.view(-1, 784)` vs `x.flatten(1)`:view 要求内存连续, flatten 不要求。前者偶尔报 "view size is not compatible..." 错。 - 不调用 `model.train()` / `model.eval()` → Dropout / BatchNorm 行为错。 eval 期间 Dropout 应该关(输出固定);BatchNorm 应该用 running stats。 - `torch.cuda.empty_cache()` 是给 PyTorch 内部 caching allocator 用的, 通常没必要手动调;调了也不会真的还给 OS。担心 OOM 应该减小 batch。
## 起因 存了 200 小时的会议录音 + 课程视频,想搜里面"我哪节课讲过 X 主题"。 传统方案:手工转录(贵 / 慢)、剪映等付费云转录(隐私 + 钱)。 Whisper 是 OpenAI 开源的 ASR 模型,本地跑能转 90+ 种语言, 中文识别质量直接打过国内大多数云服务。 ## 解决方案 最快的方式是 `whisper.cpp`(C++ 重新实现,CPU/GPU 都很快,无需 PyTorch)。 ### 装 ```bash git clone https://github.com/ggerganov/whisper.cpp cd whisper.cpp # Apple Silicon 用 Metal;NVIDIA 用 CUDA;纯 CPU 也可以 make -j # 下模型(large-v3 中文最准;medium 速度+准确度平衡;base 最快但中文一般) bash ./models/download-ggml-model.sh large-v3 # 1.5 GB 左右 ``` ### 转录单个文件 ```bash # 输入要是 16kHz mono WAV ffmpeg -i lecture.mp4 -vn -ar 16000 -ac 1 -c:a pcm_s16le tmp.wav ./build/bin/whisper-cli -m models/ggml-large-v3.bin \ -l zh -f tmp.wav -osrt -otxt # 生成 tmp.wav.srt + tmp.wav.txt ``` 参数解释: - `-l zh`:源语言中文(不指定的话自动检测) - `-osrt`:输出 SubRip 字幕 - `-otxt`:输出纯文本 ### 批量脚本 ```bash #!/usr/bin/env bash for f in *.mp4; do base="${f%.mp4}" [ -f "$base.srt" ] && { echo "skip $f (already done)"; continue; } ffmpeg -y -i "$f" -vn -ar 16000 -ac 1 -c:a pcm_s16le /tmp/_audio.wav ./whisper-cli -m models/ggml-large-v3.bin -l zh -f /tmp/_audio.wav \ -osrt -of "$base" done ``` `[ -f "$base.srt" ] && continue` 做断点续传:跑了一半中断重启不重做。 ### Python 版(openai-whisper) 如果要嵌入到 pipeline: ```bash uv add openai-whisper ``` ```python import whisper model = whisper.load_model('large-v3') # 第一次自动下载 result = model.transcribe('lecture.mp4', language='zh', verbose=True) print(result['text']) # 带时间戳 for seg in result['segments']: print(f"[{seg['start']:.1f}s] {seg['text']}") ``` GPU 大模型一篇 1 小时音频约 5-10 分钟。CPU 慢 5-10 倍。 ## 性能 / 选型 我笔记本 M1 Pro + whisper.cpp + large-v3: | 模型 | 中文 WER | 速度 | 显存 | |---|---|---|---| | tiny | ~25% | 30x realtime | 1 GB | | base | ~18% | 16x | 1 GB | | small | ~13% | 6x | 2 GB | | medium | ~9% | 2x | 5 GB | | **large-v3** | ~6% | 1x | 10 GB | "realtime" = 处理 1 分钟音频要多少秒。`large-v3` 比 medium 准确率明显 但速度慢 2 倍,中文场景值得。 ## 效果 - 200 小时录音转完一晚上跑完 - `.srt` 字幕直接拖进 VLC / 剪映对应视频 - 搜索 `grep -l "知识点关键词" *.srt` 一秒定位"哪一节哪一段说过" - 转录质量手工抽查:科技 / 普通话清晰场景准确率 > 95%,方言 / 嘈杂 环境降到 80% 左右 ## 踩过的坑 1. **GPU 装错版本**:whisper-py 默认装 CPU torch。要手动装 CUDA 版: `uv add torch --index https://download.pytorch.org/whl/cu124`。 2. **音频不是 16kHz mono**:whisper.cpp 严格要求格式;自动失败但 error message 不明显。永远先 `ffmpeg -ar 16000 -ac 1`。 3. **长视频中间 hallucination**:超过 30 秒的安静段落 whisper 会"幻想" 一些重复的"谢谢观看"之类的话。`--no-speech-threshold 0.6` 加严格点 过滤,或者 `--temperature 0` 减少创造性。 4. **断句不准**:whisper 自动给句号但有时一长串没标点。后处理用 `ja-tokenize` 或者再过一遍 LLM 加标点: ```python client.chat.completions.create(messages=[{ 'role': 'user', 'content': f'给下面文本加合理标点,不要改字:\n{raw_text}' }]) ``` 5. **GPU 显存不够**:large-v3 需要 10 GB+。降级到 medium(5 GB)+ `--word-timestamps` 也能拿很好结果。
## 起因 我们的推荐模型在训练时算"用户过去 30 天点击次数"用 pandas, 线上推理时用 Redis lookup。两套代码必然漂移:一次 pandas 用错时区 窗口对不上 → 训练效果在线上拉胯。 "训练 / 在线特征不一致" 是 ML 生产最常见的痛点。 Feast 是开源 feature store,把特征定义统一在一处,训练 / 在线都从 同一份代码读。 ## 解决方案 ### 装 ```bash uv add feast feast version ``` ### 项目初始化 ```bash feast init my_store cd my_store/feature_repo ``` 生成示例 `example_repo.py`。 ### 定义特征 ```python # feature_repo/user_features.py from datetime import timedelta from feast import Entity, Feature, FeatureView, FileSource, ValueType from feast.types import Float32, Int64 user = Entity(name='user_id', value_type=ValueType.INT64) # 数据源:parquet 文件(生产用 BigQuery / Snowflake / S3 等) user_clicks_source = FileSource( path='data/user_clicks.parquet', timestamp_field='event_time', ) user_clicks_30d = FeatureView( name='user_clicks_30d', entities=[user], ttl=timedelta(days=30), schema=[ Feature(name='click_count', dtype=Int64), Feature(name='avg_dwell_seconds', dtype=Float32), Feature(name='top_category', dtype='string'), ], source=user_clicks_source, online=True, ) ``` ```bash feast apply # 注册 entity / feature view 元数据 ``` ### 训练时拉历史特征 ```python from feast import FeatureStore import pandas as pd store = FeatureStore(repo_path='feature_repo') # 训练样本(user_id + label + event_time) training_df = pd.DataFrame({ 'user_id': [1, 2, 3], 'event_time': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']), 'label': [1, 0, 1], }) # point-in-time 拼接特征(保证不用未来数据) training = store.get_historical_features( entity_df=training_df, features=[ 'user_clicks_30d:click_count', 'user_clicks_30d:avg_dwell_seconds', 'user_clicks_30d:top_category', ], ).to_df() # train X = training[['click_count', 'avg_dwell_seconds']] y = training['label'] model.fit(X, y) ``` Feast 自动按 entity_df 的 event_time 去 source 找当时点的特征值, **避免 data leakage**(不会用未来的 click_count 训练历史样本)。 ### 在线 materialize(把最近特征推到 Redis) ```bash feast materialize-incremental $(date -u +%Y-%m-%dT%H:%M:%S) # 把 last_materialization → now 的特征写到 online store (Redis) ``` 定时跑(每小时 / 每天): ```bash # /etc/systemd/system/feast-materialize.timer [Timer] OnCalendar=*-*-* */1:00:00 ``` ### 在线推理时读特征 ```python features = store.get_online_features( features=[ 'user_clicks_30d:click_count', 'user_clicks_30d:avg_dwell_seconds', ], entity_rows=[{'user_id': 42}], ).to_dict() x = [[features['click_count'][0], features['avg_dwell_seconds'][0]]] prediction = model.predict(x) ``` **同一份 FeatureView 定义** 决定了训练和在线都拿"click_count"的相同 语义。漂移消失。 ### Online store backend Feast 支持多种 online store: ```yaml # feature_repo/feature_store.yaml online_store: type: redis connection_string: 'localhost:6379' # 或: online_store: type: dynamodb region: us-east-1 table_name: feast_online # 或: online_store: type: sqlite # dev 用 ``` ### 数据源支持 ```yaml offline_store: type: file # parquet # 或 bigquery / snowflake / redshift / spark / trino ``` 不同公司栈用不同 source,Feast 抽象统一。 ## 实战流程 ``` ┌─────────────────┐ │ FeatureView 定义 │ │ (Python 代码) │ └────────┬────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌────────────┐ ┌──────────┐ │ 训练 pipeline │ │ materialize │ │ 在线推理 │ │ get_historical│ │ → Redis │ │ get_online│ └──────────┘ └────────────┘ └──────────┘ ``` 特征定义是 single source of truth。 ## 何时该上 feature store - 多个模型共用相同特征(用户画像 / 商品画像) - 训练 / 在线漂移导致过线下效果不在线上复现 - 特征工程团队跟 ML 团队分工(特征 owner 模型) - 需要 point-in-time 一致性(防 data leakage) 何时**不**该上(杀鸡用牛刀): - 单模型 + 简单特征(计算一下 mean / sum 直接做) - 团队 < 5 人 ML 工程师 - 没有上线 / 用 batch 预测 ## 替代方案对比 | | Feast (OSS) | Hopsworks (商业) | Tecton (商业) | 自己拼 (Redis + DAG) | |---|---|---|---|---| | 复杂度 | 中 | 中 | 高 | 低-高 | | 价格 | 免费 | 付费 | 付费 | 0 | | 在线 store | 多选 | HBase 主 | 多 | 自选 | | 离线 store | 多选 | 自家 + S3 | 多 | 自选 | | 实时特征 | 较弱 | 强 | 强 | 看实现 | 中小团队 Feast;大企业 Tecton / Hopsworks(带运维支持); 极简团队拼 Redis + cron 也能 work。 ## 与训练 pipeline 集成 ```python # Kubeflow / Airflow / Dagster pipeline: @task def fetch_features(entity_df): store = FeatureStore(...) return store.get_historical_features(entity_df, ...).to_df() @task def train(df): model.fit(df[features], df['label']) return model @task def deploy(model): save_to_s3(model) feast materialize-incremental ... ``` 特征 fetch 是 pipeline 第一步,后续 train / deploy 都用 feast。 ## 效果 我们一个 churn 预测模型用 Feast 后: - 训练 / 在线特征一致性:100%(之前 ~95%,漂移 5% 是 bug) - 新模型上线时间从 2 周 → 3 天(特征不用重写,复用现成 view) - 多模型共享 user_features view:避免重复算 - "为什么这个用户预测分数低" 类调试:直接 feast 查在线特征 + 对照训练 分布 ## 踩过的坑 1. **`event_time` 时区**:feast 假设所有时间是 UTC。本地时间传进去 会算偏差。always to_datetime(...).tz_localize('UTC')。 2. **materialize 漏数据**:`materialize-incremental` 从 last_materialization 开始。如果之前没 materialize 过,从 ttl 之前开始,可能遗漏。 首次用 `materialize <start> <end>` 全量。 3. **online store 一致性**:Redis 单机时 materialize 期间挂掉 → 部分 特征写进去 + 部分没写 → 在线读到旧 + 新混合值。Redis cluster 有助稳定。 4. **特征 schema 变了**:加新字段简单(FeatureView 重新 apply); 删 / 改字段类型麻烦,需要重新 materialize 整 entity 群。 5. **point-in-time join 慢**:百万级 entity_df 跨多个 feature view join 几分钟到几十分钟。生产 source 用 BigQuery / Snowflake 让 join 推到 DB 端比较快。
pandas 是 2010 年代标准工具但有几个本质局限: - 单线程(GIL) - eager evaluation,复杂 pipeline 中间结果都实例化 - API 历史包袱(10 种 index、SettingWithCopyWarning、`inplace=True`) polars 是 Rust 写的列存数据框,多线程 + lazy 模式 + 干净的链式 API。 1GB+ 的 CSV 处理快 5-30 倍。 ## 安装 ```bash uv add polars pyarrow ``` `pyarrow` 不是必需但读 Parquet / Feather 时性能好。 ## 30 秒上手 ```python import polars as pl df = pl.read_csv('sales.csv') print(df.head()) print(df.schema) # 过滤 + 聚合 + 排序 result = ( df.filter(pl.col('country') == 'CN') .group_by('product') .agg( pl.col('amount').sum().alias('total'), pl.col('amount').count().alias('orders'), ) .sort('total', descending=True) .head(10) ) print(result) ``` 注意: - `pl.col('x')` 是表达式,可以组合(`pl.col('x') * 2`) - `group_by().agg()` 链式 - `.alias()` 重命名 - 不像 pandas 那样有索引;纯列数据 ## lazy 模式 ```python # 用 scan_csv 而不是 read_csv 进 lazy 模式 q = ( pl.scan_csv('sales.csv') .filter(pl.col('country') == 'CN') .group_by('product') .agg(pl.col('amount').sum()) .sort('amount', descending=True) ) # 这一步还没读文件!只构建了 query plan print(q.explain()) # 看到优化后的 plan(如 filter 下推) # 真正执行 + 取结果 result = q.collect() ``` lazy 让 polars 做查询优化:predicate pushdown、projection pushdown、 predicate fusion 等。处理 10GB CSV 时 collect 之前都不占内存。 ## Parquet:列存的好处 ```python # 写 Parquet df.write_parquet('sales.parquet', compression='zstd') # 读 df = pl.read_parquet('sales.parquet') # lazy + projection q = (pl.scan_parquet('sales.parquet') .select(['country', 'amount']) # 只读这两列 .filter(pl.col('country') == 'US') .group_by('country').agg(pl.col('amount').sum()) .collect()) ``` Parquet 列存意味着 `.select(['country', 'amount'])` 完全不读其它列。 10GB 表只读 2 列可能只 IO 1GB。 ## 性能对比 |任务(1GB CSV,5000 万行)| pandas | polars eager | polars lazy | |---|---|---|---| | 读文件 | 18s | 6s | 0.5s (scan) | | filter + groupby + agg | 25s | 4s | 3s | | 内存峰值 | 8GB | 3GB | 1.5GB | 数字会因数据 / 机器而异,但量级对。 ## 常用对照表 | pandas | polars | |---|---| | `df['col']` | `df['col']` 或 `df.get_column('col')` | | `df[df['x'] > 0]` | `df.filter(pl.col('x') > 0)` | | `df.groupby('a')['b'].sum()` | `df.group_by('a').agg(pl.col('b').sum())` | | `df.merge(other, on='id')` | `df.join(other, on='id')` | | `df['col'].str.lower()` | `df.with_columns(pl.col('col').str.to_lowercase())` | | `df.dropna()` | `df.drop_nulls()` | | `df.fillna(0)` | `df.fill_null(0)` | | `pd.concat([df1, df2])` | `pl.concat([df1, df2])` | | `df.pivot_table` | `df.pivot()` | ## 窗口函数 ```python # 按 user 排序后的 cumsum df = df.with_columns( pl.col('amount').cum_sum().over('user').alias('cum_amount') ) # 计算每行相对所在组的平均 df = df.with_columns( (pl.col('amount') / pl.col('amount').mean().over('country')).alias('rel') ) ``` `.over(...)` 是 partition by 的简洁写法。 ## 与 pandas 互转 ```python # polars → pandas pdf = df.to_pandas() # pandas → polars df2 = pl.from_pandas(pdf) ``` 适合渐进迁移:现有 pandas 流程改一段为 polars 跑性能瓶颈。 ## 与 Arrow / DuckDB 联用 polars 内部就是 Arrow 格式,零拷贝传给 DuckDB / Arrow Compute: ```python import duckdb df = pl.read_parquet('big.parquet') # 直接把 polars df 当 DuckDB 视图 result = duckdb.sql("SELECT country, SUM(amount) FROM df GROUP BY 1").pl() ``` DuckDB 跑 SQL,polars 拿结果,全程 Arrow buffer。 ## 写入数据库 ```python import polars as pl df = pl.read_csv('users.csv') df.write_database( table_name='users', connection='postgresql://localhost/mydb', if_table_exists='replace', ) ``` 底层用 ConnectorX / SQLAlchemy 写。 ## 何时仍用 pandas - ML 库强制:scikit-learn 接受 numpy / pandas,polars 要 `.to_numpy()` - 小数据 + 现有代码:pandas 足够时迁移没收益 - 复杂的 multi-level index 需求 不过 polars 团队明确目标是覆盖 pandas 90% 的用法,差距越来越小。 ## 踩过的坑 - 没有 index 的概念:以前 pandas `df.loc['2024-01-01']` 这种行为不存在。 polars 都用 column filter。 - `with_columns` 返回新 DataFrame(immutable),忘了赋值回去 `df = ...`: ```python df.with_columns(pl.col('x') * 2) # 没改 df! df = df.with_columns(pl.col('x') * 2) # 对 ``` - group_by 的 Python 列表语法 → 改成表达式: ```python # pandas df.groupby(['a', 'b']).agg({'c': 'sum'}) # polars df.group_by(['a', 'b']).agg(pl.col('c').sum()) ``` - 日期解析自动推断不总是对:`pl.read_csv(..., try_parse_dates=True)` 或者显式 `pl.col('date').str.to_datetime('%Y-%m-%d')`。
## 起因 公司数据不能上 wandb / Comet 这种 cloud SaaS。要本地自托管的实验 跟踪 + 模型版本控制 + 一键部署。 MLflow 是 Databricks 出的开源套件,4 个组件覆盖 ML lifecycle: - **Tracking**:记录实验(params / metrics / artifacts) - **Projects**:可复现的 ML 包格式 - **Models**:模型版本 + 多 framework 统一接口 - **Registry**:模型生命周期(staging / production / archived) ## 装 + 启服务 ```bash uv add mlflow # 启动 tracking server(默认 SQLite 后端 + 本地文件 artifact) mlflow server \ --host 0.0.0.0 --port 5000 \ --backend-store-uri sqlite:///mlflow.db \ --default-artifact-root ./mlruns ``` 或更"生产"配置(PostgreSQL + S3): ```bash mlflow server \ --host 0.0.0.0 --port 5000 \ --backend-store-uri postgresql://user:pass@db/mlflow \ --default-artifact-root s3://my-bucket/mlruns \ --workers 4 ``` systemd unit + nginx 套一下就是企业级服务。 ## Tracking:训练时记录 ```python import mlflow import mlflow.pytorch mlflow.set_tracking_uri('http://localhost:5000') mlflow.set_experiment('churn-prediction') with mlflow.start_run(run_name='lgbm-baseline'): mlflow.log_params({ 'model': 'lgbm', 'lr': 0.05, 'n_estimators': 200, 'max_depth': 7, }) mlflow.set_tag('dataset_version', 'v2024-05-01') # 训练 model = train(...) eval_metrics = evaluate(model, X_val, y_val) mlflow.log_metrics(eval_metrics) # {'auc': 0.84, 'precision': 0.71, 'recall': 0.66} # 多 step:每 epoch log for epoch in range(20): train_one_epoch() mlflow.log_metric('train/loss', loss, step=epoch) mlflow.log_metric('val/auc', auc, step=epoch) # 保存模型(mlflow 自动 capture 依赖 env) mlflow.lightgbm.log_model(model, 'model') # 任意 artifact mlflow.log_artifact('confusion_matrix.png') mlflow.log_artifact('feature_importance.csv') ``` 跑完 → MLflow UI 看到 run,有 params / metrics 表 + 曲线 + artifacts 下载。 ## 实验对比 UI 选多个 run → Compare → 表格 + parallel coordinates + scatter plot。 一眼看出"哪几个超参组合 auc 高"。 ## Model Registry ```python # 训练完成后注册到 registry mlflow.lightgbm.log_model( lgb_model=model, artifact_path='model', registered_model_name='ChurnPredictor', ) ``` UI 里看到 `ChurnPredictor v1`。 版本管理 + 状态机: ```python client = mlflow.MlflowClient() # 升级到 staging client.transition_model_version_stage( name='ChurnPredictor', version=1, stage='Staging', ) # 验证后升级到 production client.transition_model_version_stage( name='ChurnPredictor', version=1, stage='Production', archive_existing_versions=True, # 老 production 自动 archive ) ``` 线上代码总是拿 production 版本: ```python model = mlflow.pyfunc.load_model( model_uri='models:/ChurnPredictor/Production' ) prediction = model.predict(X) ``` 回滚?把老版本 transition 回 Production 即可。 ## Models:统一 framework 接口 ```python # sklearn mlflow.sklearn.log_model(model, 'm') # PyTorch mlflow.pytorch.log_model(model, 'm') # Transformers mlflow.transformers.log_model(pipeline, 'm') # 自定义(Pyfunc) class MyModel(mlflow.pyfunc.PythonModel): def load_context(self, context): self.artifact = load(context.artifacts['my_file']) def predict(self, context, input_df): return my_inference(input_df, self.artifact) mlflow.pyfunc.log_model('m', python_model=MyModel(), artifacts={'my_file': 'data.pkl'}) ``` 加载时**不需要知道 framework**: ```python model = mlflow.pyfunc.load_model('models:/X/Production') model.predict(df) ``` 业务代码不再 hardcode "import torch / sklearn / xgboost"。 ## 部署:一行 serve ```bash mlflow models serve -m models:/ChurnPredictor/Production -p 5001 # 起一个 HTTP API on :5001 ``` ```bash curl -X POST http://localhost:5001/invocations \ -H 'Content-Type: application/json' \ -d '{"dataframe_records": [{"age": 30, "balance": 1000}]}' # {"predictions": [0.72]} ``` 或 build Docker image: ```bash mlflow models build-docker -m models:/X/Production -n my-model:latest docker run -p 5001:8080 my-model:latest ``` 或部署到 Sagemaker / Azure ML / K8s: ```bash mlflow sagemaker deploy ... mlflow azureml deploy ... ``` framework 抽象一直延伸到部署。 ## Autologging ```python mlflow.sklearn.autolog() # 之后所有 sklearn fit() 自动 log model + params + metrics model = RandomForestClassifier(n_estimators=100) model.fit(X, y) # 自动 log: n_estimators / max_depth / mean_score / training_time / ... ``` 支持 sklearn / PyTorch / Lightning / TensorFlow / XGBoost / LightGBM。 适合"快速 baseline 跑 N 个 algorithm 选最好的"。 ## Projects:可复现 ML 包 `MLproject` 文件: ```yaml name: churn-prediction python_env: python_env.yaml entry_points: main: parameters: data_path: {type: string, default: 'data/train.parquet'} lr: {type: float, default: 0.05} n_estimators: {type: int, default: 200} command: 'python train.py --data {data_path} --lr {lr} --n {n_estimators}' ``` 跑: ```bash mlflow run . -P lr=0.1 -P n_estimators=500 # 或 git URL: mlflow run https://github.com/me/churn-prediction.git -P lr=0.1 ``` 自动建 virtualenv + 装依赖 + 跑训练。同事任何人能复现。 ## 与替代品对比 | | MLflow | Weights & Biases | Neptune | DVC | |---|---|---|---|---| | 自托管 | ✅ 简单 | 企业版 | ✅ | ✅ | | 实验跟踪 | ✅ | ✅ 最强 | ✅ | 较弱 | | 模型注册 | ✅ | ✅ | ✅ | ❌ | | 部署 | ✅ 内置 | ❌ | ❌ | ❌ | | Pipeline | ❌(外部 Airflow) | weave | ❌ | ✅ | | 价格 | 免费 | 付费(个人免费) | 付费 | 免费 | 数据合规要本地的 → MLflow。 体验最好 + 不担心数据外发 → wandb。 ## 我们的实战配置 `docker-compose.yml`: ```yaml services: mlflow: image: ghcr.io/mlflow/mlflow:v2.16.2 ports: ["5000:5000"] environment: MLFLOW_S3_ENDPOINT_URL: http://minio:9000 AWS_ACCESS_KEY_ID: minio AWS_SECRET_ACCESS_KEY: ... command: > mlflow server --host 0.0.0.0 --port 5000 --backend-store-uri postgresql://mlflow:pw@pg/mlflow --default-artifact-root s3://mlflow-artifacts/ pg: image: postgres:16 environment: POSTGRES_DB: mlflow POSTGRES_USER: mlflow POSTGRES_PASSWORD: pw minio: image: minio/minio command: server /data --console-address ":9001" ports: ["9000:9000", "9001:9001"] environment: MINIO_ROOT_USER: minio MINIO_ROOT_PASSWORD: ... ``` 三个 service:MLflow + Postgres (metadata) + Minio (S3 兼容 artifact)。 全本地,数据不出公司。 ## 效果 - 30+ 个实验跑下来"哪个 lr × dataset 最优"清晰 - 上线模型有版本号 + git commit 关联 + 训练数据版本 traceable - 回滚到上版本:UI 点 button + 30 秒 redeploy - 数据科学家 / ML 工程师 / 部署运维各看 UI 不同部分 ## 踩过的坑 1. **artifact 上传慢**:local file backend OK;S3 时大模型几 GB 上传 几分钟。`mlflow.log_artifact` 阻塞 train script。后台 thread 异步。 2. **autolog 误 log 整张 dataframe**:默认 sklearn autolog 会 log X_train shape + 部分 sample。私密数据可能进 MLflow → 安全隐患。 `mlflow.sklearn.autolog(log_input_examples=False)`。 3. **Model registry 没强制 stage gating**:任何人能把 dev model 推 Production。生产建 ACL + reviewer 流程。 4. **跨 Python 版本 model 加载失败**:在 3.10 训练的 sklearn model 在 3.12 load 时 unpickle 错。MLflow log model 时 capture 了 python_env.yaml,确认部署机器装对版本。 5. **UI 慢**:实验数量 > 几万后 list 慢。定期 archive 老 experiment 到 `s3://archived/`。
## 起因 要 fine-tune 一个 7B 模型,A100 40GB 显存,跑起来直接 CUDA OOM。 "换大卡"是简单解决但贵。理解几个技术能在同样显存里训更大模型 / 更大 batch。 ## 各项的显存占用拆解 训练时显存 ≈ 模型权重 + 梯度 + optimizer state + activations + 临时 buffer。以 7B FP16 模型 + AdamW 为例: | 项 | 公式 | 7B 模型 | |---|---|---| | 权重 | params × 2 bytes (fp16) | 14 GB | | 梯度 | params × 2 bytes | 14 GB | | optimizer state(AdamW) | params × 8 bytes (FP32 m+v) | 56 GB | | activations | 依 batch / seq | 几 GB-几十 GB | **总 = 84 GB + activations**。一张 A100 40GB 远不够。 ## 解决方案逐个上 ### 1. 混合精度(FP16/BF16)— 必选 ```python # pure PyTorch scaler = torch.cuda.amp.GradScaler() for batch in loader: with torch.cuda.amp.autocast(dtype=torch.bfloat16): loss = model(batch).loss scaler.scale(loss).backward() scaler.step(opt); scaler.update() ``` 权重 / 梯度从 FP32 4 bytes → FP16/BF16 2 bytes,对半省。 A100+ 推荐 BF16(无需 grad scaler,数值更稳)。 ### 2. Gradient Checkpointing — 用计算换显存 normal 前向把所有 activations 都存着(反向用)。checkpointing 只保存 某几层,其它 layer 反向时重新算前向: ```python model.gradient_checkpointing_enable() # transformers 一行 ``` 省 activations 50-80%,代价是训练慢 ~20-30%。LLM fine-tune 默认开。 ### 3. Gradient Accumulation — 模拟更大 batch 显存装不下 batch=32?跑 batch=8 累 4 次 = batch=32 等效: ```python accum_steps = 4 for i, batch in enumerate(loader): loss = model(batch).loss / accum_steps loss.backward() if (i + 1) % accum_steps == 0: opt.step(); opt.zero_grad() ``` 显存等同 batch=8,效果近似 batch=32。 ### 4. CPU offload(DeepSpeed / accelerate) 把 optimizer state 卸到 CPU 内存,反正它不参与每步前反向: ```python from accelerate import Accelerator acc = Accelerator( mixed_precision='bf16', gradient_accumulation_steps=4, ) model, opt, loader = acc.prepare(model, opt, loader) ``` 或用 DeepSpeed ZeRO-2 / ZeRO-3: ```python # accelerate config 选 DeepSpeed # 跑: accelerate launch --num_processes=1 \ --mixed_precision=bf16 \ --deepspeed_stage=2 \ train.py ``` ZeRO-2 把 optimizer state 分片(多卡时)/ offload 到 CPU(单卡时), 省 56 GB → 0 GB(cpu 接管)。代价:每 step 数据传输延迟。 ### 5. LoRA / QLoRA — 只训一小部分参数 ```python from peft import LoraConfig, get_peft_model config = LoraConfig( r=8, lora_alpha=16, lora_dropout=0.05, target_modules=['q_proj', 'v_proj'], task_type='CAUSAL_LM', ) model = get_peft_model(base_model, config) model.print_trainable_parameters() # trainable: 4.2M / 7B = 0.06% ``` 只有 LoRA 的小矩阵需要梯度 + optimizer state。7B 模型变成"7B 冻结 + 4M 可训",显存暴跌。 QLoRA 进一步把 base model 也量化到 4-bit: ```python from transformers import BitsAndBytesConfig bnb = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type='nf4', bnb_4bit_compute_dtype=torch.bfloat16, ) model = AutoModelForCausalLM.from_pretrained(name, quantization_config=bnb) model = get_peft_model(model, config) ``` 7B QLoRA 单 A100 40GB 训 batch=4 可以跑得动。 ## 效果(我的 case:A100 40GB fine-tune Qwen2 7B) | 配置 | 显存 | 训练速度 | 效果损失 | |---|---|---|---| | FP32 full ft | OOM | — | — | | BF16 full ft | OOM (~80 GB) | — | — | | BF16 + grad checkpoint | OOM (~50 GB) | — | — | | BF16 + checkpoint + ZeRO-2 cpu offload | 32 GB | 1x | 0 | | BF16 + LoRA | 24 GB | 1.3x | 微小 | | BF16 + QLoRA | 14 GB | 1.2x | 1-2% | 最终 QLoRA 跑通 fine-tune,loss 收敛、benchmark 比 base 提升 8%。 ## 调试技巧 ```python # 看每层显存 print(torch.cuda.memory_summary()) # 最大峰值 print(f'peak: {torch.cuda.max_memory_allocated() / 1e9:.2f} GB') torch.cuda.reset_peak_memory_stats() # 实时监控 nvidia-smi -l 1 # 或更细: nvtop ``` 跑 OOM 时立刻 `nvidia-smi` 看到底是 model load 时挂了还是 forward 时挂了, 对症下药。 ## 踩过的坑 1. **`del var` 不立刻释放**:PyTorch caching allocator 不还给 OS。 `torch.cuda.empty_cache()` 也只是把 cached block 让出来,**不会** 实际减少 OS 看到的进程显存。 2. **DataLoader pin_memory + num_workers 大**:每个 worker 一份 GPU 显存映射。OOM 时先减 `num_workers`。 3. **eval 不开 no_grad**:评估时没 `with torch.no_grad():`,accidentally build 完整 computation graph,显存翻倍。 4. **多个模型同时 load**:base model + LoRA + reward model 一起在 GPU 上时,DPO / RLHF 训练显存压力极大。把 reward model 量化或 freeze 后丢 CPU。 5. **使用 `compute_dtype=torch.float16` + Adam**:fp16 + Adam 数值 不稳定。一律 `bf16` 或者 fp32 master weight(mixed precision)。
训练用 PyTorch 灵活,部署到生产时通常希望: - 没有 PyTorch 100+ MB 依赖 - 跨语言(C++ / Go / JS / Java 都能加载) - CPU / GPU 都能跑 - 性能更好(融合算子) ONNX 是开放神经网络交换格式,ONNX Runtime 是 Microsoft 的高性能推理引擎。 PyTorch 训练完导出 ONNX,运行时用 ORT 加载。 ## 装 ```bash uv add torch onnx onnxruntime # GPU 推理: uv add onnxruntime-gpu ``` ## 1. 导出 PyTorch 模型为 ONNX ```python import torch from your_model import Net model = Net() model.load_state_dict(torch.load('mnist.pt')) model.eval() # 一个 dummy 输入用于 trace dummy = torch.randn(1, 1, 28, 28) torch.onnx.export( model, dummy, 'mnist.onnx', input_names=['input'], output_names=['logits'], dynamic_axes={ 'input': {0: 'batch'}, # batch 维度可变 'logits': {0: 'batch'}, }, opset_version=17, ) ``` `dynamic_axes` 让导出的模型支持任意 batch size,否则固定为 dummy 的形状。 ## 2. 校验导出正确 ```python import onnx m = onnx.load('mnist.onnx') onnx.checker.check_model(m) print(onnx.helper.printable_graph(m.graph)) ``` `check_model` 不报错就 OK。 ## 3. 推理(Python) ```python import onnxruntime as ort import numpy as np sess = ort.InferenceSession('mnist.onnx', providers=['CPUExecutionProvider']) # GPU: providers=['CUDAExecutionProvider'] # 看输入输出 for i in sess.get_inputs(): print(f'input {i.name}: {i.shape} {i.type}') for o in sess.get_outputs(): print(f'output {o.name}: {o.shape} {o.type}') # 跑推理 x = np.random.rand(4, 1, 28, 28).astype(np.float32) logits = sess.run(['logits'], {'input': x})[0] pred = logits.argmax(axis=1) print(pred) ``` ## 4. 性能基准 ```python import time, numpy as np x = np.random.rand(1, 1, 28, 28).astype(np.float32) # warm up for _ in range(10): sess.run(['logits'], {'input': x}) t0 = time.time() for _ in range(1000): sess.run(['logits'], {'input': x}) print(f'avg latency: {(time.time()-t0)/1000*1000:.2f} ms') ``` 对比 PyTorch: ```python import torch model.eval() x = torch.randn(1, 1, 28, 28) with torch.no_grad(): for _ in range(10): model(x) t0 = time.time() for _ in range(1000): model(x) print(f'pytorch avg: {(time.time()-t0)/1000*1000:.2f} ms') ``` CPU 上 ONNX Runtime 通常比 PyTorch 快 1.5-3x(算子融合 + 简化 graph)。 ## 5. 性能优化 ```python sess = ort.InferenceSession( 'mnist.onnx', providers=['CPUExecutionProvider'], sess_options=ort.SessionOptions(), ) opts = ort.SessionOptions() opts.intra_op_num_threads = 4 opts.inter_op_num_threads = 1 opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL opts.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL sess = ort.InferenceSession('mnist.onnx', sess_options=opts, providers=['CPUExecutionProvider']) ``` 对于 transformer / 大模型,`enable_profiling=True` 让 ORT 输出每个算子的耗时 帮助找瓶颈。 ## 6. 多 provider ```python # 优先 GPU,没有就 CPU sess = ort.InferenceSession( 'mnist.onnx', providers=['CUDAExecutionProvider', 'CPUExecutionProvider'] ) print(sess.get_providers()) ``` NVIDIA:CUDAExecutionProvider / TensorrtExecutionProvider Apple:CoreMLExecutionProvider AMD:ROCMExecutionProvider Intel:OpenVINOExecutionProvider ## 7. 量化(减小模型 + 加速 CPU 推理) ```python from onnxruntime.quantization import quantize_dynamic, QuantType quantize_dynamic('mnist.onnx', 'mnist.int8.onnx', weight_type=QuantType.QInt8) ``` INT8 量化通常 2-4x 推理加速 + 模型大小 1/4。精度损失对 ResNet / 简单 CNN 很小(< 1% 准确率),对 BERT 类需要 calibration 复杂些。 ## 8. C++ 推理 ```cpp #include "onnxruntime_cxx_api.h" Ort::Env env(ORT_LOGGING_LEVEL_WARNING, "test"); Ort::SessionOptions opts; Ort::Session session(env, "mnist.onnx", opts); // ... 准备输入 tensor ... auto output = session.Run(...); ``` 完整 C++ 例子在 ONNX Runtime 仓库。集成到 C++ 服务里完全摆脱 Python 依赖。 ## 9. 浏览器推理(onnxruntime-web) ```html <script src="https://cdn.jsdelivr.net/npm/onnxruntime-web/dist/ort.min.js"></script> <script> const session = await ort.InferenceSession.create('mnist.onnx'); const input = new ort.Tensor('float32', data, [1, 1, 28, 28]); const results = await session.run({ input: input }); console.log(results.logits.data); </script> ``` WebGPU / WebGL / wasm 多后端自动选。小模型直接跑浏览器,数据不离开 用户设备。 ## 10. 部署模式比较 | 工具 | 适合 | |---|---| | ONNX Runtime | 跨语言、跨平台、单进程 | | TorchServe | 多 PyTorch 模型微服务 | | Triton Inference Server | NVIDIA GPU 多模型高并发 | | BentoML | Python 服务封装(含监控 / 队列) | | vLLM | LLM 专用(PagedAttention) | ONNX Runtime 是"最通用最简单"那档;要更高级 ops(动态 batching、 gpu 调度)上 Triton。 ## 踩过的坑 - 导出失败 "Unsupported ONNX opset version":升 `opset_version` 或者 降 PyTorch 中用的 op(替换 custom op)。 - 导出后形状对但数值差:训练时 BatchNorm 等 running stats 没保存好, 确保 `model.eval()` 后再 export。 - Dynamic axes 没写:onnx 模型固定 batch=1,部署时 batch=N 直接报错。 - ONNX 模型文件很大(含权重):考虑用 `onnx.save_model(m, ..., save_as_external_data=True)` 把权重分离存储,便于 CDN / 分发。
跑 ML 实验最容易混乱的是"我那个 lr=0.001 + dropout=0.3 的 run 是哪天哪份代码?" wandb 自动记录:超参、loss / metric 曲线、代码 git hash、系统资源、模型权重。 免费层个人项目无限。 ## 安装 + 注册 ```bash uv add wandb wandb login # 浏览器打开,复制 API key 进来 # 或:export WANDB_API_KEY=... ``` ## 最小集成(5 行代码) ```python import wandb wandb.init( project='mnist-cnn', config={ 'lr': 0.001, 'batch_size': 128, 'epochs': 5, 'dropout': 0.25, 'model': 'cnn-v1', }, ) # 训练循环里 for epoch in range(5): train_loss, train_acc = train_epoch() val_loss, val_acc = evaluate() wandb.log({ 'train/loss': train_loss, 'train/acc': train_acc, 'val/loss': val_loss, 'val/acc': val_acc, 'epoch': epoch, }) wandb.finish() ``` 跑一下 `python train.py`,wandb 打印一个 URL,打开就能看到实时曲线。 ## config 优先级 ```python # 1. 默认在代码里 wandb.init(config={'lr': 0.001}) # 2. 命令行覆盖(用 argparse / typer / hydra) import argparse p = argparse.ArgumentParser() p.add_argument('--lr', type=float, default=0.001) args = p.parse_args() wandb.init(config=vars(args)) # 3. Sweep 时由 wandb 注入 config = wandb.config # 读,不能写 lr = config.lr ``` ## Sweep:自动超参搜索 ```yaml # sweep.yaml program: train.py method: bayes metric: name: val/acc goal: maximize parameters: lr: distribution: log_uniform_values min: 1e-5 max: 1e-2 dropout: values: [0.1, 0.25, 0.5] batch_size: values: [64, 128, 256] ``` ```bash wandb sweep sweep.yaml # Output: wandb agent yourname/project/sweep_id wandb agent yourname/project/sweep_id --count 20 ``` 跑 20 个实验,自动按 Bayesian / grid / random 选超参。 多台机器并行:每台跑 `wandb agent ...` 共享同一个 sweep。 ## 记录媒体 ```python # 图片 import matplotlib.pyplot as plt fig, ax = plt.subplots() ax.plot(losses) wandb.log({'curve': wandb.Image(fig)}) # 表格 wandb.log({ 'predictions': wandb.Table( columns=['image', 'pred', 'truth'], data=[[wandb.Image(x), p, y] for x, p, y in samples], ) }) # 直方图 wandb.log({'grad_norm': wandb.Histogram(grad_norms)}) # 视频 / 音频 wandb.log({'video': wandb.Video('output.mp4')}) ``` ## 保存模型(Artifact) ```python artifact = wandb.Artifact(name='mnist-cnn', type='model') artifact.add_file('checkpoints/best.pt') wandb.log_artifact(artifact) ``` 模型权重 + 元数据存在 wandb 服务端(免费层有配额)。后续加载: ```python api = wandb.Api() artifact = api.artifact('yourname/project/mnist-cnn:latest') artifact.download(root='./model') ``` ## 监控系统资源 `wandb` 自动记录: - GPU 利用率 / 显存 - CPU / 内存 - 磁盘 / 网络 - Python 进程 无需任何代码,看 dashboard 的 "System" 标签页。 ## 代码 + git 状态 `wandb` 自动 capture: - 当前 git commit hash - 未提交的 diff(不要在没干净 commit 时跑实验!) - 命令行参数 - Python 版本 + 包列表 回放某次 run:知道用了哪份代码 + 哪个数据。 ## 离线模式 无网时: ```bash wandb offline # 或:export WANDB_MODE=offline python train.py # 数据存在本地 ./wandb/ # 之后有网时 wandb sync ./wandb/offline-run-* ``` CI / 内网集群里非常有用。 ## 与 PyTorch Lightning / HuggingFace 集成 ```python # PyTorch Lightning from pytorch_lightning.loggers import WandbLogger logger = WandbLogger(project='myproj') trainer = Trainer(logger=logger) # HuggingFace Transformers training_args = TrainingArguments( report_to='wandb', run_name='bert-finetune-v3', ... ) ``` 不用手写 log,框架自动调 wandb。 ## 团队协作 - **Project** 是大目录(按代号 / 任务) - **Run** 是单次实验 - **Group** 让你把同一个 sweep / 同一组对比放一起 - **Tags** 给 run 打标签(baseline / experiment / final) ```python wandb.init( project='myproj', group='ablation-dropout', tags=['final', 'cnn-v2'], notes='increased lr to 5e-3 to test convergence speed', ) ``` ## 数据 dashboard dashboard 默认按时间线显示。常用功能: - **Reports**:把多个 run 拖到一份"报告"里,加文字 + 自动同步图表, 当作"项目周报"或 paper 草稿 - **Parallel Coordinates**:可视化超参 → metric 的关系, 找哪个超参影响最大 - **Compare runs**:勾几个 run 一起看曲线 / config diff ## 隐私 / 自托管 wandb cloud 免费个人无限,团队收费。如果数据敏感不能上 cloud: ```bash # 自托管 wandb server(社区版免费) docker run -p 8080:8080 wandb/local ``` Python 端: ```python wandb.init(project='...', host='https://wandb.your-company.com') ``` ## 替代方案 - **MLflow**:开源,自托管简单。tracking + model registry,UI 朴素 - **TensorBoard**:本地用,无云端 - **Comet / Neptune**:商业产品类似 - **Aim**:开源极简,无云 个人项目 wandb 最快;公司里数据合规要求高用 MLflow。 ## 踩过的坑 - 忘 `wandb.finish()` —— 长 run 退出后 wandb sync 一直挂着。 脚本最后必须 `finish()` 或者用 `with wandb.init(...) as run:`。 - 每次 `wandb.log` 都立刻发到云端 → 训练时大量请求可能拖慢。 设 `commit=False` 累积后批量发:`wandb.log({...}, commit=False)`, 最后 `wandb.log({}, commit=True)`。 - 在 jupyter 里 wandb.init 多次:会创建多个 run。每次重启 kernel 之前先 `wandb.finish()`。 - artifact 配额:免费个人 100GB,超了不能再传。定期清老 artifact。
## 起因 业务给了一个 50GB 用户行为日志 CSV,让我做个简单的"每个城市 PV / UV / 转化率"统计。pandas 直接 `read_csv` 把机器爆了(机器只有 32GB 内存)。改成 chunksize=100000 一块块读、自己累加?写起来又难维护 (要管 partial state)。 polars 的 `scan_csv` + lazy + streaming 是这种"out-of-core"分析的 标准解法。 ## 解决方案 ### 装 ```bash uv add polars ``` ### 一行做完聚合 ```python import polars as pl result = ( pl.scan_csv('events_50gb.csv') # 不读,只 plan .filter(pl.col('event_type') == 'pageview') .group_by(['city', 'date']) .agg([ pl.col('user_id').n_unique().alias('uv'), pl.len().alias('pv'), ]) .sort('pv', descending=True) .collect(streaming=True) # 流式执行 ) print(result.head(20)) ``` 关键点: 1. **`scan_csv` 而不是 `read_csv`**:只构建 query plan,不读文件 2. **链式 lazy 操作**:filter → group_by → agg → sort 都在 plan 里 3. **`collect(streaming=True)`**:让 polars 流式跑(边读边算, 不把全部数据放内存) 50GB 文件在我 32GB 机器上跑了 8 分钟,内存峰值 ~4 GB。 ### 看 query plan ```python q = ( pl.scan_csv('events.csv') .filter(pl.col('event_type') == 'pageview') .group_by('city') .agg(pl.col('user_id').n_unique()) ) print(q.explain()) # AGGREGATE # [col("user_id").n_unique()] # BY [col("city")] # FROM # Csv SCAN events.csv # PROJECT 3/15 COLUMNS # SELECTION: [col("event_type") == "pageview"] ``` polars 自动做: - **column pruning**:只读用到的 3 列(不读 15 列里的其它 12 列) - **predicate pushdown**:filter 下推到 scan 层(不读不匹配的行) - **operator fusion**:能合并的操作合并 这些都是数据库 query optimizer 的标准手法,polars 把它带到 DataFrame 里。 ### 转 Parquet:以后查询快 10x ```python # 一次性把 CSV 转 Parquet(列存 + zstd 压缩) pl.scan_csv('events_50gb.csv').sink_parquet('events.parquet', compression='zstd') # 50GB CSV → ~12GB Parquet # 之后所有查询用 parquet result = pl.scan_parquet('events.parquet').filter(...).group_by(...).agg(...).collect() # 同样查询 8 分钟 → 30 秒 ``` Parquet 列存让"只读需要的列"在 IO 层就生效,比 CSV 快得多。 ### 按字段分区写入 ```python # 数据按 date 分区写 pl.scan_csv('events.csv').sink_parquet( 'events/', partition_by=['date'], ) # 生成:events/date=2024-01-01/data_0.parquet ... ``` ```python # 之后查询某一天的数据自动 prune 到该分区 pl.scan_parquet('events/').filter(pl.col('date') == '2024-01-01') ``` 巨大效率提升 —— 比扫整个 50GB 高几个数量级。 ### join 大表 ```python # 50GB 主表 join 100MB 维度表 result = ( pl.scan_parquet('events.parquet') .join(pl.scan_csv('cities.csv'), on='city_id', how='left') .group_by('city_name') .agg(pl.len().alias('events')) .collect(streaming=True) ) ``` polars 自动选 hash join(小表 build hash,大表 stream probe), 内存友好。 ### 性能 vs pandas / dask 50GB CSV → `count by city`: | 工具 | 时间 | 内存峰值 | 写代码量 | |---|---|---|---| | pandas chunked | 25 min | ~6 GB | 30 行(手动累加) | | dask | 12 min | ~8 GB | 8 行 | | **polars lazy + streaming** | 8 min | ~4 GB | 6 行 | | **polars on parquet** | 30 s | ~2 GB | 6 行 | 转 Parquet 一次后所有后续查询都飞快。 ## 效果 - 50GB CSV 单机能跑通(之前要上集群) - 探索性分析迭代速度 25 倍快 - 代码 30 行 → 6 行,可维护性大幅提升 - 后续团队类似分析都按"CSV → Parquet → polars query"模板做 ## 高级 tip ### 流式写入 巨大计算结果直接落盘,不全部放内存: ```python (pl.scan_csv('events.csv') .filter(pl.col('amount') > 100) .sink_parquet('high_value.parquet')) ``` `sink_*` 系列函数都是流式写。 ### profile 查询 ```python import time t0 = time.time() result = q.collect(streaming=True) print(f'took {time.time()-t0:.1f}s') # 更细:每个 node 时间 result, profile = q.profile() print(profile) ``` ### 字符串列消耗内存大 ```python df = df.with_columns( pl.col('country').cast(pl.Categorical), # 像 pandas categorical ) ``` categorical 把字符串映射到 int,对低基数字段省 80% 内存。 ## 踩过的坑 1. **`streaming=True` 不是所有操作都支持**:极个别 window 操作目前 还回落到 in-memory。运行时如果没真的流式,看 explain 输出会有 "STREAMING" 字样标记。 2. **多文件 scan 时 schema 不一致**:`scan_csv('logs/*.csv')` 假设所有 文件 schema 一样。第 N 个文件多一列就出错。`infer_schema_length=10000` 或显式指定 schema。 3. **`group_by(['col1', 'col2'])` 在 streaming 模式不一定流式**: 分组键基数极高(如 user_id)时内存爆。改成"先按 user_id hash 分桶 再聚合"的策略。 4. **写 Parquet 时 dtype 不对**:polars `null` 列写 Parquet 是 null type, 其它工具读不出来。`cast` 到具体类型再写。 5. **collect 后切回 eager 操作**:`collect()` 返回 `DataFrame`, 不再是 lazy。后续操作如果想再 lazy 就 `.lazy()`。
## 起因 要把一个 30M 参数的 ResNet 部署到 ARM 手机上。FP32 模型 120 MB + 推理慢得卡顿。INT8 量化把模型缩到 30 MB + 推理快 3 倍,精度只掉 0.5%。 深度学习模型 INT8 量化已成熟,几行代码搞定。 ## 三种量化策略 ### A. Dynamic quantization:仅 weight 量化,最简单 ```python import torch from torchvision.models import resnet50 model = resnet50(pretrained=True).eval() # 一行:把所有 Linear 层 weight 量化到 INT8 quantized = torch.quantization.quantize_dynamic( model, {torch.nn.Linear, torch.nn.LSTM, torch.nn.RNN}, dtype=torch.qint8, ) torch.save(quantized.state_dict(), 'resnet50_int8.pt') ``` **适合**:BERT / transformer / RNN 类(Linear 主导)。 **不适合**:CNN(Conv2d 占比大,dynamic 不量化它)。 ### B. Static quantization:weight + activation 全量化 需要 calibration(用代表性数据跑一遍找 activation 范围): ```python import torch import torch.ao.quantization as Q model = MyModel().eval() # 1. 准备:插入 observer 收 activation 统计 model.qconfig = Q.get_default_qconfig('fbgemm') # x86; 'qnnpack' for ARM model_prepared = Q.prepare(model, inplace=False) # 2. Calibration: 跑 ~100-1000 张代表性图片 with torch.no_grad(): for img in calibration_loader: model_prepared(img) # 3. Convert: observer → 实际 quant op model_int8 = Q.convert(model_prepared, inplace=False) torch.save(model_int8.state_dict(), 'resnet50_static_int8.pt') ``` 效果通常 比 dynamic 更激进,**全模型 INT8**。 代价:要 calibration data + 模型架构必须支持(含 Conv-BN fuse 等)。 ### C. Quantization-aware training (QAT):训练时模拟量化 精度损失最小(< 0.5%)但要重训: ```python model.qconfig = Q.get_default_qat_qconfig('fbgemm') model_prepared = Q.prepare_qat(model, inplace=False) # 训练(模型在前向时模拟 INT8 round 噪声) for epoch in range(5): for x, y in train_loader: loss = criterion(model_prepared(x), y) loss.backward() optimizer.step() model_int8 = Q.convert(model_prepared.eval()) ``` QAT 适合:精度对生产关键的模型,可承受 5-20 epoch 重训。 ## 实测对比(ResNet50 + ImageNet val) | | 大小 | CPU 推理(ms/img) | top-1 acc | |---|---|---|---| | FP32 | 98 MB | 65 | 76.13% | | Dynamic INT8 | ~95 MB | 65 | 76.13% (Linear 没主导) | | Static INT8 | 25 MB | 28 | 75.84% | | QAT INT8 | 25 MB | 28 | 76.02% | CNN 类 static / QAT 显著有效。BERT 类 dynamic 也能 4x 小 + 2-3x 快。 ## ONNX Runtime + INT8(生产推荐) PyTorch 量化导出 ONNX 后用 ONNX Runtime 跑,性能 / 跨平台都更好: ```python import torch from torch.ao.quantization import quantize_dynamic q_model = quantize_dynamic(model, {torch.nn.Linear}, dtype=torch.qint8) dummy = torch.randn(1, 3, 224, 224) torch.onnx.export(q_model, dummy, 'model_int8.onnx', opset_version=13) ``` 或者直接 ONNX Runtime 的量化工具(更稳): ```python from onnxruntime.quantization import quantize_dynamic, QuantType quantize_dynamic( model_input='model_fp32.onnx', model_output='model_int8.onnx', weight_type=QuantType.QInt8, ) ``` ONNX Runtime 在 ARM / x86 / Apple Silicon 都有 INT8 优化 kernel。 ## bitsandbytes:LLM 用 4-bit / 8-bit quantization ```bash uv add bitsandbytes accelerate ``` ```python from transformers import AutoModelForCausalLM, BitsAndBytesConfig bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type='nf4', bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) model = AutoModelForCausalLM.from_pretrained( 'meta-llama/Llama-3.1-70B-Instruct', quantization_config=bnb_config, device_map='auto', ) ``` 70B 模型从 140 GB → 35 GB。单 A100 80GB 或 4090 + offload 跑得动。 精度损失:相对 FP16 通常 < 1% benchmark(NF4 比 INT4 更稳)。 ## GPTQ / AWQ:post-training quantization for LLM 针对 LLM 优化的 4-bit 量化算法(比 bitsandbytes NF4 更好): ```python # GPTQ from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig quantize_config = BaseQuantizeConfig(bits=4, group_size=128) model = AutoGPTQForCausalLM.from_pretrained( 'meta-llama/Llama-3.1-8B', quantize_config=quantize_config, ) model.quantize(calibration_dataset) model.save_quantized('llama-3.1-8b-4bit-gptq') ``` ```python # AWQ from awq import AutoAWQForCausalLM model = AutoAWQForCausalLM.from_pretrained('llama-3.1-8b') model.quantize(tokenizer, quant_config={...}) ``` 社区已有大量预量化的 GPTQ / AWQ model 在 HuggingFace(搜 `-GPTQ-4bit` / `-AWQ` 后缀)。直接下载用,省得自己量化。 ## 部署侧:vLLM / llama.cpp 用量化模型 ```bash # vLLM vllm serve TheBloke/Llama-3.1-70B-AWQ --quantization awq # llama.cpp(CPU / Apple Silicon Metal 极快) llama-cli -m llama-3.1-8b.Q4_K_M.gguf -p 'hello' ``` llama.cpp GGUF 格式包含量化(Q4_K_M / Q5_K_M / Q8_0 等), Mac M 系列上 8B 模型 30+ tokens/s。 ## 效果 我们的几个生产模型量化后: | 模型 | 之前 | 量化后 | 精度损失 | |---|---|---|---| | ResNet50 (移动 app) | 98 MB / 65ms | 25 MB / 22ms | -0.3% | | BERT-base (后台) | 440 MB / 80ms | 110 MB / 30ms | -0.5% | | Llama 7B (RAG) | 14 GB / 100 token/s | 4 GB / 230 token/s | < 1% | 移动 / 边缘 / CPU 推理场景量化几乎是必做。 ## 几个陷阱 1. **量化前 fuse 模块**: ```python torch.ao.quantization.fuse_modules( model, [['conv', 'bn', 'relu']], inplace=True) ``` Conv-BN-ReLU 合成一个 op 后量化效果更好。漏 fuse 精度可能掉 2-5%。 2. **observer 范围错**:calibration 数据不代表 production → activation 范围估计错 → 量化 clip 严重。calibration 一定用真实分布数据。 3. **某些 layer 不能量化**:softmax / layernorm 等保留 FP32。 `model.qconfig = ...` 全局设后,对这些 layer 显式 `qconfig=None`。 4. **不同硬件 backend**:`fbgemm` 是 x86 优化,`qnnpack` 是 ARM 优化。 部署目标错了性能差 2-5 倍。 5. **量化后调试难**:bug 是模型本身的还是量化引入的?保留 FP32 reference 模型对比每层 activation 找漂移最大的 layer。 ## 总结 | 场景 | 推荐 | |---|---| | BERT / transformer post-hoc | dynamic INT8 | | CNN 上 ARM / edge | static INT8 + QAT | | LLM 推理 | bitsandbytes NF4 / AWQ / GGUF | | 跨平台部署 | ONNX Runtime + INT8 | | 极致精度要求 | QAT | | 不想自己折腾 | 用社区预量化模型 | 量化是 ML 生产工程的标准动作,不做白扔 70% 推理性能。
## 起因 ML 项目的代码可以 git 管,但数据集(几百 MB / 几 GB / 几十 GB)不能进 git。 结果是"我两个月前那个 SOTA 实验用的是哪份数据?" 完全说不清。 git 只能 commit 一个 `data.csv` 指针 / README 描述,没法保证可复现。 `DVC` 把这个问题解了:在 git 里只 commit 一个"数据元信息文件"(指针 + hash),实际数据存对象存储 / S3 / SSH 服务器。`dvc pull` 拉对应 hash 的 数据,整套实验完全可复现。 ## 解决方案 ### 装 ```bash uv add 'dvc[s3]' # 后端用 S3;其它有 [gs] [azure] [ssh] [gdrive] # 或 brew install dvc dvc version ``` ### 初始化 在已有 git 仓库里: ```bash dvc init git commit -m 'init dvc' ``` DVC 在 `.dvc/` 建配置目录。 ### 配远程存储 ```bash dvc remote add -d myremote s3://my-bucket/dvc-storage dvc remote modify myremote endpointurl https://s3.example.com # 兼容 minio # AWS S3 凭据走 ~/.aws/credentials 或环境变量 git commit .dvc/config -m 'add s3 remote' ``` ### 添加数据 ```bash dvc add data/train.parquet # 输出:data/train.parquet.dvc + .gitignore 更新 git add data/train.parquet.dvc data/.gitignore git commit -m 'add train dataset v1' # 推送数据到 S3 dvc push ``` `.dvc/cache/` 是本地缓存。`data/train.parquet.dvc` 文件长这样: ```yaml outs: - md5: 6c7a3b8d9e2f4a1c5b8e7d6f3a2c4b5e size: 234567890 path: train.parquet ``` 只 28 行 YAML 进 git,原始几百 MB 数据进 S3。 ### 别人 / 别机器拉 ```bash git clone my-project cd my-project dvc pull # 自动拉所有 .dvc 文件对应的数据 ``` 切某个 git commit → `dvc pull` 自动拉那个版本的数据。 ### Pipeline(DAG) DVC 的核心还在于 pipeline 定义 + 自动 cache。`dvc.yaml`: ```yaml stages: prep: cmd: python src/prep.py deps: - src/prep.py - data/raw.parquet outs: - data/clean.parquet train: cmd: python src/train.py --data data/clean.parquet --out models/model.pt deps: - src/train.py - data/clean.parquet params: - lr - epochs outs: - models/model.pt metrics: - metrics.json eval: cmd: python src/eval.py deps: - models/model.pt - data/test.parquet metrics: - eval.json ``` 跑: ```bash dvc repro # 跑全 pipeline,按依赖关系 + 自动跳过未变 stage dvc repro train # 只跑 train + 下游 dvc dag # 看 DAG ``` `params.yaml`: ```yaml lr: 0.001 epochs: 10 ``` 改 `params.yaml` 里某个值 → `dvc repro` 只重跑受影响 stage。 没改的 stage 命中 cache 秒级返回。 ### 实验对比 ```bash dvc exp run -S lr=0.005 -S epochs=20 dvc exp run -S lr=0.001 -S epochs=30 dvc exp show # 表格对比所有实验的 params + metrics ``` `-S` 临时改参数。每次 exp 产生独立分支,不污染 main。 ## 效果 - 数据从 git 中消失(仓库从 5 GB 降到 12 MB) - 切换数据版本 = git checkout + dvc pull,分钟级 - 多人同时改不同 stage 不冲突(每人本地 cache 各自命中) - "上次 SOTA 用的是哪份数据" 永远答得清 - CI 里 `dvc pull` + `dvc repro` 复现实验 ## 与 git-lfs / DataLad 对比 | | git-lfs | DVC | DataLad | |---|---|---|---| | 数据大文件 | ✅ | ✅ | ✅ | | Pipeline | ❌ | ✅ | ❌ | | 实验跟踪 | ❌ | ✅ | ❌ | | 多 backend | 仅 GitHub LFS | S3 / GCS / SSH / 多 | 多 | | 学习曲线 | 低 | 中 | 高 | 只存数据用 git-lfs;要做 ML pipeline 用 DVC。 ## 踩过的坑 1. **第一次 `dvc add` 大文件慢**:要算 md5 + 复制到 .dvc/cache/。 `dvc config cache.type symlink` 用软链不复制,省时省空间但 `.dvc/cache/` 不能跨文件系统。 2. **没 push 就 commit + push git**:别人 `git pull` + `dvc pull` 拉不到 数据。养成 `dvc push` 在 `git push` 前的习惯。 3. **.dvc/cache 占满磁盘**:本地保留所有版本 + 多分支切换累积。 `dvc gc --workspace` 清掉工作区当前 commit 用不到的; `dvc gc --all-commits` 极致清。 4. **multiple users 写同一 stage**:`dvc lock` 防止并发 repro 撞车。 或者 stage outputs 必须确定,random_seed 要固定。 5. **大数据集多人 train**:每人都 `dvc pull` 几十 GB 浪费带宽。可以 配 dvc 远程在共享 NFS 上,所有人挂载到本地 `.dvc/cache/` 共享, 配 cache.type=symlink。
预训练 + 微调是 NLP 的标准范式。Hugging Face `transformers` 把 模型 / tokenizer / 训练循环都封装好,从原始数据到训练好的分类器 只需 < 50 行代码。 下面用 IMDB 影评数据微调 BERT 做二分类(正面 / 负面)。 ## 装 ```bash uv add 'transformers[torch]' datasets accelerate evaluate ``` ## 数据 ```python from datasets import load_dataset ds = load_dataset('imdb') print(ds) # DatasetDict({ # train: Dataset({ features: ['text', 'label'], num_rows: 25000 }) # test: Dataset({ features: ['text', 'label'], num_rows: 25000 }) # unsupervised: ... # }) # label: 0 = neg, 1 = pos print(ds['train'][0]) ``` ## tokenizer ```python from transformers import AutoTokenizer MODEL = 'distilbert-base-uncased' # 比 bert-base 小 40%,效果差几个点但快 tok = AutoTokenizer.from_pretrained(MODEL) def tokenize(batch): return tok(batch['text'], truncation=True, max_length=256, padding='max_length') ds_tok = ds.map(tokenize, batched=True) ds_tok = ds_tok.remove_columns(['text']) ds_tok = ds_tok.rename_column('label', 'labels') ds_tok.set_format('torch') ``` `truncation=True` 截到 256 token;BERT 最大 512,但短点训练快得多。 ## 模型 ```python from transformers import AutoModelForSequenceClassification model = AutoModelForSequenceClassification.from_pretrained(MODEL, num_labels=2) ``` `from_pretrained` 自动下载预训练权重 + 加上一个新的 classification head。 ## Trainer:30 行训练循环 ```python from transformers import TrainingArguments, Trainer import evaluate import numpy as np metric = evaluate.load('accuracy') def compute_metrics(eval_pred): logits, labels = eval_pred preds = np.argmax(logits, axis=1) return metric.compute(predictions=preds, references=labels) args = TrainingArguments( output_dir='./out', num_train_epochs=2, per_device_train_batch_size=16, per_device_eval_batch_size=32, learning_rate=2e-5, weight_decay=0.01, warmup_steps=500, logging_steps=50, eval_strategy='epoch', save_strategy='epoch', load_best_model_at_end=True, metric_for_best_model='accuracy', fp16=True, # GPU 时开混合精度 report_to='wandb', # 可选:wandb 跟踪 ) trainer = Trainer( model=model, args=args, train_dataset=ds_tok['train'].shuffle(seed=42).select(range(10000)), eval_dataset=ds_tok['test'].select(range(2000)), tokenizer=tok, compute_metrics=compute_metrics, ) trainer.train() trainer.evaluate() ``` 10k 训练样本 / 2k 验证样本,DistilBERT,单个 RTX 3090 上 ~5 分钟训完, 准确率约 91-92%。 全量 25k 训 3 个 epoch 能到 93-94%。 ## 保存 + 加载 + 推理 ```python trainer.save_model('./imdb-distilbert') tok.save_pretrained('./imdb-distilbert') # 加载推理 from transformers import pipeline classifier = pipeline('sentiment-analysis', model='./imdb-distilbert', device=0) # device=0 = cuda:0; -1 = cpu print(classifier('This movie was absolutely fantastic')) # [{'label': 'LABEL_1', 'score': 0.998}] print(classifier('What a complete waste of time and money')) # [{'label': 'LABEL_0', 'score': 0.997}] ``` `label` 是模型内部的,可以在训练时设标签名: ```python model = AutoModelForSequenceClassification.from_pretrained( MODEL, num_labels=2, id2label={0: 'NEGATIVE', 1: 'POSITIVE'}, label2id={'NEGATIVE': 0, 'POSITIVE': 1}, ) ``` 之后 pipeline 直接返回 'POSITIVE' / 'NEGATIVE'。 ## 中文文本? ```python MODEL = 'bert-base-chinese' # 谷歌中文 BERT # 或: MODEL = 'hfl/chinese-roberta-wwm-ext' # 哈工大 RoBERTa 全词 mask # 或: MODEL = 'IDEA-CCNL/Erlangshen-Roberta-110M-Sentiment' # 已经是情感分类微调过的 ``` 中文 tokenizer 是字级 BPE(不像英文按 subword),输出结构一样。 ## 推到 Hugging Face Hub(可选) ```bash huggingface-cli login ``` ```python trainer.push_to_hub('your-username/imdb-distilbert') # 之后任何人能: # AutoModelForSequenceClassification.from_pretrained('your-username/imdb-distilbert') ``` ## 减小内存:LoRA 完整微调 DistilBERT 已经够轻;微调 7B+ 模型时显存装不下,用 LoRA: ```bash uv add peft ``` ```python from peft import LoraConfig, get_peft_model, TaskType lora_config = LoraConfig( task_type=TaskType.SEQ_CLS, r=8, lora_alpha=16, lora_dropout=0.1, target_modules=['q_lin', 'v_lin'], ) model = get_peft_model(model, lora_config) model.print_trainable_parameters() # trainable params: 0.6M / 67M = 0.9% ``` 之后照常 `Trainer`。LoRA 让显存 / 速度大幅降低,性能差距通常 < 1%。 ## 推理优化(生产) - **量化**:`bitsandbytes` int8 / int4,2-4x 速度,半精度差几个点 - **ONNX 导出 + ONNX Runtime**:CPU 推理 2-3x 加速 - **TGI** (Text Generation Inference) / **vLLM**:高并发 LLM 推理 - **TorchServe**:通用 PyTorch 推理服务 简单分类任务直接用 `pipeline`;高并发用 TGI / TorchServe。 ## 评估更精细 ```python import evaluate metrics = evaluate.combine(['accuracy', 'f1', 'precision', 'recall']) def compute_metrics(eval_pred): logits, labels = eval_pred preds = np.argmax(logits, axis=1) return metrics.compute(predictions=preds, references=labels) ``` ## 踩过的坑 - `padding='max_length'` 把所有样本 pad 到 256 → 浪费计算。改成 `padding=True`(动态 pad 到 batch 内最长),训练快 2-3x。 - learning rate 太大:BERT 微调通常 2e-5 ~ 5e-5。1e-4 已经偏大, 常导致 loss NaN。 - 数据集 label 顺序:HuggingFace `load_dataset('imdb')` 是按 label 排序的,不 shuffle 训练会先看完所有 negative 再看 positive, loss 曲线奇形怪状。`shuffle(seed=42)` 必加。 - `fp16=True` 在 BF16 友好的硬件(A100 / H100)改 `bf16=True` 更稳。 老 V100 / GTX 用 fp16。
## 起因 模型训完了 `.pt` 文件躺在硬盘上。要让前端 / 移动端 / 别的服务能用它, 需要包成 REST API。手写 Flask / FastAPI 包一遍是 100 行 boilerplate (加载模型 + parse 输入 + tensor 转 numpy + 错误处理 + batching)。 做几个模型这种工作就极乏味。 BentoML 把 ML 模型 → 生产 service 的过程标准化:写一个 service 文件, 自动生成 HTTP / gRPC / OpenAPI / Docker。 ## 解决方案 ### 装 ```bash uv add bentoml torch torchvision pillow ``` ### 模型仓库(model store) ```python # save_model.py import bentoml import torch from torchvision.models import resnet50, ResNet50_Weights model = resnet50(weights=ResNet50_Weights.DEFAULT).eval() bento_model = bentoml.pytorch.save_model('resnet50', model) print(bento_model) # Model(tag="resnet50:abc123...") ``` 模型存到本地 `~/bentoml/models/`,分 tag 版本化。 团队共享用 `bentoml push / pull` 配 BentoCloud 或自建 S3。 ### service.py(核心) ```python import bentoml from bentoml.io import Image, JSON from PIL import Image as PILImage import torch from torchvision import transforms resnet = bentoml.pytorch.get('resnet50:latest').to_runner() svc = bentoml.Service('image_classifier', runners=[resnet]) preprocess = transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) with open('imagenet_classes.txt') as f: LABELS = [line.strip() for line in f] @svc.api(input=Image(), output=JSON()) async def predict(img: PILImage.Image) -> dict: x = preprocess(img).unsqueeze(0) logits = await resnet.async_run(x) probs = torch.softmax(logits, dim=1)[0] top5 = torch.topk(probs, k=5) return [ {'label': LABELS[idx], 'prob': prob.item()} for idx, prob in zip(top5.indices.tolist(), top5.values.tolist()) ] ``` 要点: - `to_runner()`:让 BentoML 管理 model 生命周期 + 并发 batch - `@svc.api(input=Image(), output=JSON())`:声明输入输出,自动 生成 OpenAPI doc + 验证 - `async run`:BentoML 在 worker 进程里跑 model,业务进程 async 协程 ### 本地启动 ```bash bentoml serve service:svc --reload # uvicorn 起来 http://localhost:3000 # http://localhost:3000/docs 自动 Swagger ``` 调用: ```bash curl -X POST http://localhost:3000/predict \ -H 'Content-Type: image/jpeg' \ --data-binary @cat.jpg # [{"label": "Egyptian cat", "prob": 0.84}, ...] ``` ### Adaptive batching(自动批处理) `runner` 默认开启 batching:单条请求进来时 hold ~10ms 等更多请求, 合并 batch 一次 forward,吞吐量直接翻几倍。 ```python runner = bentoml.pytorch.get('resnet50:latest').to_runner( method_configs={'__call__': { 'max_batch_size': 32, 'max_latency_ms': 100, }}, ) ``` 业务代码无感知。 ### 打包成 Bento + Docker ```bash # bentofile.yaml service: 'service:svc' include: - 'service.py' - 'imagenet_classes.txt' python: packages: - torch - torchvision - pillow models: - resnet50:latest ``` ```bash bentoml build # 生成 ~/bentoml/bentos/image_classifier/<tag> bentoml containerize image_classifier:latest # 生成 docker image image_classifier:latest docker run -p 3000:3000 image_classifier:latest ``` 镜像里包含:Python + 依赖 + service code + 模型权重。直接部署。 ### K8s 部署(BentoML Yatai) ```bash bentoml deployment create my-deploy \ --bento image_classifier:latest \ --cluster prod ``` 或者用 Yatai operator,K8s 原生 CRD 管理 Bento。 ## 效果 - 训完 model → 上生产 API 从 2 天 → 2 小时 - 自动 batch 让单 GPU 吞吐量翻 4 倍 - OpenAPI 文档自动生成,前端不再追着问 schema - 多版本管理 / canary deploy 都是 framework 原生支持 - 监控 metrics 自动暴露 /metrics endpoint 给 Prometheus ## 与替代品对比 | | BentoML | TorchServe | Triton | 自己写 FastAPI | |---|---|---|---|---| | 学习曲线 | 中 | 中 | 高 | 低 | | 多框架 | ✅ | 主 PyTorch | ✅ | N/A | | 自动 batching | ✅ | ✅ | ✅ | 需自写 | | Docker / K8s | ✅ | ✅ | ✅ | 需自写 | | 模型仓库 | ✅ | ✅ | ✅ | 需自建 | | 简单 API | 中 | 中 | 复杂 | 极简 | 复杂 ML 系统选 BentoML / Triton;单模型 < 100 QPS 自己写 FastAPI 更轻量。 ## 踩过的坑 1. **runner 进程模型加载慢**:cold start 几秒-几十秒。生产用 `bentoml serve --production --workers 4 --runners 2 ...` 提前 warmup。 2. **adaptive batching 引入延迟**:单条请求 P99 可能 > 100ms(等其它 请求凑 batch)。低 QPS 场景关 batching:`max_batch_size=1`。 3. **input/output schema 不严格**:默认 JSON 接受任意结构。生产用 pydantic `JSON(pydantic_model=MyInput)` 强校验。 4. **模型权重打包到镜像里**:镜像几 GB 推送慢。模型放 OSS / 启动时 下载更轻量;trade-off 是冷启动慢。 5. **GPU 不释放**:bentoml runner 进程退出时 GPU 偶尔被 PyTorch leaked。systemd 重启 service 时确保 `KillMode=control-group` 杀 所有子进程。