Celery + Redis 跑后台任务的最小可工作配置

Web 请求里同步发邮件 / 调外部 API / 跑重计算就是把响应时间往沟里推。
异步丢任务队列,Web 立刻返回,后台 worker 慢慢干。Celery + Redis 是
Python 生态最常用的组合。

依赖

uv add 'celery[redis]'

[redis] extra 装上 redis-py,作为 broker + result backend。

项目结构

myapp/
├── celery_app.py
├── tasks.py
└── ... (Django/Flask/FastAPI)

celery_app.py

from celery import Celery

app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    include=['myapp.tasks'],
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Shanghai',
    enable_utc=True,
    task_acks_late=True,           # worker 处理完才 ack;崩了任务会被重新派发
    task_reject_on_worker_lost=True,
    worker_prefetch_multiplier=1,  # 长任务必须设 1,否则一个 worker 卡住所有 prefetch
    task_time_limit=300,           # 硬超时 5 分钟(SIGKILL)
    task_soft_time_limit=270,      # 软超时(抛 SoftTimeLimitExceeded)
)

prefetch_multiplier=1 是长任务必须改的——默认 4,意思是每个 worker
预取 4 个任务,第一个任务慢的话另外 3 个被卡住等。

tasks.py

from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
import smtplib

@shared_task(
    bind=True,
    autoretry_for=(ConnectionError, smtplib.SMTPException),
    retry_backoff=True,        # 1s, 2s, 4s, 8s ...
    retry_backoff_max=600,
    max_retries=5,
)
def send_welcome_email(self, user_id, email):
    try:
        # ... 真的发邮件 ...
        with smtplib.SMTP('smtp.example.com', 587) as s:
            s.login(...)
            s.sendmail('from@', email, 'Welcome!')
    except SoftTimeLimitExceeded:
        # 清理资源
        raise

@shared_task
def heavy_calc(rows):
    return sum(complex_fn(r) for r in rows)

autoretry_for + retry_backoff 是失败自动指数退避重试的现代写法。
比手写 self.retry(countdown=...) 简洁。

Web 端触发

# Django / Flask / FastAPI 都一样
from myapp.tasks import send_welcome_email

def signup_view(request):
    user = User.objects.create(...)
    send_welcome_email.delay(user.id, user.email)
    return Response({'ok': True})

# .delay() 是 .apply_async() 的简写
# 想要更细控制:
send_welcome_email.apply_async(
    args=(user.id, user.email),
    countdown=60,           # 60 秒后再执行
    queue='email',          # 路由到指定队列
    expires=3600,           # 1 小时后没执行就放弃
)

启动 worker

uv run celery -A myapp.celery_app worker -l info -c 4
# -c 4: 并发数;CPU 密集任务建议 = 物理核心数;IO 密集可以更高

多队列分开:

# 一个 worker 只跑邮件队列
uv run celery -A myapp.celery_app worker -l info -Q email -c 2

# 另一个 worker 跑默认 + 计算队列
uv run celery -A myapp.celery_app worker -l info -Q celery,heavy -c 8

周期任务(Celery Beat)

# celery_app.py
app.conf.beat_schedule = {
    'cleanup-every-hour': {
        'task': 'myapp.tasks.cleanup_temp_files',
        'schedule': 3600.0,    # 秒;或 crontab(hour=3, minute=0)
    },
    'send-digest-monday-9am': {
        'task': 'myapp.tasks.send_weekly_digest',
        'schedule': crontab(hour=9, minute=0, day_of_week='mon'),
    },
}

启动 beat(单独进程,单实例):

uv run celery -A myapp.celery_app beat -l info

beat 是单实例的。多机部署只起一个 beat 进程,否则任务被触发多次。
redbeat 把 schedule 存 Redis 解决高可用。

监控:Flower

uv add flower
uv run celery -A myapp.celery_app flower --port=5555
# 浏览器打开 http://localhost:5555

可以看到所有 worker、队列长度、任务历史、失败重试。

生产部署清单

  1. broker / backend 用 不同 Redis DB(broker 流量大、result 流量小)
  2. 长时间不取的 result 自动过期:result_expires=3600
  3. worker 用 systemd unit 管,开 Restart=on-failure
  4. 监控队列长度(Prometheus celery exporter):超过阈值就扩 worker
  5. 别在 task 里用 Django ORM 然后忘记 connection.close()
    长跑 worker 会泄连接

systemd 单元

# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery worker
After=network.target redis.service

[Service]
Type=simple
User=app
Group=app
WorkingDirectory=/srv/myapp
EnvironmentFile=/srv/myapp/.env
ExecStart=/srv/myapp/.venv/bin/celery -A myapp.celery_app worker -l info -c 4
Restart=on-failure
RestartSec=5s
KillMode=mixed

[Install]
WantedBy=multi-user.target

踩过的坑

  • task 改了签名(加 / 删参数)后还有老格式的任务在队列里,新 worker 处理时
    会爆 TypeError。新增字段加默认值,删字段做兼容 wrapper。
  • task_serializer='pickle' —— 不要!pickle 允许执行任意代码,broker 一被攻破
    全完。一律 JSON。
  • worker 跑 Django ORM 任务时,每个任务后忘记 transaction.commit()
    数据库连接长期持有,下一个任务可能看到老数据。Django 4+ 默认 ATOMIC_REQUESTS
  • 显式 connection.close() 能解决。
  • Result backend 用 Redis 时,每个 task 结果默认占一个 key 直到 expire。
    几十万任务的 backlog 能把 Redis 撑爆。要么 task_ignore_result=True
    要么 result_expires 设短。
精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。