知识广场

按学科筛选:计算机科学 / 机器学习 / 数据工程
清除筛选

«计算机科学 / 机器学习 / 数据工程» 分类下共 3 篇帖子

Great Expectations + dbt test:数据质量门禁

## 起因 数据 pipeline 跑出来的表: - 突然某天 row 数 -80%(上游断了) - 某列 null 比例飙到 30%(schema 改了没告知) - 重要 metric 暴增 100x(埋点 bug) 下游 BI 报表 / model 已经用上 → 业务 / model 出错。 **数据质量监控**是数据团队必须建的。 两个主流工具: - **dbt test**(轻量,pipeline 集成) - **Great Expectations / GX**(重量,schema + 历史 profile) ## dbt test 基础 dbt model 的 schema.yml 里加 test: ```yaml # models/orders/schema.yml models: - name: orders columns: - name: id tests: - unique - not_null - name: status tests: - accepted_values: values: ['pending', 'paid', 'cancelled', 'refunded'] - name: amount tests: - dbt_expectations.expect_column_values_to_be_between: min_value: 0 max_value: 1000000 tests: - dbt_utils.expression_is_true: expression: "amount = price * quantity" ``` ```bash dbt test --select orders ``` 每个 test → 一个 SQL `SELECT count(*) WHERE 失败条件`。 返回 > 0 → 失败。 ### 优势 - 跟 dbt run 同 workflow(test after run) - SQL-based 简单 - `dbt_utils` / `dbt_expectations` 几百 test - 失败定位明确(具体 model + column) ### 劣势 - 只能查"当前结果是否符合规则" - 没历史 baseline / trend - 不知道"昨天 100 行,今天 50 行"算 anomaly ## dbt 常用 test ```yaml tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: [user_id, day] - dbt_utils.recency: datepart: day field: created_at interval: 1 # 最新 record 不能超过 1 天前 - dbt_expectations.expect_table_row_count_to_be_between: min_value: 1000 max_value: 100000 - dbt_expectations.expect_column_value_lengths_to_equal: column: phone value: 11 - dbt_expectations.expect_column_proportion_of_unique_values_to_be_between: column: email min_value: 0.9 ``` table_row_count 等 catch "今天数据突然少"。 ## Great Expectations (GX) ```python import great_expectations as gx context = gx.get_context() # 定义 expectation suite suite = context.suites.add(name='orders_suite') suite.add_expectation( gx.expectations.ExpectColumnValuesToBeBetween( column='amount', min_value=0, max_value=1000000)) suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column='user_id')) # 跑 validation validator = context.get_validator(...) results = validator.validate() ``` GX 比 dbt test 复杂但功能更多: - profile dataset → 自动生成 expectation - HTML 报告(哪 row 失败、% 等) - versioned expectation suite - data docs 生成 ## 自动 profile ```python profiler = gx.profile.UserConfigurableProfiler(profile_dataset=batch) suite = profiler.build_suite() ``` 让 GX 看现有数据,自动建 expectation(每列 min/max/null% etc)。 人审 + 调 → 提交。 第一次设 expectation 非常省时。 ## anomaly detection GX 0.18+ 支持 "expect column values stay close to historical mean": ```python gx.expectations.ExpectColumnMeanToBeBetween( column='daily_revenue', min_value=-3, max_value=3, strict_min=False, auto=True, # 自动 baseline last 30 day ) ``` 跟历史 baseline 比,3σ 之外报警。 catch "突然飙升 / 暴跌"。 ## dbt + GX 集成 `dbt-expectations` 包是 GX expectation 用 SQL 重写的 dbt test 版本。 所以两个工具的核心 expectation 高度重叠: ```yaml # dbt_expectations 在 dbt test 里 - dbt_expectations.expect_column_values_to_match_regex: column: email regex: '^[^@]+@[^@]+\.[^@]+$' ``` 简单 expectation → dbt_expectations 够,pipeline 内集成。 需要 profile / anomaly / 历史 → 用 GX。 ## 我们的 setup ``` 1. dbt run + dbt test(pipeline 内,failing test → 阻塞 pipeline) 2. GX 每天对核心表 daily check(独立 schedule) 3. anomaly detected → Slack 告警 + 人工调查 ``` dbt test 防"明显 bug"。GX 防"渐变趋势异常"。 ## test 严重级别 dbt 1.5+ test 有 severity: ```yaml - not_null: severity: error # 默认,阻塞 - not_null: severity: warn # 不阻塞,只 log ``` `warn` 给"偶尔不符合但还 acceptable"的规则。 ## 失败时怎么办 dbt test 默认 fail → 整 pipeline halt。 策略: 1. **block downstream**:dbt 默认行为,下游 model 不跑(避免坏数据传播) 2. **alert only**:`severity: warn`,下游照跑 + 通知人 3. **quarantine**:把坏 row 隔离到 errors table,好 row 继续 选哪个看业务容忍度。金融 → block。日志 → warn。 ## storing test results dbt test 默认结果不存。 `dbt-checkpoint` 或 自定义 macro 把结果写表: ```sql -- models/_test_results.sql SELECT current_timestamp AS run_at, '{{ this.name }}' AS test, {{ test_function() }} AS result ``` 历史化 → Grafana 看 test pass rate 趋势。 ## 真实 case:救命的 test 我们一个 ETL pipeline: ```yaml tests: - dbt_expectations.expect_table_row_count_to_be_between: min_value: 50000 # 历史日均 80k max_value: 200000 ``` 某天上游 partition 错 → 我们表只 catch 到 5k 行。 test 立刻失败 → pipeline halt → BI 没拿到坏数据 → 修 partition → re-run。 没这 test 的话,dashboard 显示 5k 行 = "今天业务下滑 95%", 高管 panic。 ## 与 monte-carlo / soda 对比 | | dbt test | GX | Monte Carlo | Soda | |---|---|---|---|---| | 部署 | 跟 dbt | self-host / cloud | SaaS | self/cloud | | 价格 | 0 | 0 | 贵 | mid | | anomaly | 基础 | 中 | 强 (ML) | 中 | | 集成 | dbt 原生 | API | data warehouse 联 | 类 GX | | 上手 | 极易 | 中 | easy(SaaS) | 中 | 预算紧 → dbt test + GX。 预算大 + 不想运维 → Monte Carlo。 ## 踩过的坑 1. **expectation 太严**:`amount > 0` 但实际有 refund 是负数 → 全部 alert false positive。expectation 必须 calibrate。 2. **suite 跟 schema 不同步**:表加列,suite 没改 → 没 cover。 review process。 3. **GX 版本升级**:0.x → 1.x breaking change 大。锁版本 / 小心升。 4. **test 跑慢**:每个 test 一条 query → 大表 N test 慢。dbt `--store-failures` 让结果存表 + 跑一次 query 多 test。 5. **silent broken**:test 跑了但通过(即使数据有问题)。覆盖度 review 重要。每次 incident 后加新 test,防同问题。

