Redis Streams vs NATS JetStream:轻量消息队列怎么选

起因

需要异步消息队列处理:

  • 用户上传图片 → 后台压缩
  • 发邮件 / 短信
  • 跑数据 export

Kafka 太重(要 ZooKeeper 时代 / 现在 KRaft 也复杂);
RabbitMQ 装着 OK 但对小项目仍偏重;
Redis Streams 和 NATS JetStream 是轻量替代。

下面对比两者 + 用法。

Redis Streams

Redis 5.0+ 内置的 stream 数据结构,类似 Kafka topic 但单实例。

Redis 7.x 默认带:

sudo apt install -y redis
redis-cli --version

Producer

import redis
r = redis.Redis()

# 向 stream "tasks" 加一条消息
r.xadd('tasks', {'type': 'send_email', 'to': '[email protected]', 'subject': 'hi'})
# 返回 message id:'1716543210000-0'

ID 是 timestamp-seq 自动生成。

Consumer Group

# 创建 consumer group(每个 worker 进程一份消费状态)
r.xgroup_create('tasks', 'email-workers', id='0', mkstream=True)

# Worker 拉消息
while True:
    msgs = r.xreadgroup(
        groupname='email-workers',
        consumername='worker-1',
        streams={'tasks': '>'},     # > 表示新消息
        count=10,
        block=5000,                  # 5s 没消息阻塞
    )
    for stream_name, messages in msgs:
        for msg_id, data in messages:
            try:
                send_email(data[b'to'], data[b'subject'])
                # ack:确认处理完
                r.xack('tasks', 'email-workers', msg_id)
            except Exception as e:
                # 不 ack:消息留在 pending list,可被 reclaim 或 retry
                log.exception('failed')

特性:

  • at-least-once delivery(ack 确认;不 ack 会被 reclaim)
  • 多 consumer 共享 group:消息分配给空闲 worker
  • 持久化:写盘(开 AOF 时) + 全消息历史可重读

维护:MAXLEN 防爆

r.xadd('tasks', {...}, maxlen=100000, approximate=True)
# 超过 10w 条自动删最老的(近似,性能好)

否则 stream 无限增长。

Pending list(处理失败 / worker 挂)

# 看哪些消息被 worker-1 取了但没 ack
r.xpending_range('tasks', 'email-workers',
                 min='-', max='+', count=100, consumer='worker-1')

# Worker 挂掉超过 60s 没 ack 的消息 → reclaim 给 worker-2
r.xclaim('tasks', 'email-workers', 'worker-2',
         min_idle_time=60000, message_ids=stuck_ids)

成熟 retry / dead letter 都要自己 wrap。

NATS JetStream

NATS 是 Go 写的轻量消息中间件,JetStream 是它的持久化层(v2.2+)。

curl -L https://github.com/nats-io/nats-server/releases/latest/download/nats-server-linux-amd64.tar.gz \
    | sudo tar xz -C /usr/local/bin --strip-components=1 nats-server-*/nats-server

# 启动 + JetStream
nats-server -js

Docker:

docker run -d -p 4222:4222 nats:latest -js

装 client

uv add nats-py

Producer

import asyncio
import nats

async def main():
    nc = await nats.connect('nats://localhost:4222')
    js = nc.jetstream()

    # 创建 stream(一次性)
    await js.add_stream(
        name='TASKS',
        subjects=['tasks.>'],         # 接受 tasks.X 各 subject
        retention='workqueue',         # work queue 模式(被消费就删)
        max_msgs=100000,
    )

    # 发消息
    await js.publish('tasks.email', b'{"to":"[email protected]"}')

    await nc.close()

asyncio.run(main())

Consumer

async def worker():
    nc = await nats.connect('nats://localhost:4222')
    js = nc.jetstream()

    # 创建 durable consumer
    psub = await js.pull_subscribe(
        subject='tasks.email',
        durable='email-workers',
        config=ConsumerConfig(
            ack_policy='explicit',
            max_deliver=3,           # 最多重试 3 次
            ack_wait=30,             # 30s 内 ack
        ),
    )

    while True:
        try:
            msgs = await psub.fetch(batch=10, timeout=5)
            for msg in msgs:
                try:
                    await send_email(json.loads(msg.data))
                    await msg.ack()
                except Exception:
                    await msg.nak()   # 立刻重投
        except nats.errors.TimeoutError:
            continue

asyncio.run(worker())

特性:

  • at-least-once / exactly-once (with dedup)
  • 多 consumer 模式:work queue / fanout / replay
  • subject 多级 routingtasks.> / orders.created.*
  • 集群 / 副本:3 节点 raft 内置

