知识广场

按学科筛选:计算机科学 / 人工智能
清除筛选

«计算机科学 / 人工智能» 分类下共 30 篇帖子

polars lazy 模式处理 50GB CSV:分块、流式、out-of-core

## 起因 业务给了一个 50GB 用户行为日志 CSV,让我做个简单的"每个城市 PV / UV / 转化率"统计。pandas 直接 `read_csv` 把机器爆了(机器只有 32GB 内存)。改成 chunksize=100000 一块块读、自己累加?写起来又难维护 (要管 partial state)。 polars 的 `scan_csv` + lazy + streaming 是这种"out-of-core"分析的 标准解法。 ## 解决方案 ### 装 ```bash uv add polars ``` ### 一行做完聚合 ```python import polars as pl result = ( pl.scan_csv('events_50gb.csv') # 不读,只 plan .filter(pl.col('event_type') == 'pageview') .group_by(['city', 'date']) .agg([ pl.col('user_id').n_unique().alias('uv'), pl.len().alias('pv'), ]) .sort('pv', descending=True) .collect(streaming=True) # 流式执行 ) print(result.head(20)) ``` 关键点: 1. **`scan_csv` 而不是 `read_csv`**:只构建 query plan,不读文件 2. **链式 lazy 操作**:filter → group_by → agg → sort 都在 plan 里 3. **`collect(streaming=True)`**:让 polars 流式跑(边读边算, 不把全部数据放内存) 50GB 文件在我 32GB 机器上跑了 8 分钟,内存峰值 ~4 GB。 ### 看 query plan ```python q = ( pl.scan_csv('events.csv') .filter(pl.col('event_type') == 'pageview') .group_by('city') .agg(pl.col('user_id').n_unique()) ) print(q.explain()) # AGGREGATE # [col("user_id").n_unique()] # BY [col("city")] # FROM # Csv SCAN events.csv # PROJECT 3/15 COLUMNS # SELECTION: [col("event_type") == "pageview"] ``` polars 自动做: - **column pruning**:只读用到的 3 列(不读 15 列里的其它 12 列) - **predicate pushdown**:filter 下推到 scan 层(不读不匹配的行) - **operator fusion**:能合并的操作合并 这些都是数据库 query optimizer 的标准手法,polars 把它带到 DataFrame 里。 ### 转 Parquet:以后查询快 10x ```python # 一次性把 CSV 转 Parquet(列存 + zstd 压缩) pl.scan_csv('events_50gb.csv').sink_parquet('events.parquet', compression='zstd') # 50GB CSV → ~12GB Parquet # 之后所有查询用 parquet result = pl.scan_parquet('events.parquet').filter(...).group_by(...).agg(...).collect() # 同样查询 8 分钟 → 30 秒 ``` Parquet 列存让"只读需要的列"在 IO 层就生效,比 CSV 快得多。 ### 按字段分区写入 ```python # 数据按 date 分区写 pl.scan_csv('events.csv').sink_parquet( 'events/', partition_by=['date'], ) # 生成:events/date=2024-01-01/data_0.parquet ... ``` ```python # 之后查询某一天的数据自动 prune 到该分区 pl.scan_parquet('events/').filter(pl.col('date') == '2024-01-01') ``` 巨大效率提升 —— 比扫整个 50GB 高几个数量级。 ### join 大表 ```python # 50GB 主表 join 100MB 维度表 result = ( pl.scan_parquet('events.parquet') .join(pl.scan_csv('cities.csv'), on='city_id', how='left') .group_by('city_name') .agg(pl.len().alias('events')) .collect(streaming=True) ) ``` polars 自动选 hash join(小表 build hash,大表 stream probe), 内存友好。 ### 性能 vs pandas / dask 50GB CSV → `count by city`: | 工具 | 时间 | 内存峰值 | 写代码量 | |---|---|---|---| | pandas chunked | 25 min | ~6 GB | 30 行(手动累加) | | dask | 12 min | ~8 GB | 8 行 | | **polars lazy + streaming** | 8 min | ~4 GB | 6 行 | | **polars on parquet** | 30 s | ~2 GB | 6 行 | 转 Parquet 一次后所有后续查询都飞快。 ## 效果 - 50GB CSV 单机能跑通(之前要上集群) - 探索性分析迭代速度 25 倍快 - 代码 30 行 → 6 行,可维护性大幅提升 - 后续团队类似分析都按"CSV → Parquet → polars query"模板做 ## 高级 tip ### 流式写入 巨大计算结果直接落盘,不全部放内存: ```python (pl.scan_csv('events.csv') .filter(pl.col('amount') > 100) .sink_parquet('high_value.parquet')) ``` `sink_*` 系列函数都是流式写。 ### profile 查询 ```python import time t0 = time.time() result = q.collect(streaming=True) print(f'took {time.time()-t0:.1f}s') # 更细:每个 node 时间 result, profile = q.profile() print(profile) ``` ### 字符串列消耗内存大 ```python df = df.with_columns( pl.col('country').cast(pl.Categorical), # 像 pandas categorical ) ``` categorical 把字符串映射到 int,对低基数字段省 80% 内存。 ## 踩过的坑 1. **`streaming=True` 不是所有操作都支持**:极个别 window 操作目前 还回落到 in-memory。运行时如果没真的流式,看 explain 输出会有 "STREAMING" 字样标记。 2. **多文件 scan 时 schema 不一致**:`scan_csv('logs/*.csv')` 假设所有 文件 schema 一样。第 N 个文件多一列就出错。`infer_schema_length=10000` 或显式指定 schema。 3. **`group_by(['col1', 'col2'])` 在 streaming 模式不一定流式**: 分组键基数极高(如 user_id)时内存爆。改成"先按 user_id hash 分桶 再聚合"的策略。 4. **写 Parquet 时 dtype 不对**:polars `null` 列写 Parquet 是 null type, 其它工具读不出来。`cast` 到具体类型再写。 5. **collect 后切回 eager 操作**:`collect()` 返回 `DataFrame`, 不再是 lazy。后续操作如果想再 lazy 就 `.lazy()`。

PyTorch 模型 INT8 量化:模型小 4x、推理快 2-4x、精度损失 < 1%

## 起因 要把一个 30M 参数的 ResNet 部署到 ARM 手机上。FP32 模型 120 MB + 推理慢得卡顿。INT8 量化把模型缩到 30 MB + 推理快 3 倍,精度只掉 0.5%。 深度学习模型 INT8 量化已成熟,几行代码搞定。 ## 三种量化策略 ### A. Dynamic quantization:仅 weight 量化,最简单 ```python import torch from torchvision.models import resnet50 model = resnet50(pretrained=True).eval() # 一行:把所有 Linear 层 weight 量化到 INT8 quantized = torch.quantization.quantize_dynamic( model, {torch.nn.Linear, torch.nn.LSTM, torch.nn.RNN}, dtype=torch.qint8, ) torch.save(quantized.state_dict(), 'resnet50_int8.pt') ``` **适合**:BERT / transformer / RNN 类(Linear 主导)。 **不适合**:CNN(Conv2d 占比大,dynamic 不量化它)。 ### B. Static quantization:weight + activation 全量化 需要 calibration(用代表性数据跑一遍找 activation 范围): ```python import torch import torch.ao.quantization as Q model = MyModel().eval() # 1. 准备:插入 observer 收 activation 统计 model.qconfig = Q.get_default_qconfig('fbgemm') # x86; 'qnnpack' for ARM model_prepared = Q.prepare(model, inplace=False) # 2. Calibration: 跑 ~100-1000 张代表性图片 with torch.no_grad(): for img in calibration_loader: model_prepared(img) # 3. Convert: observer → 实际 quant op model_int8 = Q.convert(model_prepared, inplace=False) torch.save(model_int8.state_dict(), 'resnet50_static_int8.pt') ``` 效果通常 比 dynamic 更激进,**全模型 INT8**。 代价:要 calibration data + 模型架构必须支持(含 Conv-BN fuse 等)。 ### C. Quantization-aware training (QAT):训练时模拟量化 精度损失最小(< 0.5%)但要重训: ```python model.qconfig = Q.get_default_qat_qconfig('fbgemm') model_prepared = Q.prepare_qat(model, inplace=False) # 训练(模型在前向时模拟 INT8 round 噪声) for epoch in range(5): for x, y in train_loader: loss = criterion(model_prepared(x), y) loss.backward() optimizer.step() model_int8 = Q.convert(model_prepared.eval()) ``` QAT 适合:精度对生产关键的模型,可承受 5-20 epoch 重训。 ## 实测对比(ResNet50 + ImageNet val) | | 大小 | CPU 推理(ms/img) | top-1 acc | |---|---|---|---| | FP32 | 98 MB | 65 | 76.13% | | Dynamic INT8 | ~95 MB | 65 | 76.13% (Linear 没主导) | | Static INT8 | 25 MB | 28 | 75.84% | | QAT INT8 | 25 MB | 28 | 76.02% | CNN 类 static / QAT 显著有效。BERT 类 dynamic 也能 4x 小 + 2-3x 快。 ## ONNX Runtime + INT8(生产推荐) PyTorch 量化导出 ONNX 后用 ONNX Runtime 跑,性能 / 跨平台都更好: ```python import torch from torch.ao.quantization import quantize_dynamic q_model = quantize_dynamic(model, {torch.nn.Linear}, dtype=torch.qint8) dummy = torch.randn(1, 3, 224, 224) torch.onnx.export(q_model, dummy, 'model_int8.onnx', opset_version=13) ``` 或者直接 ONNX Runtime 的量化工具(更稳): ```python from onnxruntime.quantization import quantize_dynamic, QuantType quantize_dynamic( model_input='model_fp32.onnx', model_output='model_int8.onnx', weight_type=QuantType.QInt8, ) ``` ONNX Runtime 在 ARM / x86 / Apple Silicon 都有 INT8 优化 kernel。 ## bitsandbytes:LLM 用 4-bit / 8-bit quantization ```bash uv add bitsandbytes accelerate ``` ```python from transformers import AutoModelForCausalLM, BitsAndBytesConfig bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type='nf4', bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) model = AutoModelForCausalLM.from_pretrained( 'meta-llama/Llama-3.1-70B-Instruct', quantization_config=bnb_config, device_map='auto', ) ``` 70B 模型从 140 GB → 35 GB。单 A100 80GB 或 4090 + offload 跑得动。 精度损失:相对 FP16 通常 < 1% benchmark(NF4 比 INT4 更稳)。 ## GPTQ / AWQ:post-training quantization for LLM 针对 LLM 优化的 4-bit 量化算法(比 bitsandbytes NF4 更好): ```python # GPTQ from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig quantize_config = BaseQuantizeConfig(bits=4, group_size=128) model = AutoGPTQForCausalLM.from_pretrained( 'meta-llama/Llama-3.1-8B', quantize_config=quantize_config, ) model.quantize(calibration_dataset) model.save_quantized('llama-3.1-8b-4bit-gptq') ``` ```python # AWQ from awq import AutoAWQForCausalLM model = AutoAWQForCausalLM.from_pretrained('llama-3.1-8b') model.quantize(tokenizer, quant_config={...}) ``` 社区已有大量预量化的 GPTQ / AWQ model 在 HuggingFace(搜 `-GPTQ-4bit` / `-AWQ` 后缀)。直接下载用,省得自己量化。 ## 部署侧:vLLM / llama.cpp 用量化模型 ```bash # vLLM vllm serve TheBloke/Llama-3.1-70B-AWQ --quantization awq # llama.cpp(CPU / Apple Silicon Metal 极快) llama-cli -m llama-3.1-8b.Q4_K_M.gguf -p 'hello' ``` llama.cpp GGUF 格式包含量化(Q4_K_M / Q5_K_M / Q8_0 等), Mac M 系列上 8B 模型 30+ tokens/s。 ## 效果 我们的几个生产模型量化后: | 模型 | 之前 | 量化后 | 精度损失 | |---|---|---|---| | ResNet50 (移动 app) | 98 MB / 65ms | 25 MB / 22ms | -0.3% | | BERT-base (后台) | 440 MB / 80ms | 110 MB / 30ms | -0.5% | | Llama 7B (RAG) | 14 GB / 100 token/s | 4 GB / 230 token/s | < 1% | 移动 / 边缘 / CPU 推理场景量化几乎是必做。 ## 几个陷阱 1. **量化前 fuse 模块**: ```python torch.ao.quantization.fuse_modules( model, [['conv', 'bn', 'relu']], inplace=True) ``` Conv-BN-ReLU 合成一个 op 后量化效果更好。漏 fuse 精度可能掉 2-5%。 2. **observer 范围错**:calibration 数据不代表 production → activation 范围估计错 → 量化 clip 严重。calibration 一定用真实分布数据。 3. **某些 layer 不能量化**:softmax / layernorm 等保留 FP32。 `model.qconfig = ...` 全局设后,对这些 layer 显式 `qconfig=None`。 4. **不同硬件 backend**:`fbgemm` 是 x86 优化,`qnnpack` 是 ARM 优化。 部署目标错了性能差 2-5 倍。 5. **量化后调试难**:bug 是模型本身的还是量化引入的?保留 FP32 reference 模型对比每层 activation 找漂移最大的 layer。 ## 总结 | 场景 | 推荐 | |---|---| | BERT / transformer post-hoc | dynamic INT8 | | CNN 上 ARM / edge | static INT8 + QAT | | LLM 推理 | bitsandbytes NF4 / AWQ / GGUF | | 跨平台部署 | ONNX Runtime + INT8 | | 极致精度要求 | QAT | | 不想自己折腾 | 用社区预量化模型 | 量化是 ML 生产工程的标准动作,不做白扔 70% 推理性能。

