NATS JetStream:1MB 二进制的"Kafka 平替"

起因

要做服务间异步消息:常用选择 Kafka / RabbitMQ / Redis Streams。
Kafka 要装 ZooKeeper 或 KRaft,集群配置复杂;RabbitMQ 单机够但
"事件流"语义弱;Redis Streams 简单但不算 first-class。

NATS 是 CNCF 项目,单二进制、< 20MB 内存、跨平台。
JetStream 是 NATS 内置的持久化层,把 NATS pub/sub 升级到 Kafka-like
能力:persistent stream / consumer / replay / 多副本。

5 分钟起服务

1. 装

# 二进制下载
curl -fsSL https://github.com/nats-io/nats-server/releases/latest/download/nats-server-v2.10.20-linux-amd64.tar.gz \
  | tar xz
sudo install nats-server-v2.10.20-linux-amd64/nats-server /usr/local/bin/

# 或 Docker
docker run -p 4222:4222 -p 8222:8222 \
  nats:latest -js -m 8222    # -js 开 JetStream;-m 开监控 endpoint

CLI 工具:

go install github.com/nats-io/natscli/nats@latest
nats account info

2. 创建 stream

nats stream add ORDERS \
  --subjects "orders.>" \
  --storage file \
  --retention limits \
  --max-msgs 1000000 \
  --max-age 30d \
  --discard old \
  --replicas 1

"orders.>" 通配符:所有以 orders. 开头的 subject 都进这个 stream
(orders.created / orders.shipped / orders.cancelled)。

3. 发消息

nats pub orders.created '{"id": 1, "amount": 99.5}'
nats pub orders.shipped '{"id": 1, "carrier": "fedex"}'

4. 创建 consumer + 消费

nats consumer add ORDERS analytics \
  --filter "orders.>" \
  --ack explicit \
  --deliver all \
  --replay instant
# Pull-based consumer

nats consumer next ORDERS analytics --count 10
# 拿 10 条

Python 客户端

uv add nats-py
import asyncio
import nats
from nats.js.api import StreamConfig, ConsumerConfig, RetentionPolicy

async def main():
    nc = await nats.connect('nats://localhost:4222')
    js = nc.jetstream()

    # 确保 stream 存在(幂等)
    await js.add_stream(name='ORDERS', subjects=['orders.>'],
                        retention=RetentionPolicy.LIMITS,
                        max_age=30 * 24 * 60 * 60)

    # 发
    await js.publish('orders.created', b'{"id": 1, "amount": 99.5}')

    # 收(pull-based subscriber)
    psub = await js.pull_subscribe('orders.>', 'analytics', stream='ORDERS')
    while True:
        msgs = await psub.fetch(10, timeout=5)
        for m in msgs:
            print(m.subject, m.data)
            await m.ack()

asyncio.run(main())

与 Kafka / RabbitMQ 对比

NATS JetStream Kafka RabbitMQ
二进制大小 20MB 100MB+ 50MB+
内存 < 50MB 几百 MB 几百 MB
集群安装 一行命令 复杂 中等
消息 retention
Stream 模式
Queue 模式 需要外部
性能 100k+ msg/s 1M+ msg/s 几十 k
跨地域副本 容易(NATS leaf) 复杂(MirrorMaker)
生态成熟度 中(快增) 极成熟 极成熟

适合 NATS:小到中规模、要求轻量、跨地域、自托管简单的场景。
Kafka 仍然适合:百万级吞吐 / 已有 Kafka 生态 / 大数据 pipeline。

几个 NATS 杀手 feature

1. Request-Reply (RPC)

# server
async def handler(msg):
    await msg.respond(b'pong')

await nc.subscribe('rpc.ping', cb=handler)

# client
response = await nc.request('rpc.ping', b'', timeout=1)
print(response.data)   # b'pong'

不需要 HTTP framework,直接通过 NATS 做 RPC。
比 gRPC 简单:无需 .proto,subject 即 route。