多副本 + HA

# nats.conf
jetstream {
  store_dir: /var/lib/nats
}
cluster {
  name: my-cluster
  listen: 0.0.0.0:6222
  routes: [
    nats-route://node1:6222,
    nats-route://node2:6222,
    nats-route://node3:6222,
  ]
}
await js.add_stream(name='TASKS', subjects=['tasks.>'], num_replicas=3)

3 节点 raft 自动同步。一节点挂剩 2 节点继续工作。
Redis Streams 没有这个能力(要 sentinel + 第三方扩展实现)。

对比表

Redis Streams NATS JetStream
学习曲线 低(Redis 老熟人)
集群 / HA 弱(需 Sentinel) 内置 raft
吞吐 中(10k-50k msg/s) 高(100k+ msg/s)
持久化 AOF / RDB log 文件
多语言 client 极多 较多
资源占用 中(Redis 进程) 低(Go 单二进制)
Subject routing 无(按 stream 名) 多级 wildcard
跨集群联邦 leaf node
复杂场景 简单 work queue 复杂 routing + replay

Redis Streams 适合

  • 已经在用 Redis 不想加新组件
  • 单实例 OK,QPS < 10k
  • 简单 work queue / event log
  • 团队熟 Redis

JetStream 适合

  • 需要 HA / 多副本
  • 多消费模式(同一消息多 group 各取)
  • subject-based routing 复杂
  • 高吞吐(> 10k msg/s)
  • 不想运营 Kafka 但需要类似能力

跟 Kafka 对比

Kafka 适合超大规模(数百万 msg/s)+ 长期存储 + 多 consumer group
+ stream processing(Kafka Streams / Flink)。

JetStream / Redis Streams 都是 "Kafka 简化版",但 80% 场景够。

实战 case

我们小创业公司:

  • 1k-10k msg/s
  • 工作队列:图片处理 / 邮件 / API 调用 / data ETL
  • 不想运营 Kafka

NATS JetStream

  • 单二进制部署(5 分钟搞定)
  • 3 节点集群跑 6 个月 0 down
  • subject routing 让"按业务类型分流"自然(tasks.email.*tasks.image.*
  • Go client 性能极好(业务端 Go 服务直连)

对比之前用 Celery + RabbitMQ:

  • 资源占用 1/3
  • 吞吐 5x
  • 维护成本明显降

但 Celery 的"task discovery / chain / chord / scheduled task" 生态
更丰富,Python-heavy 项目仍是合理选择。

与 Celery / Sidekiq / dramatiq 比

这些是"任务队列 framework"(带 retry / scheduling / monitoring),
底层 broker 是 Redis / RabbitMQ。

Redis Streams / JetStream 是底层 broker。
要"task 框架体验" 在它们上套一层(如 dramatiq + Redis)。

uv add dramatiq[redis]
import dramatiq

@dramatiq.actor(max_retries=3, queue_name='email')
def send_email(to, subject, body):
    smtp.send(to, subject, body)

# 业务
send_email.send('[email protected]', 'Welcome', '...')

跟 Celery 用法类似但代码 1/3 + Redis Streams 后端。

监控

Redis

redis-cli xinfo stream tasks
# length / first-entry / last-entry / consumer group 详情

# 持续监控积压
watch -n 5 'redis-cli xlen tasks'

Prometheus redis_exporter 暴露 stream length / lag。

NATS

nats stream info TASKS
nats consumer info TASKS email-workers
# 看 pending / ack pending / lag

nats CLI + prometheus-nats-exporter。

踩过的坑

Redis Streams

  1. 没 MAXLEN → 无限增长:1 小时几十万消息后 Redis 内存爆。
    永远 xadd ... MAXLEN ~ 100000

  2. consumer name 重复:两个进程用 consumer-1 → pending list
    混乱。每 worker 独立 name(worker-${hostname}-${pid})。

  3. claim 没设 min_idle_time:抢走还没超时的消息 → 重复处理。
    生产 30-60s。

JetStream

  1. stream 创建后改 config:某些 config(subject / retention)改了
    要 delete + recreate,丢消息。一开始想清楚。

  2. pull vs push subscription:pull 简单可控;push 需要 client 一直
    连。新手用 pull。

  3. JetStream domain / account 隔离:多 tenant 时配错权限互通。
    小项目用 default 就好。

总结

简单"事件流" → Redis Streams(已经在用 Redis 的话)。
"想要类 Kafka 但更轻" → NATS JetStream。
极致简单 + Python → Celery + Redis broker。
真大规模 → Kafka。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。