起因
数据 pipeline 需要:
- 定时 / 触发跑(cron / event)
- 任务依赖 DAG
- 重试 / 失败告警
- UI 看历史 / 调试
Airflow 是 2014 起的事实标准。Prefect(2018+)是现代挑战者。
最近从 Airflow 迁了一半 pipeline 到 Prefect,下面对比。
Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract():
print('extract')
def transform():
print('transform')
def load():
print('load')
with DAG('etl', start_date=datetime(2025, 1, 1), schedule='@daily') as dag:
e = PythonOperator(task_id='extract', python_callable=extract)
t = PythonOperator(task_id='transform', python_callable=transform)
l = PythonOperator(task_id='load', python_callable=load)
e >> t >> l
DAG 是静态文件,scheduler 周期扫描 + render。
优势
- 业界事实标准(10 年沉淀)
- 巨大 operator 生态(数百个 connector)
- 大规模成熟(Airbnb 几万 DAG)
- Kubernetes / Celery executor 久经考验
劣势
- DAG 必须静态(运行时 branch / 动态 task 难)
- 调试本地难(要起 scheduler / webserver / DB / executor)
- Python function 跟 task framework 耦合(PythonOperator wrapper)
- 升级痛苦(2.x vs 1.x 大变;插件兼容差)
- 巨型部署(PG + scheduler + worker + webserver 几组件)
Prefect 2/3
from prefect import flow, task
@task(retries=3)
def extract():
return [1, 2, 3]
@task
def transform(x):
return x * 2
@task
def load(items):
print(f'loaded {items}')
@flow
def etl():
data = extract()
transformed = [transform(x) for x in data]
load(transformed)
if __name__ == '__main__':
etl()
@flow / @task 装饰器即编排。普通 Python 函数 + decorator。
优势
- DAG 动态生成(运行时根据数据决定 task)
- 本地 dev 简单:
python etl.py跑 - 装饰器轻,function 仍是 function
- API + UI 现代
- Prefect Cloud free tier 个人 OK
劣势
- 生态比 Airflow 小(operator / connector 少)
- 还在快速演化(Prefect 3 vs 2 部分 API 变)
- 大规模 production 沉淀少
调度模型对比
Airflow:
Scheduler 每分钟扫 DAG → 决定哪 task 该跑 → 推到 worker
集中式,scheduler 是瓶颈。
Prefect:
Flow run 由 trigger(schedule / API / event)启动
Worker pull pending run 跑
更松散,worker 任意机器都能 pull。
动态 task
# Airflow(静态 → 用 mapped task 模拟动态)
@task
def process(item): ...
@task
def get_items(): return [1, 2, 3, 4]
@dag
def my_dag():
items = get_items()
process.expand(item=items)
Airflow 2.3+ 加了 dynamic task mapping,写起来扭。
# Prefect:原生
@flow
def my_flow():
items = get_items()
for item in items: # 普通 Python
process(item)
Prefect 直接用 Python loop,运行时决定 task 数。
subflow
Prefect 鼓励 flow 嵌套:
@flow
def daily_etl():
for region in ['us', 'eu', 'asia']:
region_flow(region) # subflow 单独 run,独立 retry
@flow
def region_flow(region):
extract(region)
transform(region)
load(region)
UI 里嵌套展开。复杂 pipeline 模块化。
部署模型
Airflow:
- PG(metadata)
- Redis / Celery(queue)
- Webserver
- Scheduler
- N × Worker
- 自动化 deploy DAG file → /dags 文件夹
K8s 部署 6+ 容器。
Prefect:
- Server(API + UI)(Prefect Cloud 替代)
- Worker(pull flow run)
- Code 存哪都行(git / S3 / docker)
简单很多。可以全 serverless(Cloud + ECS / Lambda worker)。
本地开发
Airflow 本地:
docker compose up # airflow-init + scheduler + webserver + worker
# 改 DAG → 等 60s scheduler 扫
Prefect 本地:
prefect server start # 起 server
python my_flow.py # 直接跑
iter loop 快 5-10x。
与 Dagster 对比
Dagster 是第三玩家,asset-based 编排(pipeline 是 "asset 之间的 graph")。
更声明式,更适合数据 platform 团队。
| Airflow | Prefect | Dagster | |
|---|---|---|---|
| 模型 | task DAG | flow + task | asset graph |
| 上手 | 中 | 低 | 中高 |
| 动态 | 弱 | 强 | 中 |
| 生态 | 最大 | 中 | 中 |
| 适合 | 大企业 / 复杂 ETL | 中小项目 / Pythonic | 数据 platform |
迁移 case
我们 30 个 Airflow DAG,三类:
- 简单 ETL(SQL → SQL):保留 Airflow(operator 现成,省事)
- 复杂 Python pipeline(动态 logic):迁 Prefect
- ML 训练 pipeline:迁 Prefect(model artifact + dynamic 强)
混合架构:Airflow 跑标准 ETL,Prefect 跑 dynamic / Pythonic 流程。
与 dbt 集成
两者都能跑 dbt:
# Prefect
from prefect_dbt.cli.commands import DbtCoreOperation
@flow
def dbt_flow():
DbtCoreOperation(commands=['dbt run --select tag:hourly']).run()
# Airflow
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
# 或者 BashOperator('dbt run ...')
差不多。Prefect block 概念让 dbt cli 复用简单。
API trigger
# Prefect:HTTP trigger flow run
from prefect.client.orchestration import get_client
async with get_client() as client:
await client.create_flow_run_from_deployment(
deployment_id='abc',
parameters={'date': '2025-01-01'},
)
外部系统(webhook / event)触发简单。Airflow 也有 REST API 但繁琐。
监控 / 告警
两者都有 UI + email / Slack 通知 + Prometheus metrics。
Prefect Cloud 免费 tier 告警直接配。
Airflow self-host 要自己拼 alertmanager。
选型决策
- 大企业 + 复杂 ETL + 团队熟 Airflow → Airflow
- 新项目 + Python pipeline → Prefect
- 数据 platform 团队 + asset 思维 → Dagster
- 简单 cron + 几个 job → 别上编排框架,systemd timer + GitHub
Actions 够
踩过的坑(迁移)
-
Airflow XCom → Prefect return value:XCom 传数据有限(< 48KB
metadata)。Prefect 直接 return Python object,但 cluster 间要序
列化 → 用 storage block。 -
schedule timezone:Airflow
start_dateUTC;Prefect 默认 UTC
但 cron string 解释要明确。 -
retries 默认 0:忘配 → 失败不重试。生产 retries=3 + backoff
默认。 -
flow concurrency limit:同 flow 多 run 并行 → DB 锁。
concurrency_limit控制。 -
Prefect 3 升级:从 2.x 迁 3.x 有 breaking change。读 migration
guide。
登录后参与评论。