知识广场

按学科筛选:计算机科学 / 人工智能 / 机器学习
清除筛选

«计算机科学 / 人工智能 / 机器学习» 分类下共 23 篇帖子

本地跑 Stable Diffusion:ComfyUI + 模型管理 + 工作流复用

## 起因 想生成产品配图,又不想把数据传 Midjourney / DALL-E。Stable Diffusion 是 开源文生图模型,本地一张 8GB+ 显存的卡能跑。 WebUI 老牌但 UI 笨重;ComfyUI 用 node-based 工作流(像 Blender 的节点编辑器), 更适合复杂 pipeline + 跨实验复用。 ## 解决方案 ### 装 ComfyUI ```bash git clone https://github.com/comfyanonymous/ComfyUI cd ComfyUI uv venv --python 3.12 source .venv/bin/activate # CUDA 12.x uv pip install torch torchvision --index https://download.pytorch.org/whl/cu124 uv pip install -r requirements.txt # 起服务 python main.py --listen 0.0.0.0 --port 8188 # 浏览器打开 http://localhost:8188 ``` 第一次进去是空白画布。 ### 装模型 ComfyUI 的目录结构: ``` ComfyUI/models/ ├── checkpoints/ # 基础模型 (.safetensors) ├── loras/ # LoRA 微调(风格 / 角色) ├── vae/ # VAE 解码器 ├── controlnet/ # ControlNet 模型 ├── upscale_models/ # 超分模型 └── embeddings/ # textual inversion ``` 从 [civitai.com](https://civitai.com) 或 HuggingFace 下: ```bash cd models/checkpoints # SDXL 1.0 base(6.5 GB) wget https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0/resolve/main/sd_xl_base_1.0.safetensors # 流行的写实风:Juggernaut XL(7 GB) # 流行的二次元:Pony Diffusion XL(7 GB) # 流行的快出图:SDXL Turbo(lightning 版,4 步出图) ``` ### 第一个工作流 ComfyUI 默认工作流:Load Checkpoint → CLIP Text Encode (Prompt) → CLIP Text Encode (Negative) → Empty Latent Image → KSampler → VAE Decode → Save Image. 右键画布 → Add Node → 选模块拖入。Connect 各节点 output → input。 prompt 例: ``` positive: a serene Japanese garden with cherry blossoms, golden hour, ultra detailed, 8k negative: ugly, blurry, watermark, low quality ``` KSampler 参数: - `steps`: 20-30(普通模型)/ 4-8(Turbo / Lightning) - `cfg`: 7(普通) / 1-2(Turbo) - `sampler_name`: dpmpp_2m_sde / euler_ancestral - `scheduler`: karras / normal `Queue Prompt` 跑一次。 ### 工作流保存 / 分享 `Save (JSON)` 把整个 graph 存成 `.json`。 团队里"哪个工作流出图最好" → JSON 文件共享,对方 `Load` 即可复现。 ComfyUI 还把生成的 PNG 嵌入了 metadata:图片本身包含生成它的完整 workflow。 拖任意 PNG 到 ComfyUI 画布 → 自动还原工作流。神奇。 ### LoRA 风格切换 ``` Load Checkpoint → Load LoRA → CLIP Text Encode → ... ``` `Load LoRA` 节点选 `pony_anime_v3.safetensors`,strength 0.7, 风格立刻切到该 LoRA 训练的风格。多个 LoRA 可以串联。 ### ControlNet(按草图 / 边缘 / 姿态约束) ``` Load Image → Canny Edge → Apply ControlNet (Advanced) ↓ Load Checkpoint → ... → KSampler (with controlnet conditioning) → ... ``` 可以从一张参考图提取边缘 / 姿态 / 深度,让生成图保持同样构图。 广告 / 产品设计常用:用线稿生成 100 种渲染风格。 ### 批量生成 `KSampler` 的 `batch_size` 设 4 → 一次跑出 4 张。 `Save Image` 节点自动命名 `00001.png`、`00002.png`。 `Queue Prompt` 多次连续跑: ``` Queue → 队列 5 → 跑完 5 批 = 20 张图 ``` 跑久后图片在 `ComfyUI/output/`。 ## 效果 - 本地 RTX 4090:SDXL 1024×1024 单图 ~5s(30 steps),batch 4 ~15s - SDXL Lightning:~1.5s 单图 - 月生成量 1k+ 张零成本 - 工作流文件版本化,"上次客户喜欢的那个风格" 一键复现 - 同组设计师共享 lora + 工作流 json,统一风格 ## 性能 tips - **--lowvram** flag 启动:8GB 卡能跑 SDXL(变慢但能跑) - **--use-pytorch-cross-attention**:xformers 不兼容时的替代 - **VAE FP16**:`--fp16-vae` 减半解码显存 - **TAESD**:`models/vae_approx/` 预览快但质量低,调试用 ## 与 WebUI / API 对比 | | A1111 WebUI | ComfyUI | 商业 API | |---|---|---|---| | UI | 表单 | Node graph | Prompt only | | 工作流复用 | 弱 | 极强 | 无 | | ControlNet / Inpaint | ✅ | ✅ | 部分 | | 复杂 pipeline | 难 | 极易 | 不行 | | 学习曲线 | 低 | 中 | 极低 | | 性能 | 好 | 更好(无 Gradio overhead) | N/A | 简单文生图选 WebUI;任何"图 → 处理 → 再处理 → 多阶段"的工作流选 ComfyUI。 ## 踩过的坑 1. **CUDA OOM 但 nvidia-smi 显示有显存**:PyTorch caching allocator 碎片。`--gpu-only` 全部组件放 GPU,或者 `--cpu-vae` 把 VAE 移 CPU (VAE decode 慢但省 2GB)。 2. **首次加载某 model 极慢**:safetensors 文件几 GB,第一次读盘 + 装入 GPU。后续 cache 命中秒级。 3. **不同 model 的最佳 prompt 不一样**:SDXL 跟 SD 1.5 prompt 风格完全 不同。每个 model 看 civitai 上的"showcase prompt"参考。 4. **negative prompt 误用**:写过多 negative(比如 "bad anatomy, ugly, blurry, low quality, worst quality, ...")反而效果差。3-5 个核心 negative 就够。 5. **生成的图有水印 / artifact**:训练数据里有 watermark 的 model 常产出"假水印"。换 model 或在 prompt 加 `(watermark:1.5)` 进 negative。

pandas 内存优化:dtype 收缩 / categorical / sparse 让 5GB → 800MB

