Great Expectations + dbt test:数据质量门禁

起因

数据 pipeline 跑出来的表:

  • 突然某天 row 数 -80%(上游断了)
  • 某列 null 比例飙到 30%(schema 改了没告知)
  • 重要 metric 暴增 100x(埋点 bug)

下游 BI 报表 / model 已经用上 → 业务 / model 出错。
数据质量监控是数据团队必须建的。

两个主流工具:

  • dbt test(轻量,pipeline 集成)
  • Great Expectations / GX(重量,schema + 历史 profile)

dbt test 基础

dbt model 的 schema.yml 里加 test:

# models/orders/schema.yml
models:
  - name: orders
    columns:
      - name: id
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'paid', 'cancelled', 'refunded']
      - name: amount
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 1000000
    tests:
      - dbt_utils.expression_is_true:
          expression: "amount = price * quantity"
dbt test --select orders

每个 test → 一个 SQL SELECT count(*) WHERE 失败条件
返回 > 0 → 失败。

优势

  • 跟 dbt run 同 workflow(test after run)
  • SQL-based 简单
  • dbt_utils / dbt_expectations 几百 test
  • 失败定位明确(具体 model + column)

劣势

  • 只能查"当前结果是否符合规则"
  • 没历史 baseline / trend
  • 不知道"昨天 100 行,今天 50 行"算 anomaly

dbt 常用 test

tests:
  - dbt_utils.unique_combination_of_columns:
      combination_of_columns: [user_id, day]

  - dbt_utils.recency:
      datepart: day
      field: created_at
      interval: 1               # 最新 record 不能超过 1 天前

  - dbt_expectations.expect_table_row_count_to_be_between:
      min_value: 1000
      max_value: 100000

  - dbt_expectations.expect_column_value_lengths_to_equal:
      column: phone
      value: 11

  - dbt_expectations.expect_column_proportion_of_unique_values_to_be_between:
      column: email
      min_value: 0.9

table_row_count 等 catch "今天数据突然少"。

Great Expectations (GX)

import great_expectations as gx

context = gx.get_context()

# 定义 expectation suite
suite = context.suites.add(name='orders_suite')
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column='amount', min_value=0, max_value=1000000))
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column='user_id'))

# 跑 validation
validator = context.get_validator(...)
results = validator.validate()

GX 比 dbt test 复杂但功能更多:

  • profile dataset → 自动生成 expectation
  • HTML 报告(哪 row 失败、% 等)
  • versioned expectation suite
  • data docs 生成

自动 profile

profiler = gx.profile.UserConfigurableProfiler(profile_dataset=batch)
suite = profiler.build_suite()

让 GX 看现有数据,自动建 expectation(每列 min/max/null% etc)。
人审 + 调 → 提交。

第一次设 expectation 非常省时。

anomaly detection

GX 0.18+ 支持 "expect column values stay close to historical mean":

gx.expectations.ExpectColumnMeanToBeBetween(
    column='daily_revenue',
    min_value=-3,
    max_value=3,
    strict_min=False,
    auto=True,      # 自动 baseline last 30 day
)

跟历史 baseline 比,3σ 之外报警。
catch "突然飙升 / 暴跌"。

dbt + GX 集成

dbt-expectations 包是 GX expectation 用 SQL 重写的 dbt test 版本。
所以两个工具的核心 expectation 高度重叠:

# dbt_expectations 在 dbt test 里
- dbt_expectations.expect_column_values_to_match_regex:
    column: email
    regex: '^[^@]+@[^@]+\.[^@]+$'

简单 expectation → dbt_expectations 够,pipeline 内集成。
需要 profile / anomaly / 历史 → 用 GX。

我们的 setup

1. dbt run + dbt test(pipeline 内,failing test → 阻塞 pipeline)
2. GX 每天对核心表 daily check(独立 schedule)
3. anomaly detected → Slack 告警 + 人工调查

dbt test 防"明显 bug"。GX 防"渐变趋势异常"。

test 严重级别

dbt 1.5+ test 有 severity:

- not_null:
    severity: error      # 默认,阻塞
- not_null:
    severity: warn       # 不阻塞,只 log

warn 给"偶尔不符合但还 acceptable"的规则。

失败时怎么办

dbt test 默认 fail → 整 pipeline halt。

策略:

  1. block downstream:dbt 默认行为,下游 model 不跑(避免坏数据传播)
  2. alert onlyseverity: warn,下游照跑 + 通知人
  3. quarantine:把坏 row 隔离到 errors table,好 row 继续

选哪个看业务容忍度。金融 → block。日志 → warn。

storing test results

dbt test 默认结果不存。
dbt-checkpoint 或 自定义 macro 把结果写表:

-- models/_test_results.sql
SELECT
    current_timestamp AS run_at,
    '{{ this.name }}' AS test,
    {{ test_function() }} AS result

历史化 → Grafana 看 test pass rate 趋势。

真实 case:救命的 test

我们一个 ETL pipeline:

tests:
  - dbt_expectations.expect_table_row_count_to_be_between:
      min_value: 50000      # 历史日均 80k
      max_value: 200000

某天上游 partition 错 → 我们表只 catch 到 5k 行。
test 立刻失败 → pipeline halt → BI 没拿到坏数据 → 修 partition → re-run。

没这 test 的话,dashboard 显示 5k 行 = "今天业务下滑 95%",
高管 panic。

与 monte-carlo / soda 对比

dbt test GX Monte Carlo Soda
部署 跟 dbt self-host / cloud SaaS self/cloud
价格 0 0 mid
anomaly 基础 强 (ML)
集成 dbt 原生 API data warehouse 联 类 GX
上手 极易 easy(SaaS)

预算紧 → dbt test + GX。
预算大 + 不想运维 → Monte Carlo。

踩过的坑

  1. expectation 太严amount > 0 但实际有 refund 是负数 → 全部
    alert false positive。expectation 必须 calibrate。

  2. suite 跟 schema 不同步:表加列,suite 没改 → 没 cover。
    review process。

  3. GX 版本升级:0.x → 1.x breaking change 大。锁版本 / 小心升。

  4. test 跑慢:每个 test 一条 query → 大表 N test 慢。dbt
    --store-failures 让结果存表 + 跑一次 query 多 test。

  5. silent broken:test 跑了但通过(即使数据有问题)。覆盖度
    review 重要。每次 incident 后加新 test,防同问题。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。