起因
每天跑一遍数据仓库 SQL transform:
-- daily_user_metrics
SELECT
user_id,
DATE(created_at) AS day,
COUNT(*) AS events,
SUM(value) AS total
FROM events
GROUP BY 1, 2;
全量跑:扫几亿行 events → 20 min + Snowflake credit 几百刀。
但实际上昨天之前的天数据是冻结的,每天只需算今天的新增。
dbt 的 incremental model 模式解决这问题。
全量 model
-- models/daily_user_metrics.sql
{{ config(materialized='table') }}
SELECT user_id, DATE(created_at) AS day, COUNT(*) AS events
FROM {{ ref('events') }}
GROUP BY 1, 2
materialized='table':每次 dbt run 都 DROP + CREATE TABLE AS。
小数据 OK;大数据浪费。
incremental model
-- models/daily_user_metrics.sql
{{ config(
materialized='incremental',
unique_key=['user_id', 'day'],
on_schema_change='append_new_columns'
) }}
SELECT
user_id,
DATE(created_at) AS day,
COUNT(*) AS events,
SUM(value) AS total
FROM {{ ref('events') }}
{% if is_incremental() %}
WHERE created_at >= (SELECT MAX(day) FROM {{ this }}) - INTERVAL '1 day'
{% endif %}
GROUP BY 1, 2
关键:
{{ this }}引用当前 model 自己is_incremental()在"已存在且不是 --full-refresh"时为 true- WHERE 只取新数据
unique_key让 dbt 知道按啥 merge
第一次 run:建 table + 全量。
之后每次 run:只算新数据 → MERGE 进现有 table。
效果
我们一个 metric model:
- 事件表 5 亿行
- 全量跑:22 分钟,$3 credit
- incremental:1.5 分钟,$0.2 credit
12x 时间节省,15x 成本节省。
几种 strategy
dbt 支持几种 incremental 策略(按 warehouse):
| strategy | 行为 |
|---|---|
append |
直接 INSERT 新数据(不去重) |
merge |
默认。MERGE INTO with unique_key |
delete+insert |
删 unique_key 匹配的行 + INSERT |
insert_overwrite |
partition-level overwrite(BigQuery / Spark) |
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='id',
) }}
PG / Snowflake / Redshift 用 merge。BigQuery 大 partition 用
insert_overwrite。
late-arriving data
事件 24 小时后才到(移动 SDK 离线缓存)。
window 要更宽:
{% if is_incremental() %}
WHERE created_at >= (SELECT MAX(day) FROM {{ this }}) - INTERVAL '7 days'
{% endif %}
回看 7 天,覆盖晚到的事件。unique_key 保证 merge 不重复。
trade-off:window 越宽 → 重算越多 → 增量收益减。
全量 backfill
老数据要重算(如 metric 公式改了):
dbt run --select daily_user_metrics --full-refresh
--full-refresh 让 incremental 当 table 处理 → DROP + 重建。
或者部分重算:
# 重算最近 30 天
dbt run --select daily_user_metrics --vars '{"start_date": "2025-04-01"}'
model 里读 var:
{% if var('start_date', false) %}
WHERE created_at >= '{{ var("start_date") }}'
{% endif %}
跟 partitioned table 配
BigQuery / Snowflake partition:
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={'field': 'day', 'data_type': 'date'},
cluster_by=['user_id'],
) }}
incremental insert overwrite 比 merge 更高效(partition 整块替换 vs
row-level merge)。
test incremental
dbt test 默认在 model run 后跑:
# models/schema.yml
models:
- name: daily_user_metrics
columns:
- name: user_id
tests:
- not_null
- name: day
tests:
- not_null
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns: [user_id, day]
increment 后唯一性测试关键:merge 配错 → 重复数据。
监控
dbt run 完看 timing:
1 of 5 START incremental model daily_user_metrics ........ [RUN]
1 of 5 OK created incremental model daily_user_metrics ... [SUCCESS in 89.42s]
如果 incremental 跑越来越慢(不应该):
- WHERE 条件 partition / cluster key 利用不充分
- merge 行数变大(window 太宽)
- new column 加了导致
on_schema_change触发
dbt 1.6+ microbatch
dbt 1.6 引入 microbatch incremental_strategy(更明确的 partition-by-time):
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='created_at',
batch_size='day',
lookback=3,
) }}
SELECT ... FROM {{ ref('events') }}
dbt 自动按 day 分批跑 → backfill 时按天 chunk → 单个 chunk 失败不
影响别天。比手写 is_incremental() 干净。
我们的 pipeline
events (raw, 5 亿行)
↓ [hourly]
events_hourly (incremental, 24h window)
↓ [daily]
daily_metrics (incremental, 7d window)
↓
monthly_summary (incremental, 1m window)
↓
dashboard
每层 incremental,各自 window 覆盖延迟。
全量 backfill 用 --full-refresh 串行跑全部。
真实 vs 全量
我们一个 200 个 model 的 dbt 项目,把 80% 大 model 改 incremental
后:
- 总跑时间从 4h → 35min
- Snowflake 月成本从 $8000 → $1200
- 失败重试代价低很多
incremental 是 dbt 最重要的 production pattern。
踩过的坑
-
unique_key 不对:merge 进重复 → 数据涨。每次改 unique_key 必须
--full-refresh。 -
is_incremental()没用:写在 CTE 里 / 没读{{ this }}→
每次都全量。看生成的 SQL(compile)确认。 -
schema change:表加列 → 默认 incremental 报错(schema mismatch)。
on_schema_change='append_new_columns'或'sync_all_columns'。 -
late-arriving 漏数:window 太窄 → 晚到事件没进表。监控
events vs metrics一致性。 -
partition 没 prune:BigQuery 大 partition 表 WHERE 写 timestamp
比较,但 model 没 partition by → full scan。
partition_by配 + WHERE 用 partition 列。
登录后参与评论。