## 起因 加载一份"用户行为日志" CSV,10M 行 × 30 列, `pd.read_csv('events.csv')` → 内存 5 GB 后 OOM。 机器只有 8 GB RAM。 查 `df.info(memory_usage='deep')`: ``` RangeIndex: 10000000 entries, 0 to 9999999 Data columns (total 30 columns): # Column Dtype MemUsage (MB) --- ------ ----- ------------- 0 event_type object 620 1 user_id int64 76 2 country object 580 3 device_type object 610 4 amount float64 76 ... dtypes: float64(8), int64(7), object(15) Memory: 5102 MB ``` 绝大多数内存被 string column 占用(pandas object dtype = Python str 对象数组,每个 string 40+ bytes 开销)。 5 个优化让同样数据降到 800 MB: ## 1. categorical:低基数字符串 ```python df['event_type'] = df['event_type'].astype('category') df['country'] = df['country'].astype('category') df['device_type'] = df['device_type'].astype('category') ``` categorical 把 string 映射到 int8/int16 + 一份 unique values 字典。 event_type 5 个值 + 10M 行 → 5 bytes / row 而非 60 bytes / row。 效果: ``` 0 event_type category 10 (从 620 MB) 2 country category 11 (从 580 MB) 3 device_type category 12 (从 610 MB) ``` 3 个 column 从 1.8 GB → 33 MB。 判断"该不该 categorical": ```python # 列的 unique 数 / 总行数 比例 df['country'].nunique() / len(df) # 0.00002 = 200 / 10M → 强烈 categorical # 0.5 = 5M unique / 10M → 不该 categorical(开销可能更大) # 一般 ratio < 0.05 时 categorical 受益 ``` `> 50%` 基数的 string column(如 URL / session_id)反而保 object 或 string[pyarrow]。 ## 2. 数值列收缩 dtype ```python df['user_id'].max() # 200_000_000 → int32 装得下 (max 2.1B) df['amount'].max() # 9999.99 → float32 够(默认 float64) df['hour_of_day'].max() # 23 → int8 (max 127) df = df.astype({ 'user_id': 'int32', 'amount': 'float32', 'hour_of_day': 'int8', 'minute_of_day': 'int8', }) ``` int64 → int32 减半;int8 减 8 倍。 float64 → float32 减半;某些数据(ML 特征)甚至 float16 也 OK。 辅助: ```python def shrink_ints(df): for col in df.select_dtypes(include=['int64']).columns: c = df[col] mx, mn = c.max(), c.min() if mn >= 0: if mx < 256: df[col] = c.astype('uint8') elif mx < 65536: df[col] = c.astype('uint16') elif mx < 4294967296: df[col] = c.astype('uint32') else: if -128 <= mn and mx < 128: df[col] = c.astype('int8') elif -32768 <= mn and mx < 32768: df[col] = c.astype('int16') elif -2147483648 <= mn and mx < 2147483648: df[col] = c.astype('int32') return df ``` 跑一次自动 downcast 所有 int。 ## 3. sparse for mostly-zero columns ```python # 90% 是 0 的 dummy variable column df['premium'] = df['premium'].astype('Sparse[int8, 0]') ``` 只存 non-zero 值 + 索引。 0.9M 个 0 + 0.1M 个 1 → 之前 10MB → 1MB。 适合 one-hot encoding 后的稀疏矩阵。 ## 4. parquet 替代 CSV 读 / 写 / 存都快得多: ```python df.to_parquet('events.parquet', compression='zstd') # 5GB CSV → 800MB Parquet(列存 + 压缩) df = pd.read_parquet('events.parquet') # 比 read_csv 快 5-10 倍 # 自动保留 dtype(包括 categorical) ``` read_csv 时也指定 dtype 避免 pandas 猜: ```python df = pd.read_csv('events.csv', dtype={ 'event_type': 'category', 'country': 'category', 'amount': 'float32', 'user_id': 'int32', }) ``` 避免 pandas 默认 int64 / float64 / object 后再转换的中间峰值。 ## 5. chunked read:分块处理 如果转完仍装不下,分块流式处理: ```python chunks = [] for chunk in pd.read_csv('events.csv', chunksize=100_000): chunk = optimize_dtypes(chunk) # 业务处理 / 聚合 result = chunk.groupby('user_id').agg(...) chunks.append(result) # 最后合并 final = pd.concat(chunks).groupby(...).sum() ``` 或者直接用 polars(前面有文章),原生支持流式 + 多核。 ## 6. PyArrow string 替代 object pandas 2.0+ 加了 `string[pyarrow]` 类型,比 object 省内存 + 快: ```python df['session_id'] = df['session_id'].astype('string[pyarrow]') ``` 适合"高基数 string" —— categorical 不划算的场景。 比 object 省 40-60% 内存 + groupby / sort 快 2-3x。 `pd.options.future.infer_string = True` 让 read_csv 默认用 string[pyarrow](pandas 3.0 将成默认)。 ## 实战脚本 ```python import pandas as pd def shrink(df): """通用 dtype 收缩 + categorical 自动检测""" df = df.copy() # 1. integer downcast for col in df.select_dtypes(include=['int64']).columns: df[col] = pd.to_numeric(df[col], downcast='integer') # 2. float downcast for col in df.select_dtypes(include=['float64']).columns: df[col] = pd.to_numeric(df[col], downcast='float') # 3. low-cardinality string → category for col in df.select_dtypes(include=['object']).columns: if df[col].nunique() / len(df) < 0.05: df[col] = df[col].astype('category') return df df = pd.read_csv('events.csv') print('before:', df.memory_usage(deep=True).sum() / 1e9, 'GB') df = shrink(df) print('after: ', df.memory_usage(deep=True).sum() / 1e9, 'GB') ``` 典型场景 5x 内存压缩。 ## 效果 我的 10M × 30 dataset: | | 内存 | |---|---| | 默认 read_csv | 5.1 GB | | + int / float downcast | 3.2 GB | | + categorical (3 cols) | 1.4 GB | | + string[pyarrow] (1 col) | 1.1 GB | | + sparse (2 cols) | 800 MB | 整套操作 < 30 秒 + 完全保留语义。后续 groupby / merge 也快得多 (小数据 = 快 cache 命中)。 ## 何时考虑换工具 pandas 优化到底后还是不够 → 切: - **polars**:原生流式 + 多核,比 pandas 快 5-30x - **DuckDB**:SQL 跑 Parquet / CSV,省内存 - **Dask**:pandas 类似 API + out-of-core + 集群 - **Spark**:超大数据集群 但单机 100GB 内 pandas + 这些技巧通常够。 ## 踩过的坑 1. **categorical 后 join 慢**:两个 df join 的 categorical 列要 "相同 categories" 否则 pandas 转回 object。 `df['x'].cat.set_categories(combined_cats)`。 2. **`pd.read_csv` 不带 dtype**:pandas 先全读 object / int64 / float64 占大量内存,read 完才转。**指定 dtype**:内存峰值降 2-3x。 3. **categorical 不能做某些操作**:`.str` accessor 在 categorical 上慢 / 不工作。需要时 `.astype('object').str.lower()`。 4. **sparse 与 numpy/sklearn 兼容性差**:很多 sklearn estimator 不 接受 SparseArray,要 `.to_dense()`。trade-off。 5. **string[pyarrow] 仍在演进**:pandas 2.x 部分功能(如 groupby) 仍回退到 object。看 changelog 跟进。

scikit-learn Pipeline + ColumnTransformer 把"训练泄漏"杀掉

