Prefect vs Airflow:现代数据流编排选哪个

起因

数据 pipeline 需要:

  • 定时 / 触发跑(cron / event)
  • 任务依赖 DAG
  • 重试 / 失败告警
  • UI 看历史 / 调试

Airflow 是 2014 起的事实标准。Prefect(2018+)是现代挑战者。
最近从 Airflow 迁了一半 pipeline 到 Prefect,下面对比。

Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract():
    print('extract')

def transform():
    print('transform')

def load():
    print('load')

with DAG('etl', start_date=datetime(2025, 1, 1), schedule='@daily') as dag:
    e = PythonOperator(task_id='extract', python_callable=extract)
    t = PythonOperator(task_id='transform', python_callable=transform)
    l = PythonOperator(task_id='load', python_callable=load)
    e >> t >> l

DAG 是静态文件,scheduler 周期扫描 + render。

优势

  • 业界事实标准(10 年沉淀)
  • 巨大 operator 生态(数百个 connector)
  • 大规模成熟(Airbnb 几万 DAG)
  • Kubernetes / Celery executor 久经考验

劣势

  • DAG 必须静态(运行时 branch / 动态 task 难)
  • 调试本地难(要起 scheduler / webserver / DB / executor)
  • Python function 跟 task framework 耦合(PythonOperator wrapper)
  • 升级痛苦(2.x vs 1.x 大变;插件兼容差)
  • 巨型部署(PG + scheduler + worker + webserver 几组件)

Prefect 2/3

from prefect import flow, task

@task(retries=3)
def extract():
    return [1, 2, 3]

@task
def transform(x):
    return x * 2

@task
def load(items):
    print(f'loaded {items}')

@flow
def etl():
    data = extract()
    transformed = [transform(x) for x in data]
    load(transformed)

if __name__ == '__main__':
    etl()

@flow / @task 装饰器即编排。普通 Python 函数 + decorator。

优势

  • DAG 动态生成(运行时根据数据决定 task)
  • 本地 dev 简单:python etl.py
  • 装饰器轻,function 仍是 function
  • API + UI 现代
  • Prefect Cloud free tier 个人 OK

劣势

  • 生态比 Airflow 小(operator / connector 少)
  • 还在快速演化(Prefect 3 vs 2 部分 API 变)
  • 大规模 production 沉淀少

调度模型对比

Airflow

Scheduler 每分钟扫 DAG → 决定哪 task 该跑 → 推到 worker

集中式,scheduler 是瓶颈。

Prefect

Flow run 由 trigger(schedule / API / event)启动
Worker pull pending run 跑

更松散,worker 任意机器都能 pull。

动态 task

# Airflow(静态 → 用 mapped task 模拟动态)
@task
def process(item): ...

@task
def get_items(): return [1, 2, 3, 4]

@dag
def my_dag():
    items = get_items()
    process.expand(item=items)

Airflow 2.3+ 加了 dynamic task mapping,写起来扭。

# Prefect:原生
@flow
def my_flow():
    items = get_items()
    for item in items:        # 普通 Python
        process(item)

Prefect 直接用 Python loop,运行时决定 task 数。

subflow

Prefect 鼓励 flow 嵌套:

@flow
def daily_etl():
    for region in ['us', 'eu', 'asia']:
        region_flow(region)        # subflow 单独 run,独立 retry

@flow
def region_flow(region):
    extract(region)
    transform(region)
    load(region)

UI 里嵌套展开。复杂 pipeline 模块化。

部署模型

Airflow

- PG(metadata)
- Redis / Celery(queue)
- Webserver
- Scheduler
- N × Worker
- 自动化 deploy DAG file → /dags 文件夹

K8s 部署 6+ 容器。

Prefect

- Server(API + UI)(Prefect Cloud 替代)
- Worker(pull flow run)
- Code 存哪都行(git / S3 / docker)

简单很多。可以全 serverless(Cloud + ECS / Lambda worker)。

本地开发

Airflow 本地:

docker compose up   # airflow-init + scheduler + webserver + worker
# 改 DAG → 等 60s scheduler 扫

Prefect 本地:

prefect server start    # 起 server
python my_flow.py       # 直接跑

iter loop 快 5-10x。

与 Dagster 对比

Dagster 是第三玩家,asset-based 编排(pipeline 是 "asset 之间的 graph")。
更声明式,更适合数据 platform 团队。

Airflow Prefect Dagster
模型 task DAG flow + task asset graph
上手 中高
动态
生态 最大
适合 大企业 / 复杂 ETL 中小项目 / Pythonic 数据 platform

迁移 case

我们 30 个 Airflow DAG,三类:

  1. 简单 ETL(SQL → SQL):保留 Airflow(operator 现成,省事)
  2. 复杂 Python pipeline(动态 logic):迁 Prefect
  3. ML 训练 pipeline:迁 Prefect(model artifact + dynamic 强)

混合架构:Airflow 跑标准 ETL,Prefect 跑 dynamic / Pythonic 流程。

与 dbt 集成

两者都能跑 dbt:

# Prefect
from prefect_dbt.cli.commands import DbtCoreOperation

@flow
def dbt_flow():
    DbtCoreOperation(commands=['dbt run --select tag:hourly']).run()
# Airflow
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
# 或者 BashOperator('dbt run ...')

差不多。Prefect block 概念让 dbt cli 复用简单。

API trigger

# Prefect:HTTP trigger flow run
from prefect.client.orchestration import get_client

async with get_client() as client:
    await client.create_flow_run_from_deployment(
        deployment_id='abc',
        parameters={'date': '2025-01-01'},
    )

外部系统(webhook / event)触发简单。Airflow 也有 REST API 但繁琐。

监控 / 告警

两者都有 UI + email / Slack 通知 + Prometheus metrics。
Prefect Cloud 免费 tier 告警直接配。
Airflow self-host 要自己拼 alertmanager。

选型决策

  • 大企业 + 复杂 ETL + 团队熟 Airflow → Airflow
  • 新项目 + Python pipeline → Prefect
  • 数据 platform 团队 + asset 思维 → Dagster
  • 简单 cron + 几个 job → 别上编排框架,systemd timer + GitHub
    Actions 够

踩过的坑(迁移)

  1. Airflow XCom → Prefect return value:XCom 传数据有限(< 48KB
    metadata)。Prefect 直接 return Python object,但 cluster 间要序
    列化 → 用 storage block。

  2. schedule timezone:Airflow start_date UTC;Prefect 默认 UTC
    但 cron string 解释要明确。

  3. retries 默认 0:忘配 → 失败不重试。生产 retries=3 + backoff
    默认。

  4. flow concurrency limit:同 flow 多 run 并行 → DB 锁。
    concurrency_limit 控制。

  5. Prefect 3 升级:从 2.x 迁 3.x 有 breaking change。读 migration
    guide。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。