Server-Sent Events (SSE):替代 WebSocket 做服务端推送

起因

要做一个"任务跑完通知前端"的功能。WebSocket 是常见方案但:

  • 需要保持长连接 + 心跳 + 重连逻辑
  • HTTP/2 + nginx 反代各种配置
  • 鉴权 / cookies / CORS 跟普通 HTTP 差异大

如果只需要服务端推、客户端不需要回传(80% 通知场景),
Server-Sent Events (SSE) 是更简单的方案:基于 HTTP 长连接,浏览器
原生支持 EventSource API,自动重连。

解决方案

1. 服务端:FastAPI

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio, json

app = FastAPI()

async def event_stream(user_id: str):
    while True:
        # 等下一条 event(可以从 Redis pubsub / DB poll / 内存队列拿)
        event = await pubsub.get_event_for_user(user_id)

        # SSE 格式:data: <json>\n\n
        yield f"data: {json.dumps(event)}\n\n"

@app.get('/events')
async def sse(user_id: str):
    return StreamingResponse(
        event_stream(user_id),
        media_type='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'X-Accel-Buffering': 'no',   # nginx 不要 buffer
        },
    )

SSE 是纯 HTTP,response body 持续 stream,每个 event 一段:

event: notification
data: {"type": "task_done", "id": 42}
id: 12345
retry: 5000

event: update
data: {"progress": 0.5}

字段:

  • data: 必填,可多行
  • event: 可选事件类型(前端按类型路由)
  • id: 可选,用于断线重连时 Last-Event-Id
  • retry: 客户端断线后重连等待时长(ms)
  • \n\n 结束一个 event

2. 前端:EventSource API

const es = new EventSource('/events?user_id=42')

es.onopen = () => console.log('connected')

es.onmessage = (e) => {
  // 默认 event(没 event: 字段时)
  const data = JSON.parse(e.data)
  console.log('event:', data)
}

es.addEventListener('notification', (e) => {
  const n = JSON.parse(e.data)
  showToast(n.message)
})

es.addEventListener('progress', (e) => {
  updateProgressBar(JSON.parse(e.data).percent)
})

es.onerror = (e) => {
  console.warn('SSE error, will reconnect automatically')
  // 浏览器自动 reconnect(用 retry 字段指定的间隔)
}

// 主动关闭
// es.close()

EventSource 浏览器自动:

  • 重连(断线后等 retry 时间)
  • 发送 Last-Event-Id header 让服务端从断点继续
  • HTTP/1.1 长连接

3. 实际后端实现:Redis pubsub 桥

import redis.asyncio as redis

r = redis.from_url('redis://localhost')

async def event_stream(user_id: str):
    pubsub = r.pubsub()
    await pubsub.subscribe(f'user:{user_id}')

    # heartbeat:每 30s 发一个注释行,防止反代认为连接死了断开
    async def heartbeat():
        while True:
            await asyncio.sleep(30)
            yield ': keepalive\n\n'

    try:
        async for msg in pubsub.listen():
            if msg['type'] != 'message':
                continue
            data = msg['data'].decode()
            yield f'data: {data}\n\n'
    finally:
        await pubsub.unsubscribe(f'user:{user_id}')
        await pubsub.close()

业务方在任何地方 publish 事件:

import redis
r = redis.from_url(...)

# 任务完成时
r.publish(f'user:{user_id}', json.dumps({'type': 'task_done', 'task_id': 42}))

4. 断线重连 + Last-Event-Id

服务端要支持"从某 id 之后开始重发":

from fastapi import Header

@app.get('/events')
async def sse(
    user_id: str,
    last_event_id: str | None = Header(None, alias='Last-Event-Id'),
):
    async def gen():
        # 1. 重连场景:发送 last_event_id 之后错过的事件
        if last_event_id:
            for e in get_events_after(user_id, last_event_id):
                yield f'id: {e.id}\ndata: {e.data}\n\n'

        # 2. 然后实时推送
        async for e in subscribe_user_events(user_id):
            yield f'id: {e.id}\ndata: {e.data}\n\n'

    return StreamingResponse(gen(), media_type='text/event-stream')