2. Key-Value Store

kv = await js.create_key_value(bucket='configs')

await kv.put('app.version', b'1.2.3')
entry = await kv.get('app.version')
print(entry.value)   # b'1.2.3'

# Watch changes
async for entry in kv.watch_all():
    print('changed:', entry.key, entry.value)

JetStream 上的小 K/V,replicated + watch 通知。
替代 etcd 的简单场景。

3. Object Store

obs = await js.create_object_store(bucket='photos')

await obs.put('logo.png', b'<png bytes>')
data = await obs.get('logo.png')

S3 替代品(小规模),数据在 stream 里。

4. Leaf node(跨地域 / 边缘)

总部跑 main NATS server,分公司跑 leaf node 连过来:

# leaf node config
leafnodes {
  remotes = [
    { url: "nats://hq.example.com:7422" }
  ]
}

leaf 上发的消息自动同步到总部。流量本地优先,节省跨地域带宽。

实战:订单事件总线

service: order-api          ↘
service: payment-svc        ↘  publish orders.* ↘
service: shipment-svc       ↗                    ↘
                                                  JetStream
                          ↗                       ↙
service: analytics  (subscribe orders.> as 'analytics')
service: email-svc  (subscribe orders.created as 'email-notifier')
service: webhook-svc (subscribe orders.> as 'webhook-publisher')

每个消费方独立 consumer,独立维护 offset。
order-api 一次 publish,5 个下游各自异步处理。

加新下游不影响现有:

# 新加 'audit-log' consumer
await js.add_consumer(stream='ORDERS', config=ConsumerConfig(
    durable_name='audit-log',
    filter_subject='orders.>',
    deliver_policy=DeliverPolicy.ALL,    # 从最早开始
))

deliver_policy=ALL 让新 consumer 从 stream 最早消息开始处理,
回放历史(Kafka 一样能力)。

监控

NATS 自带 :8222/varz /:8222/jsz 等 endpoint。Prometheus exporter:

docker run -p 7777:7777 \
  natsio/prometheus-nats-exporter:latest \
  -varz -connz -channelz -subz http://nats:8222

Grafana 用现成 dashboard ID 2279 / 2280。

高可用:cluster + replicas

3 node cluster:

nats-server -p 4222 -n n1 -cluster_listen 0.0.0.0:6222 -routes nats://n2:6222,nats://n3:6222 -js -sd /data/n1
nats-server -p 4223 -n n2 -cluster_listen 0.0.0.0:6222 -routes nats://n1:6222,nats://n3:6222 -js -sd /data/n2
nats-server -p 4224 -n n3 -cluster_listen 0.0.0.0:6222 -routes nats://n1:6222,nats://n2:6222 -js -sd /data/n3

Stream --replicas 3 → 3 副本(Raft 共识)。一台挂仍可用。

效果

我们一个中型 SaaS 把消息中间件从 Redis Streams 换 NATS JetStream:

  • 容器内存从 250MB(Redis 加事件订阅逻辑)→ 60MB(NATS 单进程)
  • 跨地域分公司间消息同步 native 支持(之前要自己写桥)
  • delivery semantics 更清楚(at-least-once 默认,exactly-once 配置开)
  • consumer 重连 / replay 等行为类 Kafka,无需自己处理

踩过的坑

  1. -js 没加忘了启 JetStream:以为 stream 创建失败,
    failed to find storage directory。第一次启动一定加 -js

  2. --max-msgs / --max-bytes 一定要设:默认 unlimited,
    磁盘吃满。

  3. consumer ack 超时配置:处理慢的 consumer 默认 30s ack 不及
    消息被 redelivered。--ack-wait 调大。

  4. 多语言客户端兼容性:Go / Python / Node / Java / Rust / C# 都有
    官方 client;version 一致性要注意(client 落后 server 可能少 feature)。

  5. TLS / auth:开放公网必须配 --tls + --auth + 用户密码或
    nkey。默认 anonymous + plain 安全为零。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。