DVC 给数据集做版本控制:数 GB 文件不进 git 也能 reproduce

## 起因 ML 项目的代码可以 git 管,但数据集(几百 MB / 几 GB / 几十 GB)不能进 git。 结果是"我两个月前那个 SOTA 实验用的是哪份数据?" 完全说不清。 git 只能 commit 一个 `data.csv` 指针 / README 描述,没法保证可复现。 `DVC` 把这个问题解了:在 git 里只 commit 一个"数据元信息文件"(指针 + hash),实际数据存对象存储 / S3 / SSH 服务器。`dvc pull` 拉对应 hash 的 数据,整套实验完全可复现。 ## 解决方案 ### 装 ```bash uv add 'dvc[s3]' # 后端用 S3;其它有 [gs] [azure] [ssh] [gdrive] # 或 brew install dvc dvc version ``` ### 初始化 在已有 git 仓库里: ```bash dvc init git commit -m 'init dvc' ``` DVC 在 `.dvc/` 建配置目录。 ### 配远程存储 ```bash dvc remote add -d myremote s3://my-bucket/dvc-storage dvc remote modify myremote endpointurl https://s3.example.com # 兼容 minio # AWS S3 凭据走 ~/.aws/credentials 或环境变量 git commit .dvc/config -m 'add s3 remote' ``` ### 添加数据 ```bash dvc add data/train.parquet # 输出:data/train.parquet.dvc + .gitignore 更新 git add data/train.parquet.dvc data/.gitignore git commit -m 'add train dataset v1' # 推送数据到 S3 dvc push ``` `.dvc/cache/` 是本地缓存。`data/train.parquet.dvc` 文件长这样: ```yaml outs: - md5: 6c7a3b8d9e2f4a1c5b8e7d6f3a2c4b5e size: 234567890 path: train.parquet ``` 只 28 行 YAML 进 git,原始几百 MB 数据进 S3。 ### 别人 / 别机器拉 ```bash git clone my-project cd my-project dvc pull # 自动拉所有 .dvc 文件对应的数据 ``` 切某个 git commit → `dvc pull` 自动拉那个版本的数据。 ### Pipeline(DAG) DVC 的核心还在于 pipeline 定义 + 自动 cache。`dvc.yaml`: ```yaml stages: prep: cmd: python src/prep.py deps: - src/prep.py - data/raw.parquet outs: - data/clean.parquet train: cmd: python src/train.py --data data/clean.parquet --out models/model.pt deps: - src/train.py - data/clean.parquet params: - lr - epochs outs: - models/model.pt metrics: - metrics.json eval: cmd: python src/eval.py deps: - models/model.pt - data/test.parquet metrics: - eval.json ``` 跑: ```bash dvc repro # 跑全 pipeline,按依赖关系 + 自动跳过未变 stage dvc repro train # 只跑 train + 下游 dvc dag # 看 DAG ``` `params.yaml`: ```yaml lr: 0.001 epochs: 10 ``` 改 `params.yaml` 里某个值 → `dvc repro` 只重跑受影响 stage。 没改的 stage 命中 cache 秒级返回。 ### 实验对比 ```bash dvc exp run -S lr=0.005 -S epochs=20 dvc exp run -S lr=0.001 -S epochs=30 dvc exp show # 表格对比所有实验的 params + metrics ``` `-S` 临时改参数。每次 exp 产生独立分支,不污染 main。 ## 效果 - 数据从 git 中消失(仓库从 5 GB 降到 12 MB) - 切换数据版本 = git checkout + dvc pull,分钟级 - 多人同时改不同 stage 不冲突(每人本地 cache 各自命中) - "上次 SOTA 用的是哪份数据" 永远答得清 - CI 里 `dvc pull` + `dvc repro` 复现实验 ## 与 git-lfs / DataLad 对比 | | git-lfs | DVC | DataLad | |---|---|---|---| | 数据大文件 | ✅ | ✅ | ✅ | | Pipeline | ❌ | ✅ | ❌ | | 实验跟踪 | ❌ | ✅ | ❌ | | 多 backend | 仅 GitHub LFS | S3 / GCS / SSH / 多 | 多 | | 学习曲线 | 低 | 中 | 高 | 只存数据用 git-lfs;要做 ML pipeline 用 DVC。 ## 踩过的坑 1. **第一次 `dvc add` 大文件慢**:要算 md5 + 复制到 .dvc/cache/。 `dvc config cache.type symlink` 用软链不复制,省时省空间但 `.dvc/cache/` 不能跨文件系统。 2. **没 push 就 commit + push git**:别人 `git pull` + `dvc pull` 拉不到 数据。养成 `dvc push` 在 `git push` 前的习惯。 3. **.dvc/cache 占满磁盘**:本地保留所有版本 + 多分支切换累积。 `dvc gc --workspace` 清掉工作区当前 commit 用不到的; `dvc gc --all-commits` 极致清。 4. **multiple users 写同一 stage**:`dvc lock` 防止并发 repro 撞车。 或者 stage outputs 必须确定,random_seed 要固定。 5. **大数据集多人 train**:每人都 `dvc pull` 几十 GB 浪费带宽。可以 配 dvc 远程在共享 NFS 上,所有人挂载到本地 `.dvc/cache/` 共享, 配 cache.type=symlink。

LLM function calling:让模型可靠地调你的工具(不是字符串解析)

## 起因 要做一个"自然语言查数据库"的功能。用户问"上周日北京下单的用户有几个?" → LLM 生成 SQL → 后端执行 → 返结果。 最原始做法是让 LLM 生成 SQL 字符串然后 regex 提取。痛点: - 模型有时输出 ```sql ... ``` markdown 包裹 - 有时多输出一段"分析这个 SQL..." 散文 - 有时 SQL 语法错(缺逗号、wrong table) - parse 失败要 try/except 重试 `function calling`(OpenAI 起的名,Anthropic 叫 tool use)让模型 **直接结构化输出"调用什么函数 + 什么参数"**,零解析。 ## 解决方案:tool calling ### 定义工具 ```python from openai import OpenAI client = OpenAI() tools = [ { 'type': 'function', 'function': { 'name': 'run_sql', 'description': '在分析数据库上执行 SELECT SQL,返回最多 50 行。' '只允许 SELECT;DELETE/UPDATE/INSERT 会被拒绝。', 'parameters': { 'type': 'object', 'properties': { 'sql': { 'type': 'string', 'description': 'PostgreSQL 标准 SQL,必须以 SELECT 开头', }, 'explanation': { 'type': 'string', 'description': '一句话解释这个 SQL 在做什么', }, }, 'required': ['sql', 'explanation'], }, }, }, { 'type': 'function', 'function': { 'name': 'list_tables', 'description': '列出可用表名', 'parameters': {'type': 'object', 'properties': {}, 'required': []}, }, }, ] system_prompt = """ 你是一个数据分析助手。回答用户问题前,可能需要: 1. 用 list_tables 看有哪些表 2. 用 run_sql 查数据 数据 schema: - users(id, email, country, created_at, plan) - orders(id, user_id, amount, city, ordered_at, status) - products(id, name, category, price) 回答用户用中文。 """ def chat(messages): return client.chat.completions.create( model='gpt-4o', messages=messages, tools=tools, ) ``` ### 调用循环 ```python def run(user_question: str): messages = [ {'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': user_question}, ] while True: resp = chat(messages) msg = resp.choices[0].message # 模型决定调函数 if msg.tool_calls: messages.append(msg) # assistant turn for tc in msg.tool_calls: result = dispatch(tc.function.name, json.loads(tc.function.arguments)) messages.append({ 'role': 'tool', 'tool_call_id': tc.id, 'content': json.dumps(result), }) # 继续下一轮,让模型看 tool 结果再决定 continue # 模型给最终答案 return msg.content ``` ### 实现 dispatch(真的执行 SQL) ```python import psycopg def dispatch(name, args): if name == 'list_tables': return list_tables() if name == 'run_sql': return run_sql(args['sql']) return {'error': f'unknown function: {name}'} def list_tables(): with psycopg.connect(DB_URL) as conn: cur = conn.execute(""" SELECT table_name, obj_description(...) FROM information_schema.tables WHERE table_schema='public' """) return [{'table': r[0], 'desc': r[1]} for r in cur.fetchall()] def run_sql(sql: str): if not sql.strip().upper().startswith('SELECT'): return {'error': 'only SELECT allowed'} try: with psycopg.connect(DB_URL, autocommit=False) as conn: conn.execute('SET statement_timeout=10000') # 10s 限时 cur = conn.execute(sql) rows = [dict(zip([c[0] for c in cur.description], r)) for r in cur.fetchmany(50)] return {'rows': rows, 'count': len(rows)} except Exception as e: return {'error': str(e)} ``` ### 跑一下 ```python print(run('上周日北京下单的用户有几个?')) # 输出: # 上周日(2024-05-19)在北京下单的用户共 47 个。 ``` 模型自动: 1. 调 `list_tables()` 看有哪些 2. 调 `run_sql('SELECT COUNT(DISTINCT user_id) FROM orders WHERE city=...')` 3. 拿到结果后用自然语言回答 整套流程**无字符串解析**——arguments 已经是 typed JSON。 ## 几个重要细节 ### 1. 多 tool 同时调 ```python if msg.tool_calls: # 可能一次调 2-3 个函数 for tc in msg.tool_calls: ... ``` 模型可能并行调 list_tables + run_sql。要 loop 处理所有。 ### 2. 防恶意 SQL 工具签名再严格也挡不住"DROP TABLE users; --" 写在 SQL 字符串里。 在 dispatch 层做实际验证: - 只允许 SELECT 开头 - 用只读 DB 用户(无 DDL / DML 权限) - `SET statement_timeout=N` 限时长 - ACL 限 schema / table 访问 - 用 SQLAlchemy `text(sql)` + 参数化(更难做) 或者更激进:sandbox 跑(DuckDB on read-only data copy)。 ### 3. 限制循环次数 模型可能死循环调工具。限步数: ```python for step in range(10): resp = chat(messages) if not msg.tool_calls: return msg.content # ... raise RuntimeError('exceeded 10 tool-use steps') ``` ### 4. parallel tool call OpenAI 默认开 parallel;要禁用: ```python resp = client.chat.completions.create(..., parallel_tool_calls=False) ``` 复杂任务有 dependency 时禁用更稳。 ### 5. tool_choice 强制 ```python client.chat.completions.create( ..., tool_choice={'type': 'function', 'function': {'name': 'run_sql'}}, ) ``` 强制本轮调某个函数(不让模型 freestyle 直接答)。 ## Anthropic / Gemini / Ollama 也支持 API 风格略不同但概念一致。 ```python # Anthropic import anthropic client = anthropic.Anthropic() resp = client.messages.create( model='claude-sonnet-4-5', max_tokens=1024, tools=[{'name': 'run_sql', 'description': '...', 'input_schema': {...}}], messages=[{'role': 'user', 'content': '...'}], ) # stop_reason='tool_use' 时遍历 content blocks ``` ```python # Ollama (qwen2.5 等支持 function calling) import ollama resp = ollama.chat( model='qwen2.5:7b', messages=[...], tools=[{'type': 'function', 'function': {...}}], ) ``` 跨家 LLM 工具调用接口已经形成事实标准(OpenAI 格式被大部分模仿)。 ## 实际应用场景 1. **数据库 query agent**(上面例子) 2. **代码 review bot**:tool 是 read_file / list_files / run_tests 3. **客服 agent**:lookup_order / refund / escalate 4. **DevOps agent**:check_deployment / rollback / fetch_logs 5. **RAG with citations**:search_docs / fetch_doc tool 任何"模型需要查外部信息再回答" 都适合。 ## 与 LangChain / LlamaIndex 的关系 LangChain `create_react_agent` / `create_tool_calling_agent` 是上面 循环的封装: ```python from langchain.agents import create_tool_calling_agent, AgentExecutor from langchain_openai import ChatOpenAI from langchain.tools import tool @tool def run_sql(sql: str) -> str: """Execute SELECT SQL, return rows.""" return run_sql_impl(sql) llm = ChatOpenAI(model='gpt-4o') agent = create_tool_calling_agent(llm, [run_sql], prompt) executor = AgentExecutor(agent=agent, tools=[run_sql], max_iterations=10) executor.invoke({'input': '上周日北京...'}) ``` 封装方便但藏了细节。简单场景手写循环更可控;复杂 agent 用 LangChain 省事。 ## 效果 我们的 SQL agent 上线后: - 业务团队不再用 Metabase 拖拽,直接问中文 - "为什么这个客户流失了" 类自由查询能 90% 准确给出 SQL + 结果 - function calling 解析失败率:0%(结构化 output) - vs 之前用 regex 提取 SQL 字符串:~12% 失败要重试 ## 踩过的坑 1. **参数 schema 错**:模型按 schema 生成参数,schema 不严会乱传。 `required` / `enum` / `type` 都明确写。 2. **大 tool 数 → 模型 confused**:超过 ~10 个 tool 后模型选错率 上升。分层:top-level "router" → 选 sub-agent → sub-agent 有 3-5 个 tool。 3. **tool 实现 crash 抛异常**:返 `{'error': str(e)}` 让模型看见, 模型会 retry / 换策略。直接 raise 就只能上层 catch。 4. **token 成本**:tool description + schema 占 system prompt 不少 token。每次请求都付。优化:精简 description / 用 short tool name。 5. **流式(streaming)+ tool**:streaming response 中 tool_call chunks 是分段的,要 buffer 后再 parse。复杂场景非流式更稳。