每个 event 给 id: → 浏览器记住 → 重连时自动 header 发 Last-Event-ID
保证不丢事件。

5. nginx 配置

SSE 需要不被 buffer + 不超时:

location /events {
    proxy_pass http://app;
    proxy_buffering off;          # 关键:不要 buffer
    proxy_cache off;
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding off;
    proxy_read_timeout 24h;       # 长连接不要超时
}

X-Accel-Buffering: no header(应用层发的)也起同样作用。

6. 鉴权

SSE 是 HTTP,cookie 自动带(同源)。跨域要 CORS:

@app.get('/events')
async def sse(...):
    return StreamingResponse(
        gen(),
        media_type='text/event-stream',
        headers={
            'Access-Control-Allow-Origin': 'https://web.example.com',
            'Access-Control-Allow-Credentials': 'true',
        },
    )
new EventSource('https://api.example.com/events', { withCredentials: true })

EventSource 不支持 custom headers(如 Authorization),鉴权要走 cookie
或 URL query。query 传 token 不安全(log 可能记),生产用 cookie session。

7. 多 worker 的事件分发

多进程 / 多机器 worker 时,event publish 给某 user 但 SSE 连接在别的
worker → 丢。Redis pubsub 解决:所有 worker 订阅,user 的连接在哪个
worker 就由那个 worker push。

或者用 Centrifugo / Mercure 等专用 SSE/WebSocket 后端,扩容简单。

与 WebSocket / polling 对比

SSE WebSocket Long polling
协议 HTTP(单向) 自定义 frame(双向) HTTP
浏览器原生 EventSource WebSocket XHR
自动重连 ❌(自己写)
鉴权 cookie 简单 复杂 简单
Header / Auth 受限 灵活 灵活
nginx 反代 简单 中(HTTP/1.1 upgrade) 简单
客户端到服务端 用普通 HTTP 内置 用 XHR

适合 SSE 场景:

  • 通知 / push 单向流(订单状态 / 后台任务进度 / 行情 ticker)
  • AI 聊天 / LLM streaming response
  • log tail / real-time dashboard

适合 WebSocket:

  • 双向频繁(聊天 / 多人编辑 / 游戏)
  • 二进制(视频 / 音频)

LLM streaming 是 SSE 的杀手应用

OpenAI / Anthropic / Ollama 等 LLM API 的 streaming 输出都用 SSE:

async def chat_stream(prompt: str):
    yield 'data: {"role": "assistant", "content": ""}\n\n'
    async for chunk in llm.stream(prompt):
        yield f'data: {json.dumps({"content": chunk})}\n\n'
    yield 'data: [DONE]\n\n'

前端用 EventSource 接收每个 token,UI 打字机效果显示。

效果

我们改 SSE 替代之前的 long-polling 通知:

  • 推送延迟从 1-5s(poll 间隔)→ < 100ms
  • 服务端请求数 -90%(不再每 5s 轮询)
  • 客户端代码 60 行 → 15 行(EventSource 包办)
  • 断线重连自动 + 无丢事件(Last-Event-Id)

踩过的坑

  1. nginx buffer 没关:response 在 nginx 端聚合,客户端永远收不到
    单个 event(全部等到 buffer 满才发)。proxy_buffering off 必加。

  2. 同源限制 6 个 SSE 连接:浏览器对每域名 HTTP/1.1 连接数限制
    ~6。开多个 SSE 把额度吃光,普通请求被排队。
    解决:HTTP/2(多路复用,无 6 限制)+ 同源访问。

  3. 多 tab 共享一个 SSE 用 SharedWorker:1 个 tab 1 个 SSE 连接,
    10 个 tab 客户端 10 倍开销 + 服务端 10 倍连接。SharedWorker 让多 tab
    共享一个连接。

  4. mobile Safari 后台断连:iOS 把后台 tab 的 SSE 断了不通知。
    visibilitychange 事件触发时检查 EventSource.readyState,
    不是 OPEN 就重连。

  5. 回避 EventSource 不支持自定义 header:要 JWT 鉴权又不想存 cookie:
    EventSourcePolyfill(npm 包)支持 fetch API + header。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。