dbt incremental models:每天增量跑 vs 全量重算

起因

每天跑一遍数据仓库 SQL transform:

-- daily_user_metrics
SELECT
    user_id,
    DATE(created_at) AS day,
    COUNT(*) AS events,
    SUM(value) AS total
FROM events
GROUP BY 1, 2;

全量跑:扫几亿行 events → 20 min + Snowflake credit 几百刀。
但实际上昨天之前的天数据是冻结的,每天只需算今天的新增。

dbt 的 incremental model 模式解决这问题。

全量 model

-- models/daily_user_metrics.sql
{{ config(materialized='table') }}

SELECT user_id, DATE(created_at) AS day, COUNT(*) AS events
FROM {{ ref('events') }}
GROUP BY 1, 2

materialized='table':每次 dbt run 都 DROP + CREATE TABLE AS。
小数据 OK;大数据浪费。

incremental model

-- models/daily_user_metrics.sql
{{ config(
    materialized='incremental',
    unique_key=['user_id', 'day'],
    on_schema_change='append_new_columns'
) }}

SELECT
    user_id,
    DATE(created_at) AS day,
    COUNT(*) AS events,
    SUM(value) AS total
FROM {{ ref('events') }}

{% if is_incremental() %}
    WHERE created_at >= (SELECT MAX(day) FROM {{ this }}) - INTERVAL '1 day'
{% endif %}

GROUP BY 1, 2

关键:

  • {{ this }} 引用当前 model 自己
  • is_incremental() 在"已存在且不是 --full-refresh"时为 true
  • WHERE 只取新数据
  • unique_key 让 dbt 知道按啥 merge

第一次 run:建 table + 全量。
之后每次 run:只算新数据 → MERGE 进现有 table。

效果

我们一个 metric model:

  • 事件表 5 亿行
  • 全量跑:22 分钟,$3 credit
  • incremental:1.5 分钟,$0.2 credit

12x 时间节省,15x 成本节省。

几种 strategy

dbt 支持几种 incremental 策略(按 warehouse):

strategy 行为
append 直接 INSERT 新数据(不去重)
merge 默认。MERGE INTO with unique_key
delete+insert 删 unique_key 匹配的行 + INSERT
insert_overwrite partition-level overwrite(BigQuery / Spark)
{{ config(
    materialized='incremental',
    incremental_strategy='merge',
    unique_key='id',
) }}

PG / Snowflake / Redshift 用 merge。BigQuery 大 partition 用
insert_overwrite。

late-arriving data

事件 24 小时后才到(移动 SDK 离线缓存)。
window 要更宽:

{% if is_incremental() %}
    WHERE created_at >= (SELECT MAX(day) FROM {{ this }}) - INTERVAL '7 days'
{% endif %}

回看 7 天,覆盖晚到的事件。unique_key 保证 merge 不重复。

trade-off:window 越宽 → 重算越多 → 增量收益减。

全量 backfill

老数据要重算(如 metric 公式改了):

dbt run --select daily_user_metrics --full-refresh

--full-refresh 让 incremental 当 table 处理 → DROP + 重建。

或者部分重算:

# 重算最近 30 天
dbt run --select daily_user_metrics --vars '{"start_date": "2025-04-01"}'

model 里读 var:

{% if var('start_date', false) %}
    WHERE created_at >= '{{ var("start_date") }}'
{% endif %}

跟 partitioned table 配

BigQuery / Snowflake partition:

{{ config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partition_by={'field': 'day', 'data_type': 'date'},
    cluster_by=['user_id'],
) }}

incremental insert overwrite 比 merge 更高效(partition 整块替换 vs
row-level merge)。

test incremental

dbt test 默认在 model run 后跑:

# models/schema.yml
models:
  - name: daily_user_metrics
    columns:
      - name: user_id
        tests:
          - not_null
      - name: day
        tests:
          - not_null
    tests:
      - dbt_utils.unique_combination_of_columns:
          combination_of_columns: [user_id, day]

increment 后唯一性测试关键:merge 配错 → 重复数据。

监控

dbt run 完看 timing:

1 of 5 START incremental model daily_user_metrics ........ [RUN]
1 of 5 OK created incremental model daily_user_metrics ... [SUCCESS in 89.42s]

如果 incremental 跑越来越慢(不应该):

  • WHERE 条件 partition / cluster key 利用不充分
  • merge 行数变大(window 太宽)
  • new column 加了导致 on_schema_change 触发

dbt 1.6+ microbatch

dbt 1.6 引入 microbatch incremental_strategy(更明确的 partition-by-time):

{{ config(
    materialized='incremental',
    incremental_strategy='microbatch',
    event_time='created_at',
    batch_size='day',
    lookback=3,
) }}

SELECT ... FROM {{ ref('events') }}

dbt 自动按 day 分批跑 → backfill 时按天 chunk → 单个 chunk 失败不
影响别天。比手写 is_incremental() 干净。

我们的 pipeline

events (raw, 5 亿行)
   ↓ [hourly]
events_hourly (incremental, 24h window)
   ↓ [daily]
daily_metrics (incremental, 7d window)
   ↓
monthly_summary (incremental, 1m window)
   ↓
dashboard

每层 incremental,各自 window 覆盖延迟。
全量 backfill 用 --full-refresh 串行跑全部。

真实 vs 全量

我们一个 200 个 model 的 dbt 项目,把 80% 大 model 改 incremental
后:

  • 总跑时间从 4h → 35min
  • Snowflake 月成本从 $8000 → $1200
  • 失败重试代价低很多

incremental 是 dbt 最重要的 production pattern。

踩过的坑

  1. unique_key 不对:merge 进重复 → 数据涨。每次改 unique_key 必须
    --full-refresh

  2. is_incremental() 没用:写在 CTE 里 / 没读 {{ this }}
    每次都全量。看生成的 SQL(compile)确认。

  3. schema change:表加列 → 默认 incremental 报错(schema mismatch)。
    on_schema_change='append_new_columns''sync_all_columns'

  4. late-arriving 漏数:window 太窄 → 晚到事件没进表。监控
    events vs metrics 一致性。

  5. partition 没 prune:BigQuery 大 partition 表 WHERE 写 timestamp
    比较,但 model 没 partition by → full scan。
    partition_by 配 + WHERE 用 partition 列。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。