Hugging Face transformers 微调 BERT 做文本分类(最小流程)

预训练 + 微调是 NLP 的标准范式。Hugging Face `transformers` 把 模型 / tokenizer / 训练循环都封装好,从原始数据到训练好的分类器 只需 < 50 行代码。 下面用 IMDB 影评数据微调 BERT 做二分类(正面 / 负面)。 ## 装 ```bash uv add 'transformers[torch]' datasets accelerate evaluate ``` ## 数据 ```python from datasets import load_dataset ds = load_dataset('imdb') print(ds) # DatasetDict({ # train: Dataset({ features: ['text', 'label'], num_rows: 25000 }) # test: Dataset({ features: ['text', 'label'], num_rows: 25000 }) # unsupervised: ... # }) # label: 0 = neg, 1 = pos print(ds['train'][0]) ``` ## tokenizer ```python from transformers import AutoTokenizer MODEL = 'distilbert-base-uncased' # 比 bert-base 小 40%,效果差几个点但快 tok = AutoTokenizer.from_pretrained(MODEL) def tokenize(batch): return tok(batch['text'], truncation=True, max_length=256, padding='max_length') ds_tok = ds.map(tokenize, batched=True) ds_tok = ds_tok.remove_columns(['text']) ds_tok = ds_tok.rename_column('label', 'labels') ds_tok.set_format('torch') ``` `truncation=True` 截到 256 token;BERT 最大 512,但短点训练快得多。 ## 模型 ```python from transformers import AutoModelForSequenceClassification model = AutoModelForSequenceClassification.from_pretrained(MODEL, num_labels=2) ``` `from_pretrained` 自动下载预训练权重 + 加上一个新的 classification head。 ## Trainer:30 行训练循环 ```python from transformers import TrainingArguments, Trainer import evaluate import numpy as np metric = evaluate.load('accuracy') def compute_metrics(eval_pred): logits, labels = eval_pred preds = np.argmax(logits, axis=1) return metric.compute(predictions=preds, references=labels) args = TrainingArguments( output_dir='./out', num_train_epochs=2, per_device_train_batch_size=16, per_device_eval_batch_size=32, learning_rate=2e-5, weight_decay=0.01, warmup_steps=500, logging_steps=50, eval_strategy='epoch', save_strategy='epoch', load_best_model_at_end=True, metric_for_best_model='accuracy', fp16=True, # GPU 时开混合精度 report_to='wandb', # 可选:wandb 跟踪 ) trainer = Trainer( model=model, args=args, train_dataset=ds_tok['train'].shuffle(seed=42).select(range(10000)), eval_dataset=ds_tok['test'].select(range(2000)), tokenizer=tok, compute_metrics=compute_metrics, ) trainer.train() trainer.evaluate() ``` 10k 训练样本 / 2k 验证样本,DistilBERT,单个 RTX 3090 上 ~5 分钟训完, 准确率约 91-92%。 全量 25k 训 3 个 epoch 能到 93-94%。 ## 保存 + 加载 + 推理 ```python trainer.save_model('./imdb-distilbert') tok.save_pretrained('./imdb-distilbert') # 加载推理 from transformers import pipeline classifier = pipeline('sentiment-analysis', model='./imdb-distilbert', device=0) # device=0 = cuda:0; -1 = cpu print(classifier('This movie was absolutely fantastic')) # [{'label': 'LABEL_1', 'score': 0.998}] print(classifier('What a complete waste of time and money')) # [{'label': 'LABEL_0', 'score': 0.997}] ``` `label` 是模型内部的,可以在训练时设标签名: ```python model = AutoModelForSequenceClassification.from_pretrained( MODEL, num_labels=2, id2label={0: 'NEGATIVE', 1: 'POSITIVE'}, label2id={'NEGATIVE': 0, 'POSITIVE': 1}, ) ``` 之后 pipeline 直接返回 'POSITIVE' / 'NEGATIVE'。 ## 中文文本? ```python MODEL = 'bert-base-chinese' # 谷歌中文 BERT # 或: MODEL = 'hfl/chinese-roberta-wwm-ext' # 哈工大 RoBERTa 全词 mask # 或: MODEL = 'IDEA-CCNL/Erlangshen-Roberta-110M-Sentiment' # 已经是情感分类微调过的 ``` 中文 tokenizer 是字级 BPE(不像英文按 subword),输出结构一样。 ## 推到 Hugging Face Hub(可选) ```bash huggingface-cli login ``` ```python trainer.push_to_hub('your-username/imdb-distilbert') # 之后任何人能: # AutoModelForSequenceClassification.from_pretrained('your-username/imdb-distilbert') ``` ## 减小内存:LoRA 完整微调 DistilBERT 已经够轻;微调 7B+ 模型时显存装不下,用 LoRA: ```bash uv add peft ``` ```python from peft import LoraConfig, get_peft_model, TaskType lora_config = LoraConfig( task_type=TaskType.SEQ_CLS, r=8, lora_alpha=16, lora_dropout=0.1, target_modules=['q_lin', 'v_lin'], ) model = get_peft_model(model, lora_config) model.print_trainable_parameters() # trainable params: 0.6M / 67M = 0.9% ``` 之后照常 `Trainer`。LoRA 让显存 / 速度大幅降低,性能差距通常 < 1%。 ## 推理优化(生产) - **量化**:`bitsandbytes` int8 / int4,2-4x 速度,半精度差几个点 - **ONNX 导出 + ONNX Runtime**:CPU 推理 2-3x 加速 - **TGI** (Text Generation Inference) / **vLLM**:高并发 LLM 推理 - **TorchServe**:通用 PyTorch 推理服务 简单分类任务直接用 `pipeline`;高并发用 TGI / TorchServe。 ## 评估更精细 ```python import evaluate metrics = evaluate.combine(['accuracy', 'f1', 'precision', 'recall']) def compute_metrics(eval_pred): logits, labels = eval_pred preds = np.argmax(logits, axis=1) return metrics.compute(predictions=preds, references=labels) ``` ## 踩过的坑 - `padding='max_length'` 把所有样本 pad 到 256 → 浪费计算。改成 `padding=True`(动态 pad 到 batch 内最长),训练快 2-3x。 - learning rate 太大:BERT 微调通常 2e-5 ~ 5e-5。1e-4 已经偏大, 常导致 loss NaN。 - 数据集 label 顺序:HuggingFace `load_dataset('imdb')` 是按 label 排序的,不 shuffle 训练会先看完所有 negative 再看 positive, loss 曲线奇形怪状。`shuffle(seed=42)` 必加。 - `fp16=True` 在 BF16 友好的硬件(A100 / H100)改 `bf16=True` 更稳。 老 V100 / GTX 用 fp16。

用 BentoML 把训好的 PyTorch 模型变成可调用的 HTTP API

## 起因 模型训完了 `.pt` 文件躺在硬盘上。要让前端 / 移动端 / 别的服务能用它, 需要包成 REST API。手写 Flask / FastAPI 包一遍是 100 行 boilerplate (加载模型 + parse 输入 + tensor 转 numpy + 错误处理 + batching)。 做几个模型这种工作就极乏味。 BentoML 把 ML 模型 → 生产 service 的过程标准化:写一个 service 文件, 自动生成 HTTP / gRPC / OpenAPI / Docker。 ## 解决方案 ### 装 ```bash uv add bentoml torch torchvision pillow ``` ### 模型仓库(model store) ```python # save_model.py import bentoml import torch from torchvision.models import resnet50, ResNet50_Weights model = resnet50(weights=ResNet50_Weights.DEFAULT).eval() bento_model = bentoml.pytorch.save_model('resnet50', model) print(bento_model) # Model(tag="resnet50:abc123...") ``` 模型存到本地 `~/bentoml/models/`,分 tag 版本化。 团队共享用 `bentoml push / pull` 配 BentoCloud 或自建 S3。 ### service.py(核心) ```python import bentoml from bentoml.io import Image, JSON from PIL import Image as PILImage import torch from torchvision import transforms resnet = bentoml.pytorch.get('resnet50:latest').to_runner() svc = bentoml.Service('image_classifier', runners=[resnet]) preprocess = transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) with open('imagenet_classes.txt') as f: LABELS = [line.strip() for line in f] @svc.api(input=Image(), output=JSON()) async def predict(img: PILImage.Image) -> dict: x = preprocess(img).unsqueeze(0) logits = await resnet.async_run(x) probs = torch.softmax(logits, dim=1)[0] top5 = torch.topk(probs, k=5) return [ {'label': LABELS[idx], 'prob': prob.item()} for idx, prob in zip(top5.indices.tolist(), top5.values.tolist()) ] ``` 要点: - `to_runner()`:让 BentoML 管理 model 生命周期 + 并发 batch - `@svc.api(input=Image(), output=JSON())`:声明输入输出,自动 生成 OpenAPI doc + 验证 - `async run`:BentoML 在 worker 进程里跑 model,业务进程 async 协程 ### 本地启动 ```bash bentoml serve service:svc --reload # uvicorn 起来 http://localhost:3000 # http://localhost:3000/docs 自动 Swagger ``` 调用: ```bash curl -X POST http://localhost:3000/predict \ -H 'Content-Type: image/jpeg' \ --data-binary @cat.jpg # [{"label": "Egyptian cat", "prob": 0.84}, ...] ``` ### Adaptive batching(自动批处理) `runner` 默认开启 batching:单条请求进来时 hold ~10ms 等更多请求, 合并 batch 一次 forward,吞吐量直接翻几倍。 ```python runner = bentoml.pytorch.get('resnet50:latest').to_runner( method_configs={'__call__': { 'max_batch_size': 32, 'max_latency_ms': 100, }}, ) ``` 业务代码无感知。 ### 打包成 Bento + Docker ```bash # bentofile.yaml service: 'service:svc' include: - 'service.py' - 'imagenet_classes.txt' python: packages: - torch - torchvision - pillow models: - resnet50:latest ``` ```bash bentoml build # 生成 ~/bentoml/bentos/image_classifier/<tag> bentoml containerize image_classifier:latest # 生成 docker image image_classifier:latest docker run -p 3000:3000 image_classifier:latest ``` 镜像里包含:Python + 依赖 + service code + 模型权重。直接部署。 ### K8s 部署(BentoML Yatai) ```bash bentoml deployment create my-deploy \ --bento image_classifier:latest \ --cluster prod ``` 或者用 Yatai operator,K8s 原生 CRD 管理 Bento。 ## 效果 - 训完 model → 上生产 API 从 2 天 → 2 小时 - 自动 batch 让单 GPU 吞吐量翻 4 倍 - OpenAPI 文档自动生成,前端不再追着问 schema - 多版本管理 / canary deploy 都是 framework 原生支持 - 监控 metrics 自动暴露 /metrics endpoint 给 Prometheus ## 与替代品对比 | | BentoML | TorchServe | Triton | 自己写 FastAPI | |---|---|---|---|---| | 学习曲线 | 中 | 中 | 高 | 低 | | 多框架 | ✅ | 主 PyTorch | ✅ | N/A | | 自动 batching | ✅ | ✅ | ✅ | 需自写 | | Docker / K8s | ✅ | ✅ | ✅ | 需自写 | | 模型仓库 | ✅ | ✅ | ✅ | 需自建 | | 简单 API | 中 | 中 | 复杂 | 极简 | 复杂 ML 系统选 BentoML / Triton;单模型 < 100 QPS 自己写 FastAPI 更轻量。 ## 踩过的坑 1. **runner 进程模型加载慢**:cold start 几秒-几十秒。生产用 `bentoml serve --production --workers 4 --runners 2 ...` 提前 warmup。 2. **adaptive batching 引入延迟**:单条请求 P99 可能 > 100ms(等其它 请求凑 batch)。低 QPS 场景关 batching:`max_batch_size=1`。 3. **input/output schema 不严格**:默认 JSON 接受任意结构。生产用 pydantic `JSON(pydantic_model=MyInput)` 强校验。 4. **模型权重打包到镜像里**:镜像几 GB 推送慢。模型放 OSS / 启动时 下载更轻量;trade-off 是冷启动慢。 5. **GPU 不释放**:bentoml runner 进程退出时 GPU 偶尔被 PyTorch leaked。systemd 重启 service 时确保 `KillMode=control-group` 杀 所有子进程。

