Dagster vs Airflow vs Prefect:现代 ETL pipeline 选哪个

起因

要搭一个数据 pipeline:每天凌晨 1 点从 S3 拉新数据 → 清洗 → 入
ClickHouse → 跑特征工程 → 训 ML model → push 到 staging。
失败要重试 + 告警 + 部分重跑。

三个主流编排工具:

  • Airflow:老牌,2014 出自 Airbnb,业界事实标准
  • Prefect:Airflow 工程师出走重新设计,更轻量
  • Dagster:从数据资产角度重新思考 pipeline,2024 风头最劲

试了一周下来记录。

Airflow

定义 DAG:

# airflow_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'daily_etl',
    default_args=default_args,
    schedule='0 1 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
) as dag:

    def fetch_data(**ctx):
        ds = ctx['ds']    # '2024-05-24'
        download_from_s3(f's3://bucket/raw/{ds}/')

    def clean_data(**ctx):
        ds = ctx['ds']
        clean(input_dir, output_dir)

    fetch = PythonOperator(task_id='fetch', python_callable=fetch_data)
    clean = PythonOperator(task_id='clean', python_callable=clean_data)
    load = PythonOperator(task_id='load', python_callable=load_to_ch)
    train = PythonOperator(task_id='train', python_callable=train_model)

    fetch >> clean >> load >> train

跑:

airflow webserver
airflow scheduler
# 浏览器 http://localhost:8080 看 DAG

优点

  • 业界最广,会的人多
  • operator 库巨大(Bash / Spark / EMR / Snowflake / Postgres / Slack / ...)
  • 成熟的 SLA / lineage / retry 机制
  • K8s 部署成熟(KubernetesExecutor / KubernetesPodOperator)

缺点

  • DAG = 全局 Python module,导入慢(scheduler 反复 import 几千 DAG)
  • 任务间传数据麻烦(XCom 限大小;走 S3 / DB 才行)
  • 本地开发体验差(webserver + scheduler + executor 三件套)
  • DAG 语法重,3 个 task 也 50 行

Prefect

# prefect_flow.py
from prefect import flow, task
from datetime import datetime, timedelta

@task(retries=2, retry_delay_seconds=300)
def fetch_data(date: str) -> str:
    path = f's3://bucket/raw/{date}/'
    download_from_s3(path)
    return path

@task
def clean_data(raw_path: str) -> str:
    out = clean(raw_path)
    return out

@task
def load_to_ch(clean_path: str):
    load(clean_path)

@task
def train_model():
    train()

@flow(name='daily_etl', log_prints=True)
def daily_etl(date: str = None):
    date = date or datetime.utcnow().date().isoformat()
    raw = fetch_data(date)
    clean = clean_data(raw)
    load_to_ch(clean)
    train_model()


if __name__ == '__main__':
    daily_etl()

跑:

# 本地直接跑
python prefect_flow.py

# 部署到 Prefect Cloud / 自托管 server
prefect deploy prefect_flow.py:daily_etl --name nightly --cron '0 1 * * *'
prefect worker start --pool default

优点

  • 写 flow 像写普通 Python 函数,task 间传值像普通调用
  • 本地开发体验好(直接 run flow)
  • Prefect Cloud 免费层够小团队用
  • 动态 task / sub-flow 支持好

缺点

  • 生态不如 Airflow(operator / connector 少)
  • 2.0 vs 3.0 几次大改 breaking
  • 自托管 server 还需要装 Postgres + Redis 等

Dagster

# dagster_pipeline.py
from dagster import asset, AssetExecutionContext, Definitions, ScheduleDefinition, define_asset_job

@asset
def raw_data(context: AssetExecutionContext) -> str:
    ds = context.partition_key   # '2024-05-24'
    return download_from_s3(f's3://bucket/raw/{ds}/')

@asset(deps=[raw_data])
def cleaned_data(context, raw_data: str) -> str:
    return clean(raw_data)

@asset(deps=[cleaned_data])
def clickhouse_table(cleaned_data: str):
    load_to_ch(cleaned_data)

@asset(deps=[clickhouse_table])
def ml_model() -> str:
    return train()

daily_job = define_asset_job('daily', selection='*')
daily_schedule = ScheduleDefinition(job=daily_job, cron_schedule='0 1 * * *')

