RabbitMQ vs Kafka 选型 + 各自最小可用部署

消息队列两个最主流:RabbitMQ(AMQP)和 Kafka(log-based)。设计哲学
完全不同,选错会很疼。

一句话区分

  • RabbitMQ:消息任务,消费后即删,适合"事件触发处理"
  • Kafka:消息日志,消费后保留,适合"事件流回放 / 多消费者"

详细对比

维度 RabbitMQ Kafka
数据模型 queue + exchange partitioned log
消息 retention 消费后删 时间 / 大小(默认 7 天)
顺序保证 单 queue 单 partition
多消费者 competing consumers(争抢) consumer group(offset 独立)
持久化 可选 总是
单 broker 吞吐 1k-50k msg/s 100k-1M msg/s
重试 / 死信 内置 DLX 自己处理
路由 灵活(topic / fanout / direct / headers) 简单(topic + partition key)
协议 AMQP / STOMP / MQTT 自家二进制
复杂度

何时选 RabbitMQ

  • "用户注册了,发欢迎邮件"——单次任务,一个消费者处理
  • 任务队列(Celery 用的就是 RabbitMQ / Redis)
  • RPC over message
  • 中等吞吐(< 50k msg/s)

何时选 Kafka

  • "用户做了 N 个动作,多个下游各自处理"——同一消息多消费者
  • 事件流 / 实时分析(订单流 → 多个团队订阅)
  • 日志聚合管道(应用日志 → Kafka → Logstash / ELK)
  • 大吞吐(> 100k msg/s)

RabbitMQ 最小部署

docker run -d --hostname rabbit \
  --name rabbit -p 5672:5672 -p 15672:15672 \
  rabbitmq:4-management

Web 管理界面 http://localhost:15672 默认 guest/guest。

发 / 收(Python pika)

uv add pika
# producer.py
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='tasks', durable=True)

for i in range(100):
    ch.basic_publish(
        exchange='',
        routing_key='tasks',
        body=f'task {i}'.encode(),
        properties=pika.BasicProperties(delivery_mode=2),   # persistent
    )

conn.close()
# consumer.py
import pika

def callback(ch, method, props, body):
    print(f'got: {body!r}')
    # 假装处理
    ch.basic_ack(delivery_tag=method.delivery_tag)

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='tasks', durable=True)
ch.basic_qos(prefetch_count=1)        # 一次只抓一条,处理完才下一条
ch.basic_consume(queue='tasks', on_message_callback=callback)
ch.start_consuming()

启动多个 consumer:消息在它们之间分配(competing consumers)。

死信队列(DLX)

任务失败重试几次后丢进 DLQ 人工检查:

ch.queue_declare(
    queue='tasks',
    durable=True,
    arguments={
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': 'tasks-dlq',
        'x-message-ttl': 30000,            # 30 秒处理超时
        'x-max-length': 100000,            # 队列上限
    },
)
ch.queue_declare(queue='tasks-dlq', durable=True)

Kafka 最小部署

Kafka 4.x KRaft 模式(不再需要 ZooKeeper):

# docker-compose.yml
services:
  kafka:
    image: apache/kafka:3.8.0
    ports: [ "9092:9092", "9093:9093" ]
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
docker compose up -d

发 / 收(Python kafka-python)

uv add kafka-python
# producer.py
from kafka import KafkaProducer
import json

p = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode(),
    acks='all',   # 等所有副本确认(最安全)
)

for i in range(100):
    p.send('events', value={'id': i, 'type': 'click'})

p.flush()
# consumer.py
from kafka import KafkaConsumer

c = KafkaConsumer(
    'events',
    bootstrap_servers='localhost:9092',
    group_id='analytics',
    auto_offset_reset='earliest',   # 从头开始(如果是新 group)
    value_deserializer=lambda v: json.loads(v.decode()),
    enable_auto_commit=False,
)

for msg in c:
    print(f'partition={msg.partition} offset={msg.offset} value={msg.value}')
    # 处理...
    c.commit()   # 手动 commit offset,处理失败下次重读

多消费组

KafkaConsumer('events', group_id='analytics', ...)
KafkaConsumer('events', group_id='billing', ...)
KafkaConsumer('events', group_id='email', ...)

三个 group 独立从头消费同一 topic —— 这是 Kafka 的核心能力。

partition + 顺序

p.send('events', key=b'user-42', value=...)
# 同一 key 总是去同一 partition → 同一 user 的事件保证顺序

partition 数决定并行度上限:一个 partition 同时只能被同 group 内一个
consumer 消费。

运维差异

RabbitMQ:装在一台机器开心用,复杂集群 erlang 配很麻烦
Kafka:单机能跑但生产至少 3 broker;K8s 用 Strimzi operator 简化

监控

RabbitMQ:内置 Web UI + Prometheus exporter
Kafka:Kafka Manager / Conduktor / Kowl / kafka_exporter

替代 / 衍生

  • NATS / JetStream:超轻量 message + streaming,Go 写的
  • Pulsar:Kafka + RabbitMQ 优点合一,复杂度也合一
  • Redis Streams:Redis 内置 stream,5.x+,比 Kafka 简单 50x
  • AWS SQS / Google PubSub:托管,省运维

踩过的坑

  • RabbitMQ 没设 prefetch_count → 一个 consumer 抓走全部消息,
    其它 consumer 闲着。设 prefetch=1 或较小数。
  • Kafka offset 自动 commit + 处理失败:消息已经"算消费过了",重启后丢失。
    生产用手动 commit。
  • Kafka 一开始 partition 数定得太少:扩 partition 复杂(顺序 / rebalance)。
    规划时按峰值并发 × 2-3 倍。
  • 把 RabbitMQ 当 log 用(永久保留消息):会胀死 broker。换 Kafka。
精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。