## 起因 ML 新手最容易写出"data leakage"——把测试集的统计信息(均值 / 编码 / 缺失值填补)混到训练里。常见错法: ```python # ❌ 整个数据集上算均值标准差,然后切分 from sklearn.preprocessing import StandardScaler scaler = StandardScaler().fit(X) X = scaler.transform(X) X_train, X_test, y_train, y_test = train_test_split(X, y) ``` 测试集的均值"漏"进了 scaler,cross-validation 分数偏高,部署到真实 新数据立刻拉胯。 `Pipeline` + `ColumnTransformer` 强制把所有 preprocessing 关在 cv split 内部,自动避免泄漏。 ## 解决方案 ### 1. 简单 Pipeline ```python from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.model_selection import cross_val_score pipe = Pipeline([ ('scaler', StandardScaler()), ('clf', LogisticRegression()), ]) scores = cross_val_score(pipe, X, y, cv=5) print(f'mean acc: {scores.mean():.3f} ± {scores.std():.3f}') ``` 每折 cv 时 pipeline 重新 `fit` scaler,只在那一折的训练数据上算 mean / std。没有泄漏。 ### 2. 混合类型特征:ColumnTransformer 数据集常常包含 numerical + categorical 两类: ```python import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.impute import SimpleImputer df = pd.read_csv('users.csv') y = df.pop('churn') X = df num_cols = ['age', 'days_active', 'monthly_revenue'] cat_cols = ['country', 'plan', 'referral_source'] preprocessor = ColumnTransformer([ ('num', Pipeline([ ('imp', SimpleImputer(strategy='median')), ('sc', StandardScaler()), ]), num_cols), ('cat', Pipeline([ ('imp', SimpleImputer(strategy='constant', fill_value='missing')), ('oh', OneHotEncoder(handle_unknown='ignore', sparse_output=False)), ]), cat_cols), ], remainder='drop') full_pipe = Pipeline([ ('prep', preprocessor), ('clf', GradientBoostingClassifier()), ]) scores = cross_val_score(full_pipe, X, y, cv=5, scoring='roc_auc') ``` 要点: - 每列类型独立配 imputer + encoder/scaler - `handle_unknown='ignore'`:测试集里出现训练集没见过的 category 值不报错 - `remainder='drop'` 显式:未声明的列扔掉(safer than `passthrough`) ### 3. 超参搜索:所有阶段一起搜 ```python from sklearn.model_selection import GridSearchCV param_grid = { 'prep__num__imp__strategy': ['mean', 'median'], 'clf__n_estimators': [100, 200, 500], 'clf__max_depth': [3, 5, 7], } search = GridSearchCV(full_pipe, param_grid, cv=5, scoring='roc_auc', n_jobs=-1, verbose=1) search.fit(X, y) print(search.best_params_, search.best_score_) ``` 注意命名:`stage_name__sub_stage__param`,双下划线层级。 `n_jobs=-1` 用所有 CPU 核并行 fit 各组合。 ### 4. 保存 / 加载整个 pipeline ```python import joblib joblib.dump(search.best_estimator_, 'model.pkl') # 部署侧 model = joblib.load('model.pkl') y_pred = model.predict(new_X) # 自动跑 preprocessor → estimator,新数据原始 DataFrame 进去 ``` 整套 preprocessing + 模型在一个文件里,不需要"先 scale 再 predict" 两步走,部署更安全。 ### 5. 自定义 transformer 业务相关的 feature engineering 包成 transformer: ```python from sklearn.base import BaseEstimator, TransformerMixin import numpy as np class LogRatio(BaseEstimator, TransformerMixin): """两列做 log(a/b) 派生""" def __init__(self, num_col, denom_col): self.num_col, self.denom_col = num_col, denom_col def fit(self, X, y=None): return self def transform(self, X): ratio = (X[self.num_col] + 1) / (X[self.denom_col] + 1) return np.log(ratio).to_frame('log_ratio') # 加进 ColumnTransformer preprocessor = ColumnTransformer([ ('lr', LogRatio('revenue', 'days_active'), ['revenue', 'days_active']), ('num', ..., num_cols), ('cat', ..., cat_cols), ]) ``` 业务知识沉淀进 pipeline,每次实验都用。 ## 效果 - cross-validation 分数与真实 holdout / 线上 A/B 偏差 < 1% (之前漏了泄漏时偏差 5-10%) - 部署只 `joblib.load + predict`,前后端不再重复实现 preprocessing - 改特征工程只动 pipeline 定义,下游所有实验自动跟着 - 团队新同事接手时一个 .pkl 文件即可,无须读 100 行 prep 脚本 ## 踩过的坑 1. **`drop='first'` 在 OneHotEncoder**:drop 第一个 category 用于 linear model 避免共线性,但 tree-based model 不需要。GBT / RF 用 `drop=None` 信息更全。 2. **categorical 列只出现在测试集的新值**:`handle_unknown='error'` 默认 会报错。生产用 `handle_unknown='ignore'` 当全 0 处理;或 `'infrequent_if_exist'` 归入 "rare" 桶。 3. **SimpleImputer 把整数列变浮点**:填了 median 后 int → float。 下游模型可能内存翻倍。pandas 2.0+ 用 nullable Int 类型避开。 4. **大 categorical 高基数(user_id)做 OneHot**:维度爆炸。 用 target encoding / hashing trick / embeddings。 5. **`fit_transform(X_train)` + `transform(X_test)` 写错顺序**: 测试集 fit_transform = 灾难。强迫自己用 Pipeline 就规避了 —— 你只 `pipe.fit(X_train)` + `pipe.predict(X_test)`,永远不会写 到 transform 这个层。

CLIP + Faiss 做"用文字搜图"的图片搜索引擎(自家相册版)

