起因
需要异步消息队列处理:
- 用户上传图片 → 后台压缩
- 发邮件 / 短信
- 跑数据 export
Kafka 太重(要 ZooKeeper 时代 / 现在 KRaft 也复杂);
RabbitMQ 装着 OK 但对小项目仍偏重;
Redis Streams 和 NATS JetStream 是轻量替代。
下面对比两者 + 用法。
Redis Streams
Redis 5.0+ 内置的 stream 数据结构,类似 Kafka topic 但单实例。
装
Redis 7.x 默认带:
sudo apt install -y redis
redis-cli --version
Producer
import redis
r = redis.Redis()
# 向 stream "tasks" 加一条消息
r.xadd('tasks', {'type': 'send_email', 'to': '[email protected]', 'subject': 'hi'})
# 返回 message id:'1716543210000-0'
ID 是 timestamp-seq 自动生成。
Consumer Group
# 创建 consumer group(每个 worker 进程一份消费状态)
r.xgroup_create('tasks', 'email-workers', id='0', mkstream=True)
# Worker 拉消息
while True:
msgs = r.xreadgroup(
groupname='email-workers',
consumername='worker-1',
streams={'tasks': '>'}, # > 表示新消息
count=10,
block=5000, # 5s 没消息阻塞
)
for stream_name, messages in msgs:
for msg_id, data in messages:
try:
send_email(data[b'to'], data[b'subject'])
# ack:确认处理完
r.xack('tasks', 'email-workers', msg_id)
except Exception as e:
# 不 ack:消息留在 pending list,可被 reclaim 或 retry
log.exception('failed')
特性:
- at-least-once delivery(ack 确认;不 ack 会被 reclaim)
- 多 consumer 共享 group:消息分配给空闲 worker
- 持久化:写盘(开 AOF 时) + 全消息历史可重读
维护:MAXLEN 防爆
r.xadd('tasks', {...}, maxlen=100000, approximate=True)
# 超过 10w 条自动删最老的(近似,性能好)
否则 stream 无限增长。
Pending list(处理失败 / worker 挂)
# 看哪些消息被 worker-1 取了但没 ack
r.xpending_range('tasks', 'email-workers',
min='-', max='+', count=100, consumer='worker-1')
# Worker 挂掉超过 60s 没 ack 的消息 → reclaim 给 worker-2
r.xclaim('tasks', 'email-workers', 'worker-2',
min_idle_time=60000, message_ids=stuck_ids)
成熟 retry / dead letter 都要自己 wrap。
NATS JetStream
NATS 是 Go 写的轻量消息中间件,JetStream 是它的持久化层(v2.2+)。
装
curl -L https://github.com/nats-io/nats-server/releases/latest/download/nats-server-linux-amd64.tar.gz \
| sudo tar xz -C /usr/local/bin --strip-components=1 nats-server-*/nats-server
# 启动 + JetStream
nats-server -js
Docker:
docker run -d -p 4222:4222 nats:latest -js
装 client
uv add nats-py
Producer
import asyncio
import nats
async def main():
nc = await nats.connect('nats://localhost:4222')
js = nc.jetstream()
# 创建 stream(一次性)
await js.add_stream(
name='TASKS',
subjects=['tasks.>'], # 接受 tasks.X 各 subject
retention='workqueue', # work queue 模式(被消费就删)
max_msgs=100000,
)
# 发消息
await js.publish('tasks.email', b'{"to":"[email protected]"}')
await nc.close()
asyncio.run(main())
Consumer
async def worker():
nc = await nats.connect('nats://localhost:4222')
js = nc.jetstream()
# 创建 durable consumer
psub = await js.pull_subscribe(
subject='tasks.email',
durable='email-workers',
config=ConsumerConfig(
ack_policy='explicit',
max_deliver=3, # 最多重试 3 次
ack_wait=30, # 30s 内 ack
),
)
while True:
try:
msgs = await psub.fetch(batch=10, timeout=5)
for msg in msgs:
try:
await send_email(json.loads(msg.data))
await msg.ack()
except Exception:
await msg.nak() # 立刻重投
except nats.errors.TimeoutError:
continue
asyncio.run(worker())
特性:
- at-least-once / exactly-once (with dedup)
- 多 consumer 模式:work queue / fanout / replay
- subject 多级 routing:
tasks.>/orders.created.* - 集群 / 副本:3 节点 raft 内置
多副本 + HA
# nats.conf
jetstream {
store_dir: /var/lib/nats
}
cluster {
name: my-cluster
listen: 0.0.0.0:6222
routes: [
nats-route://node1:6222,
nats-route://node2:6222,
nats-route://node3:6222,
]
}
await js.add_stream(name='TASKS', subjects=['tasks.>'], num_replicas=3)
3 节点 raft 自动同步。一节点挂剩 2 节点继续工作。
Redis Streams 没有这个能力(要 sentinel + 第三方扩展实现)。
对比表
| Redis Streams | NATS JetStream | |
|---|---|---|
| 学习曲线 | 低(Redis 老熟人) | 中 |
| 集群 / HA | 弱(需 Sentinel) | 内置 raft |
| 吞吐 | 中(10k-50k msg/s) | 高(100k+ msg/s) |
| 持久化 | AOF / RDB | log 文件 |
| 多语言 client | 极多 | 较多 |
| 资源占用 | 中(Redis 进程) | 低(Go 单二进制) |
| Subject routing | 无(按 stream 名) | 多级 wildcard |
| 跨集群联邦 | 无 | leaf node |
| 复杂场景 | 简单 work queue | 复杂 routing + replay |
Redis Streams 适合
- 已经在用 Redis 不想加新组件
- 单实例 OK,QPS < 10k
- 简单 work queue / event log
- 团队熟 Redis
JetStream 适合
- 需要 HA / 多副本
- 多消费模式(同一消息多 group 各取)
- subject-based routing 复杂
- 高吞吐(> 10k msg/s)
- 不想运营 Kafka 但需要类似能力
跟 Kafka 对比
Kafka 适合超大规模(数百万 msg/s)+ 长期存储 + 多 consumer group
+ stream processing(Kafka Streams / Flink)。
JetStream / Redis Streams 都是 "Kafka 简化版",但 80% 场景够。
实战 case
我们小创业公司:
- 1k-10k msg/s
- 工作队列:图片处理 / 邮件 / API 调用 / data ETL
- 不想运营 Kafka
选 NATS JetStream:
- 单二进制部署(5 分钟搞定)
- 3 节点集群跑 6 个月 0 down
- subject routing 让"按业务类型分流"自然(
tasks.email.*、tasks.image.*) - Go client 性能极好(业务端 Go 服务直连)
对比之前用 Celery + RabbitMQ:
- 资源占用 1/3
- 吞吐 5x
- 维护成本明显降
但 Celery 的"task discovery / chain / chord / scheduled task" 生态
更丰富,Python-heavy 项目仍是合理选择。
与 Celery / Sidekiq / dramatiq 比
这些是"任务队列 framework"(带 retry / scheduling / monitoring),
底层 broker 是 Redis / RabbitMQ。
Redis Streams / JetStream 是底层 broker。
要"task 框架体验" 在它们上套一层(如 dramatiq + Redis)。
uv add dramatiq[redis]
import dramatiq
@dramatiq.actor(max_retries=3, queue_name='email')
def send_email(to, subject, body):
smtp.send(to, subject, body)
# 业务
send_email.send('[email protected]', 'Welcome', '...')
跟 Celery 用法类似但代码 1/3 + Redis Streams 后端。
监控
Redis
redis-cli xinfo stream tasks
# length / first-entry / last-entry / consumer group 详情
# 持续监控积压
watch -n 5 'redis-cli xlen tasks'
Prometheus redis_exporter 暴露 stream length / lag。
NATS
nats stream info TASKS
nats consumer info TASKS email-workers
# 看 pending / ack pending / lag
nats CLI + prometheus-nats-exporter。
踩过的坑
Redis Streams
-
没 MAXLEN → 无限增长:1 小时几十万消息后 Redis 内存爆。
永远xadd ... MAXLEN ~ 100000。 -
consumer name 重复:两个进程用
consumer-1→ pending list
混乱。每 worker 独立 name(worker-${hostname}-${pid})。 -
claim 没设 min_idle_time:抢走还没超时的消息 → 重复处理。
生产 30-60s。
JetStream
-
stream 创建后改 config:某些 config(subject / retention)改了
要 delete + recreate,丢消息。一开始想清楚。 -
pull vs push subscription:pull 简单可控;push 需要 client 一直
连。新手用 pull。 -
JetStream domain / account 隔离:多 tenant 时配错权限互通。
小项目用 default 就好。
总结
简单"事件流" → Redis Streams(已经在用 Redis 的话)。
"想要类 Kafka 但更轻" → NATS JetStream。
极致简单 + Python → Celery + Redis broker。
真大规模 → Kafka。
登录后参与评论。