本地跑 Stable Diffusion:ComfyUI + 模型管理 + 工作流复用

## 起因 想生成产品配图,又不想把数据传 Midjourney / DALL-E。Stable Diffusion 是 开源文生图模型,本地一张 8GB+ 显存的卡能跑。 WebUI 老牌但 UI 笨重;ComfyUI 用 node-based 工作流(像 Blender 的节点编辑器), 更适合复杂 pipeline + 跨实验复用。 ## 解决方案 ### 装 ComfyUI ```bash git clone https://github.com/comfyanonymous/ComfyUI cd ComfyUI uv venv --python 3.12 source .venv/bin/activate # CUDA 12.x uv pip install torch torchvision --index https://download.pytorch.org/whl/cu124 uv pip install -r requirements.txt # 起服务 python main.py --listen 0.0.0.0 --port 8188 # 浏览器打开 http://localhost:8188 ``` 第一次进去是空白画布。 ### 装模型 ComfyUI 的目录结构: ``` ComfyUI/models/ ├── checkpoints/ # 基础模型 (.safetensors) ├── loras/ # LoRA 微调(风格 / 角色) ├── vae/ # VAE 解码器 ├── controlnet/ # ControlNet 模型 ├── upscale_models/ # 超分模型 └── embeddings/ # textual inversion ``` 从 [civitai.com](https://civitai.com) 或 HuggingFace 下: ```bash cd models/checkpoints # SDXL 1.0 base(6.5 GB) wget https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0/resolve/main/sd_xl_base_1.0.safetensors # 流行的写实风:Juggernaut XL(7 GB) # 流行的二次元:Pony Diffusion XL(7 GB) # 流行的快出图:SDXL Turbo(lightning 版,4 步出图) ``` ### 第一个工作流 ComfyUI 默认工作流:Load Checkpoint → CLIP Text Encode (Prompt) → CLIP Text Encode (Negative) → Empty Latent Image → KSampler → VAE Decode → Save Image. 右键画布 → Add Node → 选模块拖入。Connect 各节点 output → input。 prompt 例: ``` positive: a serene Japanese garden with cherry blossoms, golden hour, ultra detailed, 8k negative: ugly, blurry, watermark, low quality ``` KSampler 参数: - `steps`: 20-30(普通模型)/ 4-8(Turbo / Lightning) - `cfg`: 7(普通) / 1-2(Turbo) - `sampler_name`: dpmpp_2m_sde / euler_ancestral - `scheduler`: karras / normal `Queue Prompt` 跑一次。 ### 工作流保存 / 分享 `Save (JSON)` 把整个 graph 存成 `.json`。 团队里"哪个工作流出图最好" → JSON 文件共享,对方 `Load` 即可复现。 ComfyUI 还把生成的 PNG 嵌入了 metadata:图片本身包含生成它的完整 workflow。 拖任意 PNG 到 ComfyUI 画布 → 自动还原工作流。神奇。 ### LoRA 风格切换 ``` Load Checkpoint → Load LoRA → CLIP Text Encode → ... ``` `Load LoRA` 节点选 `pony_anime_v3.safetensors`,strength 0.7, 风格立刻切到该 LoRA 训练的风格。多个 LoRA 可以串联。 ### ControlNet(按草图 / 边缘 / 姿态约束) ``` Load Image → Canny Edge → Apply ControlNet (Advanced) ↓ Load Checkpoint → ... → KSampler (with controlnet conditioning) → ... ``` 可以从一张参考图提取边缘 / 姿态 / 深度,让生成图保持同样构图。 广告 / 产品设计常用:用线稿生成 100 种渲染风格。 ### 批量生成 `KSampler` 的 `batch_size` 设 4 → 一次跑出 4 张。 `Save Image` 节点自动命名 `00001.png`、`00002.png`。 `Queue Prompt` 多次连续跑: ``` Queue → 队列 5 → 跑完 5 批 = 20 张图 ``` 跑久后图片在 `ComfyUI/output/`。 ## 效果 - 本地 RTX 4090:SDXL 1024×1024 单图 ~5s(30 steps),batch 4 ~15s - SDXL Lightning:~1.5s 单图 - 月生成量 1k+ 张零成本 - 工作流文件版本化,"上次客户喜欢的那个风格" 一键复现 - 同组设计师共享 lora + 工作流 json,统一风格 ## 性能 tips - **--lowvram** flag 启动:8GB 卡能跑 SDXL(变慢但能跑) - **--use-pytorch-cross-attention**:xformers 不兼容时的替代 - **VAE FP16**:`--fp16-vae` 减半解码显存 - **TAESD**:`models/vae_approx/` 预览快但质量低,调试用 ## 与 WebUI / API 对比 | | A1111 WebUI | ComfyUI | 商业 API | |---|---|---|---| | UI | 表单 | Node graph | Prompt only | | 工作流复用 | 弱 | 极强 | 无 | | ControlNet / Inpaint | ✅ | ✅ | 部分 | | 复杂 pipeline | 难 | 极易 | 不行 | | 学习曲线 | 低 | 中 | 极低 | | 性能 | 好 | 更好(无 Gradio overhead) | N/A | 简单文生图选 WebUI;任何"图 → 处理 → 再处理 → 多阶段"的工作流选 ComfyUI。 ## 踩过的坑 1. **CUDA OOM 但 nvidia-smi 显示有显存**:PyTorch caching allocator 碎片。`--gpu-only` 全部组件放 GPU,或者 `--cpu-vae` 把 VAE 移 CPU (VAE decode 慢但省 2GB)。 2. **首次加载某 model 极慢**:safetensors 文件几 GB,第一次读盘 + 装入 GPU。后续 cache 命中秒级。 3. **不同 model 的最佳 prompt 不一样**:SDXL 跟 SD 1.5 prompt 风格完全 不同。每个 model 看 civitai 上的"showcase prompt"参考。 4. **negative prompt 误用**:写过多 negative(比如 "bad anatomy, ugly, blurry, low quality, worst quality, ...")反而效果差。3-5 个核心 negative 就够。 5. **生成的图有水印 / artifact**:训练数据里有 watermark 的 model 常产出"假水印"。换 model 或在 prompt 加 `(watermark:1.5)` 进 negative。

pandas 内存优化:dtype 收缩 / categorical / sparse 让 5GB → 800MB

## 起因 加载一份"用户行为日志" CSV,10M 行 × 30 列, `pd.read_csv('events.csv')` → 内存 5 GB 后 OOM。 机器只有 8 GB RAM。 查 `df.info(memory_usage='deep')`: ``` RangeIndex: 10000000 entries, 0 to 9999999 Data columns (total 30 columns): # Column Dtype MemUsage (MB) --- ------ ----- ------------- 0 event_type object 620 1 user_id int64 76 2 country object 580 3 device_type object 610 4 amount float64 76 ... dtypes: float64(8), int64(7), object(15) Memory: 5102 MB ``` 绝大多数内存被 string column 占用(pandas object dtype = Python str 对象数组,每个 string 40+ bytes 开销)。 5 个优化让同样数据降到 800 MB: ## 1. categorical:低基数字符串 ```python df['event_type'] = df['event_type'].astype('category') df['country'] = df['country'].astype('category') df['device_type'] = df['device_type'].astype('category') ``` categorical 把 string 映射到 int8/int16 + 一份 unique values 字典。 event_type 5 个值 + 10M 行 → 5 bytes / row 而非 60 bytes / row。 效果: ``` 0 event_type category 10 (从 620 MB) 2 country category 11 (从 580 MB) 3 device_type category 12 (从 610 MB) ``` 3 个 column 从 1.8 GB → 33 MB。 判断"该不该 categorical": ```python # 列的 unique 数 / 总行数 比例 df['country'].nunique() / len(df) # 0.00002 = 200 / 10M → 强烈 categorical # 0.5 = 5M unique / 10M → 不该 categorical(开销可能更大) # 一般 ratio < 0.05 时 categorical 受益 ``` `> 50%` 基数的 string column(如 URL / session_id)反而保 object 或 string[pyarrow]。 ## 2. 数值列收缩 dtype ```python df['user_id'].max() # 200_000_000 → int32 装得下 (max 2.1B) df['amount'].max() # 9999.99 → float32 够(默认 float64) df['hour_of_day'].max() # 23 → int8 (max 127) df = df.astype({ 'user_id': 'int32', 'amount': 'float32', 'hour_of_day': 'int8', 'minute_of_day': 'int8', }) ``` int64 → int32 减半;int8 减 8 倍。 float64 → float32 减半;某些数据(ML 特征)甚至 float16 也 OK。 辅助: ```python def shrink_ints(df): for col in df.select_dtypes(include=['int64']).columns: c = df[col] mx, mn = c.max(), c.min() if mn >= 0: if mx < 256: df[col] = c.astype('uint8') elif mx < 65536: df[col] = c.astype('uint16') elif mx < 4294967296: df[col] = c.astype('uint32') else: if -128 <= mn and mx < 128: df[col] = c.astype('int8') elif -32768 <= mn and mx < 32768: df[col] = c.astype('int16') elif -2147483648 <= mn and mx < 2147483648: df[col] = c.astype('int32') return df ``` 跑一次自动 downcast 所有 int。 ## 3. sparse for mostly-zero columns ```python # 90% 是 0 的 dummy variable column df['premium'] = df['premium'].astype('Sparse[int8, 0]') ``` 只存 non-zero 值 + 索引。 0.9M 个 0 + 0.1M 个 1 → 之前 10MB → 1MB。 适合 one-hot encoding 后的稀疏矩阵。 ## 4. parquet 替代 CSV 读 / 写 / 存都快得多: ```python df.to_parquet('events.parquet', compression='zstd') # 5GB CSV → 800MB Parquet(列存 + 压缩) df = pd.read_parquet('events.parquet') # 比 read_csv 快 5-10 倍 # 自动保留 dtype(包括 categorical) ``` read_csv 时也指定 dtype 避免 pandas 猜: ```python df = pd.read_csv('events.csv', dtype={ 'event_type': 'category', 'country': 'category', 'amount': 'float32', 'user_id': 'int32', }) ``` 避免 pandas 默认 int64 / float64 / object 后再转换的中间峰值。 ## 5. chunked read:分块处理 如果转完仍装不下,分块流式处理: ```python chunks = [] for chunk in pd.read_csv('events.csv', chunksize=100_000): chunk = optimize_dtypes(chunk) # 业务处理 / 聚合 result = chunk.groupby('user_id').agg(...) chunks.append(result) # 最后合并 final = pd.concat(chunks).groupby(...).sum() ``` 或者直接用 polars(前面有文章),原生支持流式 + 多核。 ## 6. PyArrow string 替代 object pandas 2.0+ 加了 `string[pyarrow]` 类型,比 object 省内存 + 快: ```python df['session_id'] = df['session_id'].astype('string[pyarrow]') ``` 适合"高基数 string" —— categorical 不划算的场景。 比 object 省 40-60% 内存 + groupby / sort 快 2-3x。 `pd.options.future.infer_string = True` 让 read_csv 默认用 string[pyarrow](pandas 3.0 将成默认)。 ## 实战脚本 ```python import pandas as pd def shrink(df): """通用 dtype 收缩 + categorical 自动检测""" df = df.copy() # 1. integer downcast for col in df.select_dtypes(include=['int64']).columns: df[col] = pd.to_numeric(df[col], downcast='integer') # 2. float downcast for col in df.select_dtypes(include=['float64']).columns: df[col] = pd.to_numeric(df[col], downcast='float') # 3. low-cardinality string → category for col in df.select_dtypes(include=['object']).columns: if df[col].nunique() / len(df) < 0.05: df[col] = df[col].astype('category') return df df = pd.read_csv('events.csv') print('before:', df.memory_usage(deep=True).sum() / 1e9, 'GB') df = shrink(df) print('after: ', df.memory_usage(deep=True).sum() / 1e9, 'GB') ``` 典型场景 5x 内存压缩。 ## 效果 我的 10M × 30 dataset: | | 内存 | |---|---| | 默认 read_csv | 5.1 GB | | + int / float downcast | 3.2 GB | | + categorical (3 cols) | 1.4 GB | | + string[pyarrow] (1 col) | 1.1 GB | | + sparse (2 cols) | 800 MB | 整套操作 < 30 秒 + 完全保留语义。后续 groupby / merge 也快得多 (小数据 = 快 cache 命中)。 ## 何时考虑换工具 pandas 优化到底后还是不够 → 切: - **polars**:原生流式 + 多核,比 pandas 快 5-30x - **DuckDB**:SQL 跑 Parquet / CSV,省内存 - **Dask**:pandas 类似 API + out-of-core + 集群 - **Spark**:超大数据集群 但单机 100GB 内 pandas + 这些技巧通常够。 ## 踩过的坑 1. **categorical 后 join 慢**:两个 df join 的 categorical 列要 "相同 categories" 否则 pandas 转回 object。 `df['x'].cat.set_categories(combined_cats)`。 2. **`pd.read_csv` 不带 dtype**:pandas 先全读 object / int64 / float64 占大量内存,read 完才转。**指定 dtype**:内存峰值降 2-3x。 3. **categorical 不能做某些操作**:`.str` accessor 在 categorical 上慢 / 不工作。需要时 `.astype('object').str.lower()`。 4. **sparse 与 numpy/sklearn 兼容性差**:很多 sklearn estimator 不 接受 SparseArray,要 `.to_dense()`。trade-off。 5. **string[pyarrow] 仍在演进**:pandas 2.x 部分功能(如 groupby) 仍回退到 object。看 changelog 跟进。

