知识广场
按学科筛选:计算机科学 / 机器学习
«计算机科学 / 机器学习» 分类下共 13 篇帖子
## 起因 LLM 应用上生产后: - 用户报"AI 回答错" → 哪 prompt?哪 model call?怎么调试? - prompt 改一版 → 怎么评估质量没退步? - 月 OpenAI bill $5000 → 哪些 endpoint 烧钱? 传统 web app observability 工具不适合: - 输入 / 输出是大段文本 - 嵌套 LLM call(agent / tool use) - 需要 evaluation(不只是 metric) **Langfuse**:LLM 应用专用 observability + prompt management + eval。 open source + self-host 友好。 ## 装 ```bash # self-host docker compose up -d -f langfuse-docker-compose.yml # UI on localhost:3000 ``` 或 SaaS:cloud.langfuse.com 免费 tier。 ## SDK 集成 ```python from langfuse import Langfuse from langfuse.decorators import observe lf = Langfuse(public_key='pk-...', secret_key='sk-...') @observe() def answer_question(q: str) -> str: # 自动 trace 这函数 + 嵌套 LLM call context = retrieve(q) prompt = f"Context: {context}\n\nQ: {q}" response = openai.chat.completions.create( model='gpt-4', messages=[{'role': 'user', 'content': prompt}], ) return response.choices[0].message.content ``` `@observe()` 装饰器自动 track 函数调用 + 输入 / 输出 + 子调用 trace。 OpenAI call 用 Langfuse 包装的 client → 自动捕获 prompt / response / token / cost。 ## langchain / llamaindex 集成 ```python from langfuse.callback import CallbackHandler handler = CallbackHandler() chain.invoke({'q': q}, config={'callbacks': [handler]}) ``` LangChain 整 chain / agent 自动 trace(每步骤 / 每 tool call / 每 LLM call)。 ## UI trace view: ``` trace: answer_question("How to deploy?") ├─ retrieve (5 docs found) ├─ format_prompt ├─ openai.chat.completions │ model: gpt-4 │ tokens: 1234 in / 234 out │ cost: $0.024 │ latency: 1.2s └─ output: "To deploy, run..." ``` 每 LLM call 看到: - input / output 全文 - model / params - token + cost - latency - error 嵌套 trace 直观看 agent 步骤。 ## sessions + users ```python @observe() def chat(user_id: str, session_id: str, message: str): ... lf.update_current_trace(user_id=user_id, session_id=session_id) ``` UI 按 user / session 聚合 → 看某用户对话历史 + cost。 ## prompt management prompt 不写代码里,存 Langfuse: ```python prompt = lf.get_prompt('answer-question', version='production') formatted = prompt.compile(context=ctx, question=q) ``` UI 改 prompt → 自动 versioned → 部署不用改 code。 ## evaluation 每 trace 加 score: ```python lf.score( trace_id=trace.id, name='accuracy', value=0.9, ) ``` 或者 user feedback: ```python @app.post('/feedback') async def feedback(trace_id: str, thumbs_up: bool): lf.score(trace_id=trace_id, name='user_feedback', value=1 if thumbs_up else 0) ``` UI 聚合:哪些 prompt 评分低?哪些 model 效果好? LLM-as-judge eval: ```python def llm_judge(trace): judgment = openai.chat.completions.create( model='gpt-4', messages=[{ 'role': 'user', 'content': f'Rate this answer 1-5: Q: {trace.input}, A: {trace.output}', }], ) lf.score(trace_id=trace.id, name='llm_judge', value=parse(judgment)) ``` LLM 评 LLM → 自动 quality monitoring。 ## dataset + experiment ```python # 上传 test dataset lf.create_dataset(name='qa-test-set', items=[ {'input': q1, 'expected': a1}, {'input': q2, 'expected': a2}, ]) # 跑实验 for item in lf.get_dataset('qa-test-set'): actual = answer_question(item.input) lf.create_dataset_item_score(item, 'exact_match', actual == item.expected) ``` 新 prompt / model → 跑 dataset → 对比 score。 回归测试 for LLM。 ## cost tracking dashboard 看: - 每 model 总 cost - 每 user 总 cost - 每 endpoint trace count + cost - 趋势 ``` Last 7 days: gpt-4: $234 (1234 calls) gpt-4o: $89 (5678 calls) embedding: $12 Top user: user-42 ($45 last week) ``` 提前发现"某 endpoint 烧钱" → 优化(用 cheaper model / cache)。 ## 与替代品 | | Langfuse | LangSmith | Helicone | Weights & Biases | |---|---|---|---|---| | 开源 | ✅ | ❌(SaaS) | ✅ | ❌ | | Self-host | ✅ | ❌ | ✅ | enterprise | | trace | ✅ | ✅ | ✅ | ✅ | | prompt mgmt | ✅ | ✅ | 弱 | ❌ | | eval | ✅ | ✅ | 弱 | ✅ | | price | free / 用量 | 付费 | free / 用量 | 付费 | 我用 Langfuse self-host(数据敏感 + 开源 + 功能全)。 ## 真实 case 我们一个客户 RAG 应用上线: - 1000+ QPS LLM call - 接入 Langfuse 一周 - 发现: - 某 endpoint 答错率 30%(prompt 有 bug,UI 看 trace 立刻发现) - 某 user 每天 $50 cost(abuse 检测) - retrieval 召回率低 → 改 chunking 策略 如果没 trace,全靠 user complain → 慢 + 漏 80%。 ## privacy LLM 内容可能含 PII。 Langfuse 配 PII redaction: ```python lf.flush() # 含 mask sensitive ``` 或者在客户端 mask 后再发: ```python def mask(text): return re.sub(r'\b\d{16}\b', '[CARD]', text) @observe() def chat(msg): msg_masked = mask(msg) ... ``` ## 部署 self-host 简单 docker compose(langfuse + postgres + clickhouse)。 clickhouse 存 trace(大量 string 数据 → columnar 高效)。 ## 与 OTEL 集成 Langfuse 1.0+ 支持 OTLP receiver: ```python # OpenTelemetry trace 自动转 Langfuse ``` 跟现有 observability stack 整合。 ## 踩过的坑 1. **flush 异步**:默认异步 send,应用退出前要 `lf.flush()` 否则 trace 丢。 2. **trace input/output 大**:长文本占 DB。配 truncate。 3. **cost 计算不准**:Langfuse 内置 cost map 可能滞后于 OpenAI 价格 调整。自己 update 或者验证。 4. **self-host clickhouse 重**:单 server 几 GB RAM 起。小项目用 SaaS 简单。 5. **prompt version 滥**:每改一字一版本 → UI 难看。考虑 staging / production tag。
## 起因 公司不想全部 LLM 调用都走 OpenAI / Anthropic API: - 成本:长上下文 + 高 QPS → API 月几万刀 - 数据敏感:医疗 / 金融数据不送 cloud - 自由度:fine-tune 自己 model 开源 LLM(Llama 3 / Qwen 2 / DeepSeek 等)质量足够替代 GPT-4 一些场景。 推理框架选 `vLLM`(Berkeley 出,paged attention,**事实标准**)。 ## 装 ```bash pip install vllm ``` 需要 NVIDIA GPU + CUDA。RTX 4090 / A100 / H100 / L40。 ## 跑 ```bash python -m vllm.entrypoints.openai.api_server \ --model meta-llama/Meta-Llama-3-8B-Instruct \ --port 8000 ``` 启动后兼容 OpenAI API 格式: ```python from openai import OpenAI client = OpenAI(base_url='http://localhost:8000/v1', api_key='x') res = client.chat.completions.create( model='meta-llama/Meta-Llama-3-8B-Instruct', messages=[{'role': 'user', 'content': '你好'}], ) print(res.choices[0].message.content) ``` 现有 OpenAI client 一行改 base_url 就能换成自己的 vLLM。 ## 性能 vs HF transformers 跑 Llama 3 8B / 单 A100: | | 推理速度 | |---|---| | HuggingFace transformers | 30 tok/s | | vLLM | 800 tok/s(continuous batching) | | TGI (huggingface) | 600 tok/s | vLLM 的 paged-attention + continuous batching 让 GPU 利用率拉满 → 20x HF transformers。 ## 跑大 model 70B 模型需 80GB+ VRAM。单 A100 80GB 跑 fp16 紧。 量化 + tensor parallel: ```bash # 4-bit AWQ 量化 → 40GB 装得下 single A100 python -m vllm.entrypoints.openai.api_server \ --model TheBloke/Llama-3-70B-Instruct-AWQ \ --quantization awq # 多 GPU tensor parallel python -m vllm.entrypoints.openai.api_server \ --model meta-llama/Meta-Llama-3-70B-Instruct \ --tensor-parallel-size 4 ``` 4 张 A100 跑 70B fp16 → 充足空间 + 速度更快。 ## paged attention 是什么 LLM 推理时每个 sequence 有 KV cache(每 token 的 attention key/value 存住)。 传统:每 sequence 预留最大 length 的 contiguous block → 浪费(多数 sequence 短)。 paged:把 KV cache 切成固定大小 page(类似 OS virtual memory)→ 按需分配 / 共享相同 prefix。 效果: - 内存利用率 90%+ vs 传统 30-50% - 同 GPU 能跑更多 concurrent request - prefix caching:相同 system prompt 的多个请求共享 prefix cache ## continuous batching 老办法 static batching:等够 N 个 request → 一起跑 → 等最慢的 → 返回。 有 request 早完成也要等。 continuous batching:每生成 1 token 就检查能不能塞新 request。 不浪费 GPU cycle。 ``` Time GPU 0 [A:gen, B:gen, C:gen, D:gen] # A B C D 同 batch 1 [A:gen, B:gen, C:gen, D:gen, E:start] # E 加入 2 [A:done, B:gen, C:gen, D:gen, E:gen] # A 完成腾出 slot 3 [F:start, B:gen, C:gen, D:gen, E:gen] # F 加入 ``` 吞吐量是传统 batching 2-5x。 ## prefix caching system prompt 经常一样: ``` You are a helpful assistant. Today is 2026-05-25. Answer concisely. [user message: ...] ``` vLLM 0.4+ 默认开 prefix cache:相同 prefix 的 KV cache 共享 → system prompt 不重新算 → 加速。 特别适合 chatbot / agent 多 turn 对话。 ## structured output GPT-4 / Claude 都支持 JSON mode / function calling。vLLM 0.4+ 也有: ```python res = client.chat.completions.create( model=..., messages=[...], extra_body={ 'guided_json': { 'type': 'object', 'properties': { 'name': {'type': 'string'}, 'age': {'type': 'integer'}, }, 'required': ['name', 'age'], }, }, ) ``` 底层用 outlines / xgrammar 强制 token 选择符合 schema → 100% 合规 JSON。 ## LoRA 多租户 部署一个 base model + 多个 LoRA fine-tune 一起 serve: ```bash python -m vllm.entrypoints.openai.api_server \ --model meta-llama/Meta-Llama-3-8B \ --enable-lora \ --lora-modules sql=./loras/sql-lora medical=./loras/medical-lora ``` ```python client.chat.completions.create(model='sql', ...) client.chat.completions.create(model='medical', ...) ``` 多个特化 model 共享同 base → 显存省 + 切换零开销。 ## 监控 vLLM 暴露 Prometheus metrics: ```bash --enable-metrics # /metrics endpoint ``` 关键指标: - `vllm:num_requests_running` - `vllm:gpu_cache_usage_perc` - `vllm:time_to_first_token_seconds` - `vllm:e2e_request_latency_seconds` Grafana dashboard 看吞吐 / latency / cache 利用。 ## 与替代方案对比 | | vLLM | TGI | llama.cpp | ollama | |---|---|---|---|---| | 目标 | 高吞吐 GPU server | 同 | CPU/GPU 单机 | 单用户简单 | | 性能 | 最高 | 中高 | 中(CPU 优秀) | 中 | | API | OpenAI-compat | 自家 + OAI | 自家 + OAI | 自家 | | 量化 | AWQ / GPTQ / FP8 | 同 | GGUF | GGUF | | 适合 | 生产 server | 生产 server | 笔记本 | 个人 | 生产用 vLLM。个人 / 笔记本用 ollama。 ## 实战 cost 对比 跑 Llama 3 70B 处理 100M tokens/月: | 方案 | 月成本 | |---|---| | OpenAI GPT-4 API | ~$8000 | | Anthropic Claude Sonnet | ~$6000 | | Together / Fireworks API(70B) | ~$900 | | 自托管 vLLM + 2× A100 cloud | ~$2500(GPU 租金) | | 自托管 + own H100 | ~$0(一次性买卡) | 自托管 break-even 大约 200M tokens/月(vs cloud API)。 量大 + 控制需求 → 自托管。量小 → API。 ## 容器化部署 ```dockerfile FROM vllm/vllm-openai:latest CMD ["--model", "meta-llama/Meta-Llama-3-8B-Instruct", "--port", "8000"] ``` ```yaml # k8s deployment resources: limits: nvidia.com/gpu: 1 ``` prod 跑 NVIDIA GPU operator 配 GPU node + vLLM container。 ## 踩过的坑 1. **OOM**:模型 + KV cache 超 VRAM → OOM。`--gpu-memory-utilization 0.9` 留 buffer;或者降 `--max-model-len`。 2. **量化精度损失**:AWQ 4-bit 比 fp16 任务上有 1-3% 精度损失。 测过再上。 3. **prefix cache 没用上**:system prompt 略有不同(如时间戳) → prefix 不匹配。把动态部分放后面。 4. **OpenAI client 兼容性**:某些参数(如 `tools`)vLLM 还不支持 → 用基础 chat completions。 5. **生产 reload model**:vLLM 不支持热重载。换 model 必须重启容器。 blue-green deploy。
## 起因 数据团队常见需求: - 给 PM / 业务做个交互式 dashboard - 给一个 ML model 一个 demo UI - 内部工具(查 user / 跑 cohort 分析) 完整前端(React + API)几天工作量。 **streamlit** / **gradio**:纯 Python 描述 UI,5 分钟出能用 app。 ## streamlit ```python # app.py import streamlit as st import pandas as pd st.title('Sales Dashboard') uploaded = st.file_uploader('上传 CSV', type='csv') if uploaded: df = pd.read_csv(uploaded) st.dataframe(df) country = st.selectbox('国家', df.country.unique()) filtered = df[df.country == country] st.bar_chart(filtered.groupby('product').amount.sum()) st.metric('总销售', f"${filtered.amount.sum():,.0f}") ``` ```bash streamlit run app.py # 浏览器自动开 localhost:8501 ``` 整个 app 一个 .py,自顶向下 imperative。 每次 widget 交互 → script 从头重跑(reactive)。 ## gradio ```python # app.py import gradio as gr def predict(image): # 跑 model return label, score demo = gr.Interface( fn=predict, inputs=gr.Image(), outputs=[gr.Label(), gr.Number()], title='Image Classifier', ) demo.launch() ``` gradio 更 function-centric:定义 input/output → 自动 wrap UI。 ## 两个的定位 - **streamlit**:通用 dashboard / 内部工具(多个 widget 交互) - **gradio**:ML model demo(input → 跑 → output) streamlit 是 layout + state-aware app;gradio 是 function demo wrapper。 ## streamlit 细节 ### session state ```python if 'count' not in st.session_state: st.session_state.count = 0 if st.button('+1'): st.session_state.count += 1 st.write(f'count: {st.session_state.count}') ``` 每次重跑 script,session_state 持久(同浏览器 session)。 ### cache ```python @st.cache_data def load_data(path): return pd.read_csv(path) # 慢操作 df = load_data('big.csv') # 重 invocation cached ``` `@st.cache_data` 内容缓存;`@st.cache_resource` 资源(model)缓存。 不 cache 的话每次按 widget 都重读 CSV → 慢。 ### 多页面 ``` my_app/ ├── app.py # 主页 ├── pages/ │ ├── 1_📊_Dashboard.py │ ├── 2_🔍_Search.py │ └── 3_⚙️_Settings.py ``` `streamlit run app.py` 左侧自动有 page 切换。 ### chart 库 streamlit 内置: - `st.line_chart`, `st.bar_chart`, `st.area_chart`(轻量) - `st.pyplot()` matplotlib - `st.plotly_chart()` plotly - `st.altair_chart()` altair - `st.vega_lite_chart()` 任意 chart 都能塞。 ## gradio 细节 ### 复合 input/output ```python demo = gr.Interface( fn=lambda txt, slider: txt * slider, inputs=[ gr.Textbox(label='文本'), gr.Slider(1, 10, step=1), ], outputs=gr.Textbox(), ) ``` ### Blocks (复杂布局) ```python with gr.Blocks() as demo: gr.Markdown('# My App') with gr.Row(): with gr.Column(): inp = gr.Textbox() btn = gr.Button('Run') with gr.Column(): out = gr.Textbox() btn.click(fn=process, inputs=inp, outputs=out) demo.launch() ``` Blocks 更接近 streamlit 灵活度。 ### chat interface ```python def respond(message, history): return f"echo: {message}" gr.ChatInterface(respond).launch() ``` 3 行起 LLM chat UI。huggingface space 上 90% chat demo 用 gradio。 ## 性能 / scale - streamlit:每个 user 连接独立 session,但跑同一进程;重 compute 会卡其它 user - gradio:queue 系统,多个 request 排队跑 LLM demo 用 gradio(queue 默认);多 user dashboard 用 streamlit。 ## 部署 ### streamlit cloud / hugging face streamlit cloud free tier: - 连 GitHub repo - 推 → 自动部署到 streamlit.app domain hugging face space: - 同样思路,免费 CPU - gradio / streamlit 都支持 ### 自托管 ```dockerfile FROM python:3.12-slim WORKDIR /app COPY . . RUN pip install -r requirements.txt EXPOSE 8501 CMD streamlit run app.py --server.address 0.0.0.0 ``` 放 nginx 反代 + auth → 内部工具。 ## 鉴权 两者都没原生用户系统。 方案: - nginx + basic auth - Cloudflare Access(zero-trust) - streamlit-authenticator package - OAuth proxy (oauth2-proxy) 内部工具我用 Cloudflare Access:5 分钟配,免维护。 ## 与 dash / panel 对比 - **Dash**(Plotly):基于 Flask + React,更灵活但写得多 - **Panel**(HoloViz):科学计算友好,多 backend - **Reflex**(前 Pynecone):写 Python 编 React,UI 强 streamlit / gradio 简单优先;dash / reflex 复杂应用。 ## 我的选择 - **数据 dashboard** → streamlit - **ML 模型 demo** → gradio - **内部 admin tool** → streamlit - **需要复杂前端** → 接 SPA + FastAPI ## case:客户演示工具 要给客户演示一个文本摘要 LLM: ```python import gradio as gr from transformers import pipeline summarizer = pipeline('summarization') def summarize(text): return summarizer(text, max_length=100)[0]['summary_text'] gr.Interface( fn=summarize, inputs=gr.Textbox(lines=10, placeholder='粘贴长文'), outputs=gr.Textbox(label='摘要'), title='LLM 摘要 Demo', examples=[['Long text 1...'], ['Long text 2...']], ).launch(share=True) # share=True 给临时公网 URL(gradio.live) ``` 30 秒部署 + URL 发给客户 + 客户能直接试。比 PPT 强 100 倍。 `share=True` 临时 tunnel 72 小时有效。 ## 内部 case:cohort 分析工具 ```python import streamlit as st import duckdb st.title('User Cohort Analysis') date_range = st.date_input('日期范围', value=(start, end)) group_by = st.selectbox('Group by', ['country', 'plan', 'source']) @st.cache_data def query(date_range, group_by): return duckdb.sql(f""" SELECT {group_by}, DATE_TRUNC('week', signup_date) AS cohort, COUNT(*) AS users FROM read_parquet('s3://.../users/*.parquet') WHERE signup_date BETWEEN '{date_range[0]}' AND '{date_range[1]}' GROUP BY 1, 2 """).df() df = query(date_range, group_by) st.plotly_chart(px.line(df, x='cohort', y='users', color=group_by)) st.dataframe(df) ``` 业务自己改 dropdown 看不同维度。 原本 BA 找数据团队跑 → 改成业务自助。 ## 踩过的坑 1. **state 重置**:streamlit 每次交互重跑 script。耗时 op 没 cache → 卡。 2. **gradio queue 默认关**:高 concurrent 时阻塞。`demo.queue()` 打开。 3. **streamlit 多 tab**:同 user 多 tab → state 不共享。 `st.session_state` 是单 session 单 tab。 4. **share=True 安全**:gradio share 链接公网,没 auth。给 demo 用, 不要放 secret data。 5. **upload size 限制**:streamlit 默认 200 MB;要更大改 `--server.maxUploadSize=1000`。
## 起因 数据 pipeline 跑出来的表: - 突然某天 row 数 -80%(上游断了) - 某列 null 比例飙到 30%(schema 改了没告知) - 重要 metric 暴增 100x(埋点 bug) 下游 BI 报表 / model 已经用上 → 业务 / model 出错。 **数据质量监控**是数据团队必须建的。 两个主流工具: - **dbt test**(轻量,pipeline 集成) - **Great Expectations / GX**(重量,schema + 历史 profile) ## dbt test 基础 dbt model 的 schema.yml 里加 test: ```yaml # models/orders/schema.yml models: - name: orders columns: - name: id tests: - unique - not_null - name: status tests: - accepted_values: values: ['pending', 'paid', 'cancelled', 'refunded'] - name: amount tests: - dbt_expectations.expect_column_values_to_be_between: min_value: 0 max_value: 1000000 tests: - dbt_utils.expression_is_true: expression: "amount = price * quantity" ``` ```bash dbt test --select orders ``` 每个 test → 一个 SQL `SELECT count(*) WHERE 失败条件`。 返回 > 0 → 失败。 ### 优势 - 跟 dbt run 同 workflow(test after run) - SQL-based 简单 - `dbt_utils` / `dbt_expectations` 几百 test - 失败定位明确(具体 model + column) ### 劣势 - 只能查"当前结果是否符合规则" - 没历史 baseline / trend - 不知道"昨天 100 行,今天 50 行"算 anomaly ## dbt 常用 test ```yaml tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: [user_id, day] - dbt_utils.recency: datepart: day field: created_at interval: 1 # 最新 record 不能超过 1 天前 - dbt_expectations.expect_table_row_count_to_be_between: min_value: 1000 max_value: 100000 - dbt_expectations.expect_column_value_lengths_to_equal: column: phone value: 11 - dbt_expectations.expect_column_proportion_of_unique_values_to_be_between: column: email min_value: 0.9 ``` table_row_count 等 catch "今天数据突然少"。 ## Great Expectations (GX) ```python import great_expectations as gx context = gx.get_context() # 定义 expectation suite suite = context.suites.add(name='orders_suite') suite.add_expectation( gx.expectations.ExpectColumnValuesToBeBetween( column='amount', min_value=0, max_value=1000000)) suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column='user_id')) # 跑 validation validator = context.get_validator(...) results = validator.validate() ``` GX 比 dbt test 复杂但功能更多: - profile dataset → 自动生成 expectation - HTML 报告(哪 row 失败、% 等) - versioned expectation suite - data docs 生成 ## 自动 profile ```python profiler = gx.profile.UserConfigurableProfiler(profile_dataset=batch) suite = profiler.build_suite() ``` 让 GX 看现有数据,自动建 expectation(每列 min/max/null% etc)。 人审 + 调 → 提交。 第一次设 expectation 非常省时。 ## anomaly detection GX 0.18+ 支持 "expect column values stay close to historical mean": ```python gx.expectations.ExpectColumnMeanToBeBetween( column='daily_revenue', min_value=-3, max_value=3, strict_min=False, auto=True, # 自动 baseline last 30 day ) ``` 跟历史 baseline 比,3σ 之外报警。 catch "突然飙升 / 暴跌"。 ## dbt + GX 集成 `dbt-expectations` 包是 GX expectation 用 SQL 重写的 dbt test 版本。 所以两个工具的核心 expectation 高度重叠: ```yaml # dbt_expectations 在 dbt test 里 - dbt_expectations.expect_column_values_to_match_regex: column: email regex: '^[^@]+@[^@]+\.[^@]+$' ``` 简单 expectation → dbt_expectations 够,pipeline 内集成。 需要 profile / anomaly / 历史 → 用 GX。 ## 我们的 setup ``` 1. dbt run + dbt test(pipeline 内,failing test → 阻塞 pipeline) 2. GX 每天对核心表 daily check(独立 schedule) 3. anomaly detected → Slack 告警 + 人工调查 ``` dbt test 防"明显 bug"。GX 防"渐变趋势异常"。 ## test 严重级别 dbt 1.5+ test 有 severity: ```yaml - not_null: severity: error # 默认,阻塞 - not_null: severity: warn # 不阻塞,只 log ``` `warn` 给"偶尔不符合但还 acceptable"的规则。 ## 失败时怎么办 dbt test 默认 fail → 整 pipeline halt。 策略: 1. **block downstream**:dbt 默认行为,下游 model 不跑(避免坏数据传播) 2. **alert only**:`severity: warn`,下游照跑 + 通知人 3. **quarantine**:把坏 row 隔离到 errors table,好 row 继续 选哪个看业务容忍度。金融 → block。日志 → warn。 ## storing test results dbt test 默认结果不存。 `dbt-checkpoint` 或 自定义 macro 把结果写表: ```sql -- models/_test_results.sql SELECT current_timestamp AS run_at, '{{ this.name }}' AS test, {{ test_function() }} AS result ``` 历史化 → Grafana 看 test pass rate 趋势。 ## 真实 case:救命的 test 我们一个 ETL pipeline: ```yaml tests: - dbt_expectations.expect_table_row_count_to_be_between: min_value: 50000 # 历史日均 80k max_value: 200000 ``` 某天上游 partition 错 → 我们表只 catch 到 5k 行。 test 立刻失败 → pipeline halt → BI 没拿到坏数据 → 修 partition → re-run。 没这 test 的话,dashboard 显示 5k 行 = "今天业务下滑 95%", 高管 panic。 ## 与 monte-carlo / soda 对比 | | dbt test | GX | Monte Carlo | Soda | |---|---|---|---|---| | 部署 | 跟 dbt | self-host / cloud | SaaS | self/cloud | | 价格 | 0 | 0 | 贵 | mid | | anomaly | 基础 | 中 | 强 (ML) | 中 | | 集成 | dbt 原生 | API | data warehouse 联 | 类 GX | | 上手 | 极易 | 中 | easy(SaaS) | 中 | 预算紧 → dbt test + GX。 预算大 + 不想运维 → Monte Carlo。 ## 踩过的坑 1. **expectation 太严**:`amount > 0` 但实际有 refund 是负数 → 全部 alert false positive。expectation 必须 calibrate。 2. **suite 跟 schema 不同步**:表加列,suite 没改 → 没 cover。 review process。 3. **GX 版本升级**:0.x → 1.x breaking change 大。锁版本 / 小心升。 4. **test 跑慢**:每个 test 一条 query → 大表 N test 慢。dbt `--store-failures` 让结果存表 + 跑一次 query 多 test。 5. **silent broken**:test 跑了但通过(即使数据有问题)。覆盖度 review 重要。每次 incident 后加新 test,防同问题。
## 起因 Jupyter notebook 用了 10 年,痛点积累: - cell 乱序执行 → 隐藏状态 / 难复现 - JSON 文件 git diff 不友好 - 无 IDE 类型检查 - 不能直接 run as script - import 难(要把 notebook 转 .py) `marimo`(2023+)是新一代 Python notebook,**reactive** 设计: - 单 file = `.py`(git friendly + 可 import) - cell dependency 图 → 改一个 cell 自动重跑下游 - 无隐藏状态(重启 → 一致结果) - 内置 UI widget(slider / dropdown 等) ## 装 ```bash pip install marimo # 或者 uv add marimo ``` ```bash marimo new # 新建 notebook marimo edit my.py # 编辑 marimo run my.py # 当 app 跑(read-only) ``` ## 文件格式 ```python # my_notebook.py import marimo __generated_with = "0.6.0" app = marimo.App() @app.cell def __(): import polars as pl df = pl.read_csv('data.csv') df @app.cell def __(df): summary = df.group_by('country').agg(pl.col('amount').sum()) summary @app.cell def __(summary): summary.plot.bar(x='country', y='amount') ``` 每 cell 是函数:参数 = 它依赖的变量。 marimo 自动构建 DAG,运行时知道改 `df` → `summary` 重算 → `plot` 重算。 ## reactivity 改第一个 cell,比如 `pl.read_csv('other.csv')`: - marimo 自动识别 `df` 变了 - 下游 cell(用 `df` 的)自动跑 - 不用手动 Shift+Enter 每个 cell vs Jupyter:你必须记得"我改了 df,要 re-run 下游"。少 re-run 一个就 state 不一致。 ## 无隐藏状态 Jupyter 经典 bug: ```python # cell 1 x = 5 # cell 2 print(x) # 5 # cell 1 修改 x = 10 # 不重跑 cell 2 # cell 3 print(x) # 10 # cell 2 显示 5,cell 3 显示 10 → 矛盾状态 ``` marimo 不允许:cell 1 改了 → cell 2 / 3 都重跑。 restart kernel + run all 跟正常 run 一样。 ## git friendly ```bash $ git diff my_notebook.py - df = pl.read_csv('data.csv') + df = pl.read_csv('orders.csv') ``` 正常 Python diff。 vs Jupyter `.ipynb`: ```bash $ git diff my.ipynb - "execution_count": 12, + "execution_count": 13, - "metadata": {"id": "abc"}, + "metadata": {"id": "def"}, - ... 几百行 base64 image diff ``` `.ipynb` 是 JSON + base64 编码的 output,PR review 痛苦。 工具如 `nbdime` / `jupytext` 能缓解但 marimo 原生 .py 更纯。 ## 当 script 跑 ```bash python my_notebook.py # 跟 jupyter nbconvert 不一样,marimo .py 真的是 Python 文件,直接跑 ``` CI 跑 notebook 测试 → 一行命令。 ## 当 import 用 ```python # main.py from my_notebook import summary # marimo 让 cell 变量 import 出来 ``` marimo notebook = Python module。 prototype 完直接被生产 code 引用,不用"先把 cell 改 function 再 import"。 ## UI widget ```python @app.cell def __(mo): slider = mo.ui.slider(0, 100, value=50, label='Sample size') slider @app.cell def __(slider, df): sample = df.head(slider.value) sample ``` slider 调 → 自动重算 sample → 显新结果。 不用 ipywidgets 复杂 callback。 ## marimo as app ```bash marimo run my.py --host 0.0.0.0 --port 8000 ``` 变成 web app(read-only),URL 分享给同事。 UI widget 仍可交互。 替代 streamlit 简单场景。 ## vs Jupyter 优势汇总 | | Jupyter | marimo | |---|---|---| | 文件格式 | JSON .ipynb | Python .py | | 隐藏状态 | 是 | 否 | | Reactive | 否 | 是 | | Git diff | 烂 | 好 | | Run as script | 要 nbconvert | 直接 | | Import | 要 nbconvert | 直接 | | UI widget | ipywidgets | marimo.ui | | AI 友好度 | 弱(JSON) | 强(.py) | ## Jupyter 仍胜的场景 - 巨大社区(教程 / 大量 .ipynb 内容) - Google Colab / Kaggle / VS Code 内置 Jupyter - 自由 cell 顺序(marimo 强制 DAG 无环) - 显示 rich output 历史悠久 + 稳 新项目我用 marimo,老 .ipynb 继续 Jupyter。 ## 我的工作流 数据探索: 1. `marimo edit explore.py` 2. 改 cell → 即时看下游结果(reactive) 3. 完成的 cell 重构成 function 4. 把 explore.py 里关键 function `import` 到 production code production analysis dashboard: 1. `marimo edit dashboard.py` 加 widget 2. `marimo run dashboard.py --headless` 部署 替代 Jupyter + nbconvert + streamlit 三个工具。 ## 与 nbdev 对比 `nbdev`(fast.ai):Jupyter 当源码,notebook 直接 ship 为库。 marimo:notebook 就是 .py,本来就是源码。 nbdev 改进 Jupyter;marimo 重新设计。我选 marimo(少抽象层)。 ## 与 Pluto.jl 对比 Julia 的 Pluto 是 marimo 的灵感来源(reactive notebook)。 marimo 在 Python 把这模型实现。 ## 性能 reactive DAG 计算开销小(几 ms / 改动)。 大 cell 仍跟 Jupyter 一样跑。 增量执行有时还更快(只跑变化的 cell + 下游)。 ## 踩过的坑 1. **强制 DAG 无环**:写 `x = 1` in cell 1 + `x = 2` in cell 2 报错 (same variable defined twice)。需要 refactor 到 function 或者 rename。开始不适应。 2. **某些 imperative pattern 不支持**:`for i in range(5): print(i)` in cell → marimo 不 reactive。包 function。 3. **plot library 兼容**:matplotlib OK;plotly / altair OK; 极少 niche lib 可能不渲染。 4. **AI tab completion 弱于 Jupyter**:jupyter-ai / VS Code Copilot Jupyter 集成强。marimo 在追赶。 5. **大 dataframe display 慢**:reactive 每次跑都重 display。 `df` 大时显式 `df.head()`,或者 cache。
## 起因 每天跑一遍数据仓库 SQL transform: ```sql -- daily_user_metrics SELECT user_id, DATE(created_at) AS day, COUNT(*) AS events, SUM(value) AS total FROM events GROUP BY 1, 2; ``` 全量跑:扫几亿行 events → 20 min + Snowflake credit 几百刀。 但实际上**昨天之前的天数据是冻结的**,每天只需算今天的新增。 dbt 的 incremental model 模式解决这问题。 ## 全量 model ```sql -- models/daily_user_metrics.sql {{ config(materialized='table') }} SELECT user_id, DATE(created_at) AS day, COUNT(*) AS events FROM {{ ref('events') }} GROUP BY 1, 2 ``` `materialized='table'`:每次 dbt run 都 DROP + CREATE TABLE AS。 小数据 OK;大数据浪费。 ## incremental model ```sql -- models/daily_user_metrics.sql {{ config( materialized='incremental', unique_key=['user_id', 'day'], on_schema_change='append_new_columns' ) }} SELECT user_id, DATE(created_at) AS day, COUNT(*) AS events, SUM(value) AS total FROM {{ ref('events') }} {% if is_incremental() %} WHERE created_at >= (SELECT MAX(day) FROM {{ this }}) - INTERVAL '1 day' {% endif %} GROUP BY 1, 2 ``` 关键: - `{{ this }}` 引用当前 model 自己 - `is_incremental()` 在"已存在且不是 --full-refresh"时为 true - WHERE 只取新数据 - `unique_key` 让 dbt 知道按啥 merge 第一次 run:建 table + 全量。 之后每次 run:只算新数据 → MERGE 进现有 table。 ## 效果 我们一个 metric model: - 事件表 5 亿行 - 全量跑:22 分钟,$3 credit - incremental:1.5 分钟,$0.2 credit 12x 时间节省,15x 成本节省。 ## 几种 strategy dbt 支持几种 incremental 策略(按 warehouse): | strategy | 行为 | |---|---| | `append` | 直接 INSERT 新数据(不去重) | | `merge` | 默认。MERGE INTO with unique_key | | `delete+insert` | 删 unique_key 匹配的行 + INSERT | | `insert_overwrite` | partition-level overwrite(BigQuery / Spark) | ```sql {{ config( materialized='incremental', incremental_strategy='merge', unique_key='id', ) }} ``` PG / Snowflake / Redshift 用 merge。BigQuery 大 partition 用 insert_overwrite。 ## late-arriving data 事件 24 小时后才到(移动 SDK 离线缓存)。 window 要更宽: ```sql {% if is_incremental() %} WHERE created_at >= (SELECT MAX(day) FROM {{ this }}) - INTERVAL '7 days' {% endif %} ``` 回看 7 天,覆盖晚到的事件。`unique_key` 保证 merge 不重复。 trade-off:window 越宽 → 重算越多 → 增量收益减。 ## 全量 backfill 老数据要重算(如 metric 公式改了): ```bash dbt run --select daily_user_metrics --full-refresh ``` `--full-refresh` 让 incremental 当 table 处理 → DROP + 重建。 或者部分重算: ```bash # 重算最近 30 天 dbt run --select daily_user_metrics --vars '{"start_date": "2025-04-01"}' ``` model 里读 var: ```sql {% if var('start_date', false) %} WHERE created_at >= '{{ var("start_date") }}' {% endif %} ``` ## 跟 partitioned table 配 BigQuery / Snowflake partition: ```sql {{ config( materialized='incremental', incremental_strategy='insert_overwrite', partition_by={'field': 'day', 'data_type': 'date'}, cluster_by=['user_id'], ) }} ``` incremental insert overwrite 比 merge 更高效(partition 整块替换 vs row-level merge)。 ## test incremental dbt test 默认在 model run 后跑: ```yaml # models/schema.yml models: - name: daily_user_metrics columns: - name: user_id tests: - not_null - name: day tests: - not_null tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: [user_id, day] ``` increment 后唯一性测试**关键**:merge 配错 → 重复数据。 ## 监控 `dbt run` 完看 timing: ``` 1 of 5 START incremental model daily_user_metrics ........ [RUN] 1 of 5 OK created incremental model daily_user_metrics ... [SUCCESS in 89.42s] ``` 如果 incremental 跑越来越慢(不应该): - WHERE 条件 partition / cluster key 利用不充分 - merge 行数变大(window 太宽) - new column 加了导致 `on_schema_change` 触发 ## dbt 1.6+ microbatch dbt 1.6 引入 `microbatch` incremental_strategy(更明确的 partition-by-time): ```sql {{ config( materialized='incremental', incremental_strategy='microbatch', event_time='created_at', batch_size='day', lookback=3, ) }} SELECT ... FROM {{ ref('events') }} ``` dbt 自动按 day 分批跑 → backfill 时按天 chunk → 单个 chunk 失败不 影响别天。比手写 `is_incremental()` 干净。 ## 我们的 pipeline ``` events (raw, 5 亿行) ↓ [hourly] events_hourly (incremental, 24h window) ↓ [daily] daily_metrics (incremental, 7d window) ↓ monthly_summary (incremental, 1m window) ↓ dashboard ``` 每层 incremental,各自 window 覆盖延迟。 全量 backfill 用 `--full-refresh` 串行跑全部。 ## 真实 vs 全量 我们一个 200 个 model 的 dbt 项目,把 80% 大 model 改 incremental 后: - 总跑时间从 4h → 35min - Snowflake 月成本从 $8000 → $1200 - 失败重试代价低很多 incremental 是 dbt 最重要的 production pattern。 ## 踩过的坑 1. **unique_key 不对**:merge 进重复 → 数据涨。每次改 unique_key 必须 `--full-refresh`。 2. **`is_incremental()` 没用**:写在 CTE 里 / 没读 `{{ this }}` → 每次都全量。看生成的 SQL(`compile`)确认。 3. **schema change**:表加列 → 默认 incremental 报错(schema mismatch)。 `on_schema_change='append_new_columns'` 或 `'sync_all_columns'`。 4. **late-arriving 漏数**:window 太窄 → 晚到事件没进表。监控 `events vs metrics` 一致性。 5. **partition 没 prune**:BigQuery 大 partition 表 WHERE 写 timestamp 比较,但 model 没 partition by → full scan。 `partition_by` 配 + WHERE 用 partition 列。
## 起因 ML model train 在 PyTorch / TF / sklearn,部署面对: - 想跑在 CPU 而不是 GPU - 想 deploy 到 mobile / web / 嵌入式 - 想要更小 / 更快的 runtime(PyTorch ~ 1 GB;ONNX Runtime ~ 50 MB) - 不想生产环境扛 PyTorch 依赖 **ONNX (Open Neural Network Exchange)** 是模型表示标准。 **ONNX Runtime** 是跑 ONNX 模型的 runtime(C++ 写,多 backend)。 train 用任何框架 → export ONNX → 用 ONNX Runtime 跑。 ## export PyTorch: ```python import torch model = MyModel() model.load_state_dict(torch.load('model.pt')) model.eval() dummy_input = torch.randn(1, 3, 224, 224) torch.onnx.export( model, dummy_input, 'model.onnx', input_names=['input'], output_names=['output'], dynamic_axes={'input': {0: 'batch'}, 'output': {0: 'batch'}}, opset_version=18, ) ``` `dynamic_axes` 让 batch dim 运行时可变。 sklearn: ```python from skl2onnx import to_onnx onnx_model = to_onnx(sk_model, X_sample[:1]) with open('model.onnx', 'wb') as f: f.write(onnx_model.SerializeToString()) ``` HuggingFace transformers: ```bash optimum-cli export onnx --model bert-base-uncased ./onnx_out/ ``` ## 验证 ```python import onnx onnx.checker.check_model(onnx.load('model.onnx')) import onnxruntime as ort sess = ort.InferenceSession('model.onnx', providers=['CPUExecutionProvider']) input_name = sess.get_inputs()[0].name result = sess.run(None, {input_name: dummy_input.numpy()}) print(result[0].shape) ``` 跑通 → ONNX 模型 ready。 跟 PyTorch 原 model 输出做 numerical 比较(tolerance 1e-5): ```python torch_out = model(dummy_input).detach().numpy() onnx_out = sess.run(None, {input_name: dummy_input.numpy()})[0] np.testing.assert_allclose(torch_out, onnx_out, rtol=1e-3, atol=1e-5) ``` ## 跑 ONNX Runtime CPU: ```python sess = ort.InferenceSession('model.onnx', providers=['CPUExecutionProvider']) ``` GPU (CUDA): ```python sess = ort.InferenceSession('model.onnx', providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) ``` CoreML (mac M 系列): ```python providers=['CoreMLExecutionProvider', 'CPUExecutionProvider'] ``` WebAssembly (浏览器): ```js // onnxruntime-web import * as ort from 'onnxruntime-web'; const session = await ort.InferenceSession.create('model.onnx'); const results = await session.run({ input: tensor }); ``` 同一 .onnx 文件,多 platform 复用。 ## 性能 vs PyTorch ResNet50 inference / batch=1 / single thread: | Runtime | latency | |---|---| | PyTorch CPU | 80 ms | | ONNX Runtime CPU | 45 ms | | PyTorch (TorchScript) | 60 ms | | ONNX + OpenVINO backend | 28 ms | ONNX Runtime 普遍比 PyTorch CPU 快 1.5-2x(graph 优化 + inference 专用)。 GPU 上差距小,PyTorch 也很快。 ## 量化 ```python from onnxruntime.quantization import quantize_dynamic, QuantType quantize_dynamic('model.onnx', 'model_int8.onnx', weight_type=QuantType.QInt8) ``` INT8 量化 → model size 4x 小 + CPU 2x 快(精度损 1-3%)。 mobile / edge 部署关键。 ## 部署在 server ```dockerfile FROM python:3.12-slim RUN pip install onnxruntime fastapi uvicorn numpy COPY model.onnx app.py /app/ WORKDIR /app CMD ["uvicorn", "app:app", "--host", "0.0.0.0"] ``` ```python # app.py from fastapi import FastAPI import onnxruntime as ort import numpy as np app = FastAPI() sess = ort.InferenceSession('model.onnx') @app.post('/predict') async def predict(data: dict): x = np.array(data['input'], dtype=np.float32) result = sess.run(None, {'input': x})[0] return {'output': result.tolist()} ``` image 大小: - PyTorch image: ~3 GB - ONNX Runtime image: ~150 MB container 启动快 + 部署成本低。 ## 浏览器跑 model(onnxruntime-web) ```html <script src="https://cdn.jsdelivr.net/npm/onnxruntime-web/dist/ort.min.js"></script> <script> const session = await ort.InferenceSession.create('/static/model.onnx'); const feeds = { input: new ort.Tensor('float32', data, [1, 3, 224, 224]) }; const results = await session.run(feeds); console.log(results.output.data); </script> ``` 不需要 server,model 跑在用户浏览器。 - 数据不出端(隐私) - 0 server cost - 适合:图片分类 / 文本嵌入 / 小型模型 mobile 浏览器 4-bit 量化后跑 MobileNet 50 ms。 ## 与 TFLite / CoreML 对比 | | ONNX Runtime | TFLite | CoreML | TorchScript | |---|---|---|---|---| | 跨平台 | 强 | 强 | iOS only | 中 | | Train 框架 | 全 | TF | 全 | PyTorch | | 性能 | 高 | 高(mobile) | 极高(apple) | 中高 | | 工具链 | 复杂但灵 | 简单 | 简单 | PyTorch 内置 | iOS 跑:CoreML 最快。 Android:TFLite 或 ONNX Runtime。 跨平台 / server:ONNX。 ## transformers 适配 HuggingFace optimum 让 transformers 一键转 ONNX: ```python from optimum.onnxruntime import ORTModelForSequenceClassification model = ORTModelForSequenceClassification.from_pretrained( 'distilbert-base-uncased-finetuned-sst-2-english', export=True, ) # 内部自动 export ONNX + 用 ORT 跑 ``` API 跟 transformers 一样,performance 是 ORT 加成。 ## 真实 case:BERT 文本分类部署 train: HF transformers + GPU。 原计划: PyTorch serve 在 t3.medium (2vCPU, 4GB)。 ``` PyTorch model: 440 MB 推理 latency: 350 ms / request RAM: 1.2 GB ``` 转 ONNX + quantize INT8: ``` ONNX INT8 model: 110 MB 推理 latency: 80 ms / request RAM: 350 MB ``` 成本 / 性能都改善。同 instance 能跑 4x QPS。 ## 不适合 ONNX 的场景 - **dynamic graph 复杂**(控制流多):ONNX op 不全 cover,export 失败 - **custom op**:必须写 ONNX custom op(C++) - **需要 train**:ONNX Runtime 主要 inference(有 training 但弱) - **frequent model update**:ORT runtime load 慢,热更新麻烦 train 阶段不动 PyTorch;deploy 阶段转 ONNX。 ## 踩过的坑 1. **opset version**:老 export 用 opset 11,新 ORT 默认要 17+。 不匹配 unsupported op。统一 opset 18+。 2. **dynamic shape**:忘 `dynamic_axes` → batch=1 hardcoded。生产 variable batch 报错。 3. **数值不一致**:FP16 / FP32 mix 后 train 和 ONNX 差 1%。生产 numerical 严格场景小心。 4. **custom op**:用了 PyTorch 特有 op (如 grid_sample 某些 mode) → ONNX export 报错。改 model 或者手写 ONNX op。 5. **runtime version mismatch**:onnx 库版本 vs onnxruntime 版本不 匹配 → load model 报错。pip 同一时间装。
## 起因 RAG 场景需要向量检索: - embed 文档 chunk → 存向量 - query 时 embed query → 找最近的 K 个 chunk - chunk 喂 LLM 做答 向量 DB 选择多:pgvector / Qdrant / Weaviate / Milvus / Pinecone / LanceDB / Chroma。痛苦。 下面对比 + 我的选型建议。 ## 候选 ### pgvector(Postgres 扩展) ```sql CREATE EXTENSION vector; CREATE TABLE chunks ( id BIGSERIAL PRIMARY KEY, content TEXT, embedding vector(1536) ); CREATE INDEX ON chunks USING hnsw (embedding vector_cosine_ops); -- 查 SELECT content FROM chunks ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector LIMIT 10; ``` - 已有 PG → 0 引入新组件 - 同事务支持 metadata + 向量 - 性能:HNSW index 几百万 vector OK,千万级别仍可(精度 vs 速度调) ### Qdrant(Rust) ```python from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct client = QdrantClient('localhost', port=6333) client.recreate_collection('docs', vectors_config=VectorParams(size=1536, distance=Distance.COSINE)) client.upsert('docs', points=[ PointStruct(id=1, vector=[0.1]*1536, payload={'text': '...'}), ]) results = client.search('docs', query_vector=[0.1]*1536, limit=10, query_filter={'must': [{'key': 'lang', 'match': {'value': 'zh'}}]}) ``` - Rust 写,单机性能强 - payload 支持 filter(带 metadata 过滤) - 部署简单(docker run 一行) ### LanceDB ```python import lancedb db = lancedb.connect('./lance_data') tbl = db.create_table('docs', data=[ {'vector': [0.1]*1536, 'text': '...'}, ]) results = tbl.search([0.1]*1536).limit(10).to_pandas() ``` - 嵌入式(无 server,类似 SQLite) - 单 binary,无依赖 - 数据存 Lance 列式格式 - 适合 < 1 亿 vector / 不要分布式 ### Milvus - 老牌(2019+) - 真正分布式 + 集群(cloud-native) - 适合数十亿 vector / 高 QPS - 部署复杂 ### Chroma / Weaviate - Chroma:embedded / server,开发体验最好 - Weaviate:Go 写,schema + GraphQL - 我用得少 ## 性能对比 10M vectors / 1536d / 测试: | DB | 索引时间 | P50 query | RAM | |---|---|---|---| | pgvector (HNSW) | 30 min | 12 ms | 25 GB | | Qdrant | 15 min | 5 ms | 18 GB | | LanceDB | 10 min | 8 ms | 8 GB (disk-based) | | Milvus | 12 min | 4 ms | 20 GB | Qdrant / Milvus 在 raw 性能最强。pgvector 略慢但 ergonomics 最好。 ## 选型建议 | 场景 | 推荐 | |---|---| | 已有 PG 应用 + < 1000w vector | pgvector | | 独立向量服务 + 中等规模(千万级) | Qdrant | | 嵌入应用 / 单机 / 不想跑 server | LanceDB | | 数十亿规模 / 集群 | Milvus | | 全托管不想运维 | Pinecone(贵)/ Qdrant Cloud / Pinecone serverless | 我个人项目 100% pgvector: - DB 已经在 - transaction 跟其它 data 一致 - 不想多维护一个组件 - 性能够(< 100w vector) ## pgvector 详细 + 优化 ```sql -- HNSW index(更适合高维) CREATE INDEX ON chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64); -- 查询时 ef_search 控制精度/速度 SET hnsw.ef_search = 100; -- 半精度向量(节省一半空间,损失小) ALTER TABLE chunks ALTER COLUMN embedding TYPE halfvec(1536); CREATE INDEX ON chunks USING hnsw (embedding halfvec_cosine_ops); ``` `halfvec`(pgvector 0.7+)省内存 50%,精度损失 1-2%。 ### filter + 向量混合查 ```sql -- 找符合 metadata + 最近向量 SELECT content FROM chunks WHERE lang = 'zh' AND created_at > '2025-01-01' ORDER BY embedding <=> $query LIMIT 10; ``` PG 查询计划器自动选 index。带 filter 时 HNSW + B-tree 配合(pg17+ 更智能 prefilter)。 ## hybrid search 向量召回往往不如 keyword + 向量混合。 ```python def hybrid_search(query, k=10): # 1. BM25 (PG FTS) bm25_results = pg.execute(""" SELECT id, ts_rank(...) AS score FROM chunks WHERE chunk_tsv @@ to_tsquery(%s) ORDER BY score DESC LIMIT 50 """, query) # 2. 向量 vec_results = pg.execute(""" SELECT id, 1 - (embedding <=> %s::vector) AS score FROM chunks ORDER BY embedding <=> %s::vector LIMIT 50 """, [vec, vec]) # 3. RRF (Reciprocal Rank Fusion) return rrf_merge(bm25_results, vec_results, k=k) ``` 实际 RAG 效果显著提升。pgvector / Qdrant 都有内置 hybrid(不同程度)。 ## embedding model 选 - **OpenAI text-embedding-3-small / large**:好但要 API - **bge-m3** / **bge-large-zh**(BGE 系列):开源中英双语强 - **nomic-embed-text-v1.5**:开源,128 维 - 768 维可调 - **e5-mistral-7b**:高质量但贵 - **multilingual-e5-large**:100+ 语言 中文 RAG 我用 bge-m3,足够好 + 开源 + 本地 GPU 跑(~150 MB model)。 ## 真实 case:知识库 RAG 我们一个内部知识库: - 5 万文档 / 切 chunk 后 30 万段 - 用 bge-m3 embedding(1024 维) - 存 pgvector - query:BM25 + vector hybrid 部署: - Postgres 16 + pgvector 0.7 - 单 16 GB RAM 服务器 - query P50: 50 ms(含 embed 模型推理) 成本:$50/月 server,对比 Pinecone $200+。 功能足够,没必要专门 vector DB。 ## chunk 策略 向量质量第一影响 chunk: - 太大(> 1000 token):embedding 模糊,定位差 - 太小(< 100 token):上下文缺失 - 重叠(10-20%):边界 case 通用 500 token + 100 overlap。技术文档可能 800 + 150。 ## 踩过的坑 1. **pgvector index 建得慢**:百万 vector 建 HNSW 30 分钟。 `SET maintenance_work_mem = '2GB'` 加快。 2. **embedding dimension 改了**:换模型 → 维度变 → 老数据 schema 不 兼容。必须重新索引全部。 3. **filter selectivity 低 + 向量查**:query plan 选 seq scan vector field(不走 HNSW)。`SET enable_seqscan = off` 测试。 4. **cosine vs dot**:normalized vector 用 cosine OK;non-normalized 用 dot product。混 → 结果错。 5. **vector 类型 cast**:`'[0.1]'::vector` JSON-like 字符串。从 list `[0.1, 0.2]` 转字符串小心 numpy float 序列化精度丢失。
## 起因 ML 项目 6 个月后症状: - train code 跑 feature engineering 一遍 - predict serve code 跑 feature engineering **另一遍** - 两套实现微妙不一致 → "线上线下不一致 (train-serve skew)" - 跨 model 重复 feature 算 N 次 Feature store 解决: 1. **feature 定义统一**(train 跟 serve 同源) 2. **离线(batch)+ 在线(low-latency)双重 storage** 3. **point-in-time 正确性**(避免 future leakage) 工业级方案:**Feast** / **Tecton** / **Hopsworks**。重 + 学习曲线陡。 中小项目自己搭一个轻量的够用。 ## 轻量设计 ``` [raw data 表] ↓ [feature transformation SQL / Python] ← 唯一定义源 ↓ batch ──→ [feature 表(PG / Snowflake)] ← train 时读 realtime → [Redis] ← serve 时读 ``` 关键:**transformation 写一次**,batch 和 realtime 两边都跑同样逻辑。 ## 实现:transformation as dbt model ```sql -- models/features/user_features.sql {{ config(materialized='incremental') }} SELECT user_id, -- profile DATE_PART('year', CURRENT_DATE - birthdate) AS age, country, plan, -- recent activity (30 days) (SELECT COUNT(*) FROM events WHERE events.user_id = u.user_id AND created_at > CURRENT_DATE - INTERVAL '30 days') AS events_30d, (SELECT AVG(amount) FROM orders WHERE orders.user_id = u.user_id AND created_at > CURRENT_DATE - INTERVAL '30 days') AS avg_order_30d, CURRENT_TIMESTAMP AS computed_at FROM users u {% if is_incremental() %} WHERE updated_at > (SELECT MAX(computed_at) FROM {{ this }}) {% endif %} ``` dbt run 每小时跑一次 → `user_features` 表更新。 ## sync to Redis ```python # tasks/sync_features_to_redis.py import pandas as pd import redis import json r = redis.Redis() df = pd.read_sql("SELECT * FROM user_features WHERE computed_at > NOW() - INTERVAL '1 hour'", pg) with r.pipeline() as pipe: for _, row in df.iterrows(): key = f"features:user:{row.user_id}" pipe.hset(key, mapping=row.drop(['user_id', 'computed_at']).to_dict()) pipe.expire(key, 86400 * 2) # 2 day TTL pipe.execute() ``` 每小时跑 → Redis 永远有"最新批次"的 feature。 预测时 Redis HGETALL 一下 → 几 ms。 ## train 时用 ```python # 离线 train features_df = pd.read_sql(""" SELECT * FROM user_features WHERE computed_at <= '2025-04-30' -- 训练数据截止日 """, pg) labels_df = pd.read_sql(""" SELECT user_id, churned FROM user_outcomes WHERE outcome_date BETWEEN '2025-04-30' AND '2025-05-30' """, pg) df = features_df.merge(labels_df, on='user_id') X, y = df.drop('churned', axis=1), df.churned model.fit(X, y) ``` ## serve 时用 ```python # 在线 predict def predict_churn(user_id): features = r.hgetall(f'features:user:{user_id}') features = {k.decode(): float(v) for k, v in features.items()} X = pd.DataFrame([features])[FEATURE_COLUMNS] return model.predict(X)[0] ``` `FEATURE_COLUMNS` 是 train 时保存的 column 顺序。 顺序一致 + Redis 来的 feature 跟 train 同源 → 无 skew。 ## point-in-time train 时不要让 feature 包含 label 之后的数据(leakage): ```sql -- bad SELECT user_id, COUNT(*) FROM events WHERE user_id = u.user_id -- 没限时间 → 包含 label 期之后 -- good SELECT user_id, COUNT(*) FROM events WHERE user_id = u.user_id AND created_at < {{ label_date }} ``` 更严格的 point-in-time:feature value 用 label 时点的 snapshot: ```sql SELECT user_id, label, (SELECT events_30d FROM user_features WHERE user_id = labels.user_id AND computed_at < labels.label_date ORDER BY computed_at DESC LIMIT 1) AS events_30d FROM labels; ``` 每个 label 取它"那时" 的 feature。 ## monitoring 每天对比 train vs serve feature 分布: ```python def check_skew(): train_df = pd.read_sql("SELECT * FROM user_features TABLESAMPLE 1%", pg) serve_keys = r.scan_iter('features:user:*', count=10000) serve_df = pd.DataFrame([...]) # 拉 Redis 几千 sample for col in FEATURE_COLUMNS: train_mean = train_df[col].mean() serve_mean = serve_df[col].mean() skew = abs(train_mean - serve_mean) / train_mean if skew > 0.1: alert(f'{col} skew {skew:.2%}') ``` > 10% 飘移 → 告警。 ## 何时升级到 Feast 我们轻量方案撑到: - ~50 feature - 1 个 model in production - batch update 频率小时级 - 单团队 超过这规模: - 多 model 共享 feature - 多团队 - 需要 streaming feature(秒级更新) - 严格 point-in-time correctness 自动验证 → 上 Feast。但 Feast 学习曲线 + 运维成本,不是必要别强上。 ## 与 in-line 计算对比 | | 自己算 | feature store | |---|---|---| | 实现 | 简单 | 复杂 | | skew 风险 | 高 | 低 | | latency | 中(每次 query DB) | 低(Redis) | | 复用 | 0 | 高 | | 适合 | < 5 model | 5+ model | 第一个 model 直接 in-line 算。 第二个 model 用相同 feature → 抽出 user_features 表。 第三个 model → 上轻量 store。 N 个 → Feast。 ## 真实 case churn model 上线半年: - 一开始 in-line 算 feature - predict serve 跑 30 秒 / user(heavy SQL JOIN) - 改 feature 表(dbt)+ Redis cache - 跌到 5 ms / user - 同时 train script 用相同表 → 删除重复 code - 半年后第二个 model(LTV prediction)用同 feature 表 → 立即 reuse ## 踩过的坑 1. **dtype 不一致**:Redis 存 string,train 时 pandas 是 int → cast 错 → predict 0% 。serve code 严格 cast + 类型检查。 2. **feature drift 没监控**:上线半年后 model 慢慢失效,原因 feature distribution 变了没察觉。每周 skew check 必要。 3. **feature 名字大小写**:Redis HSET key 大小写敏感。train 大写 serve 小写 → KeyError。规范一律 snake_case。 4. **Redis TTL**:feature 旧但不 expire → 用陈旧 feature 预测 → 越 错越多。明确 TTL + sync 频率 > TTL 1/2。 5. **新 feature 加 + 老 model 还要跑**:feature 表加列,老 model 的 FEATURE_COLUMNS 不变(model 还按老 schema 输入)。 model artifact 跟 schema 绑定。
## 起因 数据分析常见情境: - 收到一堆 CSV / Parquet(几 GB - 几十 GB) - 想跑 SQL JOIN / 聚合 / 窗口函数分析 - 没 Snowflake / BigQuery(个人项目 / 本地探索) - pandas 慢 + groupby 写得难看 `DuckDB`:嵌入式 OLAP 数据库("SQLite for analytics"),单文件 binary,跑分析 SQL 跟 columnar 仓库一样快,**在你笔记本上**。 ## 装 ```bash pip install duckdb # Python brew install duckdb # CLI ``` ## CLI ```bash $ duckdb my.db D SELECT * FROM 'data.csv' LIMIT 5; -- 直接读 csv,无需 import D SELECT COUNT(*) FROM 'data.parquet'; D SELECT a.x, b.y FROM 'a.csv' a JOIN 'b.parquet' b ON a.id = b.id; ``` CSV / Parquet / JSON 直接当 table 查,无 import 步骤。 ## Python ```python import duckdb # in-memory con = duckdb.connect() # 直接查 CSV df = con.execute(""" SELECT country, SUM(amount) AS total FROM 'orders.csv' WHERE qty > 5 GROUP BY country ORDER BY total DESC """).df() # 返回 pandas df # 持久化 con = duckdb.connect('analysis.db') con.execute("CREATE TABLE orders AS SELECT * FROM 'orders.csv'") ``` ## 跟 pandas / DataFrame 互通 ```python import pandas as pd import duckdb # pandas df 直接当 table 用(DuckDB zero-copy 引用) df = pd.read_csv('big.csv') result = duckdb.sql(""" SELECT col1, AVG(col2) FROM df -- 直接引用 pandas df GROUP BY col1 """).df() ``` polars 同样: ```python import polars as pl pl_df = pl.read_csv('big.csv') result = duckdb.sql("SELECT * FROM pl_df WHERE col1 > 100").pl() ``` DuckDB 跟 pandas / polars / Arrow 数据**zero-copy 互转**(都用 Arrow columnar 内存格式)。 ## 性能 8 核 16 GB 笔记本,10 GB Parquet 文件: ```sql SELECT country, SUM(amount), COUNT(*) FROM 'orders.parquet' GROUP BY country ORDER BY 2 DESC LIMIT 10; ``` | 工具 | 时间 | |---|---| | pandas | 35s | | polars (eager) | 8s | | polars (lazy) | 4s | | DuckDB | 2.5s | DuckDB 列存 + vector 执行 + 多核 + 全局优化器,把分析查询打得很快。 ## 直接查远程 Parquet ```python duckdb.sql(""" SELECT * FROM read_parquet('s3://my-bucket/orders/*.parquet') WHERE date = '2025-03-01' """) ``` DuckDB 支持 S3 / GCS / Azure / HTTP 直读。 配合 partition + Parquet column pruning → 只读必要的 column 和 partition。 ## 数据湖直接查 跟 Iceberg / Delta lake 集成: ```sql INSTALL iceberg; LOAD iceberg; SELECT * FROM iceberg_scan('s3://bucket/table/'); ``` 不用 Spark 也能查 Iceberg。 ## window 函数 / 复杂 SQL ```sql SELECT user_id, date, amount, SUM(amount) OVER (PARTITION BY user_id ORDER BY date) AS cum_sum, RANK() OVER (PARTITION BY DATE_TRUNC('month', date) ORDER BY amount DESC) AS rank_in_month FROM orders; ``` 全 SQL 标准 + Postgres 兼容大量扩展 + DuckDB 特有的 ANTI/SEMI JOIN / QUALIFY 等。 ## EXPORT / IMPORT ```sql COPY (SELECT * FROM big_table) TO 'out.parquet' (FORMAT PARQUET); COPY (SELECT * FROM big_table) TO 'out.csv' (HEADER, DELIMITER ','); ``` 数据格式互转的瑞士军刀。 ## 真实 case:替代 pandas EDA 数据探索,原本: ```python df = pd.read_csv('events.csv') df_filtered = df[df.user_age > 18] grouped = df_filtered.groupby(['country', 'product']).agg({ 'amount': ['sum', 'mean'], 'qty': 'count', }).reset_index() sorted = grouped.sort_values(('amount', 'sum'), ascending=False) sorted.head(20) ``` DuckDB 等价: ```python duckdb.sql(""" SELECT country, product, SUM(amount) AS total, AVG(amount) AS avg_amount, COUNT(*) AS n FROM 'events.csv' WHERE user_age > 18 GROUP BY country, product ORDER BY total DESC LIMIT 20 """).df() ``` SQL 更直白 + 跑得快 + 不需要 import 完整 csv。 ## 跟 Snowflake / BigQuery 对比 | | DuckDB | Snowflake | BigQuery | |---|---|---|---| | 部署 | 单 binary | SaaS | SaaS | | 数据规模 | < 1 TB(单机) | PB | PB | | 成本 | 0 | 按 credit | 按扫描 GB | | 启动 | < 100ms | < 1s | < 5s | | SQL | Postgres-like | ANSI++ | ANSI+ | | 并发用户 | 单 | 多 | 多 | DuckDB 不替代 Snowflake(不是 multi-user / 不是无限 scale)。 但 90% 个人 / 团队分析(< 1 TB)DuckDB 够 + 免费 + 快。 ## motherduck(DuckDB cloud) DuckDB 团队也做了 motherduck.com → DuckDB + cloud sync: - 本地查 + 云端永久存 - 共享数据集 - 跨设备一致 按需用,但 DuckDB 本身完全离线可用。 ## 嵌入应用 ```python # Django / FastAPI 里嵌 DuckDB 做 analytics endpoint import duckdb @app.get('/analytics/top-products') def top_products(): return duckdb.sql(""" SELECT product, SUM(amount) AS total FROM read_parquet('s3://.../orders/*.parquet') WHERE date > current_date - 7 GROUP BY product ORDER BY total DESC LIMIT 10 """).df().to_dict('records') ``` 不用 separate analytics DB / 全部嵌进应用。 ## extension 生态 ```sql INSTALL httpfs; -- HTTP / S3 INSTALL spatial; -- 地理空间 INSTALL fts; -- 全文搜索 INSTALL postgres; -- 查 Postgres 表 INSTALL excel; -- 读写 .xlsx INSTALL sqlite; -- 读写 SQLite 文件 ``` `INSTALL postgres; LOAD postgres;` 后: ```sql ATTACH 'host=pg.example.com dbname=app user=...' AS pg (TYPE postgres); SELECT * FROM pg.public.orders LIMIT 10; ``` 把 Postgres 表当本地表查 + JOIN 本地 CSV → 异构数据查询。 ## 踩过的坑 1. **大数据 > RAM 时**:DuckDB 用 disk spilling 但仍可能慢。`SET memory_limit='10GB'`,留剩余给 OS。 2. **column type 自动推断错**:CSV 列 sometime "N/A",DuckDB 推断 string。`read_csv_auto(..., types={'col': 'INTEGER'})` 显式。 3. **更新慢**:DuckDB 是 OLAP,不适合频繁 UPDATE。点查 / 单行更新 → 用 SQLite。 4. **并发写不行**:单写者,多 reader。Web app 多 worker 同时写 DuckDB 会锁。 5. **extension 版本不匹配**:DuckDB 升级后 extension cache 旧版本 报错。`FORCE INSTALL <ext>` 强制更新。
## 起因 训练 ML model 时: - 改 hyperparameter / feature → 跑一次 - 比较哪次效果好?凭记忆?csv 抄结果? - model artifact 存哪?git LFS?S3 哪个 path? - 模型 deploy 时哪个版本? `MLflow` 是 Databricks 出的 open source,解决 ML 实验管理 4 件事: 1. **Tracking**:每次 run 记 params / metrics / artifact 2. **Projects**:reproducible run(conda env / docker) 3. **Models**:标准化打包 / 注册 / 部署 4. **Registry**:model version 管理 (staging / production) ## 装 ```bash pip install mlflow ``` ```bash mlflow server --host 0.0.0.0 --port 5000 \ --backend-store-uri sqlite:///mlflow.db \ --default-artifact-root file:./mlruns ``` UI:`http://localhost:5000`。 生产用 PG + S3 替代 SQLite + local file。 ## 追踪 run ```python import mlflow import mlflow.sklearn from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, f1_score mlflow.set_tracking_uri('http://localhost:5000') mlflow.set_experiment('user-churn-prediction') with mlflow.start_run(run_name='rf-baseline'): # log hyperparameter n_estimators = 100 max_depth = 10 mlflow.log_param('n_estimators', n_estimators) mlflow.log_param('max_depth', max_depth) # 训练 model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth) model.fit(X_train, y_train) # log metric pred = model.predict(X_val) mlflow.log_metric('val_accuracy', accuracy_score(y_val, pred)) mlflow.log_metric('val_f1', f1_score(y_val, pred)) # log model artifact mlflow.sklearn.log_model(model, 'model') # log 任意 artifact mlflow.log_artifact('feature_importance.png') ``` UI 里看:每次 run 一行,columns 是 params + metrics,能排序 / 过滤。 ## auto-log 不想手动 log 每个 param? ```python mlflow.sklearn.autolog() # sklearn 自动 log mlflow.pytorch.autolog() # pytorch mlflow.xgboost.autolog() mlflow.tensorflow.autolog() model = LogisticRegression(C=0.1) model.fit(X, y) # 自动 log C / penalty / accuracy / model ``` 90% 用例 autolog 够。 ## 比较 run UI 选多个 run → Compare → 表格 + parallel coordinates 看 hyperparam ↔ metric。 Programmatic: ```python from mlflow.tracking import MlflowClient client = MlflowClient() exp = client.get_experiment_by_name('user-churn-prediction') # 找最佳 run runs = client.search_runs( experiment_ids=[exp.experiment_id], order_by=['metrics.val_f1 DESC'], max_results=1, ) best = runs[0] print(best.data.params, best.data.metrics) ``` ## model registry train 完想正式 deploy: ```python result = mlflow.register_model( f"runs:/{best.info.run_id}/model", "user-churn-model", # 注册名 ) # version 自动 +1 ``` UI 里看 model registry → "user-churn-model" v1, v2, v3 ... mark stage: ```python client.transition_model_version_stage( name='user-churn-model', version=3, stage='Production', ) ``` production code 加载: ```python model = mlflow.pyfunc.load_model('models:/user-churn-model/Production') pred = model.predict(X_test) ``` 切换版本只改 stage 标记,code 不动。 ## hyperparameter sweep ```python from itertools import product for n, d in product([100, 200, 500], [5, 10, 20]): with mlflow.start_run(): mlflow.log_params({'n_estimators': n, 'max_depth': d}) model = RandomForestClassifier(n_estimators=n, max_depth=d) model.fit(X_train, y_train) score = model.score(X_val, y_val) mlflow.log_metric('val_score', score) ``` 9 run 自动跑 + 全部对比。 配 Optuna / Hyperopt 自动化 search 更强。 ## 跟 git 集成 mlflow 自动 log: - git commit hash - branch name - diff(未 commit 改动) 每个 run 知道是哪个 code 版本产生的 → 复现性。 ## 与替代品对比 | | MLflow | Weights & Biases | Neptune | TensorBoard | |---|---|---|---|---| | 自托管 | ✅ | ❌(cloud only OSS有限) | ❌ | ✅(无 server) | | metric tracking | ✅ | ✅+ | ✅+ | ✅ | | model registry | ✅ | ✅ | ✅ | ❌ | | collab | 弱 | 强 | 中 | 弱 | | 成本 | 0 | 团队收费 | 团队收费 | 0 | | 生态 | 大 | 大 | 中 | 大 | 我个人 / 小团队 → MLflow(自托管 + 免费 + 标准)。 大团队 / 跨公司 collab → W&B。 ## 部署:自托管设置 ```yaml # docker-compose.yml services: mlflow: image: ghcr.io/mlflow/mlflow:v2.13 ports: - 5000:5000 environment: - AWS_ACCESS_KEY_ID=... - AWS_SECRET_ACCESS_KEY=... command: > mlflow server --host 0.0.0.0 --backend-store-uri postgresql://user:pass@db/mlflow --default-artifact-root s3://my-bucket/mlflow depends_on: - db db: image: postgres:16 environment: POSTGRES_USER: user POSTGRES_PASSWORD: pass POSTGRES_DB: mlflow volumes: - mlflow-pg:/var/lib/postgresql/data ``` PG 存 metadata + S3 存 artifact → 可 scale。 ## 实战 lessons 我们一个客户 churn 项目用 MLflow: - 200+ experiment run(不同 feature set / model type / hyperparam) - 8 个最终 model 在 registry - Production model 每月更新(registry stage transition) - 任何时候能 diff 当前 production 跟 candidate 少了 mlflow 之前: - experiment 结果存团队 Notion / Slack message - model artifact 各种 S3 path 散 - 谁也不知道 production 现在跑的是 train_v3_final_FINAL2.pkl 还是 v4 接入 mlflow 后: - experiment 透明 / 可追溯 - model 版本明确 - 切回老版本 1 行命令 ## 跟 Airflow / Prefect 集成 ML pipeline DAG 每天跑: ```python @task def train_and_register(): with mlflow.start_run(): model = train(...) mlflow.sklearn.log_model(model, 'model') if score > threshold: mlflow.register_model(...) ``` 定期 retrain → log → 满足条件 promote 到 staging → 人手批准 → production。CD for ML。 ## 踩过的坑 1. **artifact path 写错**:local file mode 测试 OK,部署 server 后 `./mlruns` 在 server 找不到 → 必须 S3 / blob storage。 2. **大 model artifact**:log 大 model 几 GB 慢。考虑只 log 必要部分。 3. **run 不 close**:`mlflow.start_run()` 不在 with block + 没 `end_run()` → status="RUNNING" 一直挂。`with` 习惯保命。 4. **registry 名字冲突**:team 多人用同一 mlflow server,注册名建议 带 prefix 或 namespace。 5. **MLflow autolog 与 keras 不全兼容**:某些 callback / model subclassing 不 capture。complex case 手动 log。
## 起因 数据 pipeline 需要: - 定时 / 触发跑(cron / event) - 任务依赖 DAG - 重试 / 失败告警 - UI 看历史 / 调试 `Airflow` 是 2014 起的事实标准。`Prefect`(2018+)是现代挑战者。 最近从 Airflow 迁了一半 pipeline 到 Prefect,下面对比。 ## Airflow ```python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def extract(): print('extract') def transform(): print('transform') def load(): print('load') with DAG('etl', start_date=datetime(2025, 1, 1), schedule='@daily') as dag: e = PythonOperator(task_id='extract', python_callable=extract) t = PythonOperator(task_id='transform', python_callable=transform) l = PythonOperator(task_id='load', python_callable=load) e >> t >> l ``` DAG 是静态文件,scheduler 周期扫描 + render。 ### 优势 - 业界事实标准(10 年沉淀) - 巨大 operator 生态(数百个 connector) - 大规模成熟(Airbnb 几万 DAG) - Kubernetes / Celery executor 久经考验 ### 劣势 - DAG 必须静态(运行时 branch / 动态 task 难) - 调试本地难(要起 scheduler / webserver / DB / executor) - Python function 跟 task framework 耦合(PythonOperator wrapper) - 升级痛苦(2.x vs 1.x 大变;插件兼容差) - 巨型部署(PG + scheduler + worker + webserver 几组件) ## Prefect 2/3 ```python from prefect import flow, task @task(retries=3) def extract(): return [1, 2, 3] @task def transform(x): return x * 2 @task def load(items): print(f'loaded {items}') @flow def etl(): data = extract() transformed = [transform(x) for x in data] load(transformed) if __name__ == '__main__': etl() ``` `@flow` / `@task` 装饰器即编排。普通 Python 函数 + decorator。 ### 优势 - **DAG 动态生成**(运行时根据数据决定 task) - 本地 dev 简单:`python etl.py` 跑 - 装饰器轻,function 仍是 function - API + UI 现代 - Prefect Cloud free tier 个人 OK ### 劣势 - 生态比 Airflow 小(operator / connector 少) - 还在快速演化(Prefect 3 vs 2 部分 API 变) - 大规模 production 沉淀少 ## 调度模型对比 **Airflow**: ``` Scheduler 每分钟扫 DAG → 决定哪 task 该跑 → 推到 worker ``` 集中式,scheduler 是瓶颈。 **Prefect**: ``` Flow run 由 trigger(schedule / API / event)启动 Worker pull pending run 跑 ``` 更松散,worker 任意机器都能 pull。 ## 动态 task ```python # Airflow(静态 → 用 mapped task 模拟动态) @task def process(item): ... @task def get_items(): return [1, 2, 3, 4] @dag def my_dag(): items = get_items() process.expand(item=items) ``` Airflow 2.3+ 加了 dynamic task mapping,写起来扭。 ```python # Prefect:原生 @flow def my_flow(): items = get_items() for item in items: # 普通 Python process(item) ``` Prefect 直接用 Python loop,运行时决定 task 数。 ## subflow Prefect 鼓励 flow 嵌套: ```python @flow def daily_etl(): for region in ['us', 'eu', 'asia']: region_flow(region) # subflow 单独 run,独立 retry @flow def region_flow(region): extract(region) transform(region) load(region) ``` UI 里嵌套展开。复杂 pipeline 模块化。 ## 部署模型 **Airflow**: ``` - PG(metadata) - Redis / Celery(queue) - Webserver - Scheduler - N × Worker - 自动化 deploy DAG file → /dags 文件夹 ``` K8s 部署 6+ 容器。 **Prefect**: ``` - Server(API + UI)(Prefect Cloud 替代) - Worker(pull flow run) - Code 存哪都行(git / S3 / docker) ``` 简单很多。可以全 serverless(Cloud + ECS / Lambda worker)。 ## 本地开发 Airflow 本地: ```bash docker compose up # airflow-init + scheduler + webserver + worker # 改 DAG → 等 60s scheduler 扫 ``` Prefect 本地: ```bash prefect server start # 起 server python my_flow.py # 直接跑 ``` iter loop 快 5-10x。 ## 与 Dagster 对比 Dagster 是第三玩家,asset-based 编排(pipeline 是 "asset 之间的 graph")。 更声明式,更适合数据 platform 团队。 | | Airflow | Prefect | Dagster | |---|---|---|---| | 模型 | task DAG | flow + task | asset graph | | 上手 | 中 | 低 | 中高 | | 动态 | 弱 | 强 | 中 | | 生态 | 最大 | 中 | 中 | | 适合 | 大企业 / 复杂 ETL | 中小项目 / Pythonic | 数据 platform | ## 迁移 case 我们 30 个 Airflow DAG,三类: 1. 简单 ETL(SQL → SQL):保留 Airflow(operator 现成,省事) 2. 复杂 Python pipeline(动态 logic):迁 Prefect 3. ML 训练 pipeline:迁 Prefect(model artifact + dynamic 强) 混合架构:Airflow 跑标准 ETL,Prefect 跑 dynamic / Pythonic 流程。 ## 与 dbt 集成 两者都能跑 dbt: ```python # Prefect from prefect_dbt.cli.commands import DbtCoreOperation @flow def dbt_flow(): DbtCoreOperation(commands=['dbt run --select tag:hourly']).run() ``` ```python # Airflow from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator # 或者 BashOperator('dbt run ...') ``` 差不多。Prefect block 概念让 dbt cli 复用简单。 ## API trigger ```python # Prefect:HTTP trigger flow run from prefect.client.orchestration import get_client async with get_client() as client: await client.create_flow_run_from_deployment( deployment_id='abc', parameters={'date': '2025-01-01'}, ) ``` 外部系统(webhook / event)触发简单。Airflow 也有 REST API 但繁琐。 ## 监控 / 告警 两者都有 UI + email / Slack 通知 + Prometheus metrics。 Prefect Cloud 免费 tier 告警直接配。 Airflow self-host 要自己拼 alertmanager。 ## 选型决策 - **大企业 + 复杂 ETL + 团队熟 Airflow** → Airflow - **新项目 + Python pipeline** → Prefect - **数据 platform 团队 + asset 思维** → Dagster - **简单 cron + 几个 job** → 别上编排框架,systemd timer + GitHub Actions 够 ## 踩过的坑(迁移) 1. **Airflow XCom → Prefect return value**:XCom 传数据有限(< 48KB metadata)。Prefect 直接 return Python object,但 cluster 间要序 列化 → 用 storage block。 2. **schedule timezone**:Airflow `start_date` UTC;Prefect 默认 UTC 但 cron string 解释要明确。 3. **retries 默认 0**:忘配 → 失败不重试。生产 retries=3 + backoff 默认。 4. **flow concurrency limit**:同 flow 多 run 并行 → DB 锁。 `concurrency_limit` 控制。 5. **Prefect 3 升级**:从 2.x 迁 3.x 有 breaking change。读 migration guide。
## 起因 pandas 是 Python 数据界 15 年的事实标准。但: - 单线程(GIL),大数据慢 - 内存膨胀(同一列多份 copy) - API 设计累赘(SettingWithCopyWarning、index 烦) `polars` 是 Rust 写的 DataFrame,2020+ 起飞。Apache Arrow 内存格式 + 多线程 + lazy 执行。 2026 视角看,polars 在多个维度全面超越 pandas。 ## 装 ```bash pip install polars # 或者 uv add polars ``` ## 句法对比 ```python import polars as pl import pandas as pd # pandas df = pd.read_csv('orders.csv') result = ( df[df['country'] == 'US'] .groupby('product') .agg({'amount': 'sum', 'qty': 'count'}) .reset_index() .sort_values('amount', ascending=False) .head(10) ) # polars df = pl.read_csv('orders.csv') result = ( df.filter(pl.col('country') == 'US') .group_by('product') .agg([ pl.col('amount').sum(), pl.col('qty').count(), ]) .sort('amount', descending=True) .head(10) ) ``` polars 句法 method chain 顺。 明确的 `pl.col(...)` 比 pandas `df['x']` 在复杂 expression 里清晰。 ## 性能 我们一个 10 GB CSV / 80 列: | 操作 | pandas | polars | polars-lazy | |---|---|---|---| | read_csv | 95s | 22s | 22s | | filter + groupby + agg | 38s | 5s | 3s | | join 两 10 GB | 90s (OOM 风险) | 18s | 12s | | sort by 3 列 | 25s | 4s | 3s | 5-10x 快。32 核机器更明显(pandas 单核)。 内存:pandas 30 GB peak,polars 12 GB peak(Arrow columnar + zero-copy)。 ## lazy 执行 polars 杀手 feature: ```python # eager(每步实际跑) df = pl.read_csv('big.csv') result = df.filter(...).group_by(...).agg(...) # lazy(构建 query plan,scan 时才执行) result = ( pl.scan_csv('big.csv') # 注意 scan_ 而不是 read_ .filter(pl.col('x') > 0) .group_by('y') .agg(pl.col('z').sum()) .collect() # 触发执行 ) ``` lazy 优势: - **predicate pushdown**:filter 推到 CSV 读取阶段,只读符合行 - **projection pushdown**:只读用到的列 - **CSE**:重复 expression 算一次 - **streaming**:> 内存数据流式处理 ```python result = ( pl.scan_csv('100GB.csv') .filter(pl.col('date') > '2025-01-01') .select(['user_id', 'amount']) # 只读这俩列 .group_by('user_id') .agg(pl.col('amount').sum()) .collect(streaming=True) # 流式,不全加载 ) ``` 100 GB CSV 在 16 GB 机器跑得动。pandas 没 streaming 直接 OOM。 ## SQL interface ```python ctx = pl.SQLContext() ctx.register('orders', df) result = ctx.execute(""" SELECT country, SUM(amount) FROM orders WHERE qty > 5 GROUP BY country """).collect() ``` 熟 SQL 但不熟 polars expression → 写 SQL。 ## 跟 pandas 互转 ```python df_pd = pl.DataFrame(...).to_pandas() df_pl = pl.from_pandas(df_pd) ``` 零拷贝(用 Arrow buffer 共享)。混用方便。 ## 与 pandas 2.x(Arrow backend)对比 pandas 2.x 加了 pyarrow backend: ```python df = pd.read_csv('data.csv', dtype_backend='pyarrow') ``` 性能改善但**仍单线程**。 比 polars 还差一截(polars 多核 + lazy + native rust)。 ## 与 spark / dask 对比 | | pandas | polars | dask | spark | |---|---|---|---|---| | 内存模型 | row | columnar (Arrow) | partition | columnar | | 并行 | 单线程 | 多线程 | 多进程/集群 | 集群 | | 数据规模 | < RAM | > RAM (streaming) | TB | PB | | 学习曲线 | 低 | 中 | 中 | 高 | | 启动 | 0.1s | 0.1s | 1s | 30s+ | - < 10 GB → polars - 10 GB - 1 TB → polars streaming / dask - > 1 TB → spark / dask 集群 ## 实际项目迁移 我们 ETL pipeline 30 个 script,pandas → polars: ``` 1. read_csv → scan_csv:1 行换 2. df[df.x > 5] → df.filter(pl.col('x') > 5):手动改 3. groupby().agg({}) → group_by().agg([]):手动改 4. .reset_index() → 删(polars 无 index 概念) 5. lambda apply → 改成 polars expression ``` 大约 30 - 50% 行需要改。但跑速从 2 小时 → 12 分钟,值得。 LLM 辅助迁移很方便,pandas 到 polars 是 well-defined 转换。 ## API 缺点 / 注意 - 没 index(这是 feature 不是 bug,但 pandas 老用户要适应) - merge → join(语义稍不同,pandas merge 默认 inner,polars join 默认 inner,OK) - pivot / melt 等也有 + 语义略不同 - 没 multi-index column 90% workflow polars OK。某些特殊 transformation(时间序列 resample 加 multi-index)pandas 仍胜。 ## 用什么场景 - **新 ETL** → polars 默认 - **现有 pandas codebase** → 看痛点决定,不必全迁 - **notebook 探索性分析** → 二选一都行,polars 性能优势更大 - **DataFrame for ML 输入** → sklearn 仍 pandas 友好;polars 转 numpy 传 sklearn ## 我的工作流 - 数据 ingestion / heavy ETL:polars - ML feature engineering:polars - 给 sklearn / pytorch 时:`.to_numpy()` 或 `.to_pandas()` - 临时小数据:pandas(生态广) ## 踩过的坑 1. **expression 错位**:`pl.col('x') + 5 - pl.col('y')` vs `pl.col('x') + (5 - pl.col('y'))`。运算符优先级跟 Python 一致, 但容易看走眼。 2. **lazy collect 慢**:忘了 `.collect()` 一直 lazy。debug 时 `.head(10).collect()` 看数据。 3. **datetime 时区**:polars 严格 timezone aware / naive 区分。 pandas 经常混。从 pandas 来的 dataframe `pl.from_pandas` 时 timezone 信息可能丢。 4. **null 处理**:polars 用 Arrow null bit,跟 pandas NaN 不同。 `pl.col('x').is_null()` 不是 `x != x`。 5. **groupby 后默认按 key 排序**:pandas 默认排,polars 默认不排。 要 sort 显式 `.sort()`。