## 起因 手机里 10 万张照片,找"去年在日本拍的樱花" 要翻几天。 Google Photos 能做语义搜索但隐私 → 想本地。 OpenAI 的 CLIP 模型把图片和文字编码到同一个语义向量空间。 "樱花" 的文字向量 ≈ 樱花图片的视觉向量。 本地跑 CLIP + Faiss 索引 + 几行 Python = 自己的 Google Photos。 ## 解决方案 ### 装 ```bash uv add open-clip-torch faiss-cpu torch pillow tqdm # GPU 加速: uv add faiss-gpu torch --index https://download.pytorch.org/whl/cu124 ``` `open-clip` 是 LAION 训的 CLIP 系列(性能比官方 CLIP 好)。 ### Step 1: 提取图片特征 ```python import torch import open_clip from PIL import Image from pathlib import Path import numpy as np from tqdm import tqdm DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' # 中等大小 + 性能 + 多语言:xlm-roberta-large 支持中文 model, _, preprocess = open_clip.create_model_and_transforms( 'xlm-roberta-large-ViT-H-14', pretrained='frozen_laion5b_s13b_b90k', ) model = model.to(DEVICE).eval() tokenizer = open_clip.get_tokenizer('xlm-roberta-large-ViT-H-14') def encode_image(path): img = Image.open(path).convert('RGB') x = preprocess(img).unsqueeze(0).to(DEVICE) with torch.no_grad(): feat = model.encode_image(x) feat = feat / feat.norm(dim=-1, keepdim=True) return feat.cpu().numpy().squeeze().astype('float32') # 跑全相册 photos_dir = Path('~/Pictures/Photos').expanduser() paths = list(photos_dir.rglob('*.jpg')) + list(photos_dir.rglob('*.heic')) features = [] valid_paths = [] for p in tqdm(paths): try: features.append(encode_image(p)) valid_paths.append(str(p)) except Exception as e: print(f'skip {p}: {e}') features = np.stack(features) # shape (N, 1024) np.save('image_features.npy', features) with open('image_paths.txt', 'w') as f: f.write('\n'.join(valid_paths)) ``` GPU 上 10 万张 ~ 2-4 小时。CPU 慢 5-10 倍。一次性事,后续只索引新增。 ### Step 2: Faiss 索引 ```python import faiss import numpy as np features = np.load('image_features.npy') N, D = features.shape # 100000, 1024 # 小数据集(< 1M)用 flat:精确 + 简单 index = faiss.IndexFlatIP(D) # IP = inner product = cosine(features 已 normalize) index.add(features) faiss.write_index(index, 'images.index') # 大数据集(百万级)用 IVF + PQ 压缩 # index = faiss.index_factory(D, 'IVF1024,PQ32', faiss.METRIC_INNER_PRODUCT) # index.train(features) # index.add(features) ``` 10 万 × 1024 维 flat 索引约 400 MB。搜一次 < 10ms。 ### Step 3: 文字 → 找图 ```python paths = open('image_paths.txt').read().splitlines() index = faiss.read_index('images.index') def search(query: str, k=12): tokens = tokenizer([query]).to(DEVICE) with torch.no_grad(): feat = model.encode_text(tokens) feat = feat / feat.norm(dim=-1, keepdim=True) feat = feat.cpu().numpy().astype('float32') scores, indices = index.search(feat, k) return [(paths[i], scores[0][rank]) for rank, i in enumerate(indices[0])] # 用! for path, score in search('cherry blossoms in Japan'): print(f'{score:.3f} {path}') # 中文 for path, score in search('一只在沙滩上奔跑的金毛狗'): print(f'{score:.3f} {path}') ``` 返回 top-12 最相似的图,按 cosine similarity 排序。 ### Step 4: Web UI(10 行 Streamlit) ```python # app.py import streamlit as st from PIL import Image # ... 上面的 search 函数 ... st.title('我的照片搜索') query = st.text_input('描述你要找的图:') if query: results = search(query, k=12) cols = st.columns(4) for i, (path, score) in enumerate(results): with cols[i % 4]: st.image(Image.open(path), caption=f'{score:.2f}') ``` ```bash streamlit run app.py # 浏览器自动打开 ``` 输入"日落沙滩"→ 3 秒内显示所有匹配照片。 ## 进阶 ### 1. 图搜图(以图找图) ```python def search_by_image(image_path, k=12): feat = encode_image(image_path) feat = feat.reshape(1, -1) scores, indices = index.search(feat, k) return [(paths[i], scores[0][rank]) for rank, i in enumerate(indices[0])] ``` "找跟这张图相似的所有照片"。适合"找出所有该旅行的照片"。 ### 2. 增量索引 新增图片时不要重建整个 index: ```python new_features = [] for path in new_paths: new_features.append(encode_image(path)) new_features = np.stack(new_features) index.add(new_features) faiss.write_index(index, 'images.index') # paths 文件追加 with open('image_paths.txt', 'a') as f: f.write('\n' + '\n'.join(new_paths)) ``` Flat index 支持 add;如果是 IVF + PQ 需要 reuse trained index + add。 ### 3. 过滤:按 metadata ```python # EXIF 信息读取拍摄时间 / GPS / 相机 from PIL.ExifTags import TAGS def get_exif(path): img = Image.open(path) exif = {TAGS.get(k, k): v for k, v in (img._getexif() or {}).items()} return exif # 搜结果加 metadata filter results = search('cherry blossoms') filtered = [(p, s) for p, s in results if get_year(p) == 2023] ``` 更高级:把 metadata 存 SQLite 一起 join。 ### 4. CLIP 模型选择 | 模型 | 大小 | 速度 | 质量 | 多语言 | |---|---|---|---|---| | ViT-B/32 | 150 MB | 快 | 中 | 仅英 | | ViT-L/14 | 430 MB | 中 | 高 | 仅英 | | ViT-H/14 | 1.1 GB | 慢 | 极高 | 仅英 | | xlm-roberta + ViT-H | 4 GB | 慢 | 极高 | **多语言** | | siglip-large | 1 GB | 中 | 极高 | 看版本 | 中文场景一定用支持多语言的(xlm-roberta CLIP 或 chinese-clip)。 ### 5. faiss 大数据集 百万 - 千万级照片: ```python # IVF: 把 vectors 分桶,搜时只查最近的 N 个桶 nlist = int(np.sqrt(N)) # 桶数 quantizer = faiss.IndexFlatIP(D) index = faiss.IndexIVFFlat(quantizer, D, nlist, faiss.METRIC_INNER_PRODUCT) index.train(features) index.add(features) index.nprobe = 8 # 搜时查 8 个桶(增加 recall) ``` 千万级用 IVF + PQ 压缩(牺牲一点精度换 50x 内存压缩)。 ### 6. 部署到手机 CLIP 模型导出 ONNX / CoreML 后能在手机端跑: ```python torch.onnx.export(model.visual, dummy_image, 'clip_vision.onnx', opset_version=14) ``` Apple CoreML 工具更直接。手机端单图 encode < 200ms。 ## 完整效果 我的真实相册(4 万张照片): - index 大小:160 MB - 文字搜索单 query:~30ms - "去年在京都的樱花" → 95% 召回率(漏的是被树枝挡住的) - "戴墨镜的人" → 90% - "一群人合影" → 85% - "蓝色的天空" → 100% 但太多匹配 - "我爸" → 0%(没人脸识别能力) CLIP 强在"语义概念",弱在"特定人物 / 文字 OCR"。 后两者需要专门的人脸识别 + OCR pipeline 配合。 ## 与替代品对比 | | 自托管 CLIP | Google Photos | Apple Photos | immich (开源) | |---|---|---|---|---| | 隐私 | ✅ | ❌ | 部分(设备端) | ✅ | | 自由度 | 高 | 低 | 中 | 中 | | 人脸识别 | 没(自己加) | ✅ | ✅ | ✅ | | 语义搜索 | ✅(CLIP) | ✅ | 一定 | ✅(CLIP) | | 多设备 | 自己搭 | ✅ | ✅ | ✅ | 如果不想从零搭:**immich** 是开源 Google Photos 替代, 内置 CLIP 搜索 + 人脸识别 + 多设备同步。 ## 踩过的坑 1. **HEIC 格式**:iPhone 拍的 .heic 默认 Pillow 读不了。 `pip install pillow-heif` + `register_heif_opener()`。 2. **GPU memory 不够**:H/14 model + 高分辨率图 batch=1 仍 OOM。 `feat = model.encode_image(x.half())` half precision 减半显存。 3. **路径有中文**:Windows 上 `Path` 偶尔编码乱。统一 UTF-8 + 转 绝对路径。 4. **加新图后忘 update**:每次 sync 跑增量 encode + add to index。 写个 cron。 5. **face matching is poor**:CLIP 不擅长"区分两张人脸是否同人"。 要加 face recognition 用 ArcFace / FaceNet 等专用模型。

precision / recall / F1 / ROC-AUC:分类指标什么时候用谁