vLLM 部署一个高吞吐量 LLM 推理服务(PagedAttention)

直接用 HuggingFace transformers 跑 LLM 推理性能很差: batch 1 时 GPU 利用率 30-50%,多并发请求时显存碎片化 OOM。 vLLM 是伯克利出的高性能 LLM 推理引擎,核心技术是 **PagedAttention** (像 OS 分页一样管理 KV cache),加上 continuous batching, 比 transformers 直接推理快 5-24 倍。 ## 安装 ```bash uv add vllm # 需要 CUDA 11.8+ 或 12.x,PyTorch 2.x ``` ## 命令行起服务 ```bash uv run vllm serve Qwen/Qwen2.5-7B-Instruct \ --tensor-parallel-size 1 \ --max-model-len 8192 \ --gpu-memory-utilization 0.85 \ --port 8000 ``` 第一次启动会从 HuggingFace 下载模型(~15GB)。 启动后默认 OpenAI 兼容 API。 ## 调用 ```bash curl http://localhost:8000/v1/chat/completions \ -H 'Content-Type: application/json' \ -d '{ "model": "Qwen/Qwen2.5-7B-Instruct", "messages": [{"role": "user", "content": "你好"}], "max_tokens": 200 }' ``` 或 Python: ```python from openai import OpenAI client = OpenAI(base_url='http://localhost:8000/v1', api_key='dummy') resp = client.chat.completions.create( model='Qwen/Qwen2.5-7B-Instruct', messages=[{'role': 'user', 'content': '你好'}], max_tokens=200, ) print(resp.choices[0].message.content) ``` ## 关键参数 - `--max-model-len`:上下文最大长度(影响 KV cache 大小) - `--gpu-memory-utilization`:用多少显存(0-1,默认 0.9) - `--tensor-parallel-size`:多 GPU 时拆 tensor 并行(4 卡设 4) - `--quantization awq` / `gptq` / `fp8`:量化加速 - `--enable-prefix-caching`:相同前缀的请求复用 KV cache(系统 prompt 共享场景大幅加速) ## Python 直接调用(不走 HTTP) ```python from vllm import LLM, SamplingParams llm = LLM(model='Qwen/Qwen2.5-7B-Instruct', gpu_memory_utilization=0.85) prompts = [ '介绍一下 RAG', '解释 PagedAttention', '写一个 Python 二分查找', ] params = SamplingParams(temperature=0.3, max_tokens=300) outputs = llm.generate(prompts, params) for out in outputs: print('---') print(out.outputs[0].text) ``` vLLM 自动 batch 这 3 个 prompt 一起跑,单次 forward 处理多个序列。 ## continuous batching 的含义 传统推理: ``` batch 1: [seq A 100 tokens, seq B 80 tokens, seq C 60 tokens] 等三个序列都完成才能下一个 batch ``` continuous batching: ``` 任意时刻一个请求结束就立刻让出位置给新请求 GPU 持续吃满,无 idle ``` 这是 vLLM 高吞吐的核心,比"动态 batch"更激进。 ## benchmark:vs 直接 transformers ```bash # 100 个并发请求,每个生成 200 token # vLLM ab -n 100 -c 16 -p body.json -T application/json \ http://localhost:8000/v1/chat/completions # 通常:3000-8000 tokens/s 吞吐 # transformers + 简单 FastAPI 包装 # 通常:300-800 tokens/s ``` 10x 量级的吞吐差距。 ## 多卡:tensor parallelism 70B 模型单卡装不下,4 张 A100 拆开: ```bash uv run vllm serve meta-llama/Llama-3.1-70B-Instruct \ --tensor-parallel-size 4 \ --max-model-len 8192 ``` vLLM 自动用 NCCL 在 4 卡间分配 attention head / FFN 矩阵。 ## 量化:让更大模型跑在更小显卡 ```bash # AWQ 4-bit uv run vllm serve TheBloke/Llama-3.1-70B-AWQ \ --quantization awq # 4 bit 量化的 70B 大约 40GB 显存(不量化要 140GB) ``` ```bash # FP8 (需要 H100) uv run vllm serve meta-llama/Llama-3.1-70B-Instruct \ --quantization fp8 ``` ## 长上下文 ```bash # 32k 上下文 uv run vllm serve Qwen/Qwen2.5-7B-Instruct \ --max-model-len 32768 ``` 但 KV cache 占显存 = batch_size × max_seq_len × 每层 KV size。 32k context + 100 batch ≈ 显存吃 50%+,要 trade off。 ## 与 Hugging Face 模型生态 vLLM 支持的 model:Llama / Mistral / Qwen / Mixtral / Gemma / Yi / DeepSeek / Phi / Baichuan / ChatGLM 等几乎全部主流开源 LLM。 官方维护清单看 vLLM docs。 ## 与 sglang / lmdeploy 对比 | 引擎 | 优势 | 劣势 | |---|---|---| | vLLM | 生态最大、模型最多 | 长上下文性能一般 | | TGI (HF) | HF 官方、生产稳 | 吞吐略低于 vLLM | | sglang | 结构化生成(JSON / regex)极快 | 模型支持稍少 | | lmdeploy | 国内(商汤)、TurboMind 后端快 | 文档不全 | 通用选 vLLM;要求 JSON 严格输出选 sglang。 ## prefix caching:相同系统 prompt 复用 ```bash uv run vllm serve qwen2.5:7b --enable-prefix-caching ``` 所有请求都用 "You are a helpful assistant..." 起头的话, prefix 这部分的 KV cache 只算一次,10k token 系统 prompt 几乎免费。 ## 生产部署清单 1. 用 systemd 起 vLLM service 2. 前面套 nginx 反代(限流 + auth) 3. Prometheus 抓 vLLM 内置的 `/metrics` 4. health check:`/health` 5. 多模型用多个 vLLM 进程,每个绑不同 GPU ## 踩过的坑 - 启动时 "out of memory":`--gpu-memory-utilization` 调小, 或减 `--max-model-len`。 - 模型权重 download 慢:用 HuggingFace mirror 或预先下载, `HF_HUB_OFFLINE=1` 让 vLLM 不再尝试下载。 - TP > 1 时 NCCL 卡死:检查机器内 GPU 互联(PCIe / NVLink); 设 `NCCL_P2P_DISABLE=1` 排查。 - vLLM 0.5+ 跟 PyTorch 2.4+ 紧耦合,旧 PyTorch 装不上。`uv` 自动解析 依赖一般没问题,手动 pip 时容易翻车。

scikit-learn Pipeline + ColumnTransformer 把"训练泄漏"杀掉

## 起因 ML 新手最容易写出"data leakage"——把测试集的统计信息(均值 / 编码 / 缺失值填补)混到训练里。常见错法: ```python # ❌ 整个数据集上算均值标准差,然后切分 from sklearn.preprocessing import StandardScaler scaler = StandardScaler().fit(X) X = scaler.transform(X) X_train, X_test, y_train, y_test = train_test_split(X, y) ``` 测试集的均值"漏"进了 scaler,cross-validation 分数偏高,部署到真实 新数据立刻拉胯。 `Pipeline` + `ColumnTransformer` 强制把所有 preprocessing 关在 cv split 内部,自动避免泄漏。 ## 解决方案 ### 1. 简单 Pipeline ```python from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.model_selection import cross_val_score pipe = Pipeline([ ('scaler', StandardScaler()), ('clf', LogisticRegression()), ]) scores = cross_val_score(pipe, X, y, cv=5) print(f'mean acc: {scores.mean():.3f} ± {scores.std():.3f}') ``` 每折 cv 时 pipeline 重新 `fit` scaler,只在那一折的训练数据上算 mean / std。没有泄漏。 ### 2. 混合类型特征:ColumnTransformer 数据集常常包含 numerical + categorical 两类: ```python import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.impute import SimpleImputer df = pd.read_csv('users.csv') y = df.pop('churn') X = df num_cols = ['age', 'days_active', 'monthly_revenue'] cat_cols = ['country', 'plan', 'referral_source'] preprocessor = ColumnTransformer([ ('num', Pipeline([ ('imp', SimpleImputer(strategy='median')), ('sc', StandardScaler()), ]), num_cols), ('cat', Pipeline([ ('imp', SimpleImputer(strategy='constant', fill_value='missing')), ('oh', OneHotEncoder(handle_unknown='ignore', sparse_output=False)), ]), cat_cols), ], remainder='drop') full_pipe = Pipeline([ ('prep', preprocessor), ('clf', GradientBoostingClassifier()), ]) scores = cross_val_score(full_pipe, X, y, cv=5, scoring='roc_auc') ``` 要点: - 每列类型独立配 imputer + encoder/scaler - `handle_unknown='ignore'`:测试集里出现训练集没见过的 category 值不报错 - `remainder='drop'` 显式:未声明的列扔掉(safer than `passthrough`) ### 3. 超参搜索:所有阶段一起搜 ```python from sklearn.model_selection import GridSearchCV param_grid = { 'prep__num__imp__strategy': ['mean', 'median'], 'clf__n_estimators': [100, 200, 500], 'clf__max_depth': [3, 5, 7], } search = GridSearchCV(full_pipe, param_grid, cv=5, scoring='roc_auc', n_jobs=-1, verbose=1) search.fit(X, y) print(search.best_params_, search.best_score_) ``` 注意命名:`stage_name__sub_stage__param`,双下划线层级。 `n_jobs=-1` 用所有 CPU 核并行 fit 各组合。 ### 4. 保存 / 加载整个 pipeline ```python import joblib joblib.dump(search.best_estimator_, 'model.pkl') # 部署侧 model = joblib.load('model.pkl') y_pred = model.predict(new_X) # 自动跑 preprocessor → estimator,新数据原始 DataFrame 进去 ``` 整套 preprocessing + 模型在一个文件里,不需要"先 scale 再 predict" 两步走,部署更安全。 ### 5. 自定义 transformer 业务相关的 feature engineering 包成 transformer: ```python from sklearn.base import BaseEstimator, TransformerMixin import numpy as np class LogRatio(BaseEstimator, TransformerMixin): """两列做 log(a/b) 派生""" def __init__(self, num_col, denom_col): self.num_col, self.denom_col = num_col, denom_col def fit(self, X, y=None): return self def transform(self, X): ratio = (X[self.num_col] + 1) / (X[self.denom_col] + 1) return np.log(ratio).to_frame('log_ratio') # 加进 ColumnTransformer preprocessor = ColumnTransformer([ ('lr', LogRatio('revenue', 'days_active'), ['revenue', 'days_active']), ('num', ..., num_cols), ('cat', ..., cat_cols), ]) ``` 业务知识沉淀进 pipeline,每次实验都用。 ## 效果 - cross-validation 分数与真实 holdout / 线上 A/B 偏差 < 1% (之前漏了泄漏时偏差 5-10%) - 部署只 `joblib.load + predict`,前后端不再重复实现 preprocessing - 改特征工程只动 pipeline 定义,下游所有实验自动跟着 - 团队新同事接手时一个 .pkl 文件即可,无须读 100 行 prep 脚本 ## 踩过的坑 1. **`drop='first'` 在 OneHotEncoder**:drop 第一个 category 用于 linear model 避免共线性,但 tree-based model 不需要。GBT / RF 用 `drop=None` 信息更全。 2. **categorical 列只出现在测试集的新值**:`handle_unknown='error'` 默认 会报错。生产用 `handle_unknown='ignore'` 当全 0 处理;或 `'infrequent_if_exist'` 归入 "rare" 桶。 3. **SimpleImputer 把整数列变浮点**:填了 median 后 int → float。 下游模型可能内存翻倍。pandas 2.0+ 用 nullable Int 类型避开。 4. **大 categorical 高基数(user_id)做 OneHot**:维度爆炸。 用 target encoding / hashing trick / embeddings。 5. **`fit_transform(X_train)` + `transform(X_test)` 写错顺序**: 测试集 fit_transform = 灾难。强迫自己用 Pipeline 就规避了 —— 你只 `pipe.fit(X_train)` + `pipe.predict(X_test)`,永远不会写 到 transform 这个层。

CLIP + Faiss 做"用文字搜图"的图片搜索引擎(自家相册版)

