起因
要做服务间异步消息:常用选择 Kafka / RabbitMQ / Redis Streams。
Kafka 要装 ZooKeeper 或 KRaft,集群配置复杂;RabbitMQ 单机够但
"事件流"语义弱;Redis Streams 简单但不算 first-class。
NATS 是 CNCF 项目,单二进制、< 20MB 内存、跨平台。
JetStream 是 NATS 内置的持久化层,把 NATS pub/sub 升级到 Kafka-like
能力:persistent stream / consumer / replay / 多副本。
5 分钟起服务
1. 装
# 二进制下载
curl -fsSL https://github.com/nats-io/nats-server/releases/latest/download/nats-server-v2.10.20-linux-amd64.tar.gz \
| tar xz
sudo install nats-server-v2.10.20-linux-amd64/nats-server /usr/local/bin/
# 或 Docker
docker run -p 4222:4222 -p 8222:8222 \
nats:latest -js -m 8222 # -js 开 JetStream;-m 开监控 endpoint
CLI 工具:
go install github.com/nats-io/natscli/nats@latest
nats account info
2. 创建 stream
nats stream add ORDERS \
--subjects "orders.>" \
--storage file \
--retention limits \
--max-msgs 1000000 \
--max-age 30d \
--discard old \
--replicas 1
"orders.>" 通配符:所有以 orders. 开头的 subject 都进这个 stream
(orders.created / orders.shipped / orders.cancelled)。
3. 发消息
nats pub orders.created '{"id": 1, "amount": 99.5}'
nats pub orders.shipped '{"id": 1, "carrier": "fedex"}'
4. 创建 consumer + 消费
nats consumer add ORDERS analytics \
--filter "orders.>" \
--ack explicit \
--deliver all \
--replay instant
# Pull-based consumer
nats consumer next ORDERS analytics --count 10
# 拿 10 条
Python 客户端
uv add nats-py
import asyncio
import nats
from nats.js.api import StreamConfig, ConsumerConfig, RetentionPolicy
async def main():
nc = await nats.connect('nats://localhost:4222')
js = nc.jetstream()
# 确保 stream 存在(幂等)
await js.add_stream(name='ORDERS', subjects=['orders.>'],
retention=RetentionPolicy.LIMITS,
max_age=30 * 24 * 60 * 60)
# 发
await js.publish('orders.created', b'{"id": 1, "amount": 99.5}')
# 收(pull-based subscriber)
psub = await js.pull_subscribe('orders.>', 'analytics', stream='ORDERS')
while True:
msgs = await psub.fetch(10, timeout=5)
for m in msgs:
print(m.subject, m.data)
await m.ack()
asyncio.run(main())
与 Kafka / RabbitMQ 对比
| NATS JetStream | Kafka | RabbitMQ | |
|---|---|---|---|
| 二进制大小 | 20MB | 100MB+ | 50MB+ |
| 内存 | < 50MB | 几百 MB | 几百 MB |
| 集群安装 | 一行命令 | 复杂 | 中等 |
| 消息 retention | ✅ | ✅ | 弱 |
| Stream 模式 | ✅ | ✅ | 弱 |
| Queue 模式 | ✅ | 需要外部 | ✅ |
| 性能 | 100k+ msg/s | 1M+ msg/s | 几十 k |
| 跨地域副本 | 容易(NATS leaf) | 复杂(MirrorMaker) | 中 |
| 生态成熟度 | 中(快增) | 极成熟 | 极成熟 |
适合 NATS:小到中规模、要求轻量、跨地域、自托管简单的场景。
Kafka 仍然适合:百万级吞吐 / 已有 Kafka 生态 / 大数据 pipeline。
几个 NATS 杀手 feature
1. Request-Reply (RPC)
# server
async def handler(msg):
await msg.respond(b'pong')
await nc.subscribe('rpc.ping', cb=handler)
# client
response = await nc.request('rpc.ping', b'', timeout=1)
print(response.data) # b'pong'
不需要 HTTP framework,直接通过 NATS 做 RPC。
比 gRPC 简单:无需 .proto,subject 即 route。
2. Key-Value Store
kv = await js.create_key_value(bucket='configs')
await kv.put('app.version', b'1.2.3')
entry = await kv.get('app.version')
print(entry.value) # b'1.2.3'
# Watch changes
async for entry in kv.watch_all():
print('changed:', entry.key, entry.value)
JetStream 上的小 K/V,replicated + watch 通知。
替代 etcd 的简单场景。
3. Object Store
obs = await js.create_object_store(bucket='photos')
await obs.put('logo.png', b'<png bytes>')
data = await obs.get('logo.png')
S3 替代品(小规模),数据在 stream 里。
4. Leaf node(跨地域 / 边缘)
总部跑 main NATS server,分公司跑 leaf node 连过来:
# leaf node config
leafnodes {
remotes = [
{ url: "nats://hq.example.com:7422" }
]
}
leaf 上发的消息自动同步到总部。流量本地优先,节省跨地域带宽。
实战:订单事件总线
service: order-api ↘
service: payment-svc ↘ publish orders.* ↘
service: shipment-svc ↗ ↘
JetStream
↗ ↙
service: analytics (subscribe orders.> as 'analytics')
service: email-svc (subscribe orders.created as 'email-notifier')
service: webhook-svc (subscribe orders.> as 'webhook-publisher')
每个消费方独立 consumer,独立维护 offset。
order-api 一次 publish,5 个下游各自异步处理。
加新下游不影响现有:
# 新加 'audit-log' consumer
await js.add_consumer(stream='ORDERS', config=ConsumerConfig(
durable_name='audit-log',
filter_subject='orders.>',
deliver_policy=DeliverPolicy.ALL, # 从最早开始
))
deliver_policy=ALL 让新 consumer 从 stream 最早消息开始处理,
回放历史(Kafka 一样能力)。
监控
NATS 自带 :8222/varz /:8222/jsz 等 endpoint。Prometheus exporter:
docker run -p 7777:7777 \
natsio/prometheus-nats-exporter:latest \
-varz -connz -channelz -subz http://nats:8222
Grafana 用现成 dashboard ID 2279 / 2280。
高可用:cluster + replicas
3 node cluster:
nats-server -p 4222 -n n1 -cluster_listen 0.0.0.0:6222 -routes nats://n2:6222,nats://n3:6222 -js -sd /data/n1
nats-server -p 4223 -n n2 -cluster_listen 0.0.0.0:6222 -routes nats://n1:6222,nats://n3:6222 -js -sd /data/n2
nats-server -p 4224 -n n3 -cluster_listen 0.0.0.0:6222 -routes nats://n1:6222,nats://n2:6222 -js -sd /data/n3
Stream --replicas 3 → 3 副本(Raft 共识)。一台挂仍可用。
效果
我们一个中型 SaaS 把消息中间件从 Redis Streams 换 NATS JetStream:
- 容器内存从 250MB(Redis 加事件订阅逻辑)→ 60MB(NATS 单进程)
- 跨地域分公司间消息同步 native 支持(之前要自己写桥)
- delivery semantics 更清楚(at-least-once 默认,exactly-once 配置开)
- consumer 重连 / replay 等行为类 Kafka,无需自己处理
踩过的坑
-
-js没加忘了启 JetStream:以为 stream 创建失败,
failed to find storage directory。第一次启动一定加-js。 -
--max-msgs/--max-bytes一定要设:默认 unlimited,
磁盘吃满。 -
consumer ack 超时配置:处理慢的 consumer 默认 30s ack 不及
消息被 redelivered。--ack-wait调大。 -
多语言客户端兼容性:Go / Python / Node / Java / Rust / C# 都有
官方 client;version 一致性要注意(client 落后 server 可能少 feature)。 -
TLS / auth:开放公网必须配
--tls+--auth+ 用户密码或
nkey。默认 anonymous + plain 安全为零。
登录后参与评论。