起因
要搭一个数据 pipeline:每天凌晨 1 点从 S3 拉新数据 → 清洗 → 入
ClickHouse → 跑特征工程 → 训 ML model → push 到 staging。
失败要重试 + 告警 + 部分重跑。
三个主流编排工具:
- Airflow:老牌,2014 出自 Airbnb,业界事实标准
- Prefect:Airflow 工程师出走重新设计,更轻量
- Dagster:从数据资产角度重新思考 pipeline,2024 风头最劲
试了一周下来记录。
Airflow
定义 DAG:
# airflow_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_etl',
default_args=default_args,
schedule='0 1 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
) as dag:
def fetch_data(**ctx):
ds = ctx['ds'] # '2024-05-24'
download_from_s3(f's3://bucket/raw/{ds}/')
def clean_data(**ctx):
ds = ctx['ds']
clean(input_dir, output_dir)
fetch = PythonOperator(task_id='fetch', python_callable=fetch_data)
clean = PythonOperator(task_id='clean', python_callable=clean_data)
load = PythonOperator(task_id='load', python_callable=load_to_ch)
train = PythonOperator(task_id='train', python_callable=train_model)
fetch >> clean >> load >> train
跑:
airflow webserver
airflow scheduler
# 浏览器 http://localhost:8080 看 DAG
优点:
- 业界最广,会的人多
- operator 库巨大(Bash / Spark / EMR / Snowflake / Postgres / Slack / ...)
- 成熟的 SLA / lineage / retry 机制
- K8s 部署成熟(KubernetesExecutor / KubernetesPodOperator)
缺点:
- DAG = 全局 Python module,导入慢(scheduler 反复 import 几千 DAG)
- 任务间传数据麻烦(XCom 限大小;走 S3 / DB 才行)
- 本地开发体验差(webserver + scheduler + executor 三件套)
- DAG 语法重,3 个 task 也 50 行
Prefect
# prefect_flow.py
from prefect import flow, task
from datetime import datetime, timedelta
@task(retries=2, retry_delay_seconds=300)
def fetch_data(date: str) -> str:
path = f's3://bucket/raw/{date}/'
download_from_s3(path)
return path
@task
def clean_data(raw_path: str) -> str:
out = clean(raw_path)
return out
@task
def load_to_ch(clean_path: str):
load(clean_path)
@task
def train_model():
train()
@flow(name='daily_etl', log_prints=True)
def daily_etl(date: str = None):
date = date or datetime.utcnow().date().isoformat()
raw = fetch_data(date)
clean = clean_data(raw)
load_to_ch(clean)
train_model()
if __name__ == '__main__':
daily_etl()
跑:
# 本地直接跑
python prefect_flow.py
# 部署到 Prefect Cloud / 自托管 server
prefect deploy prefect_flow.py:daily_etl --name nightly --cron '0 1 * * *'
prefect worker start --pool default
优点:
- 写 flow 像写普通 Python 函数,task 间传值像普通调用
- 本地开发体验好(直接 run flow)
- Prefect Cloud 免费层够小团队用
- 动态 task / sub-flow 支持好
缺点:
- 生态不如 Airflow(operator / connector 少)
- 2.0 vs 3.0 几次大改 breaking
- 自托管 server 还需要装 Postgres + Redis 等
Dagster
# dagster_pipeline.py
from dagster import asset, AssetExecutionContext, Definitions, ScheduleDefinition, define_asset_job
@asset
def raw_data(context: AssetExecutionContext) -> str:
ds = context.partition_key # '2024-05-24'
return download_from_s3(f's3://bucket/raw/{ds}/')
@asset(deps=[raw_data])
def cleaned_data(context, raw_data: str) -> str:
return clean(raw_data)
@asset(deps=[cleaned_data])
def clickhouse_table(cleaned_data: str):
load_to_ch(cleaned_data)
@asset(deps=[clickhouse_table])
def ml_model() -> str:
return train()
daily_job = define_asset_job('daily', selection='*')
daily_schedule = ScheduleDefinition(job=daily_job, cron_schedule='0 1 * * *')
defs = Definitions(
assets=[raw_data, cleaned_data, clickhouse_table, ml_model],
schedules=[daily_schedule],
)
跑:
dagster dev
# http://localhost:3000 看 asset 图
优点:
- "asset-based" 思维:每个 task 产出一个数据资产 (table / file /
model),blueprint = asset 图,自然 documentation - IDE 友好:type hint + 静态分析整套 pipeline
- backfill / partition / lineage 一等公民
- 内置数据测试 / asset checks
- 本地开发 =
dagster dev一条命令
缺点:
- 最新(2019),生态比 Airflow 小
- 学习曲线略陡(asset / op / job / sensor 几个概念)
- 团队上手时间长
选型对比
| 场景 | 推荐 |
|---|---|
| 已经在用 Airflow + 团队熟 | 继续 Airflow |
| 新项目 + 现代 Python + IDE 重度 | Dagster |
| 极简 + 个人项目 + Cloud SaaS | Prefect |
| 万千 DAG + 跨团队 + 老系统对接 | Airflow(生态最广) |
| ML pipeline + 强 data lineage | Dagster |
我个人新项目首选 Dagster:asset 抽象比 task 更贴合"数据流"思维。
共同 best practice
不管选哪个:
1. 任务幂等
@task
def load_to_ch(path):
# 用 INSERT OR REPLACE / DELETE then INSERT 让重跑不重复
db.execute(f"DELETE FROM table WHERE date = '{date}'")
db.execute(f"INSERT INTO table SELECT * FROM read_parquet('{path}')")
partition / date 当业务 key,重跑刷掉这个 partition 再插。
2. 数据 contract
定义"这个 task 输出什么 schema / row count / 范围":
@asset
def cleaned_data(...) -> Output:
df = ...
return Output(
value=df,
metadata={
'rows': len(df),
'columns': list(df.columns),
'null_rate': df.isna().mean().to_dict(),
}
)
下游任务依赖 contract 而非 implicit 假设。下次改变 schema 显式报错。
3. 分层
按 dbt 风格:
sources/ # 原始数据
staging/ # 轻清洗 / 类型转换
intermediate/ # 业务逻辑组合
marts/ # 业务可用的 final table
每层独立可测试 + 故障 isolate。
4. 告警 + 监控
- task 失败 → Slack / 邮件
- task 跑太久 → SLA miss 告警
- 数据指标异常(行数突变) → 检查告警
- Prometheus 暴露 task duration / success rate
5. backfill
历史日期重跑:
# Airflow
airflow dags backfill -s 2024-01-01 -e 2024-05-01 daily_etl
# Dagster
dagster job backfill --job daily --partitions '2024-01-01..2024-05-01'
# Prefect
prefect deployment run 'daily_etl/nightly' --param date=2024-01-15
6. data lineage / catalog
知道"这个表是哪些 task 生成的,哪些下游用了它"。
Dagster 内置;Airflow 接 OpenLineage / Marquez;Prefect 类似。
整套部署(举例 Dagster + K8s)
- dagster-webserver (UI)
- dagster-daemon (scheduler + sensor)
- code locations (你的 pipeline Python)
- PostgreSQL (metadata)
- S3 / GCS (intermediate storage)
- K8s(task 在 pod 跑)
helm chart 一键:
helm install dagster dagster/dagster --set ...
效果
我们的项目用 Dagster:
- 100+ assets 自动算依赖图
- 单次 backfill 半年数据:UI 选时间范围 + run,asset 自动并行
- 数据 schema 改动 → asset check fail → CI 拦下 → 改下游 → 再合
- 新成员看 asset 图 30 秒理解整套 pipeline
踩过的坑
Airflow 类
-
DAG 太多 scheduler 慢:每 30s 扫所有 DAG 文件 import。
把 DAG 分文件 + 用 dynamic DAG generation 减少 scheduler IO。 -
XCom 大数据:传几 MB 会爆 backend DB。task 间传 path(S3 key)
而非数据本身。
Prefect 类
- 2.x → 3.x breaking:API 大改。生产 pin 大版本 + 升级提前测。
Dagster 类
-
asset partition 配错:同 asset 一个 daily 一个 hourly partition
会冲突。统一一 asset 一种 partition scheme。 -
monorepo 多 code location:每个 location 独立 Python 进程,
依赖隔离好但启动慢。开发期合并;生产分。
登录后参与评论。