## 起因 手机里 10 万张照片,找"去年在日本拍的樱花" 要翻几天。 Google Photos 能做语义搜索但隐私 → 想本地。 OpenAI 的 CLIP 模型把图片和文字编码到同一个语义向量空间。 "樱花" 的文字向量 ≈ 樱花图片的视觉向量。 本地跑 CLIP + Faiss 索引 + 几行 Python = 自己的 Google Photos。 ## 解决方案 ### 装 ```bash uv add open-clip-torch faiss-cpu torch pillow tqdm # GPU 加速: uv add faiss-gpu torch --index https://download.pytorch.org/whl/cu124 ``` `open-clip` 是 LAION 训的 CLIP 系列(性能比官方 CLIP 好)。 ### Step 1: 提取图片特征 ```python import torch import open_clip from PIL import Image from pathlib import Path import numpy as np from tqdm import tqdm DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' # 中等大小 + 性能 + 多语言:xlm-roberta-large 支持中文 model, _, preprocess = open_clip.create_model_and_transforms( 'xlm-roberta-large-ViT-H-14', pretrained='frozen_laion5b_s13b_b90k', ) model = model.to(DEVICE).eval() tokenizer = open_clip.get_tokenizer('xlm-roberta-large-ViT-H-14') def encode_image(path): img = Image.open(path).convert('RGB') x = preprocess(img).unsqueeze(0).to(DEVICE) with torch.no_grad(): feat = model.encode_image(x) feat = feat / feat.norm(dim=-1, keepdim=True) return feat.cpu().numpy().squeeze().astype('float32') # 跑全相册 photos_dir = Path('~/Pictures/Photos').expanduser() paths = list(photos_dir.rglob('*.jpg')) + list(photos_dir.rglob('*.heic')) features = [] valid_paths = [] for p in tqdm(paths): try: features.append(encode_image(p)) valid_paths.append(str(p)) except Exception as e: print(f'skip {p}: {e}') features = np.stack(features) # shape (N, 1024) np.save('image_features.npy', features) with open('image_paths.txt', 'w') as f: f.write('\n'.join(valid_paths)) ``` GPU 上 10 万张 ~ 2-4 小时。CPU 慢 5-10 倍。一次性事,后续只索引新增。 ### Step 2: Faiss 索引 ```python import faiss import numpy as np features = np.load('image_features.npy') N, D = features.shape # 100000, 1024 # 小数据集(< 1M)用 flat:精确 + 简单 index = faiss.IndexFlatIP(D) # IP = inner product = cosine(features 已 normalize) index.add(features) faiss.write_index(index, 'images.index') # 大数据集(百万级)用 IVF + PQ 压缩 # index = faiss.index_factory(D, 'IVF1024,PQ32', faiss.METRIC_INNER_PRODUCT) # index.train(features) # index.add(features) ``` 10 万 × 1024 维 flat 索引约 400 MB。搜一次 < 10ms。 ### Step 3: 文字 → 找图 ```python paths = open('image_paths.txt').read().splitlines() index = faiss.read_index('images.index') def search(query: str, k=12): tokens = tokenizer([query]).to(DEVICE) with torch.no_grad(): feat = model.encode_text(tokens) feat = feat / feat.norm(dim=-1, keepdim=True) feat = feat.cpu().numpy().astype('float32') scores, indices = index.search(feat, k) return [(paths[i], scores[0][rank]) for rank, i in enumerate(indices[0])] # 用! for path, score in search('cherry blossoms in Japan'): print(f'{score:.3f} {path}') # 中文 for path, score in search('一只在沙滩上奔跑的金毛狗'): print(f'{score:.3f} {path}') ``` 返回 top-12 最相似的图,按 cosine similarity 排序。 ### Step 4: Web UI(10 行 Streamlit) ```python # app.py import streamlit as st from PIL import Image # ... 上面的 search 函数 ... st.title('我的照片搜索') query = st.text_input('描述你要找的图:') if query: results = search(query, k=12) cols = st.columns(4) for i, (path, score) in enumerate(results): with cols[i % 4]: st.image(Image.open(path), caption=f'{score:.2f}') ``` ```bash streamlit run app.py # 浏览器自动打开 ``` 输入"日落沙滩"→ 3 秒内显示所有匹配照片。 ## 进阶 ### 1. 图搜图(以图找图) ```python def search_by_image(image_path, k=12): feat = encode_image(image_path) feat = feat.reshape(1, -1) scores, indices = index.search(feat, k) return [(paths[i], scores[0][rank]) for rank, i in enumerate(indices[0])] ``` "找跟这张图相似的所有照片"。适合"找出所有该旅行的照片"。 ### 2. 增量索引 新增图片时不要重建整个 index: ```python new_features = [] for path in new_paths: new_features.append(encode_image(path)) new_features = np.stack(new_features) index.add(new_features) faiss.write_index(index, 'images.index') # paths 文件追加 with open('image_paths.txt', 'a') as f: f.write('\n' + '\n'.join(new_paths)) ``` Flat index 支持 add;如果是 IVF + PQ 需要 reuse trained index + add。 ### 3. 过滤:按 metadata ```python # EXIF 信息读取拍摄时间 / GPS / 相机 from PIL.ExifTags import TAGS def get_exif(path): img = Image.open(path) exif = {TAGS.get(k, k): v for k, v in (img._getexif() or {}).items()} return exif # 搜结果加 metadata filter results = search('cherry blossoms') filtered = [(p, s) for p, s in results if get_year(p) == 2023] ``` 更高级:把 metadata 存 SQLite 一起 join。 ### 4. CLIP 模型选择 | 模型 | 大小 | 速度 | 质量 | 多语言 | |---|---|---|---|---| | ViT-B/32 | 150 MB | 快 | 中 | 仅英 | | ViT-L/14 | 430 MB | 中 | 高 | 仅英 | | ViT-H/14 | 1.1 GB | 慢 | 极高 | 仅英 | | xlm-roberta + ViT-H | 4 GB | 慢 | 极高 | **多语言** | | siglip-large | 1 GB | 中 | 极高 | 看版本 | 中文场景一定用支持多语言的(xlm-roberta CLIP 或 chinese-clip)。 ### 5. faiss 大数据集 百万 - 千万级照片: ```python # IVF: 把 vectors 分桶,搜时只查最近的 N 个桶 nlist = int(np.sqrt(N)) # 桶数 quantizer = faiss.IndexFlatIP(D) index = faiss.IndexIVFFlat(quantizer, D, nlist, faiss.METRIC_INNER_PRODUCT) index.train(features) index.add(features) index.nprobe = 8 # 搜时查 8 个桶(增加 recall) ``` 千万级用 IVF + PQ 压缩(牺牲一点精度换 50x 内存压缩)。 ### 6. 部署到手机 CLIP 模型导出 ONNX / CoreML 后能在手机端跑: ```python torch.onnx.export(model.visual, dummy_image, 'clip_vision.onnx', opset_version=14) ``` Apple CoreML 工具更直接。手机端单图 encode < 200ms。 ## 完整效果 我的真实相册(4 万张照片): - index 大小:160 MB - 文字搜索单 query:~30ms - "去年在京都的樱花" → 95% 召回率(漏的是被树枝挡住的) - "戴墨镜的人" → 90% - "一群人合影" → 85% - "蓝色的天空" → 100% 但太多匹配 - "我爸" → 0%(没人脸识别能力) CLIP 强在"语义概念",弱在"特定人物 / 文字 OCR"。 后两者需要专门的人脸识别 + OCR pipeline 配合。 ## 与替代品对比 | | 自托管 CLIP | Google Photos | Apple Photos | immich (开源) | |---|---|---|---|---| | 隐私 | ✅ | ❌ | 部分(设备端) | ✅ | | 自由度 | 高 | 低 | 中 | 中 | | 人脸识别 | 没(自己加) | ✅ | ✅ | ✅ | | 语义搜索 | ✅(CLIP) | ✅ | 一定 | ✅(CLIP) | | 多设备 | 自己搭 | ✅ | ✅ | ✅ | 如果不想从零搭:**immich** 是开源 Google Photos 替代, 内置 CLIP 搜索 + 人脸识别 + 多设备同步。 ## 踩过的坑 1. **HEIC 格式**:iPhone 拍的 .heic 默认 Pillow 读不了。 `pip install pillow-heif` + `register_heif_opener()`。 2. **GPU memory 不够**:H/14 model + 高分辨率图 batch=1 仍 OOM。 `feat = model.encode_image(x.half())` half precision 减半显存。 3. **路径有中文**:Windows 上 `Path` 偶尔编码乱。统一 UTF-8 + 转 绝对路径。 4. **加新图后忘 update**:每次 sync 跑增量 encode + add to index。 写个 cron。 5. **face matching is poor**:CLIP 不擅长"区分两张人脸是否同人"。 要加 face recognition 用 ArcFace / FaceNet 等专用模型。

precision / recall / F1 / ROC-AUC:分类指标什么时候用谁

## 起因 老板问我"模型准确率 95% 是不是很厉害"。我一翻数据集: 99.5% 是负类("用户不会购买"),全猜负 trivial baseline 也 99.5% 准确率。我们的 95% 反而是垃圾。这是 accuracy 在不平衡数据上的经典 陷阱。 下面把几个常见分类指标用一个具体例子串起来。 ## 例子:邮件垃圾识别 - 测试集 1000 封邮件 - 真实:900 normal + 100 spam - 模型预测出 80 封 spam,其中 70 真是 spam(10 假阳性 FP)+ 30 真 spam 被漏(30 假阴性 FN) 混淆矩阵: | | 预测 spam | 预测 normal | |---|---|---| | 真实 spam | TP=70 | FN=30 | | 真实 normal | FP=10 | TN=890 | ## 指标对照 ### Accuracy $$\text{acc} = \frac{TP + TN}{TP + TN + FP + FN} = \frac{960}{1000} = 96\%$$ 不平衡数据下骗人。这个例子 96% 看着很好。 ### Precision(精确率) $$\text{precision} = \frac{TP}{TP + FP} = \frac{70}{80} = 87.5\%$$ "被预测为 spam 的邮件里,多少真是 spam"。**FP 代价高时关注**(误判 正常邮件为垃圾很烦)。 ### Recall(召回率) / Sensitivity / TPR $$\text{recall} = \frac{TP}{TP + FN} = \frac{70}{100} = 70\%$$ "所有真 spam 里,我抓到多少"。**FN 代价高时关注**(漏报 spam)。 医疗筛查、欺诈检测、安全告警都更看重 recall——宁可多报点 false positive 让人复查,也不能漏 true positive。 ### F1 $$F1 = 2 \cdot \frac{precision \cdot recall}{precision + recall} = 0.778$$ precision 和 recall 的调和平均。两者都重要、又只能优化一个数时用 F1。 ### F-beta `beta` 控制 recall 相对 precision 的权重: - F0.5:precision 加权(更看重不误报) - F1:均衡 - F2:recall 加权(更看重不漏报) ### ROC-AUC 不依赖阈值,看模型"把正样本排在负样本前"的能力。1.0 = 完美, 0.5 = 瞎猜。 在 sklearn: ```python from sklearn.metrics import roc_auc_score, precision_recall_fscore_support y_proba = model.predict_proba(X_test)[:, 1] auc = roc_auc_score(y_test, y_proba) # 在某个阈值上 y_pred = (y_proba > 0.5).astype(int) p, r, f, _ = precision_recall_fscore_support(y_test, y_pred, average='binary') ``` ROC-AUC 在极不平衡数据上**会过于乐观**。建议同时看 PR-AUC(precision-recall 曲线下面积)。 ### PR-AUC ```python from sklearn.metrics import average_precision_score ap = average_precision_score(y_test, y_proba) ``` 不平衡数据(正类占比 < 10%)的标准评估。比 ROC-AUC 更敏感地反映模型 区分能力。 ## 选择指南 | 场景 | 主指标 | |---|---| | 类别平衡 + FP/FN 等代价 | accuracy / F1 | | 不平衡 + 关注少数类 | PR-AUC、F1、recall | | 排序质量(不定阈值) | ROC-AUC、PR-AUC | | FP 代价高(误报扰民) | precision、F0.5 | | FN 代价高(漏报致命) | recall、F2 | | 多分类 | macro-F1(每类均权)/ weighted-F1(按支持度) | ## confusion matrix 可视化 ```python from sklearn.metrics import ConfusionMatrixDisplay ConfusionMatrixDisplay.from_predictions(y_test, y_pred, normalize='true') # normalize='true' 按真实类归一化,看每类的召回率 ``` 行内归一化能直接读"真 spam 里有多少被分对"。 ## classification_report ```python from sklearn.metrics import classification_report print(classification_report(y_test, y_pred, digits=3, target_names=['normal', 'spam'])) ``` ``` precision recall f1-score support normal 0.967 0.989 0.978 900 spam 0.875 0.700 0.778 100 accuracy 0.960 1000 macro avg 0.921 0.844 0.878 1000 weighted avg 0.958 0.960 0.958 1000 ``` 一眼看到每类的三个指标 + 宏 / 加权平均。 ## 阈值调整 预测概率 > 0.5 是默认。但业务上可以调: - 关注 recall:降阈值(0.3)→ 更多预测为 spam → recall 升、precision 降 - 关注 precision:升阈值(0.7)→ 反之 用 PR 曲线选最优 trade-off: ```python from sklearn.metrics import precision_recall_curve prec, rec, thr = precision_recall_curve(y_test, y_proba) # 找 precision >= 0.95 的最高 recall 对应阈值 idx = (prec[:-1] >= 0.95).nonzero()[0] best_thr = thr[idx[rec[idx].argmax()]] ``` ## 效果(一个 churn 模型的真实改造) 我们的客户流失预测原本看 accuracy,0.91。但 base rate 流失率 8%—— 全猜不流失也 92%。换成 PR-AUC 评估: - 全猜不流失:PR-AUC = 0.08 - 我们的模型:PR-AUC = 0.34 明显能看出模型有价值。再按 PR 曲线选阈值:保留 recall >= 0.7 的最高 precision 点,把 marketing 干预投到这批"高风险"用户上。CAC 下降 30%。 ## 踩过的坑 1. **`average='binary'` 在多分类时报错**:多分类要 `'macro'`/`'micro'`/ `'weighted'`。`'micro'` 在不平衡时等于 accuracy,意义有限。 2. **正类标号搞反**:sklearn 默认 `pos_label=1`。如果你的"想检测的类" 是 0,得 `pos_label=0` 否则 precision/recall 算的是另一类。 3. **不看 support**:F1 macro = 0.55 看着不行,但其中 90% 是稀有类 的支撑,整体 weighted F1 0.88 还可以。两个都要看。 4. **直接信任 cross-validation 的 ROC-AUC**:不平衡数据 + KFold(不 stratify)→ 某些 fold 几乎没正样本 → AUC 失真。永远用 `StratifiedKFold`。 5. **阈值在训练集上选**:必须用验证集 / OOF 选阈值;测试集只用于最终 报告。