defs = Definitions(
    assets=[raw_data, cleaned_data, clickhouse_table, ml_model],
    schedules=[daily_schedule],
)

跑:

dagster dev
# http://localhost:3000 看 asset 图

优点

  • "asset-based" 思维:每个 task 产出一个数据资产 (table / file /
    model),blueprint = asset 图,自然 documentation
  • IDE 友好:type hint + 静态分析整套 pipeline
  • backfill / partition / lineage 一等公民
  • 内置数据测试 / asset checks
  • 本地开发 = dagster dev 一条命令

缺点

  • 最新(2019),生态比 Airflow 小
  • 学习曲线略陡(asset / op / job / sensor 几个概念)
  • 团队上手时间长

选型对比

场景 推荐
已经在用 Airflow + 团队熟 继续 Airflow
新项目 + 现代 Python + IDE 重度 Dagster
极简 + 个人项目 + Cloud SaaS Prefect
万千 DAG + 跨团队 + 老系统对接 Airflow(生态最广)
ML pipeline + 强 data lineage Dagster

我个人新项目首选 Dagster:asset 抽象比 task 更贴合"数据流"思维。

共同 best practice

不管选哪个:

1. 任务幂等

@task
def load_to_ch(path):
    # 用 INSERT OR REPLACE / DELETE then INSERT 让重跑不重复
    db.execute(f"DELETE FROM table WHERE date = '{date}'")
    db.execute(f"INSERT INTO table SELECT * FROM read_parquet('{path}')")

partition / date 当业务 key,重跑刷掉这个 partition 再插。

2. 数据 contract

定义"这个 task 输出什么 schema / row count / 范围":

@asset
def cleaned_data(...) -> Output:
    df = ...
    return Output(
        value=df,
        metadata={
            'rows': len(df),
            'columns': list(df.columns),
            'null_rate': df.isna().mean().to_dict(),
        }
    )

下游任务依赖 contract 而非 implicit 假设。下次改变 schema 显式报错。

3. 分层

按 dbt 风格:

sources/    # 原始数据
staging/    # 轻清洗 / 类型转换
intermediate/  # 业务逻辑组合
marts/      # 业务可用的 final table

每层独立可测试 + 故障 isolate。

4. 告警 + 监控

  • task 失败 → Slack / 邮件
  • task 跑太久 → SLA miss 告警
  • 数据指标异常(行数突变) → 检查告警
  • Prometheus 暴露 task duration / success rate

5. backfill

历史日期重跑:

# Airflow
airflow dags backfill -s 2024-01-01 -e 2024-05-01 daily_etl

# Dagster
dagster job backfill --job daily --partitions '2024-01-01..2024-05-01'

# Prefect
prefect deployment run 'daily_etl/nightly' --param date=2024-01-15

6. data lineage / catalog

知道"这个表是哪些 task 生成的,哪些下游用了它"。
Dagster 内置;Airflow 接 OpenLineage / Marquez;Prefect 类似。

整套部署(举例 Dagster + K8s)

- dagster-webserver (UI)
- dagster-daemon (scheduler + sensor)
- code locations (你的 pipeline Python)
- PostgreSQL (metadata)
- S3 / GCS (intermediate storage)
- K8s(task 在 pod 跑)

helm chart 一键:

helm install dagster dagster/dagster --set ...

效果

我们的项目用 Dagster:

  • 100+ assets 自动算依赖图
  • 单次 backfill 半年数据:UI 选时间范围 + run,asset 自动并行
  • 数据 schema 改动 → asset check fail → CI 拦下 → 改下游 → 再合
  • 新成员看 asset 图 30 秒理解整套 pipeline

踩过的坑

Airflow 类

  1. DAG 太多 scheduler 慢:每 30s 扫所有 DAG 文件 import。
    把 DAG 分文件 + 用 dynamic DAG generation 减少 scheduler IO。

  2. XCom 大数据:传几 MB 会爆 backend DB。task 间传 path(S3 key)
    而非数据本身。

Prefect 类

  1. 2.x → 3.x breaking:API 大改。生产 pin 大版本 + 升级提前测。

Dagster 类

  1. asset partition 配错:同 asset 一个 daily 一个 hourly partition
    会冲突。统一一 asset 一种 partition scheme。

  2. monorepo 多 code location:每个 location 独立 Python 进程,
    依赖隔离好但启动慢。开发期合并;生产分。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。