dbt incremental models:每天增量跑 vs 全量重算

## 起因 每天跑一遍数据仓库 SQL transform: ```sql -- daily_user_metrics SELECT user_id, DATE(created_at) AS day, COUNT(*) AS events, SUM(value) AS total FROM events GROUP BY 1, 2; ``` 全量跑:扫几亿行 events → 20 min + Snowflake credit 几百刀。 但实际上**昨天之前的天数据是冻结的**,每天只需算今天的新增。 dbt 的 incremental model 模式解决这问题。 ## 全量 model ```sql -- models/daily_user_metrics.sql {{ config(materialized='table') }} SELECT user_id, DATE(created_at) AS day, COUNT(*) AS events FROM {{ ref('events') }} GROUP BY 1, 2 ``` `materialized='table'`:每次 dbt run 都 DROP + CREATE TABLE AS。 小数据 OK;大数据浪费。 ## incremental model ```sql -- models/daily_user_metrics.sql {{ config( materialized='incremental', unique_key=['user_id', 'day'], on_schema_change='append_new_columns' ) }} SELECT user_id, DATE(created_at) AS day, COUNT(*) AS events, SUM(value) AS total FROM {{ ref('events') }} {% if is_incremental() %} WHERE created_at >= (SELECT MAX(day) FROM {{ this }}) - INTERVAL '1 day' {% endif %} GROUP BY 1, 2 ``` 关键: - `{{ this }}` 引用当前 model 自己 - `is_incremental()` 在"已存在且不是 --full-refresh"时为 true - WHERE 只取新数据 - `unique_key` 让 dbt 知道按啥 merge 第一次 run:建 table + 全量。 之后每次 run:只算新数据 → MERGE 进现有 table。 ## 效果 我们一个 metric model: - 事件表 5 亿行 - 全量跑:22 分钟,$3 credit - incremental:1.5 分钟,$0.2 credit 12x 时间节省,15x 成本节省。 ## 几种 strategy dbt 支持几种 incremental 策略(按 warehouse): | strategy | 行为 | |---|---| | `append` | 直接 INSERT 新数据(不去重) | | `merge` | 默认。MERGE INTO with unique_key | | `delete+insert` | 删 unique_key 匹配的行 + INSERT | | `insert_overwrite` | partition-level overwrite(BigQuery / Spark) | ```sql {{ config( materialized='incremental', incremental_strategy='merge', unique_key='id', ) }} ``` PG / Snowflake / Redshift 用 merge。BigQuery 大 partition 用 insert_overwrite。 ## late-arriving data 事件 24 小时后才到(移动 SDK 离线缓存)。 window 要更宽: ```sql {% if is_incremental() %} WHERE created_at >= (SELECT MAX(day) FROM {{ this }}) - INTERVAL '7 days' {% endif %} ``` 回看 7 天,覆盖晚到的事件。`unique_key` 保证 merge 不重复。 trade-off:window 越宽 → 重算越多 → 增量收益减。 ## 全量 backfill 老数据要重算(如 metric 公式改了): ```bash dbt run --select daily_user_metrics --full-refresh ``` `--full-refresh` 让 incremental 当 table 处理 → DROP + 重建。 或者部分重算: ```bash # 重算最近 30 天 dbt run --select daily_user_metrics --vars '{"start_date": "2025-04-01"}' ``` model 里读 var: ```sql {% if var('start_date', false) %} WHERE created_at >= '{{ var("start_date") }}' {% endif %} ``` ## 跟 partitioned table 配 BigQuery / Snowflake partition: ```sql {{ config( materialized='incremental', incremental_strategy='insert_overwrite', partition_by={'field': 'day', 'data_type': 'date'}, cluster_by=['user_id'], ) }} ``` incremental insert overwrite 比 merge 更高效(partition 整块替换 vs row-level merge)。 ## test incremental dbt test 默认在 model run 后跑: ```yaml # models/schema.yml models: - name: daily_user_metrics columns: - name: user_id tests: - not_null - name: day tests: - not_null tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: [user_id, day] ``` increment 后唯一性测试**关键**:merge 配错 → 重复数据。 ## 监控 `dbt run` 完看 timing: ``` 1 of 5 START incremental model daily_user_metrics ........ [RUN] 1 of 5 OK created incremental model daily_user_metrics ... [SUCCESS in 89.42s] ``` 如果 incremental 跑越来越慢(不应该): - WHERE 条件 partition / cluster key 利用不充分 - merge 行数变大(window 太宽) - new column 加了导致 `on_schema_change` 触发 ## dbt 1.6+ microbatch dbt 1.6 引入 `microbatch` incremental_strategy(更明确的 partition-by-time): ```sql {{ config( materialized='incremental', incremental_strategy='microbatch', event_time='created_at', batch_size='day', lookback=3, ) }} SELECT ... FROM {{ ref('events') }} ``` dbt 自动按 day 分批跑 → backfill 时按天 chunk → 单个 chunk 失败不 影响别天。比手写 `is_incremental()` 干净。 ## 我们的 pipeline ``` events (raw, 5 亿行) ↓ [hourly] events_hourly (incremental, 24h window) ↓ [daily] daily_metrics (incremental, 7d window) ↓ monthly_summary (incremental, 1m window) ↓ dashboard ``` 每层 incremental,各自 window 覆盖延迟。 全量 backfill 用 `--full-refresh` 串行跑全部。 ## 真实 vs 全量 我们一个 200 个 model 的 dbt 项目,把 80% 大 model 改 incremental 后: - 总跑时间从 4h → 35min - Snowflake 月成本从 $8000 → $1200 - 失败重试代价低很多 incremental 是 dbt 最重要的 production pattern。 ## 踩过的坑 1. **unique_key 不对**:merge 进重复 → 数据涨。每次改 unique_key 必须 `--full-refresh`。 2. **`is_incremental()` 没用**:写在 CTE 里 / 没读 `{{ this }}` → 每次都全量。看生成的 SQL(`compile`)确认。 3. **schema change**:表加列 → 默认 incremental 报错(schema mismatch)。 `on_schema_change='append_new_columns'` 或 `'sync_all_columns'`。 4. **late-arriving 漏数**:window 太窄 → 晚到事件没进表。监控 `events vs metrics` 一致性。 5. **partition 没 prune**:BigQuery 大 partition 表 WHERE 写 timestamp 比较,但 model 没 partition by → full scan。 `partition_by` 配 + WHERE 用 partition 列。