Prophet:5 行写一个还不错的时间序列预测(替代手动调 ARIMA)

## 起因 老板让"预测下季度营收"。专业的 forecasting 上 ARIMA / SARIMA / state space model 调一周;我们要的是"现在马上有个基础数字 + 不太离谱"。 Facebook (Meta) 开源的 Prophet 是 GAM (Generalized Additive Model) 风格的 time series 工具,对"商业数据特征 (趋势 + 周期 + 节假日 + 异常点)" 极友好,几乎零调参就有不错效果。 ## 装 ```bash uv add prophet # Mac M 系列 / Win 装 prophet 偶尔遇 cmdstanpy 编译问题 # 解决:先 conda install cmdstanpy 再 pip install prophet ``` ## 5 分钟 demo ```python import pandas as pd from prophet import Prophet # 数据格式:必须两列 ds (datetime) + y (value) df = pd.read_csv('daily_revenue.csv') # date,revenue # 2023-01-01,1234.5 # 2023-01-02,1289.1 # ... df.columns = ['ds', 'y'] # 模型 m = Prophet() m.fit(df) # 预测未来 90 天 future = m.make_future_dataframe(periods=90) forecast = m.predict(future) # yhat = 预测值;yhat_lower/upper = 80% 置信区间 print(forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()) # 可视化 m.plot(forecast).savefig('forecast.png') m.plot_components(forecast).savefig('components.png') ``` 输出: - `forecast.png`:历史 + 预测曲线 + 置信带 - `components.png`:拆解成 trend / weekly / yearly 三个成分 调参为 0。 ## 加节假日 中国春节 / 双 11 等是营业额异常点,加进 model 让预测更准: ```python holidays = pd.DataFrame({ 'holiday': 'major_sale', 'ds': pd.to_datetime([ '2023-11-11', '2024-11-11', # 双 11 '2023-06-18', '2024-06-18', # 618 ]), 'lower_window': -1, # 节前 1 天也算 'upper_window': 2, # 节后 2 天也算 }) m = Prophet(holidays=holidays) m.fit(df) ``` 或者内置节假日: ```python m.add_country_holidays(country_name='CN') # 自动加入春节 / 国庆 / 五一等 ``` ## 多季节性 默认 yearly + weekly + daily。要月度 / 自定义周期: ```python m = Prophet() m.add_seasonality(name='monthly', period=30.5, fourier_order=5) m.fit(df) ``` `fourier_order` 是季节性的"灵活度"(越大越能拟合复杂周期;过高 overfit)。 ## 外部回归量(exogenous) 业务量受外部因素影响: ```python df['ads_spend'] = ... # 广告投入 df['is_promotion'] = ... # 0/1 促销标志 m = Prophet() m.add_regressor('ads_spend') m.add_regressor('is_promotion') m.fit(df) # 预测时也要提供未来值 future['ads_spend'] = expected_ads_for_next_90d future['is_promotion'] = expected_promo_flag forecast = m.predict(future) ``` 广告 / 促销作为协变量进 model,预测更贴合业务现实。 ## 异常点 Prophet 对异常点(COVID 期间 / 单日大促)相对鲁棒(用 GAM + 平滑)。 不需要手工剔除。要进一步控制可以: ```python # 把已知异常段 mark 为 NaN 让 Prophet 忽略 df.loc[(df['ds'] >= '2020-02-01') & (df['ds'] <= '2020-05-01'), 'y'] = None m.fit(df) ``` 或者用 `add_seasonality` 处理"上下界" 让 trend 不被极端值影响。 ## cross-validation 评估 ```python from prophet.diagnostics import cross_validation, performance_metrics # initial 365 天训练,每 30 天滚动一次,预测 horizon=30 天 df_cv = cross_validation(m, initial='365 days', period='30 days', horizon='30 days') metrics = performance_metrics(df_cv) print(metrics[['horizon', 'mape', 'rmse']].head()) # horizon mape rmse # 0 1 days 0.08 102.3 # 1 2 days 0.09 105.1 # ... ``` MAPE (Mean Absolute Percentage Error) < 10% 在大多数商业数据是 "还行",5% 算优秀。 ## 与替代品对比 | | Prophet | ARIMA / SARIMA | statsmodels | NeuralProphet | NHITS / TimeGPT | |---|---|---|---|---|---| | 学习曲线 | 极低 | 高(要懂 ACF/PACF) | 高 | 中 | 低 | | 调参量 | 几乎 0 | 多 | 多 | 中 | 几乎 0 | | 自动季节性 | ✅ | 手动 | 手动 | ✅ | ✅ | | 节假日 | ✅ | 手动 | 手动 | ✅ | ✅ | | 多变量 | regressor | VAR | VAR | ✅ | ✅ | | 高频数据 | 中(小时级) | ✅ | ✅ | ✅ | ✅ | | 长期趋势 | 强 | 中 | 中 | 强 | 强 | 简单业务时间序列 → Prophet。 学术 / 严谨需求 → ARIMA / state space。 现代深度方法 → NeuralProphet / Darts / Nixtla statsforecast。 ## 真实业务案例 我们用 Prophet 做月度营收预测: - 训练数据:2 年日级营收 - 加节假日:春节 + 双 11 + 618 - 外部回归:广告预算 + 促销日 - horizon:90 天 vs 之前的"人拍" 预测: | | 人拍 | Prophet | Prophet + 节假日 + regressor | |---|---|---|---| | MAPE | 25% | 12% | 7% | | 工时 | 4 hour/月 | 10 min/月 | 30 min/月 | 不仅准、还省时。 ## 输出多 model + ensemble ```python # 跑 3 个不同 changepoint_prior_scale 取均值 forecasts = [] for cps in [0.001, 0.05, 0.5]: m = Prophet(changepoint_prior_scale=cps) m.fit(df) forecasts.append(m.predict(future)['yhat']) ensemble = pd.concat(forecasts, axis=1).mean(axis=1) ``` 不同超参对不同部分敏感;平均通常更稳。 ## 何时不该用 Prophet - **特别短**的时间序列(< 2 季):信号不够,model 拟合差 - **复杂多变量交互**:要用 LightGBM / 神经网络 - **极高频(毫秒级)实时**:Prophet 慢,用 specialized 工具 - **概率性预测要严格 calibrated**:bootstrap interval 仅近似 ## 踩过的坑 1. **列名必须 ds / y**:用了别的名 Prophet 不识别。`df.rename(...)` 后再 fit。 2. **datetime 时区**:Prophet 内部假设 naive datetime(无时区)。 带 tz 会报错。`df['ds'] = df['ds'].dt.tz_localize(None)`。 3. **`make_future_dataframe(periods=90, freq='D')`**:默认日级; 月级数据要 `freq='M'`。漏指定就把月数据当日数据预测。 4. **logistic growth 需要 cap**:用 `growth='logistic'` 时必须给 `cap` 列(上界),否则报错。 5. **changepoint 自动检测不靠谱**:业务有大转折(疫情 / 公司战略 变化)时,手动指定: ```python m = Prophet(changepoints=['2020-02-01', '2022-06-15']) ```

Dagster vs Airflow vs Prefect:现代 ETL pipeline 选哪个