## 起因 老板问我"模型准确率 95% 是不是很厉害"。我一翻数据集: 99.5% 是负类("用户不会购买"),全猜负 trivial baseline 也 99.5% 准确率。我们的 95% 反而是垃圾。这是 accuracy 在不平衡数据上的经典 陷阱。 下面把几个常见分类指标用一个具体例子串起来。 ## 例子:邮件垃圾识别 - 测试集 1000 封邮件 - 真实:900 normal + 100 spam - 模型预测出 80 封 spam,其中 70 真是 spam(10 假阳性 FP)+ 30 真 spam 被漏(30 假阴性 FN) 混淆矩阵: | | 预测 spam | 预测 normal | |---|---|---| | 真实 spam | TP=70 | FN=30 | | 真实 normal | FP=10 | TN=890 | ## 指标对照 ### Accuracy $$\text{acc} = \frac{TP + TN}{TP + TN + FP + FN} = \frac{960}{1000} = 96\%$$ 不平衡数据下骗人。这个例子 96% 看着很好。 ### Precision(精确率) $$\text{precision} = \frac{TP}{TP + FP} = \frac{70}{80} = 87.5\%$$ "被预测为 spam 的邮件里,多少真是 spam"。**FP 代价高时关注**(误判 正常邮件为垃圾很烦)。 ### Recall(召回率) / Sensitivity / TPR $$\text{recall} = \frac{TP}{TP + FN} = \frac{70}{100} = 70\%$$ "所有真 spam 里,我抓到多少"。**FN 代价高时关注**(漏报 spam)。 医疗筛查、欺诈检测、安全告警都更看重 recall——宁可多报点 false positive 让人复查,也不能漏 true positive。 ### F1 $$F1 = 2 \cdot \frac{precision \cdot recall}{precision + recall} = 0.778$$ precision 和 recall 的调和平均。两者都重要、又只能优化一个数时用 F1。 ### F-beta `beta` 控制 recall 相对 precision 的权重: - F0.5:precision 加权(更看重不误报) - F1:均衡 - F2:recall 加权(更看重不漏报) ### ROC-AUC 不依赖阈值,看模型"把正样本排在负样本前"的能力。1.0 = 完美, 0.5 = 瞎猜。 在 sklearn: ```python from sklearn.metrics import roc_auc_score, precision_recall_fscore_support y_proba = model.predict_proba(X_test)[:, 1] auc = roc_auc_score(y_test, y_proba) # 在某个阈值上 y_pred = (y_proba > 0.5).astype(int) p, r, f, _ = precision_recall_fscore_support(y_test, y_pred, average='binary') ``` ROC-AUC 在极不平衡数据上**会过于乐观**。建议同时看 PR-AUC(precision-recall 曲线下面积)。 ### PR-AUC ```python from sklearn.metrics import average_precision_score ap = average_precision_score(y_test, y_proba) ``` 不平衡数据(正类占比 < 10%)的标准评估。比 ROC-AUC 更敏感地反映模型 区分能力。 ## 选择指南 | 场景 | 主指标 | |---|---| | 类别平衡 + FP/FN 等代价 | accuracy / F1 | | 不平衡 + 关注少数类 | PR-AUC、F1、recall | | 排序质量(不定阈值) | ROC-AUC、PR-AUC | | FP 代价高(误报扰民) | precision、F0.5 | | FN 代价高(漏报致命) | recall、F2 | | 多分类 | macro-F1(每类均权)/ weighted-F1(按支持度) | ## confusion matrix 可视化 ```python from sklearn.metrics import ConfusionMatrixDisplay ConfusionMatrixDisplay.from_predictions(y_test, y_pred, normalize='true') # normalize='true' 按真实类归一化,看每类的召回率 ``` 行内归一化能直接读"真 spam 里有多少被分对"。 ## classification_report ```python from sklearn.metrics import classification_report print(classification_report(y_test, y_pred, digits=3, target_names=['normal', 'spam'])) ``` ``` precision recall f1-score support normal 0.967 0.989 0.978 900 spam 0.875 0.700 0.778 100 accuracy 0.960 1000 macro avg 0.921 0.844 0.878 1000 weighted avg 0.958 0.960 0.958 1000 ``` 一眼看到每类的三个指标 + 宏 / 加权平均。 ## 阈值调整 预测概率 > 0.5 是默认。但业务上可以调: - 关注 recall:降阈值(0.3)→ 更多预测为 spam → recall 升、precision 降 - 关注 precision:升阈值(0.7)→ 反之 用 PR 曲线选最优 trade-off: ```python from sklearn.metrics import precision_recall_curve prec, rec, thr = precision_recall_curve(y_test, y_proba) # 找 precision >= 0.95 的最高 recall 对应阈值 idx = (prec[:-1] >= 0.95).nonzero()[0] best_thr = thr[idx[rec[idx].argmax()]] ``` ## 效果(一个 churn 模型的真实改造) 我们的客户流失预测原本看 accuracy,0.91。但 base rate 流失率 8%—— 全猜不流失也 92%。换成 PR-AUC 评估: - 全猜不流失:PR-AUC = 0.08 - 我们的模型:PR-AUC = 0.34 明显能看出模型有价值。再按 PR 曲线选阈值:保留 recall >= 0.7 的最高 precision 点,把 marketing 干预投到这批"高风险"用户上。CAC 下降 30%。 ## 踩过的坑 1. **`average='binary'` 在多分类时报错**:多分类要 `'macro'`/`'micro'`/ `'weighted'`。`'micro'` 在不平衡时等于 accuracy,意义有限。 2. **正类标号搞反**:sklearn 默认 `pos_label=1`。如果你的"想检测的类" 是 0,得 `pos_label=0` 否则 precision/recall 算的是另一类。 3. **不看 support**:F1 macro = 0.55 看着不行,但其中 90% 是稀有类 的支撑,整体 weighted F1 0.88 还可以。两个都要看。 4. **直接信任 cross-validation 的 ROC-AUC**:不平衡数据 + KFold(不 stratify)→ 某些 fold 几乎没正样本 → AUC 失真。永远用 `StratifiedKFold`。 5. **阈值在训练集上选**:必须用验证集 / OOF 选阈值;测试集只用于最终 报告。

Prophet:5 行写一个还不错的时间序列预测(替代手动调 ARIMA)

