Web 请求里同步发邮件 / 调外部 API / 跑重计算就是把响应时间往沟里推。
异步丢任务队列,Web 立刻返回,后台 worker 慢慢干。Celery + Redis 是
Python 生态最常用的组合。
依赖
uv add 'celery[redis]'
[redis] extra 装上 redis-py,作为 broker + result backend。
项目结构
myapp/
├── celery_app.py
├── tasks.py
└── ... (Django/Flask/FastAPI)
celery_app.py
from celery import Celery
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['myapp.tasks'],
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=True,
task_acks_late=True, # worker 处理完才 ack;崩了任务会被重新派发
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # 长任务必须设 1,否则一个 worker 卡住所有 prefetch
task_time_limit=300, # 硬超时 5 分钟(SIGKILL)
task_soft_time_limit=270, # 软超时(抛 SoftTimeLimitExceeded)
)
prefetch_multiplier=1 是长任务必须改的——默认 4,意思是每个 worker
预取 4 个任务,第一个任务慢的话另外 3 个被卡住等。
tasks.py
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
import smtplib
@shared_task(
bind=True,
autoretry_for=(ConnectionError, smtplib.SMTPException),
retry_backoff=True, # 1s, 2s, 4s, 8s ...
retry_backoff_max=600,
max_retries=5,
)
def send_welcome_email(self, user_id, email):
try:
# ... 真的发邮件 ...
with smtplib.SMTP('smtp.example.com', 587) as s:
s.login(...)
s.sendmail('from@', email, 'Welcome!')
except SoftTimeLimitExceeded:
# 清理资源
raise
@shared_task
def heavy_calc(rows):
return sum(complex_fn(r) for r in rows)
autoretry_for + retry_backoff 是失败自动指数退避重试的现代写法。
比手写 self.retry(countdown=...) 简洁。
Web 端触发
# Django / Flask / FastAPI 都一样
from myapp.tasks import send_welcome_email
def signup_view(request):
user = User.objects.create(...)
send_welcome_email.delay(user.id, user.email)
return Response({'ok': True})
# .delay() 是 .apply_async() 的简写
# 想要更细控制:
send_welcome_email.apply_async(
args=(user.id, user.email),
countdown=60, # 60 秒后再执行
queue='email', # 路由到指定队列
expires=3600, # 1 小时后没执行就放弃
)
启动 worker
uv run celery -A myapp.celery_app worker -l info -c 4
# -c 4: 并发数;CPU 密集任务建议 = 物理核心数;IO 密集可以更高
多队列分开:
# 一个 worker 只跑邮件队列
uv run celery -A myapp.celery_app worker -l info -Q email -c 2
# 另一个 worker 跑默认 + 计算队列
uv run celery -A myapp.celery_app worker -l info -Q celery,heavy -c 8
周期任务(Celery Beat)
# celery_app.py
app.conf.beat_schedule = {
'cleanup-every-hour': {
'task': 'myapp.tasks.cleanup_temp_files',
'schedule': 3600.0, # 秒;或 crontab(hour=3, minute=0)
},
'send-digest-monday-9am': {
'task': 'myapp.tasks.send_weekly_digest',
'schedule': crontab(hour=9, minute=0, day_of_week='mon'),
},
}
启动 beat(单独进程,单实例):
uv run celery -A myapp.celery_app beat -l info
beat 是单实例的。多机部署只起一个 beat 进程,否则任务被触发多次。
用 redbeat 把 schedule 存 Redis 解决高可用。
监控:Flower
uv add flower
uv run celery -A myapp.celery_app flower --port=5555
# 浏览器打开 http://localhost:5555
可以看到所有 worker、队列长度、任务历史、失败重试。
生产部署清单
- broker / backend 用 不同 Redis DB(broker 流量大、result 流量小)
- 长时间不取的 result 自动过期:
result_expires=3600 - worker 用 systemd unit 管,开
Restart=on-failure - 监控队列长度(Prometheus celery exporter):超过阈值就扩 worker
- 别在 task 里用 Django ORM 然后忘记
connection.close():
长跑 worker 会泄连接
systemd 单元
# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery worker
After=network.target redis.service
[Service]
Type=simple
User=app
Group=app
WorkingDirectory=/srv/myapp
EnvironmentFile=/srv/myapp/.env
ExecStart=/srv/myapp/.venv/bin/celery -A myapp.celery_app worker -l info -c 4
Restart=on-failure
RestartSec=5s
KillMode=mixed
[Install]
WantedBy=multi-user.target
踩过的坑
- task 改了签名(加 / 删参数)后还有老格式的任务在队列里,新 worker 处理时
会爆 TypeError。新增字段加默认值,删字段做兼容 wrapper。 task_serializer='pickle'—— 不要!pickle 允许执行任意代码,broker 一被攻破
全完。一律 JSON。- worker 跑 Django ORM 任务时,每个任务后忘记
transaction.commit(),
数据库连接长期持有,下一个任务可能看到老数据。Django 4+ 默认 ATOMIC_REQUESTS - 显式
connection.close()能解决。 - Result backend 用 Redis 时,每个 task 结果默认占一个 key 直到 expire。
几十万任务的 backlog 能把 Redis 撑爆。要么task_ignore_result=True,
要么result_expires设短。
登录后参与评论。