## 起因 要搭一个数据 pipeline:每天凌晨 1 点从 S3 拉新数据 → 清洗 → 入 ClickHouse → 跑特征工程 → 训 ML model → push 到 staging。 失败要重试 + 告警 + 部分重跑。 三个主流编排工具: - **Airflow**:老牌,2014 出自 Airbnb,业界事实标准 - **Prefect**:Airflow 工程师出走重新设计,更轻量 - **Dagster**:从数据资产角度重新思考 pipeline,2024 风头最劲 试了一周下来记录。 ## Airflow 定义 DAG: ```python # airflow_dag.py from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator default_args = { 'owner': 'data-team', 'retries': 2, 'retry_delay': timedelta(minutes=5), } with DAG( 'daily_etl', default_args=default_args, schedule='0 1 * * *', start_date=datetime(2024, 1, 1), catchup=False, max_active_runs=1, ) as dag: def fetch_data(**ctx): ds = ctx['ds'] # '2024-05-24' download_from_s3(f's3://bucket/raw/{ds}/') def clean_data(**ctx): ds = ctx['ds'] clean(input_dir, output_dir) fetch = PythonOperator(task_id='fetch', python_callable=fetch_data) clean = PythonOperator(task_id='clean', python_callable=clean_data) load = PythonOperator(task_id='load', python_callable=load_to_ch) train = PythonOperator(task_id='train', python_callable=train_model) fetch >> clean >> load >> train ``` 跑: ```bash airflow webserver airflow scheduler # 浏览器 http://localhost:8080 看 DAG ``` **优点**: - 业界最广,会的人多 - operator 库巨大(Bash / Spark / EMR / Snowflake / Postgres / Slack / ...) - 成熟的 SLA / lineage / retry 机制 - K8s 部署成熟(KubernetesExecutor / KubernetesPodOperator) **缺点**: - DAG = 全局 Python module,导入慢(scheduler 反复 import 几千 DAG) - 任务间传数据麻烦(XCom 限大小;走 S3 / DB 才行) - 本地开发体验差(webserver + scheduler + executor 三件套) - DAG 语法重,3 个 task 也 50 行 ## Prefect ```python # prefect_flow.py from prefect import flow, task from datetime import datetime, timedelta @task(retries=2, retry_delay_seconds=300) def fetch_data(date: str) -> str: path = f's3://bucket/raw/{date}/' download_from_s3(path) return path @task def clean_data(raw_path: str) -> str: out = clean(raw_path) return out @task def load_to_ch(clean_path: str): load(clean_path) @task def train_model(): train() @flow(name='daily_etl', log_prints=True) def daily_etl(date: str = None): date = date or datetime.utcnow().date().isoformat() raw = fetch_data(date) clean = clean_data(raw) load_to_ch(clean) train_model() if __name__ == '__main__': daily_etl() ``` 跑: ```bash # 本地直接跑 python prefect_flow.py # 部署到 Prefect Cloud / 自托管 server prefect deploy prefect_flow.py:daily_etl --name nightly --cron '0 1 * * *' prefect worker start --pool default ``` **优点**: - 写 flow 像写普通 Python 函数,task 间传值像普通调用 - 本地开发体验好(直接 run flow) - Prefect Cloud 免费层够小团队用 - 动态 task / sub-flow 支持好 **缺点**: - 生态不如 Airflow(operator / connector 少) - 2.0 vs 3.0 几次大改 breaking - 自托管 server 还需要装 Postgres + Redis 等 ## Dagster ```python # dagster_pipeline.py from dagster import asset, AssetExecutionContext, Definitions, ScheduleDefinition, define_asset_job @asset def raw_data(context: AssetExecutionContext) -> str: ds = context.partition_key # '2024-05-24' return download_from_s3(f's3://bucket/raw/{ds}/') @asset(deps=[raw_data]) def cleaned_data(context, raw_data: str) -> str: return clean(raw_data) @asset(deps=[cleaned_data]) def clickhouse_table(cleaned_data: str): load_to_ch(cleaned_data) @asset(deps=[clickhouse_table]) def ml_model() -> str: return train() daily_job = define_asset_job('daily', selection='*') daily_schedule = ScheduleDefinition(job=daily_job, cron_schedule='0 1 * * *') defs = Definitions( assets=[raw_data, cleaned_data, clickhouse_table, ml_model], schedules=[daily_schedule], ) ``` 跑: ```bash dagster dev # http://localhost:3000 看 asset 图 ``` **优点**: - "asset-based" 思维:每个 task 产出一个数据资产 (table / file / model),blueprint = asset 图,自然 documentation - IDE 友好:type hint + 静态分析整套 pipeline - backfill / partition / lineage 一等公民 - 内置数据测试 / asset checks - 本地开发 = `dagster dev` 一条命令 **缺点**: - 最新(2019),生态比 Airflow 小 - 学习曲线略陡(asset / op / job / sensor 几个概念) - 团队上手时间长 ## 选型对比 | 场景 | 推荐 | |---|---| | 已经在用 Airflow + 团队熟 | 继续 Airflow | | 新项目 + 现代 Python + IDE 重度 | **Dagster** | | 极简 + 个人项目 + Cloud SaaS | Prefect | | 万千 DAG + 跨团队 + 老系统对接 | Airflow(生态最广) | | ML pipeline + 强 data lineage | Dagster | 我个人新项目首选 Dagster:asset 抽象比 task 更贴合"数据流"思维。 ## 共同 best practice 不管选哪个: ### 1. 任务幂等 ```python @task def load_to_ch(path): # 用 INSERT OR REPLACE / DELETE then INSERT 让重跑不重复 db.execute(f"DELETE FROM table WHERE date = '{date}'") db.execute(f"INSERT INTO table SELECT * FROM read_parquet('{path}')") ``` partition / date 当业务 key,重跑刷掉这个 partition 再插。 ### 2. 数据 contract 定义"这个 task 输出什么 schema / row count / 范围": ```python @asset def cleaned_data(...) -> Output: df = ... return Output( value=df, metadata={ 'rows': len(df), 'columns': list(df.columns), 'null_rate': df.isna().mean().to_dict(), } ) ``` 下游任务依赖 contract 而非 implicit 假设。下次改变 schema 显式报错。 ### 3. 分层 按 dbt 风格: ``` sources/ # 原始数据 staging/ # 轻清洗 / 类型转换 intermediate/ # 业务逻辑组合 marts/ # 业务可用的 final table ``` 每层独立可测试 + 故障 isolate。 ### 4. 告警 + 监控 - task 失败 → Slack / 邮件 - task 跑太久 → SLA miss 告警 - 数据指标异常(行数突变) → 检查告警 - Prometheus 暴露 task duration / success rate ### 5. backfill 历史日期重跑: ```bash # Airflow airflow dags backfill -s 2024-01-01 -e 2024-05-01 daily_etl # Dagster dagster job backfill --job daily --partitions '2024-01-01..2024-05-01' # Prefect prefect deployment run 'daily_etl/nightly' --param date=2024-01-15 ``` ### 6. data lineage / catalog 知道"这个表是哪些 task 生成的,哪些下游用了它"。 Dagster 内置;Airflow 接 OpenLineage / Marquez;Prefect 类似。 ## 整套部署(举例 Dagster + K8s) ``` - dagster-webserver (UI) - dagster-daemon (scheduler + sensor) - code locations (你的 pipeline Python) - PostgreSQL (metadata) - S3 / GCS (intermediate storage) - K8s(task 在 pod 跑) ``` helm chart 一键: ```bash helm install dagster dagster/dagster --set ... ``` ## 效果 我们的项目用 Dagster: - 100+ assets 自动算依赖图 - 单次 backfill 半年数据:UI 选时间范围 + run,asset 自动并行 - 数据 schema 改动 → asset check fail → CI 拦下 → 改下游 → 再合 - 新成员看 asset 图 30 秒理解整套 pipeline ## 踩过的坑 ### Airflow 类 1. **DAG 太多 scheduler 慢**:每 30s 扫所有 DAG 文件 import。 把 DAG 分文件 + 用 dynamic DAG generation 减少 scheduler IO。 2. **XCom 大数据**:传几 MB 会爆 backend DB。task 间传 path(S3 key) 而非数据本身。 ### Prefect 类 3. **2.x → 3.x breaking**:API 大改。生产 pin 大版本 + 升级提前测。 ### Dagster 类 4. **asset partition 配错**:同 asset 一个 daily 一个 hourly partition 会冲突。统一一 asset 一种 partition scheme。 5. **monorepo 多 code location**:每个 location 独立 Python 进程, 依赖隔离好但启动慢。开发期合并;生产分。

推荐系统第一步:用 implicit 库做协同过滤(不用任何深度模型)

## 起因 老板说"加个'你可能也喜欢' 推荐"。我们的数据: - 用户 5 万 - 商品 1 万 - 用户-商品 浏览 / 购买记录 100 万条 不是 YouTube 级,深度学习 / DSSM / DeepFM 过于豪华。 传统协同过滤(implicit feedback ALS)几行代码就有 baseline,效果 比"按热度排序" 提升 30-50% CTR。下面是真实跑通流程。 ## 解决方案 ### 装 ```bash uv add implicit scipy pandas ``` `implicit` 库是 Ben Frederickson 写的 C++ ALS 实现,比 surprise / spotlight 快 100x。 ### 数据格式:sparse user-item matrix ```python import pandas as pd from scipy.sparse import csr_matrix events = pd.read_csv('user_events.csv') # user_id,item_id,event_type # 1234,567,view # 1234,567,view # 1234,789,purchase # 隐式反馈权重:购买 > 加购 > 浏览 weights = {'view': 1, 'cart': 3, 'purchase': 10} events['w'] = events['event_type'].map(weights) # 聚合 agg = events.groupby(['user_id', 'item_id'])['w'].sum().reset_index() # user_id,item_id,w # 1234,567,2 # 1234,789,10 # 编码成连续整数索引 user_ids = agg['user_id'].unique() item_ids = agg['item_id'].unique() user_idx = {u: i for i, u in enumerate(user_ids)} item_idx = {it: i for i, it in enumerate(item_ids)} agg['ui'] = agg['user_id'].map(user_idx) agg['ii'] = agg['item_id'].map(item_idx) # sparse matrix matrix = csr_matrix((agg['w'].values, (agg['ui'].values, agg['ii'].values)), shape=(len(user_ids), len(item_ids))) print(matrix.shape, matrix.nnz) # (50000, 10000) 1000000 ``` ### ALS 训练 ```python from implicit.als import AlternatingLeastSquares model = AlternatingLeastSquares( factors=64, # embedding 维度 regularization=0.01, iterations=20, alpha=40, # implicit feedback 信心权重 use_gpu=False, # GPU 可选 ) model.fit(matrix) # 几秒钟完成 ``` `alpha` 是 implicit ALS 的关键超参: - 表示"我们对'用户买了'比'用户没买' 的信心差多少 - 论文建议 alpha=40 是好起点 - 调参可以 GridSearch on 验证集 ### 给一个用户推荐 top-K ```python def recommend(user_id, k=10): if user_id not in user_idx: return [] # 冷启动 uid = user_idx[user_id] item_ids_arr, scores = model.recommend(uid, matrix[uid], N=k) return [(item_ids[i], float(s)) for i, s in zip(item_ids_arr, scores)] print(recommend(user_id=1234, k=5)) # [(item_id, score), ...] ``` `model.recommend()` 内部: 1. 拿用户 embedding (64 维) 2. 跟所有 item embedding 算 dot product 3. 排除已交互的 item 4. 返回 top-K 毫秒级返回。 ### 相似商品推荐("你看了这个,也可能看那个") ```python def similar_items(item_id, k=10): if item_id not in item_idx: return [] iid = item_idx[item_id] item_ids_arr, scores = model.similar_items(iid, N=k+1) # 第一个是它自己,跳过 return [(item_ids[i], float(s)) for i, s in zip(item_ids_arr[1:], scores[1:])] ``` 商品详情页用:"看了这个的人还看了..."。 ### 评估:split + 看 hit@K / MAP@K ```python import numpy as np def hit_at_k(model, train_matrix, test_dict, k=10): """test_dict: {user_idx: set(item_idx)} 是 holdout 的真 interaction""" hits = 0 total = 0 for uid, true_items in test_dict.items(): if uid >= train_matrix.shape[0]: continue recommended, _ = model.recommend(uid, train_matrix[uid], N=k) if any(it in true_items for it in recommended): hits += 1 total += 1 return hits / total ``` hit@10 = 0.20 意味着 20% 的用户 top-10 推荐里有真实喜欢的。 随机推荐 hit@10 ≈ k / num_items ≈ 0.001。 200x 提升 = 信号强 = 模型 work。 ### 冷启动用户 新用户没历史 → 用户 embedding 不存在 → ALS 推不了。 fallback: ```python def recommend_with_fallback(user_id, k=10): if user_id in user_idx: return recommend(user_id, k) # 冷启动:推热门 return popular_items_in_user_segment(user_id, k) ``` 或者用 sign-up 时收集的偏好 / 人口学信息做内容-based 起步。 ### 冷启动商品 新上架商品没 interaction → ALS 给不出 embedding。 解决: - 用商品 metadata(类别 / 标签 / 描述)映射到现有 embedding 空间 - "two-tower" 模型:让 item tower 用 metadata 做 embedding 简单做法:新商品借用同类商品的 embedding 均值。 ## 几个进阶 model ### BM25 weighting ```python from implicit.nearest_neighbours import bm25_weight weighted = bm25_weight(matrix, K1=100, B=0.8) model = AlternatingLeastSquares(factors=64) model.fit(weighted) ``` BM25 给罕见物品更高 weight,对长尾推荐更好。 ### Item-Item CF (用户少 + 商品多时) ```python from implicit.nearest_neighbours import CosineRecommender model = CosineRecommender(K=10) model.fit(matrix) ``` 直接算 item-item 相似度,无 embedding 训练。 适合 100k+ items + 用户少场景。 ### BPR (Bayesian Personalized Ranking) ```python from implicit.bpr import BayesianPersonalizedRanking model = BayesianPersonalizedRanking(factors=64, learning_rate=0.05) model.fit(matrix) ``` learn-to-rank 风格,对"排序质量" 优化(不是 reconstruction)。 小数据集上经常更好。 ## 部署到生产 ```python # 训练完保存 import pickle with open('als_model.pkl', 'wb') as f: pickle.dump({ 'model': model, 'user_idx': user_idx, 'item_idx': item_idx, 'item_ids': item_ids, }, f) # inference 服务 class RecAPI: def __init__(self, path): with open(path, 'rb') as f: d = pickle.load(f) self.model = d['model'] self.user_idx = d['user_idx'] self.item_ids = d['item_ids'] # ... def recommend(self, user_id, k=10): uid = self.user_idx.get(user_id) if uid is None: return [] item_arr, _ = self.model.recommend(uid, ...) return [self.item_ids[i] for i in item_arr] ``` FastAPI 包一下,端口暴露给业务。 ### 每日重训 cron / Airflow: ```python @task def retrain_als(): events = load_recent_events(days=90) matrix = build_matrix(events) model = AlternatingLeastSquares(factors=64) model.fit(matrix) save_model(model, ...) # 通知 inference 服务 reload requests.post('http://rec-api/reload') ``` 每天凌晨 1 点训。新 interaction 进入 embedding。 ## A/B test 验证 把 50% 用户走"推荐"接口,50% 走"按热度排"。 观察: - CTR(点击率) - conversion rate(转化率) - 用户 session 时长 通常协同过滤 baseline 比"按热度" CTR 提升 30-100%。 ## 与深度模型对比 | | implicit ALS | LightFM | DeepFM | YouTube DNN | |---|---|---|---|---| | 数据量需求 | 中(万级用户) | 中 | 大 | 极大 | | 训练时间 | 秒-分钟 | 分钟 | 小时 | 天 | | 冷启动支持 | 弱 | 中(hybrid) | 中 | 中 | | 实施成本 | 极低 | 低 | 高 | 极高 | | 效果上限 | 中等 | 中-高 | 高 | 极高 | 500 万以上交互 + 想榨干 5-10% CTR → 上深度模型。 否则 ALS 就够。 ## 效果 我们小型电商上线 implicit ALS: - 数据:5w 用户 + 1w SKU + 100w events - 训练:30 秒(CPU) - 推理:< 5ms / user - A/B test:CTR +47%,conversion +18% - vs 之前的"按销量排序"基线,**显著**改善 后续上 LightFM hybrid(加 metadata)再 +12%。 深度学习准备阶段,但 ALS 已经 cover 大头 ROI。 ## 踩过的坑 1. **`matrix[uid]` 而不是 `matrix.getrow(uid)`**:implicit 0.7 后 API 改了, `recommend(user, user_items)` 要传整行 sparse vector。 2. **数据严重 popularity bias**:只看热门,ALS embedding 也偏热门。 BM25 weight 缓解。 3. **训练 / 推理使用不同 user_idx 映射** → 错位。永远 pickle 整套 {model, user_idx, item_idx, item_ids}。 4. **新用户进来推不了**:冷启动 fallback 一定要有,不然推荐 API 返空。 5. **A/B test 早期看 metric 没差异**:用户没注意新推荐位 / 流量太小 = 0 statistical power。至少 1 周 + 万级用户。