## 起因 老板让"预测下季度营收"。专业的 forecasting 上 ARIMA / SARIMA / state space model 调一周;我们要的是"现在马上有个基础数字 + 不太离谱"。 Facebook (Meta) 开源的 Prophet 是 GAM (Generalized Additive Model) 风格的 time series 工具,对"商业数据特征 (趋势 + 周期 + 节假日 + 异常点)" 极友好,几乎零调参就有不错效果。 ## 装 ```bash uv add prophet # Mac M 系列 / Win 装 prophet 偶尔遇 cmdstanpy 编译问题 # 解决:先 conda install cmdstanpy 再 pip install prophet ``` ## 5 分钟 demo ```python import pandas as pd from prophet import Prophet # 数据格式:必须两列 ds (datetime) + y (value) df = pd.read_csv('daily_revenue.csv') # date,revenue # 2023-01-01,1234.5 # 2023-01-02,1289.1 # ... df.columns = ['ds', 'y'] # 模型 m = Prophet() m.fit(df) # 预测未来 90 天 future = m.make_future_dataframe(periods=90) forecast = m.predict(future) # yhat = 预测值;yhat_lower/upper = 80% 置信区间 print(forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()) # 可视化 m.plot(forecast).savefig('forecast.png') m.plot_components(forecast).savefig('components.png') ``` 输出: - `forecast.png`:历史 + 预测曲线 + 置信带 - `components.png`:拆解成 trend / weekly / yearly 三个成分 调参为 0。 ## 加节假日 中国春节 / 双 11 等是营业额异常点,加进 model 让预测更准: ```python holidays = pd.DataFrame({ 'holiday': 'major_sale', 'ds': pd.to_datetime([ '2023-11-11', '2024-11-11', # 双 11 '2023-06-18', '2024-06-18', # 618 ]), 'lower_window': -1, # 节前 1 天也算 'upper_window': 2, # 节后 2 天也算 }) m = Prophet(holidays=holidays) m.fit(df) ``` 或者内置节假日: ```python m.add_country_holidays(country_name='CN') # 自动加入春节 / 国庆 / 五一等 ``` ## 多季节性 默认 yearly + weekly + daily。要月度 / 自定义周期: ```python m = Prophet() m.add_seasonality(name='monthly', period=30.5, fourier_order=5) m.fit(df) ``` `fourier_order` 是季节性的"灵活度"(越大越能拟合复杂周期;过高 overfit)。 ## 外部回归量(exogenous) 业务量受外部因素影响: ```python df['ads_spend'] = ... # 广告投入 df['is_promotion'] = ... # 0/1 促销标志 m = Prophet() m.add_regressor('ads_spend') m.add_regressor('is_promotion') m.fit(df) # 预测时也要提供未来值 future['ads_spend'] = expected_ads_for_next_90d future['is_promotion'] = expected_promo_flag forecast = m.predict(future) ``` 广告 / 促销作为协变量进 model,预测更贴合业务现实。 ## 异常点 Prophet 对异常点(COVID 期间 / 单日大促)相对鲁棒(用 GAM + 平滑)。 不需要手工剔除。要进一步控制可以: ```python # 把已知异常段 mark 为 NaN 让 Prophet 忽略 df.loc[(df['ds'] >= '2020-02-01') & (df['ds'] <= '2020-05-01'), 'y'] = None m.fit(df) ``` 或者用 `add_seasonality` 处理"上下界" 让 trend 不被极端值影响。 ## cross-validation 评估 ```python from prophet.diagnostics import cross_validation, performance_metrics # initial 365 天训练,每 30 天滚动一次,预测 horizon=30 天 df_cv = cross_validation(m, initial='365 days', period='30 days', horizon='30 days') metrics = performance_metrics(df_cv) print(metrics[['horizon', 'mape', 'rmse']].head()) # horizon mape rmse # 0 1 days 0.08 102.3 # 1 2 days 0.09 105.1 # ... ``` MAPE (Mean Absolute Percentage Error) < 10% 在大多数商业数据是 "还行",5% 算优秀。 ## 与替代品对比 | | Prophet | ARIMA / SARIMA | statsmodels | NeuralProphet | NHITS / TimeGPT | |---|---|---|---|---|---| | 学习曲线 | 极低 | 高(要懂 ACF/PACF) | 高 | 中 | 低 | | 调参量 | 几乎 0 | 多 | 多 | 中 | 几乎 0 | | 自动季节性 | ✅ | 手动 | 手动 | ✅ | ✅ | | 节假日 | ✅ | 手动 | 手动 | ✅ | ✅ | | 多变量 | regressor | VAR | VAR | ✅ | ✅ | | 高频数据 | 中(小时级) | ✅ | ✅ | ✅ | ✅ | | 长期趋势 | 强 | 中 | 中 | 强 | 强 | 简单业务时间序列 → Prophet。 学术 / 严谨需求 → ARIMA / state space。 现代深度方法 → NeuralProphet / Darts / Nixtla statsforecast。 ## 真实业务案例 我们用 Prophet 做月度营收预测: - 训练数据:2 年日级营收 - 加节假日:春节 + 双 11 + 618 - 外部回归:广告预算 + 促销日 - horizon:90 天 vs 之前的"人拍" 预测: | | 人拍 | Prophet | Prophet + 节假日 + regressor | |---|---|---|---| | MAPE | 25% | 12% | 7% | | 工时 | 4 hour/月 | 10 min/月 | 30 min/月 | 不仅准、还省时。 ## 输出多 model + ensemble ```python # 跑 3 个不同 changepoint_prior_scale 取均值 forecasts = [] for cps in [0.001, 0.05, 0.5]: m = Prophet(changepoint_prior_scale=cps) m.fit(df) forecasts.append(m.predict(future)['yhat']) ensemble = pd.concat(forecasts, axis=1).mean(axis=1) ``` 不同超参对不同部分敏感;平均通常更稳。 ## 何时不该用 Prophet - **特别短**的时间序列(< 2 季):信号不够,model 拟合差 - **复杂多变量交互**:要用 LightGBM / 神经网络 - **极高频(毫秒级)实时**:Prophet 慢,用 specialized 工具 - **概率性预测要严格 calibrated**:bootstrap interval 仅近似 ## 踩过的坑 1. **列名必须 ds / y**:用了别的名 Prophet 不识别。`df.rename(...)` 后再 fit。 2. **datetime 时区**:Prophet 内部假设 naive datetime(无时区)。 带 tz 会报错。`df['ds'] = df['ds'].dt.tz_localize(None)`。 3. **`make_future_dataframe(periods=90, freq='D')`**:默认日级; 月级数据要 `freq='M'`。漏指定就把月数据当日数据预测。 4. **logistic growth 需要 cap**:用 `growth='logistic'` 时必须给 `cap` 列(上界),否则报错。 5. **changepoint 自动检测不靠谱**:业务有大转折(疫情 / 公司战略 变化)时,手动指定: ```python m = Prophet(changepoints=['2020-02-01', '2022-06-15']) ```

Dagster vs Airflow vs Prefect:现代 ETL pipeline 选哪个

