起因
数据 pipeline 跑出来的表:
- 突然某天 row 数 -80%(上游断了)
- 某列 null 比例飙到 30%(schema 改了没告知)
- 重要 metric 暴增 100x(埋点 bug)
下游 BI 报表 / model 已经用上 → 业务 / model 出错。
数据质量监控是数据团队必须建的。
两个主流工具:
- dbt test(轻量,pipeline 集成)
- Great Expectations / GX(重量,schema + 历史 profile)
dbt test 基础
dbt model 的 schema.yml 里加 test:
# models/orders/schema.yml
models:
- name: orders
columns:
- name: id
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['pending', 'paid', 'cancelled', 'refunded']
- name: amount
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 1000000
tests:
- dbt_utils.expression_is_true:
expression: "amount = price * quantity"
dbt test --select orders
每个 test → 一个 SQL SELECT count(*) WHERE 失败条件。
返回 > 0 → 失败。
优势
- 跟 dbt run 同 workflow(test after run)
- SQL-based 简单
dbt_utils/dbt_expectations几百 test- 失败定位明确(具体 model + column)
劣势
- 只能查"当前结果是否符合规则"
- 没历史 baseline / trend
- 不知道"昨天 100 行,今天 50 行"算 anomaly
dbt 常用 test
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns: [user_id, day]
- dbt_utils.recency:
datepart: day
field: created_at
interval: 1 # 最新 record 不能超过 1 天前
- dbt_expectations.expect_table_row_count_to_be_between:
min_value: 1000
max_value: 100000
- dbt_expectations.expect_column_value_lengths_to_equal:
column: phone
value: 11
- dbt_expectations.expect_column_proportion_of_unique_values_to_be_between:
column: email
min_value: 0.9
table_row_count 等 catch "今天数据突然少"。
Great Expectations (GX)
import great_expectations as gx
context = gx.get_context()
# 定义 expectation suite
suite = context.suites.add(name='orders_suite')
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column='amount', min_value=0, max_value=1000000))
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column='user_id'))
# 跑 validation
validator = context.get_validator(...)
results = validator.validate()
GX 比 dbt test 复杂但功能更多:
- profile dataset → 自动生成 expectation
- HTML 报告(哪 row 失败、% 等)
- versioned expectation suite
- data docs 生成
自动 profile
profiler = gx.profile.UserConfigurableProfiler(profile_dataset=batch)
suite = profiler.build_suite()
让 GX 看现有数据,自动建 expectation(每列 min/max/null% etc)。
人审 + 调 → 提交。
第一次设 expectation 非常省时。
anomaly detection
GX 0.18+ 支持 "expect column values stay close to historical mean":
gx.expectations.ExpectColumnMeanToBeBetween(
column='daily_revenue',
min_value=-3,
max_value=3,
strict_min=False,
auto=True, # 自动 baseline last 30 day
)
跟历史 baseline 比,3σ 之外报警。
catch "突然飙升 / 暴跌"。
dbt + GX 集成
dbt-expectations 包是 GX expectation 用 SQL 重写的 dbt test 版本。
所以两个工具的核心 expectation 高度重叠:
# dbt_expectations 在 dbt test 里
- dbt_expectations.expect_column_values_to_match_regex:
column: email
regex: '^[^@]+@[^@]+\.[^@]+$'
简单 expectation → dbt_expectations 够,pipeline 内集成。
需要 profile / anomaly / 历史 → 用 GX。
我们的 setup
1. dbt run + dbt test(pipeline 内,failing test → 阻塞 pipeline)
2. GX 每天对核心表 daily check(独立 schedule)
3. anomaly detected → Slack 告警 + 人工调查
dbt test 防"明显 bug"。GX 防"渐变趋势异常"。
test 严重级别
dbt 1.5+ test 有 severity:
- not_null:
severity: error # 默认,阻塞
- not_null:
severity: warn # 不阻塞,只 log
warn 给"偶尔不符合但还 acceptable"的规则。
失败时怎么办
dbt test 默认 fail → 整 pipeline halt。
策略:
- block downstream:dbt 默认行为,下游 model 不跑(避免坏数据传播)
- alert only:
severity: warn,下游照跑 + 通知人 - quarantine:把坏 row 隔离到 errors table,好 row 继续
选哪个看业务容忍度。金融 → block。日志 → warn。
storing test results
dbt test 默认结果不存。
dbt-checkpoint 或 自定义 macro 把结果写表:
-- models/_test_results.sql
SELECT
current_timestamp AS run_at,
'{{ this.name }}' AS test,
{{ test_function() }} AS result
历史化 → Grafana 看 test pass rate 趋势。
真实 case:救命的 test
我们一个 ETL pipeline:
tests:
- dbt_expectations.expect_table_row_count_to_be_between:
min_value: 50000 # 历史日均 80k
max_value: 200000
某天上游 partition 错 → 我们表只 catch 到 5k 行。
test 立刻失败 → pipeline halt → BI 没拿到坏数据 → 修 partition → re-run。
没这 test 的话,dashboard 显示 5k 行 = "今天业务下滑 95%",
高管 panic。
与 monte-carlo / soda 对比
| dbt test | GX | Monte Carlo | Soda | |
|---|---|---|---|---|
| 部署 | 跟 dbt | self-host / cloud | SaaS | self/cloud |
| 价格 | 0 | 0 | 贵 | mid |
| anomaly | 基础 | 中 | 强 (ML) | 中 |
| 集成 | dbt 原生 | API | data warehouse 联 | 类 GX |
| 上手 | 极易 | 中 | easy(SaaS) | 中 |
预算紧 → dbt test + GX。
预算大 + 不想运维 → Monte Carlo。
踩过的坑
-
expectation 太严:
amount > 0但实际有 refund 是负数 → 全部
alert false positive。expectation 必须 calibrate。 -
suite 跟 schema 不同步:表加列,suite 没改 → 没 cover。
review process。 -
GX 版本升级:0.x → 1.x breaking change 大。锁版本 / 小心升。
-
test 跑慢:每个 test 一条 query → 大表 N test 慢。dbt
--store-failures让结果存表 + 跑一次 query 多 test。 -
silent broken:test 跑了但通过(即使数据有问题)。覆盖度
review 重要。每次 incident 后加新 test,防同问题。
登录后参与评论。