消息队列两个最主流:RabbitMQ(AMQP)和 Kafka(log-based)。设计哲学
完全不同,选错会很疼。
一句话区分
- RabbitMQ:消息任务,消费后即删,适合"事件触发处理"
- Kafka:消息日志,消费后保留,适合"事件流回放 / 多消费者"
详细对比
| 维度 | RabbitMQ | Kafka |
|---|---|---|
| 数据模型 | queue + exchange | partitioned log |
| 消息 retention | 消费后删 | 时间 / 大小(默认 7 天) |
| 顺序保证 | 单 queue | 单 partition |
| 多消费者 | competing consumers(争抢) | consumer group(offset 独立) |
| 持久化 | 可选 | 总是 |
| 单 broker 吞吐 | 1k-50k msg/s | 100k-1M msg/s |
| 重试 / 死信 | 内置 DLX | 自己处理 |
| 路由 | 灵活(topic / fanout / direct / headers) | 简单(topic + partition key) |
| 协议 | AMQP / STOMP / MQTT | 自家二进制 |
| 复杂度 | 中 | 高 |
何时选 RabbitMQ
- "用户注册了,发欢迎邮件"——单次任务,一个消费者处理
- 任务队列(Celery 用的就是 RabbitMQ / Redis)
- RPC over message
- 中等吞吐(< 50k msg/s)
何时选 Kafka
- "用户做了 N 个动作,多个下游各自处理"——同一消息多消费者
- 事件流 / 实时分析(订单流 → 多个团队订阅)
- 日志聚合管道(应用日志 → Kafka → Logstash / ELK)
- 大吞吐(> 100k msg/s)
RabbitMQ 最小部署
docker run -d --hostname rabbit \
--name rabbit -p 5672:5672 -p 15672:15672 \
rabbitmq:4-management
Web 管理界面 http://localhost:15672 默认 guest/guest。
发 / 收(Python pika)
uv add pika
# producer.py
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='tasks', durable=True)
for i in range(100):
ch.basic_publish(
exchange='',
routing_key='tasks',
body=f'task {i}'.encode(),
properties=pika.BasicProperties(delivery_mode=2), # persistent
)
conn.close()
# consumer.py
import pika
def callback(ch, method, props, body):
print(f'got: {body!r}')
# 假装处理
ch.basic_ack(delivery_tag=method.delivery_tag)
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='tasks', durable=True)
ch.basic_qos(prefetch_count=1) # 一次只抓一条,处理完才下一条
ch.basic_consume(queue='tasks', on_message_callback=callback)
ch.start_consuming()
启动多个 consumer:消息在它们之间分配(competing consumers)。
死信队列(DLX)
任务失败重试几次后丢进 DLQ 人工检查:
ch.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'tasks-dlq',
'x-message-ttl': 30000, # 30 秒处理超时
'x-max-length': 100000, # 队列上限
},
)
ch.queue_declare(queue='tasks-dlq', durable=True)
Kafka 最小部署
Kafka 4.x KRaft 模式(不再需要 ZooKeeper):
# docker-compose.yml
services:
kafka:
image: apache/kafka:3.8.0
ports: [ "9092:9092", "9093:9093" ]
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
docker compose up -d
发 / 收(Python kafka-python)
uv add kafka-python
# producer.py
from kafka import KafkaProducer
import json
p = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode(),
acks='all', # 等所有副本确认(最安全)
)
for i in range(100):
p.send('events', value={'id': i, 'type': 'click'})
p.flush()
# consumer.py
from kafka import KafkaConsumer
c = KafkaConsumer(
'events',
bootstrap_servers='localhost:9092',
group_id='analytics',
auto_offset_reset='earliest', # 从头开始(如果是新 group)
value_deserializer=lambda v: json.loads(v.decode()),
enable_auto_commit=False,
)
for msg in c:
print(f'partition={msg.partition} offset={msg.offset} value={msg.value}')
# 处理...
c.commit() # 手动 commit offset,处理失败下次重读
多消费组
KafkaConsumer('events', group_id='analytics', ...)
KafkaConsumer('events', group_id='billing', ...)
KafkaConsumer('events', group_id='email', ...)
三个 group 独立从头消费同一 topic —— 这是 Kafka 的核心能力。
partition + 顺序
p.send('events', key=b'user-42', value=...)
# 同一 key 总是去同一 partition → 同一 user 的事件保证顺序
partition 数决定并行度上限:一个 partition 同时只能被同 group 内一个
consumer 消费。
运维差异
RabbitMQ:装在一台机器开心用,复杂集群 erlang 配很麻烦
Kafka:单机能跑但生产至少 3 broker;K8s 用 Strimzi operator 简化
监控
RabbitMQ:内置 Web UI + Prometheus exporter
Kafka:Kafka Manager / Conduktor / Kowl / kafka_exporter
替代 / 衍生
- NATS / JetStream:超轻量 message + streaming,Go 写的
- Pulsar:Kafka + RabbitMQ 优点合一,复杂度也合一
- Redis Streams:Redis 内置 stream,5.x+,比 Kafka 简单 50x
- AWS SQS / Google PubSub:托管,省运维
踩过的坑
- RabbitMQ 没设
prefetch_count→ 一个 consumer 抓走全部消息,
其它 consumer 闲着。设 prefetch=1 或较小数。 - Kafka offset 自动 commit + 处理失败:消息已经"算消费过了",重启后丢失。
生产用手动 commit。 - Kafka 一开始 partition 数定得太少:扩 partition 复杂(顺序 / rebalance)。
规划时按峰值并发 × 2-3 倍。 - 把 RabbitMQ 当 log 用(永久保留消息):会胀死 broker。换 Kafka。
登录后参与评论。