## 起因 要搭一个数据 pipeline:每天凌晨 1 点从 S3 拉新数据 → 清洗 → 入 ClickHouse → 跑特征工程 → 训 ML model → push 到 staging。 失败要重试 + 告警 + 部分重跑。 三个主流编排工具: - **Airflow**:老牌,2014 出自 Airbnb,业界事实标准 - **Prefect**:Airflow 工程师出走重新设计,更轻量 - **Dagster**:从数据资产角度重新思考 pipeline,2024 风头最劲 试了一周下来记录。 ## Airflow 定义 DAG: ```python # airflow_dag.py from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator default_args = { 'owner': 'data-team', 'retries': 2, 'retry_delay': timedelta(minutes=5), } with DAG( 'daily_etl', default_args=default_args, schedule='0 1 * * *', start_date=datetime(2024, 1, 1), catchup=False, max_active_runs=1, ) as dag: def fetch_data(**ctx): ds = ctx['ds'] # '2024-05-24' download_from_s3(f's3://bucket/raw/{ds}/') def clean_data(**ctx): ds = ctx['ds'] clean(input_dir, output_dir) fetch = PythonOperator(task_id='fetch', python_callable=fetch_data) clean = PythonOperator(task_id='clean', python_callable=clean_data) load = PythonOperator(task_id='load', python_callable=load_to_ch) train = PythonOperator(task_id='train', python_callable=train_model) fetch >> clean >> load >> train ``` 跑: ```bash airflow webserver airflow scheduler # 浏览器 http://localhost:8080 看 DAG ``` **优点**: - 业界最广,会的人多 - operator 库巨大(Bash / Spark / EMR / Snowflake / Postgres / Slack / ...) - 成熟的 SLA / lineage / retry 机制 - K8s 部署成熟(KubernetesExecutor / KubernetesPodOperator) **缺点**: - DAG = 全局 Python module,导入慢(scheduler 反复 import 几千 DAG) - 任务间传数据麻烦(XCom 限大小;走 S3 / DB 才行) - 本地开发体验差(webserver + scheduler + executor 三件套) - DAG 语法重,3 个 task 也 50 行 ## Prefect ```python # prefect_flow.py from prefect import flow, task from datetime import datetime, timedelta @task(retries=2, retry_delay_seconds=300) def fetch_data(date: str) -> str: path = f's3://bucket/raw/{date}/' download_from_s3(path) return path @task def clean_data(raw_path: str) -> str: out = clean(raw_path) return out @task def load_to_ch(clean_path: str): load(clean_path) @task def train_model(): train() @flow(name='daily_etl', log_prints=True) def daily_etl(date: str = None): date = date or datetime.utcnow().date().isoformat() raw = fetch_data(date) clean = clean_data(raw) load_to_ch(clean) train_model() if __name__ == '__main__': daily_etl() ``` 跑: ```bash # 本地直接跑 python prefect_flow.py # 部署到 Prefect Cloud / 自托管 server prefect deploy prefect_flow.py:daily_etl --name nightly --cron '0 1 * * *' prefect worker start --pool default ``` **优点**: - 写 flow 像写普通 Python 函数,task 间传值像普通调用 - 本地开发体验好(直接 run flow) - Prefect Cloud 免费层够小团队用 - 动态 task / sub-flow 支持好 **缺点**: - 生态不如 Airflow(operator / connector 少) - 2.0 vs 3.0 几次大改 breaking - 自托管 server 还需要装 Postgres + Redis 等 ## Dagster ```python # dagster_pipeline.py from dagster import asset, AssetExecutionContext, Definitions, ScheduleDefinition, define_asset_job @asset def raw_data(context: AssetExecutionContext) -> str: ds = context.partition_key # '2024-05-24' return download_from_s3(f's3://bucket/raw/{ds}/') @asset(deps=[raw_data]) def cleaned_data(context, raw_data: str) -> str: return clean(raw_data) @asset(deps=[cleaned_data]) def clickhouse_table(cleaned_data: str): load_to_ch(cleaned_data) @asset(deps=[clickhouse_table]) def ml_model() -> str: return train() daily_job = define_asset_job('daily', selection='*') daily_schedule = ScheduleDefinition(job=daily_job, cron_schedule='0 1 * * *') defs = Definitions( assets=[raw_data, cleaned_data, clickhouse_table, ml_model], schedules=[daily_schedule], ) ``` 跑: ```bash dagster dev # http://localhost:3000 看 asset 图 ``` **优点**: - "asset-based" 思维:每个 task 产出一个数据资产 (table / file / model),blueprint = asset 图,自然 documentation - IDE 友好:type hint + 静态分析整套 pipeline - backfill / partition / lineage 一等公民 - 内置数据测试 / asset checks - 本地开发 = `dagster dev` 一条命令 **缺点**: - 最新(2019),生态比 Airflow 小 - 学习曲线略陡(asset / op / job / sensor 几个概念) - 团队上手时间长 ## 选型对比 | 场景 | 推荐 | |---|---| | 已经在用 Airflow + 团队熟 | 继续 Airflow | | 新项目 + 现代 Python + IDE 重度 | **Dagster** | | 极简 + 个人项目 + Cloud SaaS | Prefect | | 万千 DAG + 跨团队 + 老系统对接 | Airflow(生态最广) | | ML pipeline + 强 data lineage | Dagster | 我个人新项目首选 Dagster:asset 抽象比 task 更贴合"数据流"思维。 ## 共同 best practice 不管选哪个: ### 1. 任务幂等 ```python @task def load_to_ch(path): # 用 INSERT OR REPLACE / DELETE then INSERT 让重跑不重复 db.execute(f"DELETE FROM table WHERE date = '{date}'") db.execute(f"INSERT INTO table SELECT * FROM read_parquet('{path}')") ``` partition / date 当业务 key,重跑刷掉这个 partition 再插。 ### 2. 数据 contract 定义"这个 task 输出什么 schema / row count / 范围": ```python @asset def cleaned_data(...) -> Output: df = ... return Output( value=df, metadata={ 'rows': len(df), 'columns': list(df.columns), 'null_rate': df.isna().mean().to_dict(), } ) ``` 下游任务依赖 contract 而非 implicit 假设。下次改变 schema 显式报错。 ### 3. 分层 按 dbt 风格: ``` sources/ # 原始数据 staging/ # 轻清洗 / 类型转换 intermediate/ # 业务逻辑组合 marts/ # 业务可用的 final table ``` 每层独立可测试 + 故障 isolate。 ### 4. 告警 + 监控 - task 失败 → Slack / 邮件 - task 跑太久 → SLA miss 告警 - 数据指标异常(行数突变) → 检查告警 - Prometheus 暴露 task duration / success rate ### 5. backfill 历史日期重跑: ```bash # Airflow airflow dags backfill -s 2024-01-01 -e 2024-05-01 daily_etl # Dagster dagster job backfill --job daily --partitions '2024-01-01..2024-05-01' # Prefect prefect deployment run 'daily_etl/nightly' --param date=2024-01-15 ``` ### 6. data lineage / catalog 知道"这个表是哪些 task 生成的,哪些下游用了它"。 Dagster 内置;Airflow 接 OpenLineage / Marquez;Prefect 类似。 ## 整套部署(举例 Dagster + K8s) ``` - dagster-webserver (UI) - dagster-daemon (scheduler + sensor) - code locations (你的 pipeline Python) - PostgreSQL (metadata) - S3 / GCS (intermediate storage) - K8s(task 在 pod 跑) ``` helm chart 一键: ```bash helm install dagster dagster/dagster --set ... ``` ## 效果 我们的项目用 Dagster: - 100+ assets 自动算依赖图 - 单次 backfill 半年数据:UI 选时间范围 + run,asset 自动并行 - 数据 schema 改动 → asset check fail → CI 拦下 → 改下游 → 再合 - 新成员看 asset 图 30 秒理解整套 pipeline ## 踩过的坑 ### Airflow 类 1. **DAG 太多 scheduler 慢**:每 30s 扫所有 DAG 文件 import。 把 DAG 分文件 + 用 dynamic DAG generation 减少 scheduler IO。 2. **XCom 大数据**:传几 MB 会爆 backend DB。task 间传 path(S3 key) 而非数据本身。 ### Prefect 类 3. **2.x → 3.x breaking**:API 大改。生产 pin 大版本 + 升级提前测。 ### Dagster 类 4. **asset partition 配错**:同 asset 一个 daily 一个 hourly partition 会冲突。统一一 asset 一种 partition scheme。 5. **monorepo 多 code location**:每个 location 独立 Python 进程, 依赖隔离好但启动慢。开发期合并;生产分。

推荐系统第一步:用 implicit 库做协同过滤(不用任何深度模型)

