起因
要做一个"任务跑完通知前端"的功能。WebSocket 是常见方案但:
- 需要保持长连接 + 心跳 + 重连逻辑
- HTTP/2 + nginx 反代各种配置
- 鉴权 / cookies / CORS 跟普通 HTTP 差异大
如果只需要服务端推、客户端不需要回传(80% 通知场景),
Server-Sent Events (SSE) 是更简单的方案:基于 HTTP 长连接,浏览器
原生支持 EventSource API,自动重连。
解决方案
1. 服务端:FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio, json
app = FastAPI()
async def event_stream(user_id: str):
while True:
# 等下一条 event(可以从 Redis pubsub / DB poll / 内存队列拿)
event = await pubsub.get_event_for_user(user_id)
# SSE 格式:data: <json>\n\n
yield f"data: {json.dumps(event)}\n\n"
@app.get('/events')
async def sse(user_id: str):
return StreamingResponse(
event_stream(user_id),
media_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no', # nginx 不要 buffer
},
)
SSE 是纯 HTTP,response body 持续 stream,每个 event 一段:
event: notification
data: {"type": "task_done", "id": 42}
id: 12345
retry: 5000
event: update
data: {"progress": 0.5}
字段:
data:必填,可多行event:可选事件类型(前端按类型路由)id:可选,用于断线重连时 Last-Event-Idretry:客户端断线后重连等待时长(ms)- 双
\n\n结束一个 event
2. 前端:EventSource API
const es = new EventSource('/events?user_id=42')
es.onopen = () => console.log('connected')
es.onmessage = (e) => {
// 默认 event(没 event: 字段时)
const data = JSON.parse(e.data)
console.log('event:', data)
}
es.addEventListener('notification', (e) => {
const n = JSON.parse(e.data)
showToast(n.message)
})
es.addEventListener('progress', (e) => {
updateProgressBar(JSON.parse(e.data).percent)
})
es.onerror = (e) => {
console.warn('SSE error, will reconnect automatically')
// 浏览器自动 reconnect(用 retry 字段指定的间隔)
}
// 主动关闭
// es.close()
EventSource 浏览器自动:
- 重连(断线后等 retry 时间)
- 发送
Last-Event-Idheader 让服务端从断点继续 - HTTP/1.1 长连接
3. 实际后端实现:Redis pubsub 桥
import redis.asyncio as redis
r = redis.from_url('redis://localhost')
async def event_stream(user_id: str):
pubsub = r.pubsub()
await pubsub.subscribe(f'user:{user_id}')
# heartbeat:每 30s 发一个注释行,防止反代认为连接死了断开
async def heartbeat():
while True:
await asyncio.sleep(30)
yield ': keepalive\n\n'
try:
async for msg in pubsub.listen():
if msg['type'] != 'message':
continue
data = msg['data'].decode()
yield f'data: {data}\n\n'
finally:
await pubsub.unsubscribe(f'user:{user_id}')
await pubsub.close()
业务方在任何地方 publish 事件:
import redis
r = redis.from_url(...)
# 任务完成时
r.publish(f'user:{user_id}', json.dumps({'type': 'task_done', 'task_id': 42}))
4. 断线重连 + Last-Event-Id
服务端要支持"从某 id 之后开始重发":
from fastapi import Header
@app.get('/events')
async def sse(
user_id: str,
last_event_id: str | None = Header(None, alias='Last-Event-Id'),
):
async def gen():
# 1. 重连场景:发送 last_event_id 之后错过的事件
if last_event_id:
for e in get_events_after(user_id, last_event_id):
yield f'id: {e.id}\ndata: {e.data}\n\n'
# 2. 然后实时推送
async for e in subscribe_user_events(user_id):
yield f'id: {e.id}\ndata: {e.data}\n\n'
return StreamingResponse(gen(), media_type='text/event-stream')
每个 event 给 id: → 浏览器记住 → 重连时自动 header 发 Last-Event-ID。
保证不丢事件。
5. nginx 配置
SSE 需要不被 buffer + 不超时:
location /events {
proxy_pass http://app;
proxy_buffering off; # 关键:不要 buffer
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_read_timeout 24h; # 长连接不要超时
}
X-Accel-Buffering: no header(应用层发的)也起同样作用。
6. 鉴权
SSE 是 HTTP,cookie 自动带(同源)。跨域要 CORS:
@app.get('/events')
async def sse(...):
return StreamingResponse(
gen(),
media_type='text/event-stream',
headers={
'Access-Control-Allow-Origin': 'https://web.example.com',
'Access-Control-Allow-Credentials': 'true',
},
)
new EventSource('https://api.example.com/events', { withCredentials: true })
EventSource 不支持 custom headers(如 Authorization),鉴权要走 cookie
或 URL query。query 传 token 不安全(log 可能记),生产用 cookie session。
7. 多 worker 的事件分发
多进程 / 多机器 worker 时,event publish 给某 user 但 SSE 连接在别的
worker → 丢。Redis pubsub 解决:所有 worker 订阅,user 的连接在哪个
worker 就由那个 worker push。
或者用 Centrifugo / Mercure 等专用 SSE/WebSocket 后端,扩容简单。
与 WebSocket / polling 对比
| SSE | WebSocket | Long polling | |
|---|---|---|---|
| 协议 | HTTP(单向) | 自定义 frame(双向) | HTTP |
| 浏览器原生 | EventSource | WebSocket | XHR |
| 自动重连 | ✅ | ❌(自己写) | ❌ |
| 鉴权 | cookie 简单 | 复杂 | 简单 |
| Header / Auth | 受限 | 灵活 | 灵活 |
| nginx 反代 | 简单 | 中(HTTP/1.1 upgrade) | 简单 |
| 客户端到服务端 | 用普通 HTTP | 内置 | 用 XHR |
适合 SSE 场景:
- 通知 / push 单向流(订单状态 / 后台任务进度 / 行情 ticker)
- AI 聊天 / LLM streaming response
- log tail / real-time dashboard
适合 WebSocket:
- 双向频繁(聊天 / 多人编辑 / 游戏)
- 二进制(视频 / 音频)
LLM streaming 是 SSE 的杀手应用
OpenAI / Anthropic / Ollama 等 LLM API 的 streaming 输出都用 SSE:
async def chat_stream(prompt: str):
yield 'data: {"role": "assistant", "content": ""}\n\n'
async for chunk in llm.stream(prompt):
yield f'data: {json.dumps({"content": chunk})}\n\n'
yield 'data: [DONE]\n\n'
前端用 EventSource 接收每个 token,UI 打字机效果显示。
效果
我们改 SSE 替代之前的 long-polling 通知:
- 推送延迟从 1-5s(poll 间隔)→ < 100ms
- 服务端请求数 -90%(不再每 5s 轮询)
- 客户端代码 60 行 → 15 行(EventSource 包办)
- 断线重连自动 + 无丢事件(Last-Event-Id)
踩过的坑
-
nginx buffer 没关:response 在 nginx 端聚合,客户端永远收不到
单个 event(全部等到 buffer 满才发)。proxy_buffering off必加。 -
同源限制 6 个 SSE 连接:浏览器对每域名 HTTP/1.1 连接数限制
~6。开多个 SSE 把额度吃光,普通请求被排队。
解决:HTTP/2(多路复用,无 6 限制)+ 同源访问。 -
多 tab 共享一个 SSE 用 SharedWorker:1 个 tab 1 个 SSE 连接,
10 个 tab 客户端 10 倍开销 + 服务端 10 倍连接。SharedWorker 让多 tab
共享一个连接。 -
mobile Safari 后台断连:iOS 把后台 tab 的 SSE 断了不通知。
visibilitychange事件触发时检查 EventSource.readyState,
不是 OPEN 就重连。 -
回避 EventSource 不支持自定义 header:要 JWT 鉴权又不想存 cookie:
用EventSourcePolyfill(npm 包)支持 fetch API + header。
登录后参与评论。