Prefect vs Airflow:现代数据流编排选哪个

## 起因 数据 pipeline 需要: - 定时 / 触发跑(cron / event) - 任务依赖 DAG - 重试 / 失败告警 - UI 看历史 / 调试 `Airflow` 是 2014 起的事实标准。`Prefect`(2018+)是现代挑战者。 最近从 Airflow 迁了一半 pipeline 到 Prefect,下面对比。 ## Airflow ```python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def extract(): print('extract') def transform(): print('transform') def load(): print('load') with DAG('etl', start_date=datetime(2025, 1, 1), schedule='@daily') as dag: e = PythonOperator(task_id='extract', python_callable=extract) t = PythonOperator(task_id='transform', python_callable=transform) l = PythonOperator(task_id='load', python_callable=load) e >> t >> l ``` DAG 是静态文件,scheduler 周期扫描 + render。 ### 优势 - 业界事实标准(10 年沉淀) - 巨大 operator 生态(数百个 connector) - 大规模成熟(Airbnb 几万 DAG) - Kubernetes / Celery executor 久经考验 ### 劣势 - DAG 必须静态(运行时 branch / 动态 task 难) - 调试本地难(要起 scheduler / webserver / DB / executor) - Python function 跟 task framework 耦合(PythonOperator wrapper) - 升级痛苦(2.x vs 1.x 大变;插件兼容差) - 巨型部署(PG + scheduler + worker + webserver 几组件) ## Prefect 2/3 ```python from prefect import flow, task @task(retries=3) def extract(): return [1, 2, 3] @task def transform(x): return x * 2 @task def load(items): print(f'loaded {items}') @flow def etl(): data = extract() transformed = [transform(x) for x in data] load(transformed) if __name__ == '__main__': etl() ``` `@flow` / `@task` 装饰器即编排。普通 Python 函数 + decorator。 ### 优势 - **DAG 动态生成**(运行时根据数据决定 task) - 本地 dev 简单:`python etl.py` 跑 - 装饰器轻,function 仍是 function - API + UI 现代 - Prefect Cloud free tier 个人 OK ### 劣势 - 生态比 Airflow 小(operator / connector 少) - 还在快速演化(Prefect 3 vs 2 部分 API 变) - 大规模 production 沉淀少 ## 调度模型对比 **Airflow**: ``` Scheduler 每分钟扫 DAG → 决定哪 task 该跑 → 推到 worker ``` 集中式,scheduler 是瓶颈。 **Prefect**: ``` Flow run 由 trigger(schedule / API / event)启动 Worker pull pending run 跑 ``` 更松散,worker 任意机器都能 pull。 ## 动态 task ```python # Airflow(静态 → 用 mapped task 模拟动态) @task def process(item): ... @task def get_items(): return [1, 2, 3, 4] @dag def my_dag(): items = get_items() process.expand(item=items) ``` Airflow 2.3+ 加了 dynamic task mapping,写起来扭。 ```python # Prefect:原生 @flow def my_flow(): items = get_items() for item in items: # 普通 Python process(item) ``` Prefect 直接用 Python loop,运行时决定 task 数。 ## subflow Prefect 鼓励 flow 嵌套: ```python @flow def daily_etl(): for region in ['us', 'eu', 'asia']: region_flow(region) # subflow 单独 run,独立 retry @flow def region_flow(region): extract(region) transform(region) load(region) ``` UI 里嵌套展开。复杂 pipeline 模块化。 ## 部署模型 **Airflow**: ``` - PG(metadata) - Redis / Celery(queue) - Webserver - Scheduler - N × Worker - 自动化 deploy DAG file → /dags 文件夹 ``` K8s 部署 6+ 容器。 **Prefect**: ``` - Server(API + UI)(Prefect Cloud 替代) - Worker(pull flow run) - Code 存哪都行(git / S3 / docker) ``` 简单很多。可以全 serverless(Cloud + ECS / Lambda worker)。 ## 本地开发 Airflow 本地: ```bash docker compose up # airflow-init + scheduler + webserver + worker # 改 DAG → 等 60s scheduler 扫 ``` Prefect 本地: ```bash prefect server start # 起 server python my_flow.py # 直接跑 ``` iter loop 快 5-10x。 ## 与 Dagster 对比 Dagster 是第三玩家,asset-based 编排(pipeline 是 "asset 之间的 graph")。 更声明式,更适合数据 platform 团队。 | | Airflow | Prefect | Dagster | |---|---|---|---| | 模型 | task DAG | flow + task | asset graph | | 上手 | 中 | 低 | 中高 | | 动态 | 弱 | 强 | 中 | | 生态 | 最大 | 中 | 中 | | 适合 | 大企业 / 复杂 ETL | 中小项目 / Pythonic | 数据 platform | ## 迁移 case 我们 30 个 Airflow DAG,三类: 1. 简单 ETL(SQL → SQL):保留 Airflow(operator 现成,省事) 2. 复杂 Python pipeline(动态 logic):迁 Prefect 3. ML 训练 pipeline:迁 Prefect(model artifact + dynamic 强) 混合架构:Airflow 跑标准 ETL,Prefect 跑 dynamic / Pythonic 流程。 ## 与 dbt 集成 两者都能跑 dbt: ```python # Prefect from prefect_dbt.cli.commands import DbtCoreOperation @flow def dbt_flow(): DbtCoreOperation(commands=['dbt run --select tag:hourly']).run() ``` ```python # Airflow from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator # 或者 BashOperator('dbt run ...') ``` 差不多。Prefect block 概念让 dbt cli 复用简单。 ## API trigger ```python # Prefect:HTTP trigger flow run from prefect.client.orchestration import get_client async with get_client() as client: await client.create_flow_run_from_deployment( deployment_id='abc', parameters={'date': '2025-01-01'}, ) ``` 外部系统(webhook / event)触发简单。Airflow 也有 REST API 但繁琐。 ## 监控 / 告警 两者都有 UI + email / Slack 通知 + Prometheus metrics。 Prefect Cloud 免费 tier 告警直接配。 Airflow self-host 要自己拼 alertmanager。 ## 选型决策 - **大企业 + 复杂 ETL + 团队熟 Airflow** → Airflow - **新项目 + Python pipeline** → Prefect - **数据 platform 团队 + asset 思维** → Dagster - **简单 cron + 几个 job** → 别上编排框架,systemd timer + GitHub Actions 够 ## 踩过的坑(迁移) 1. **Airflow XCom → Prefect return value**:XCom 传数据有限(< 48KB metadata)。Prefect 直接 return Python object,但 cluster 间要序 列化 → 用 storage block。 2. **schedule timezone**:Airflow `start_date` UTC;Prefect 默认 UTC 但 cron string 解释要明确。 3. **retries 默认 0**:忘配 → 失败不重试。生产 retries=3 + backoff 默认。 4. **flow concurrency limit**:同 flow 多 run 并行 → DB 锁。 `concurrency_limit` 控制。 5. **Prefect 3 升级**:从 2.x 迁 3.x 有 breaking change。读 migration guide。