## 起因 老板说"加个'你可能也喜欢' 推荐"。我们的数据: - 用户 5 万 - 商品 1 万 - 用户-商品 浏览 / 购买记录 100 万条 不是 YouTube 级,深度学习 / DSSM / DeepFM 过于豪华。 传统协同过滤(implicit feedback ALS)几行代码就有 baseline,效果 比"按热度排序" 提升 30-50% CTR。下面是真实跑通流程。 ## 解决方案 ### 装 ```bash uv add implicit scipy pandas ``` `implicit` 库是 Ben Frederickson 写的 C++ ALS 实现,比 surprise / spotlight 快 100x。 ### 数据格式:sparse user-item matrix ```python import pandas as pd from scipy.sparse import csr_matrix events = pd.read_csv('user_events.csv') # user_id,item_id,event_type # 1234,567,view # 1234,567,view # 1234,789,purchase # 隐式反馈权重:购买 > 加购 > 浏览 weights = {'view': 1, 'cart': 3, 'purchase': 10} events['w'] = events['event_type'].map(weights) # 聚合 agg = events.groupby(['user_id', 'item_id'])['w'].sum().reset_index() # user_id,item_id,w # 1234,567,2 # 1234,789,10 # 编码成连续整数索引 user_ids = agg['user_id'].unique() item_ids = agg['item_id'].unique() user_idx = {u: i for i, u in enumerate(user_ids)} item_idx = {it: i for i, it in enumerate(item_ids)} agg['ui'] = agg['user_id'].map(user_idx) agg['ii'] = agg['item_id'].map(item_idx) # sparse matrix matrix = csr_matrix((agg['w'].values, (agg['ui'].values, agg['ii'].values)), shape=(len(user_ids), len(item_ids))) print(matrix.shape, matrix.nnz) # (50000, 10000) 1000000 ``` ### ALS 训练 ```python from implicit.als import AlternatingLeastSquares model = AlternatingLeastSquares( factors=64, # embedding 维度 regularization=0.01, iterations=20, alpha=40, # implicit feedback 信心权重 use_gpu=False, # GPU 可选 ) model.fit(matrix) # 几秒钟完成 ``` `alpha` 是 implicit ALS 的关键超参: - 表示"我们对'用户买了'比'用户没买' 的信心差多少 - 论文建议 alpha=40 是好起点 - 调参可以 GridSearch on 验证集 ### 给一个用户推荐 top-K ```python def recommend(user_id, k=10): if user_id not in user_idx: return [] # 冷启动 uid = user_idx[user_id] item_ids_arr, scores = model.recommend(uid, matrix[uid], N=k) return [(item_ids[i], float(s)) for i, s in zip(item_ids_arr, scores)] print(recommend(user_id=1234, k=5)) # [(item_id, score), ...] ``` `model.recommend()` 内部: 1. 拿用户 embedding (64 维) 2. 跟所有 item embedding 算 dot product 3. 排除已交互的 item 4. 返回 top-K 毫秒级返回。 ### 相似商品推荐("你看了这个,也可能看那个") ```python def similar_items(item_id, k=10): if item_id not in item_idx: return [] iid = item_idx[item_id] item_ids_arr, scores = model.similar_items(iid, N=k+1) # 第一个是它自己,跳过 return [(item_ids[i], float(s)) for i, s in zip(item_ids_arr[1:], scores[1:])] ``` 商品详情页用:"看了这个的人还看了..."。 ### 评估:split + 看 hit@K / MAP@K ```python import numpy as np def hit_at_k(model, train_matrix, test_dict, k=10): """test_dict: {user_idx: set(item_idx)} 是 holdout 的真 interaction""" hits = 0 total = 0 for uid, true_items in test_dict.items(): if uid >= train_matrix.shape[0]: continue recommended, _ = model.recommend(uid, train_matrix[uid], N=k) if any(it in true_items for it in recommended): hits += 1 total += 1 return hits / total ``` hit@10 = 0.20 意味着 20% 的用户 top-10 推荐里有真实喜欢的。 随机推荐 hit@10 ≈ k / num_items ≈ 0.001。 200x 提升 = 信号强 = 模型 work。 ### 冷启动用户 新用户没历史 → 用户 embedding 不存在 → ALS 推不了。 fallback: ```python def recommend_with_fallback(user_id, k=10): if user_id in user_idx: return recommend(user_id, k) # 冷启动:推热门 return popular_items_in_user_segment(user_id, k) ``` 或者用 sign-up 时收集的偏好 / 人口学信息做内容-based 起步。 ### 冷启动商品 新上架商品没 interaction → ALS 给不出 embedding。 解决: - 用商品 metadata(类别 / 标签 / 描述)映射到现有 embedding 空间 - "two-tower" 模型:让 item tower 用 metadata 做 embedding 简单做法:新商品借用同类商品的 embedding 均值。 ## 几个进阶 model ### BM25 weighting ```python from implicit.nearest_neighbours import bm25_weight weighted = bm25_weight(matrix, K1=100, B=0.8) model = AlternatingLeastSquares(factors=64) model.fit(weighted) ``` BM25 给罕见物品更高 weight,对长尾推荐更好。 ### Item-Item CF (用户少 + 商品多时) ```python from implicit.nearest_neighbours import CosineRecommender model = CosineRecommender(K=10) model.fit(matrix) ``` 直接算 item-item 相似度,无 embedding 训练。 适合 100k+ items + 用户少场景。 ### BPR (Bayesian Personalized Ranking) ```python from implicit.bpr import BayesianPersonalizedRanking model = BayesianPersonalizedRanking(factors=64, learning_rate=0.05) model.fit(matrix) ``` learn-to-rank 风格,对"排序质量" 优化(不是 reconstruction)。 小数据集上经常更好。 ## 部署到生产 ```python # 训练完保存 import pickle with open('als_model.pkl', 'wb') as f: pickle.dump({ 'model': model, 'user_idx': user_idx, 'item_idx': item_idx, 'item_ids': item_ids, }, f) # inference 服务 class RecAPI: def __init__(self, path): with open(path, 'rb') as f: d = pickle.load(f) self.model = d['model'] self.user_idx = d['user_idx'] self.item_ids = d['item_ids'] # ... def recommend(self, user_id, k=10): uid = self.user_idx.get(user_id) if uid is None: return [] item_arr, _ = self.model.recommend(uid, ...) return [self.item_ids[i] for i in item_arr] ``` FastAPI 包一下,端口暴露给业务。 ### 每日重训 cron / Airflow: ```python @task def retrain_als(): events = load_recent_events(days=90) matrix = build_matrix(events) model = AlternatingLeastSquares(factors=64) model.fit(matrix) save_model(model, ...) # 通知 inference 服务 reload requests.post('http://rec-api/reload') ``` 每天凌晨 1 点训。新 interaction 进入 embedding。 ## A/B test 验证 把 50% 用户走"推荐"接口,50% 走"按热度排"。 观察: - CTR(点击率) - conversion rate(转化率) - 用户 session 时长 通常协同过滤 baseline 比"按热度" CTR 提升 30-100%。 ## 与深度模型对比 | | implicit ALS | LightFM | DeepFM | YouTube DNN | |---|---|---|---|---| | 数据量需求 | 中(万级用户) | 中 | 大 | 极大 | | 训练时间 | 秒-分钟 | 分钟 | 小时 | 天 | | 冷启动支持 | 弱 | 中(hybrid) | 中 | 中 | | 实施成本 | 极低 | 低 | 高 | 极高 | | 效果上限 | 中等 | 中-高 | 高 | 极高 | 500 万以上交互 + 想榨干 5-10% CTR → 上深度模型。 否则 ALS 就够。 ## 效果 我们小型电商上线 implicit ALS: - 数据:5w 用户 + 1w SKU + 100w events - 训练:30 秒(CPU) - 推理:< 5ms / user - A/B test:CTR +47%,conversion +18% - vs 之前的"按销量排序"基线,**显著**改善 后续上 LightFM hybrid(加 metadata)再 +12%。 深度学习准备阶段,但 ALS 已经 cover 大头 ROI。 ## 踩过的坑 1. **`matrix[uid]` 而不是 `matrix.getrow(uid)`**:implicit 0.7 后 API 改了, `recommend(user, user_items)` 要传整行 sparse vector。 2. **数据严重 popularity bias**:只看热门,ALS embedding 也偏热门。 BM25 weight 缓解。 3. **训练 / 推理使用不同 user_idx 映射** → 错位。永远 pickle 整套 {model, user_idx, item_idx, item_ids}。 4. **新用户进来推不了**:冷启动 fallback 一定要有,不然推荐 API 返空。 5. **A/B test 早期看 metric 没差异**:用户没注意新推荐位 / 流量太小 = 0 statistical power。至少 1 周 + 万级用户。