起因
应用要异步处理 / 解耦 / 削峰,需要消息中间件。
候选:
- Kafka:日志型,高吞吐,持久化
- RabbitMQ:经典 broker,AMQP,灵活路由
- NATS / NATS JetStream:现代轻量,Go 写
- SQS / Pub/Sub:云托管
下面对比关键差异 + 各自适合的场景。
Kafka
[Producer] → topic (partition × N) → [Consumer Group]
↓ 持久化到 disk
(broker × N,replicated)
- 消息是 append-only log,consumer 维护 offset
- partition 是并行度(一个 partition 同时一个 consumer instance)
- replication 跨 broker → HA
- 默认保留消息几天到几周(retention 配置)
用法
# Producer (confluent-kafka)
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'kafka:9092'})
p.produce('orders', key='user-1', value='{"action": "buy"}')
p.flush()
# Consumer
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest',
})
c.subscribe(['orders'])
while True:
msg = c.poll(1.0)
if msg:
print(msg.value())
c.commit(msg)
优势
- 极高吞吐(百万 msg/s/broker)
- 重放历史(offset 任意 reset)
- 分布式天生 HA
- 流处理生态强(Kafka Streams / Flink / ksqlDB)
劣势
- 操作复杂(broker + zookeeper / KRaft + partition 设计)
- 资源消耗大(broker JVM 4 GB+)
- 不擅长 priority / per-message TTL / 复杂路由
- 消费速度受 partition 数限制
RabbitMQ
[Producer] → [Exchange] → routing → [Queue × N] → [Consumer]
- 经典 broker 模型(push-based)
- AMQP 协议,丰富 routing(direct / fanout / topic / headers)
- 消息消费后默认删除(除非 stream queue)
- 单 broker 几万 msg/s
用法
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()
channel.queue_declare(queue='orders', durable=True)
# Producer
channel.basic_publish(exchange='', routing_key='orders', body='hello',
properties=pika.BasicProperties(delivery_mode=2)) # 持久化
# Consumer
def callback(ch, method, properties, body):
print(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()
优势
- 灵活路由(topic exchange
order.*.created) - per-message TTL / priority
- 死信队列原生
- 部署简单(单 Erlang 进程)
- 消费 ack/nack / retry 模型完善
劣势
- 吞吐 < Kafka
- 不擅长 replay(消费后消息没了)
- HA cluster 配置棘手(mirror / quorum queue)
- 老版本运维痛点多
NATS / JetStream
[Publisher] → subject → [Subscriber]
(JetStream 模式可持久化)
- Go 写,极轻量(单 binary 30 MB)
- subject-based(类似 Kafka topic 但更灵活)
- core NATS:fire-and-forget(极快)
- JetStream:持久化 + replication(Kafka 类似 semantics)
用法
import asyncio
import nats
async def main():
nc = await nats.connect('nats://localhost:4222')
js = nc.jetstream()
await js.add_stream(name='ORDERS', subjects=['orders.>'])
# publish
await js.publish('orders.created', b'hello')
# consume
sub = await js.subscribe('orders.>', durable='processor')
async for msg in sub.messages:
print(msg.data)
await msg.ack()
asyncio.run(main())
优势
- 极轻量 + 易部署(单 binary,跨平台)
- 性能好(百万 msg/s core,几十万 jetstream)
- subject wildcard 路由
- 现代设计(gRPC 风格)
劣势
- 生态比 Kafka / RMQ 小
- 工具链 / 监控比较新
- 跨 cluster 复制弱(不像 Kafka MirrorMaker 成熟)
横向对比
| Kafka | RabbitMQ | NATS JetStream | SQS | |
|---|---|---|---|---|
| 吞吐 | 极高 | 中 | 高 | 中 |
| 持久化 | 默认 | 可选 | 可选 | 默认 |
| 路由 | partition+key | 丰富 (AMQP) | subject wildcard | queue |
| 顺序 | partition 内 | queue 内 | stream 内 | FIFO queue 内 |
| Replay | ✅ | 弱 | ✅ | ❌ |
| HA | 强(replication) | 中(cluster) | 强 | 托管 |
| 部署 | 复杂 | 简单 | 极简 | 0 |
| 资源 | 大 | 中 | 小 | 0 |
| 适合 | 日志 / 流 / 大规模 | 任务队列 / RPC | 现代云 / 服务网格 | 简单异步 |
推荐
- 任务队列(celery 类) → RabbitMQ 或 Redis Streams
- 日志聚合 / event sourcing → Kafka
- 微服务消息总线 + 现代云原生 → NATS
- 简单异步 + 不想运维 → SQS / Pub/Sub
- 数据 pipeline ETL → Kafka
- 极致吞吐(百万 QPS) → Kafka
我的实际选择
中小项目(< 100k msg/s):
- Celery + Redis:Django 任务队列首选,0 引入
- NATS JetStream:微服务通信,轻量
大项目 / 数据 pipeline:
- Kafka:必备
真实 case 1:Celery → Kafka 迁移
老 Celery + Redis 跑 task queue。规模大了:
- 任务 50k/s 起 Redis 撑不住
- 失败任务重放难
- 监控弱
改 Kafka:
- 任务塞 topic
- consumer group 处理
- 失败 → 进 retry topic(带 delay)→ DLQ
- 重放 / 重处理一行命令
挑战:
- partition 设计(保证某 user 的 task 顺序)
- consumer rebalance 期间 short downtime
但 throughput + observability 大改善。
真实 case 2:RabbitMQ 路由
某客户系统:"订单创建" event 要通知:
- 库存(扣库存)
- 财务(开发票)
- 物流(准备发货)
- analytics(统计)
RabbitMQ topic exchange:
# producer
channel.basic_publish(exchange='order_events',
routing_key='order.created.us', body=...)
# 4 consumer 各自 bind
channel.queue_bind('inventory_queue', 'order_events', 'order.created.*')
channel.queue_bind('finance_queue', 'order_events', 'order.created.*')
channel.queue_bind('logistics_queue', 'order_events', 'order.created.us')
channel.queue_bind('analytics_queue', 'order_events', 'order.*')
灵活订阅,新 consumer 加入无需 producer 改动。
Kafka 也能做但要 topic 设计多副本 + 自管 routing。
与 outbox pattern 配
应用先写 DB + outbox 表 → background job 读 outbox → 发到 MQ。
保证 DB 写成功 + MQ 发出一致(事务安全)。
debezium 等 CDC 工具自动监听 PG WAL → Kafka,省手写。
不要为了用而用
很多场景:
- 单 DB + 简单异步:cron job + DB queue 表
- 微服务通信:HTTP / gRPC(不必每次 async)
- 通知:直接 webhook
MQ 增加运维 + 复杂度。确认真的需要 async / decouple / 高吞吐 / replay
再上。
踩过的坑
-
Kafka partition 数选错:partition 是并行度上限。设 4,consumer
增到 8 后 4 个 idle。partition 加容易减难,初始就设大些(如 32+)。 -
RabbitMQ 队列 unbounded:consumer 慢 + producer 快 → 队列长几
亿条 → broker OOM。设x-max-length或x-message-ttl限。 -
NATS JetStream stream 配错:retention 设短 / max bytes 小 →
消息被驱逐。production 前 stress test。 -
at-least-once 重复处理:所有 MQ 都是 at-least-once。consumer
必须幂等(用 message id 去重)。 -
monitoring 缺:MQ 死了应用一段时间没察觉。最少监控 lag /
queue size / consumer alive。
登录后参与评论。