Kafka vs RabbitMQ vs NATS:消息中间件选哪个

起因

应用要异步处理 / 解耦 / 削峰,需要消息中间件。
候选:

  • Kafka:日志型,高吞吐,持久化
  • RabbitMQ:经典 broker,AMQP,灵活路由
  • NATS / NATS JetStream:现代轻量,Go 写
  • SQS / Pub/Sub:云托管

下面对比关键差异 + 各自适合的场景。

Kafka

[Producer] → topic (partition × N) → [Consumer Group]
                    ↓ 持久化到 disk
              (broker × N,replicated)
  • 消息是 append-only log,consumer 维护 offset
  • partition 是并行度(一个 partition 同时一个 consumer instance)
  • replication 跨 broker → HA
  • 默认保留消息几天到几周(retention 配置)

用法

# Producer (confluent-kafka)
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'kafka:9092'})
p.produce('orders', key='user-1', value='{"action": "buy"}')
p.flush()

# Consumer
from confluent_kafka import Consumer
c = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'order-processors',
    'auto.offset.reset': 'earliest',
})
c.subscribe(['orders'])
while True:
    msg = c.poll(1.0)
    if msg:
        print(msg.value())
        c.commit(msg)

优势

  • 极高吞吐(百万 msg/s/broker)
  • 重放历史(offset 任意 reset)
  • 分布式天生 HA
  • 流处理生态强(Kafka Streams / Flink / ksqlDB)

劣势

  • 操作复杂(broker + zookeeper / KRaft + partition 设计)
  • 资源消耗大(broker JVM 4 GB+)
  • 不擅长 priority / per-message TTL / 复杂路由
  • 消费速度受 partition 数限制

RabbitMQ

[Producer] → [Exchange] → routing → [Queue × N] → [Consumer]
  • 经典 broker 模型(push-based)
  • AMQP 协议,丰富 routing(direct / fanout / topic / headers)
  • 消息消费后默认删除(除非 stream queue)
  • 单 broker 几万 msg/s

用法

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()

channel.queue_declare(queue='orders', durable=True)

# Producer
channel.basic_publish(exchange='', routing_key='orders', body='hello',
    properties=pika.BasicProperties(delivery_mode=2))      # 持久化

# Consumer
def callback(ch, method, properties, body):
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()

优势

  • 灵活路由(topic exchange order.*.created
  • per-message TTL / priority
  • 死信队列原生
  • 部署简单(单 Erlang 进程)
  • 消费 ack/nack / retry 模型完善

劣势

  • 吞吐 < Kafka
  • 不擅长 replay(消费后消息没了)
  • HA cluster 配置棘手(mirror / quorum queue)
  • 老版本运维痛点多

NATS / JetStream

[Publisher] → subject → [Subscriber]
              (JetStream 模式可持久化)
  • Go 写,极轻量(单 binary 30 MB)
  • subject-based(类似 Kafka topic 但更灵活)
  • core NATS:fire-and-forget(极快)
  • JetStream:持久化 + replication(Kafka 类似 semantics)

用法

import asyncio
import nats

async def main():
    nc = await nats.connect('nats://localhost:4222')
    js = nc.jetstream()

    await js.add_stream(name='ORDERS', subjects=['orders.>'])

    # publish
    await js.publish('orders.created', b'hello')

    # consume
    sub = await js.subscribe('orders.>', durable='processor')
    async for msg in sub.messages:
        print(msg.data)
        await msg.ack()

asyncio.run(main())

优势

  • 极轻量 + 易部署(单 binary,跨平台)
  • 性能好(百万 msg/s core,几十万 jetstream)
  • subject wildcard 路由
  • 现代设计(gRPC 风格)

劣势

  • 生态比 Kafka / RMQ 小
  • 工具链 / 监控比较新
  • 跨 cluster 复制弱(不像 Kafka MirrorMaker 成熟)

横向对比

Kafka RabbitMQ NATS JetStream SQS
吞吐 极高
持久化 默认 可选 可选 默认
路由 partition+key 丰富 (AMQP) subject wildcard queue
顺序 partition 内 queue 内 stream 内 FIFO queue 内
Replay
HA 强(replication) 中(cluster) 托管
部署 复杂 简单 极简 0
资源 0
适合 日志 / 流 / 大规模 任务队列 / RPC 现代云 / 服务网格 简单异步

推荐

  • 任务队列(celery 类) → RabbitMQ 或 Redis Streams
  • 日志聚合 / event sourcing → Kafka
  • 微服务消息总线 + 现代云原生 → NATS
  • 简单异步 + 不想运维 → SQS / Pub/Sub
  • 数据 pipeline ETL → Kafka
  • 极致吞吐(百万 QPS) → Kafka

我的实际选择

中小项目(< 100k msg/s):

  • Celery + Redis:Django 任务队列首选,0 引入
  • NATS JetStream:微服务通信,轻量

大项目 / 数据 pipeline:

  • Kafka:必备

真实 case 1:Celery → Kafka 迁移

老 Celery + Redis 跑 task queue。规模大了:

  • 任务 50k/s 起 Redis 撑不住
  • 失败任务重放难
  • 监控弱

改 Kafka:

  • 任务塞 topic
  • consumer group 处理
  • 失败 → 进 retry topic(带 delay)→ DLQ
  • 重放 / 重处理一行命令

挑战:

  • partition 设计(保证某 user 的 task 顺序)
  • consumer rebalance 期间 short downtime

但 throughput + observability 大改善。

真实 case 2:RabbitMQ 路由

某客户系统:"订单创建" event 要通知:

  • 库存(扣库存)
  • 财务(开发票)
  • 物流(准备发货)
  • analytics(统计)

RabbitMQ topic exchange:

# producer
channel.basic_publish(exchange='order_events',
    routing_key='order.created.us', body=...)

# 4 consumer 各自 bind
channel.queue_bind('inventory_queue', 'order_events', 'order.created.*')
channel.queue_bind('finance_queue',   'order_events', 'order.created.*')
channel.queue_bind('logistics_queue', 'order_events', 'order.created.us')
channel.queue_bind('analytics_queue', 'order_events', 'order.*')

灵活订阅,新 consumer 加入无需 producer 改动。
Kafka 也能做但要 topic 设计多副本 + 自管 routing。

与 outbox pattern 配

应用先写 DB + outbox 表 → background job 读 outbox → 发到 MQ。
保证 DB 写成功 + MQ 发出一致(事务安全)。

debezium 等 CDC 工具自动监听 PG WAL → Kafka,省手写。

不要为了用而用

很多场景:

  • 单 DB + 简单异步:cron job + DB queue 表
  • 微服务通信:HTTP / gRPC(不必每次 async)
  • 通知:直接 webhook

MQ 增加运维 + 复杂度。确认真的需要 async / decouple / 高吞吐 / replay
再上。

踩过的坑

  1. Kafka partition 数选错:partition 是并行度上限。设 4,consumer
    增到 8 后 4 个 idle。partition 加容易减难,初始就设大些(如 32+)。

  2. RabbitMQ 队列 unbounded:consumer 慢 + producer 快 → 队列长几
    亿条 → broker OOM。设 x-max-lengthx-message-ttl 限。

  3. NATS JetStream stream 配错:retention 设短 / max bytes 小 →
    消息被驱逐。production 前 stress test。

  4. at-least-once 重复处理:所有 MQ 都是 at-least-once。consumer
    必须幂等(用 message id 去重)。

  5. monitoring 缺:MQ 死了应用一段时间没察觉。最少监控 lag /
    queue size / consumer alive。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。