知识广场

按学科筛选:计算机科学 / 后端开发
清除筛选

«计算机科学 / 后端开发» 分类下共 35 篇帖子

Server-Sent Events (SSE):替代 WebSocket 做服务端推送

## 起因 要做一个"任务跑完通知前端"的功能。WebSocket 是常见方案但: - 需要保持长连接 + 心跳 + 重连逻辑 - HTTP/2 + nginx 反代各种配置 - 鉴权 / cookies / CORS 跟普通 HTTP 差异大 如果**只需要服务端推、客户端不需要回传**(80% 通知场景), Server-Sent Events (SSE) 是更简单的方案:基于 HTTP 长连接,浏览器 原生支持 EventSource API,自动重连。 ## 解决方案 ### 1. 服务端:FastAPI ```python from fastapi import FastAPI from fastapi.responses import StreamingResponse import asyncio, json app = FastAPI() async def event_stream(user_id: str): while True: # 等下一条 event(可以从 Redis pubsub / DB poll / 内存队列拿) event = await pubsub.get_event_for_user(user_id) # SSE 格式:data: <json>\n\n yield f"data: {json.dumps(event)}\n\n" @app.get('/events') async def sse(user_id: str): return StreamingResponse( event_stream(user_id), media_type='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no', # nginx 不要 buffer }, ) ``` SSE 是纯 HTTP,response body 持续 stream,每个 event 一段: ``` event: notification data: {"type": "task_done", "id": 42} id: 12345 retry: 5000 event: update data: {"progress": 0.5} ``` 字段: - `data:` 必填,可多行 - `event:` 可选事件类型(前端按类型路由) - `id:` 可选,用于断线重连时 Last-Event-Id - `retry:` 客户端断线后重连等待时长(ms) - 双 `\n\n` 结束一个 event ### 2. 前端:EventSource API ```js const es = new EventSource('/events?user_id=42') es.onopen = () => console.log('connected') es.onmessage = (e) => { // 默认 event(没 event: 字段时) const data = JSON.parse(e.data) console.log('event:', data) } es.addEventListener('notification', (e) => { const n = JSON.parse(e.data) showToast(n.message) }) es.addEventListener('progress', (e) => { updateProgressBar(JSON.parse(e.data).percent) }) es.onerror = (e) => { console.warn('SSE error, will reconnect automatically') // 浏览器自动 reconnect(用 retry 字段指定的间隔) } // 主动关闭 // es.close() ``` `EventSource` 浏览器自动: - 重连(断线后等 retry 时间) - 发送 `Last-Event-Id` header 让服务端从断点继续 - HTTP/1.1 长连接 ### 3. 实际后端实现:Redis pubsub 桥 ```python import redis.asyncio as redis r = redis.from_url('redis://localhost') async def event_stream(user_id: str): pubsub = r.pubsub() await pubsub.subscribe(f'user:{user_id}') # heartbeat:每 30s 发一个注释行,防止反代认为连接死了断开 async def heartbeat(): while True: await asyncio.sleep(30) yield ': keepalive\n\n' try: async for msg in pubsub.listen(): if msg['type'] != 'message': continue data = msg['data'].decode() yield f'data: {data}\n\n' finally: await pubsub.unsubscribe(f'user:{user_id}') await pubsub.close() ``` 业务方在任何地方 publish 事件: ```python import redis r = redis.from_url(...) # 任务完成时 r.publish(f'user:{user_id}', json.dumps({'type': 'task_done', 'task_id': 42})) ``` ### 4. 断线重连 + Last-Event-Id 服务端要支持"从某 id 之后开始重发": ```python from fastapi import Header @app.get('/events') async def sse( user_id: str, last_event_id: str | None = Header(None, alias='Last-Event-Id'), ): async def gen(): # 1. 重连场景:发送 last_event_id 之后错过的事件 if last_event_id: for e in get_events_after(user_id, last_event_id): yield f'id: {e.id}\ndata: {e.data}\n\n' # 2. 然后实时推送 async for e in subscribe_user_events(user_id): yield f'id: {e.id}\ndata: {e.data}\n\n' return StreamingResponse(gen(), media_type='text/event-stream') ``` 每个 event 给 `id:` → 浏览器记住 → 重连时自动 header 发 `Last-Event-ID`。 保证不丢事件。 ### 5. nginx 配置 SSE 需要不被 buffer + 不超时: ```nginx location /events { proxy_pass http://app; proxy_buffering off; # 关键:不要 buffer proxy_cache off; proxy_set_header Connection ''; proxy_http_version 1.1; chunked_transfer_encoding off; proxy_read_timeout 24h; # 长连接不要超时 } ``` `X-Accel-Buffering: no` header(应用层发的)也起同样作用。 ### 6. 鉴权 SSE 是 HTTP,cookie 自动带(同源)。跨域要 CORS: ```python @app.get('/events') async def sse(...): return StreamingResponse( gen(), media_type='text/event-stream', headers={ 'Access-Control-Allow-Origin': 'https://web.example.com', 'Access-Control-Allow-Credentials': 'true', }, ) ``` ```js new EventSource('https://api.example.com/events', { withCredentials: true }) ``` EventSource 不支持 custom headers(如 Authorization),鉴权要走 cookie 或 URL query。query 传 token 不安全(log 可能记),生产用 cookie session。 ### 7. 多 worker 的事件分发 多进程 / 多机器 worker 时,event publish 给某 user 但 SSE 连接在别的 worker → 丢。Redis pubsub 解决:所有 worker 订阅,user 的连接在哪个 worker 就由那个 worker push。 或者用 Centrifugo / Mercure 等专用 SSE/WebSocket 后端,扩容简单。 ## 与 WebSocket / polling 对比 | | SSE | WebSocket | Long polling | |---|---|---|---| | 协议 | HTTP(单向) | 自定义 frame(双向) | HTTP | | 浏览器原生 | EventSource | WebSocket | XHR | | 自动重连 | ✅ | ❌(自己写) | ❌ | | 鉴权 | cookie 简单 | 复杂 | 简单 | | Header / Auth | 受限 | 灵活 | 灵活 | | nginx 反代 | 简单 | 中(HTTP/1.1 upgrade) | 简单 | | 客户端到服务端 | 用普通 HTTP | 内置 | 用 XHR | 适合 SSE 场景: - 通知 / push 单向流(订单状态 / 后台任务进度 / 行情 ticker) - AI 聊天 / LLM streaming response - log tail / real-time dashboard 适合 WebSocket: - 双向频繁(聊天 / 多人编辑 / 游戏) - 二进制(视频 / 音频) ### LLM streaming 是 SSE 的杀手应用 OpenAI / Anthropic / Ollama 等 LLM API 的 streaming 输出都用 SSE: ```python async def chat_stream(prompt: str): yield 'data: {"role": "assistant", "content": ""}\n\n' async for chunk in llm.stream(prompt): yield f'data: {json.dumps({"content": chunk})}\n\n' yield 'data: [DONE]\n\n' ``` 前端用 EventSource 接收每个 token,UI 打字机效果显示。 ## 效果 我们改 SSE 替代之前的 long-polling 通知: - 推送延迟从 1-5s(poll 间隔)→ < 100ms - 服务端请求数 -90%(不再每 5s 轮询) - 客户端代码 60 行 → 15 行(EventSource 包办) - 断线重连自动 + 无丢事件(Last-Event-Id) ## 踩过的坑 1. **nginx buffer 没关**:response 在 nginx 端聚合,客户端永远收不到 单个 event(全部等到 buffer 满才发)。`proxy_buffering off` 必加。 2. **同源限制 6 个 SSE 连接**:浏览器对每域名 HTTP/1.1 连接数限制 ~6。开多个 SSE 把额度吃光,普通请求被排队。 解决:HTTP/2(多路复用,无 6 限制)+ 同源访问。 3. **多 tab 共享一个 SSE 用 SharedWorker**:1 个 tab 1 个 SSE 连接, 10 个 tab 客户端 10 倍开销 + 服务端 10 倍连接。SharedWorker 让多 tab 共享一个连接。 4. **mobile Safari 后台断连**:iOS 把后台 tab 的 SSE 断了不通知。 `visibilitychange` 事件触发时检查 EventSource.readyState, 不是 OPEN 就重连。 5. **回避 EventSource 不支持自定义 header**:要 JWT 鉴权又不想存 cookie: 用 `EventSourcePolyfill`(npm 包)支持 fetch API + header。

OAuth 2.0 + PKCE:给 SPA / mobile 做 third-party 登录

## 起因 应用要"用 Google / GitHub 账号登录"。 OAuth 2.0 有多种 flow: - Authorization Code(有 backend,推荐) - Implicit(弃用) - Resource Owner Password(弃用) - Client Credentials(machine-to-machine) SPA / mobile 之前用 implicit flow(token 直接放 URL),现在标准是 **Authorization Code + PKCE**。 ## PKCE 是什么 Authorization Code flow 需要 client secret 验证 client 身份。 SPA / mobile 不能藏 secret(公共 client)。 PKCE (Proof Key for Code Exchange) 用临时 challenge 替代 secret: 1. client 生成随机 `code_verifier`(高熵 string) 2. SHA256 hash 得到 `code_challenge` 3. authorize 请求带 `code_challenge` 4. callback 拿到 code 后换 token 时带原 `code_verifier` 5. server 验 hash(verifier) == challenge → 确认是同一 client 防止"中间人截获 authorization code 后冒充换 token"。 ## flow 详细 ### 1. client 生成 verifier + challenge ```js const codeVerifier = generateRandomString(64); const codeChallenge = base64UrlEncode(await sha256(codeVerifier)); // 存到 sessionStorage(callback 时取) sessionStorage.setItem('pkce_verifier', codeVerifier); ``` ### 2. 跳转 authorize URL ```js const params = new URLSearchParams({ client_id: 'my-client-id', redirect_uri: 'https://app.example.com/callback', response_type: 'code', scope: 'openid profile email', state: randomState, // 防 CSRF code_challenge: codeChallenge, code_challenge_method: 'S256', }); window.location = `https://auth.example.com/authorize?${params}`; ``` 用户在 IdP 登录 → 同意授权 → IdP redirect 回: ``` https://app.example.com/callback?code=ABC123&state=... ``` ### 3. callback 换 token ```js async function callback() { const code = new URLSearchParams(location.search).get('code'); const verifier = sessionStorage.getItem('pkce_verifier'); const res = await fetch('https://auth.example.com/token', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'authorization_code', code, client_id: 'my-client-id', redirect_uri: 'https://app.example.com/callback', code_verifier: verifier, }), }); const tokens = await res.json(); // { access_token, refresh_token, id_token, expires_in } sessionStorage.removeItem('pkce_verifier'); storeTokens(tokens); } ``` server 验 `hash(verifier) == challenge` → 发 token。 ## token 存哪 SPA 痛点:access_token 存哪? - **localStorage**:XSS 一漏全暴露 - **sessionStorage**:同 localStorage + tab close 丢 - **memory (JS variable)**:刷新丢 - **httpOnly cookie**:最安全但需要 same-origin 或者 cross-origin 配置 + BFF backend 主流推荐:**BFF (Backend-for-Frontend) 模式** ``` [SPA] ↔ same-origin httpOnly cookie ↔ [BFF Node/Python] ↓ access_token [Resource API] ``` - BFF 持 token,SPA 仅 session cookie - XSS 拿不到 token - CSRF 用 sameSite=strict cookie 防 复杂但最安全。如果纯前端:access_token in memory + refresh 在 httpOnly cookie。 ## refresh token access_token 短(15 min - 1 h),过期用 refresh_token 换新。 PKCE flow 也能给 refresh_token:`scope=offline_access`。 refresh_token rotation(每次用过失效)防 leak 后无限续。 现代 OAuth 服务器(Auth0 / Okta / Keycloak)默认开。 ## state 防 CSRF ```js // 跳 authorize 之前 const state = generateRandom(); sessionStorage.setItem('oauth_state', state); // 加到 URL params // callback 验 const returnedState = new URLSearchParams(location.search).get('state'); const expectedState = sessionStorage.getItem('oauth_state'); if (returnedState !== expectedState) throw 'CSRF'; ``` 不验 state → 攻击者可让用户登入攻击者账号(account takeover)。 **必须验**。 ## scope OAuth scope 控制 token 能干啥: ``` scope=openid profile email # OIDC 标准 scope=read:user user:email # GitHub scope=https://www.googleapis.com/auth/drive.readonly # Google API ``` **最小权限**:只要 `email`,不要 `read:user`。 ## OIDC vs OAuth - OAuth 2.0:授权("app 能代你访问 API") - OpenID Connect (OIDC):在 OAuth 上面加身份层("who is the user") OIDC 加 `id_token`(JWT containing 用户信息)+ `userinfo` endpoint。 login 场景用 OIDC(need user identity),不仅是 API 授权。 scope 加 `openid` 启用 OIDC。 ## Library 不要手写 OAuth,用库: - JS: `oidc-client-ts`, `@auth0/auth0-spa-js` - Python: `authlib`, `httpx-auth` - Go: `golang.org/x/oauth2` PKCE flow 几行代码: ```js import { UserManager } from 'oidc-client-ts'; const userManager = new UserManager({ authority: 'https://auth.example.com', client_id: 'my-client', redirect_uri: 'https://app.example.com/callback', response_type: 'code', scope: 'openid profile email', // PKCE 自动 }); await userManager.signinRedirect(); // 跳 authorize const user = await userManager.signinRedirectCallback(); // callback 处理 ``` ## device code flow CLI / TV / IoT 没浏览器,用 device flow: 1. 应用问 server 拿 `device_code` + `user_code`(如 "ABC-123") 2. 用户开浏览器去 `https://example.com/device` 输 user_code 3. 应用 poll `/token` endpoint 等用户授权完成 GitHub CLI `gh auth login` 就是 device flow。 ## 实战 case:第三方"用 GitHub 登录" ```python # backend (FastAPI + authlib) from authlib.integrations.starlette_client import OAuth oauth = OAuth() oauth.register( 'github', client_id=GITHUB_CLIENT_ID, client_secret=GITHUB_CLIENT_SECRET, access_token_url='https://github.com/login/oauth/access_token', authorize_url='https://github.com/login/oauth/authorize', api_base_url='https://api.github.com/', client_kwargs={'scope': 'user:email'}, ) @app.get('/auth/github') async def login(request): redirect = request.url_for('callback') return await oauth.github.authorize_redirect(request, redirect) @app.get('/auth/github/callback') async def callback(request): token = await oauth.github.authorize_access_token(request) user = await oauth.github.get('user', token=token) user_data = user.json() # create / find local user + issue session ... ``` backend 持 client_secret(GitHub OAuth app 经典 confidential client), 不需要 PKCE。 如果纯 SPA 直接调 GitHub → PKCE。但 GitHub OAuth app 不支持 PKCE, 要用 GitHub App + device flow / 后端中转。 ## 踩过的坑 1. **redirect_uri 没 register**:authorize 返回 invalid redirect。 到 IdP console 加 callback URL(每环境一个:dev/staging/prod)。 2. **state 验缺失**:CSRF 风险。库一般帮做,但自己写时容易漏。 3. **token 存 localStorage**:XSS 后即失。BFF 或者 httpOnly cookie。 4. **scope 过宽**:要 email 却 request 整个 drive 访问 → 用户怕 + 不授权。最小 scope。 5. **PKCE verifier 没存好**:刷新页 / 跨 tab → callback 找不到 verifier。sessionStorage 同 tab 内 OK;BFF 模式用 server session。

限流算法实战:token bucket / leaky bucket / sliding window 各自怎么写

任何对外 API 都需要限流,否则一波突发流量 / 恶意刷接口能把服务打挂。 四种主流算法各有适用场景,下面手写各一份 + 选择建议。 ## 算法对比 | 算法 | 适合 | 突发支持 | 实现复杂度 | |---|---|---|---| | Fixed Window | 简单计数器 | 边界双倍突发 | 极简 | | Sliding Window | 平滑限流 | 可控 | 中 | | Leaky Bucket | 强制匀速输出 | 不支持 | 中 | | Token Bucket | 突发友好 | 支持 | 中 | ## 1. Fixed Window(最简,但有边界问题) ```python import redis r = redis.Redis() def fixed_window_allow(key: str, limit: int, window_sec: int) -> bool: bucket = f'{key}:{int(time.time()) // window_sec}' n = r.incr(bucket) if n == 1: r.expire(bucket, window_sec) return n <= limit # 用法:limit=100/分钟 allowed = fixed_window_allow(f'rl:user:{uid}', 100, 60) ``` 问题:59:59 一波 100 次 + 00:01 一波 100 次 = 2 秒内 200 次,违反"每分钟 100" 意图。 ## 2. Sliding Window(用 sorted set 精确,开销稍高) ```python def sliding_window_allow(key: str, limit: int, window_sec: int) -> bool: now = time.time() pipe = r.pipeline() pipe.zremrangebyscore(key, 0, now - window_sec) # 删窗口外 pipe.zcard(key) # 当前窗口内的请求数 pipe.zadd(key, {str(now): now}) # 加这次 pipe.expire(key, window_sec) _, count, _, _ = pipe.execute() return count < limit ``` 精确:当前到过去 window_sec 秒内最多 limit 次。 代价:每个 user 一个 sorted set;高并发下 Redis 内存增加。 ## 3. Leaky Bucket(强制匀速出) ```python def leaky_bucket_allow(key: str, capacity: int, leak_per_sec: float) -> bool: """ 模拟一个桶:用户每次请求往桶里加一滴水;桶在背景以 leak_per_sec 漏水。 桶满 → 拒绝。 """ now = time.time() state = r.hgetall(key) or {} last = float(state.get(b'last', now)) water = float(state.get(b'water', 0)) # 时间差里漏掉的水 water = max(0, water - (now - last) * leak_per_sec) if water >= capacity: return False water += 1 r.hset(key, mapping={'water': water, 'last': now}) r.expire(key, int(capacity / leak_per_sec) + 1) return True # 用法:bucket cap=10, 漏速 2/秒 → 最大允许短期 10 突发,长期 2/秒 allowed = leaky_bucket_allow(f'rl:lb:{uid}', 10, 2) ``` 漏速恒定 → 输出节奏稳定,但不允许突发"全速消费"。 ## 4. Token Bucket(突发友好,最常用) ```python def token_bucket_allow(key: str, capacity: int, refill_per_sec: float) -> bool: """ 桶里装 token:请求消耗一个 token;桶在背景以 refill_per_sec 添加 token。 没 token → 拒绝。 """ now = time.time() state = r.hgetall(key) or {} last = float(state.get(b'last', now)) tokens = float(state.get(b'tokens', capacity)) tokens = min(capacity, tokens + (now - last) * refill_per_sec) if tokens < 1: return False tokens -= 1 r.hset(key, mapping={'tokens': tokens, 'last': now}) r.expire(key, int(capacity / refill_per_sec) + 1) return True # 用法:cap=100, 速率 10/秒 → 短期可一次性 100 个,平均 10/秒 allowed = token_bucket_allow(f'rl:tb:{uid}', 100, 10) ``` 桶满了允许突发用 capacity 个,之后稳态 refill_per_sec。 **实际生产 99% 用这个**:API 客户端通常零散调用,偶尔一波,token bucket 最贴合。 ## 5. 原子性问题 上面 Python 版本有 race condition(get → 算 → set 之间被别的请求抢插)。 生产用 Lua 脚本一次性原子完成: ```lua -- token_bucket.lua local key = KEYS[1] local capacity = tonumber(ARGV[1]) local refill_rate = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local state = redis.call('HMGET', key, 'tokens', 'last') local tokens = tonumber(state[1]) or capacity local last = tonumber(state[2]) or now tokens = math.min(capacity, tokens + (now - last) * refill_rate) local allowed = 0 if tokens >= 1 then tokens = tokens - 1 allowed = 1 end redis.call('HMSET', key, 'tokens', tokens, 'last', now) redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1) return allowed ``` Python 调用: ```python TOKEN_BUCKET_SCRIPT = r.register_script(open('token_bucket.lua').read()) def allow(key, capacity, refill_rate): return TOKEN_BUCKET_SCRIPT( keys=[key], args=[capacity, refill_rate, time.time()], ) ``` ## 6. nginx 限流 不要总在应用层做。nginx 内置 limit_req(leaky bucket)很强: ```nginx # 每客户端 IP 10 req/s,最大突发 20 limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s; server { location /api/ { limit_req zone=api burst=20 nodelay; proxy_pass http://backend; } } ``` `burst=20 nodelay` = 突发 20 个立即过,超过的直接 503。 不带 `nodelay` 是排队处理(leaky bucket 标准行为)。 API gateway(Kong / Tyk / Envoy)也都内置。 ## 7. 分布式限流的精度 单 Redis 实例 + Lua 是强一致;多 Redis cluster 时同一 user 的请求被 hash 到不同分片,限流可能略松。一般可接受。 需要严格全局限流可用 sentinel pattern + 唯一 master。 ## 8. 限流的响应 ```http HTTP/1.1 429 Too Many Requests Retry-After: 60 X-RateLimit-Limit: 100 X-RateLimit-Remaining: 0 X-RateLimit-Reset: 1735689600 ``` `Retry-After`(秒或日期)告诉客户端何时再试。 ## 9. 按维度限流 不同维度组合: - 按 IP:防匿名滥用 - 按 API key:按付费等级 - 按 endpoint:贵 endpoint 单独严格 - 按 user-id:登录用户 ```python def check_rate_limits(req): if not allow(f'ip:{req.ip}', 1000, 100): return 'IP too fast' if not allow(f'user:{req.user_id}', 100, 10): return 'user quota exceeded' if req.path == '/expensive': if not allow(f'user:{req.user_id}:exp', 5, 0.1): return 'expensive endpoint quota exceeded' return None ``` ## 10. 反例 + 注意 - 用全局锁实现限流:性能差,单点故障 - 用 DB 计数:DB 不是限流工具,QPS 上来扛不住 - 限流粒度太粗(all users 共享):恶意用户拖累所有人 ## 11. 第三方库 - **slowapi**(FastAPI / Starlette):装饰器限流 - **django-ratelimit**:Django 装饰器 - **redis-cell**:Redis module,提供原子 CL.THROTTLE 命令 ```python # slowapi 例 from slowapi import Limiter limiter = Limiter(key_func=lambda req: req.client.host) @app.get('/api/x') @limiter.limit('100/minute') def x(request: Request): ... ``` ## 踩过的坑 - 限流 key 包含敏感信息(user email),泄露 Redis = 数据泄漏。 hash 一下当 key。 - TTL 算错让 key 永远不过期,Redis 占用涨。 - 限流应用在 readiness probe 上 → 监控系统刷它把自己 ban 了。 排除 health endpoint。 - 测试期忘了关限流 → CI 测试因 429 间歇性失败。环境变量分场景。

Django Channels:给 Django 加 WebSocket(不引入 Node)

## 起因 Django 项目要加"实时通知"功能(用户 like 时其他用户立刻看到 count 更新)。 两种思路: 1. 起独立 Node.js WebSocket server + Django 通过 Redis pub/sub 协调 2. Django Channels:在 Django 内部加 WebSocket / async 不想多维护一个 Node 服务 → Channels 直接。 ## Channels 是什么 Django 4.0+ 原生支持 ASGI,可以跑 async view。 Channels 是 Django Software Foundation 的官方扩展,处理: - WebSocket 协议(HTTP 升级) - 多 worker 间消息分发(channel layer,用 Redis) - SSE / 长连接 ## 装 ```bash uv add channels channels-redis daphne ``` ## 配置 ASGI `asgi.py`: ```python import os from django.core.asgi import get_asgi_application from channels.routing import ProtocolTypeRouter, URLRouter from channels.auth import AuthMiddlewareStack from django.urls import path os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings') django_asgi_app = get_asgi_application() from chat.consumers import ChatConsumer from notifications.consumers import NotifyConsumer application = ProtocolTypeRouter({ 'http': django_asgi_app, # 普通 HTTP 走原本 Django 'websocket': AuthMiddlewareStack( # WebSocket 路由 URLRouter([ path('ws/chat/<str:room>/', ChatConsumer.as_asgi()), path('ws/notify/', NotifyConsumer.as_asgi()), ]) ), }) ``` `settings.py`: ```python INSTALLED_APPS = [ 'daphne', # 替代 staticfiles 的 ASGI server,必须排前面 'django.contrib.staticfiles', 'channels', # ... 你的 apps ] ASGI_APPLICATION = 'myapp.asgi.application' # channel layer:跨 worker 消息 CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': {'hosts': [('redis', 6379)]}, }, } ``` ## 写一个简单 Consumer ```python # chat/consumers.py import json from channels.generic.websocket import AsyncWebsocketConsumer class ChatConsumer(AsyncWebsocketConsumer): async def connect(self): self.room = self.scope['url_route']['kwargs']['room'] self.group_name = f'chat_{self.room}' self.user = self.scope['user'] if not self.user.is_authenticated: await self.close() return # 加入房间 group(Redis pub/sub) await self.channel_layer.group_add(self.group_name, self.channel_name) await self.accept() async def disconnect(self, close_code): await self.channel_layer.group_discard(self.group_name, self.channel_name) async def receive(self, text_data): # 客户端发来消息 data = json.loads(text_data) message = data['message'] # 广播给房间所有人 await self.channel_layer.group_send( self.group_name, { 'type': 'chat.message', # 调下面方法 'message': message, 'user': self.user.username, } ) async def chat_message(self, event): # 房间收到消息时(包括自己发的) await self.send(text_data=json.dumps({ 'message': event['message'], 'user': event['user'], })) ``` `channel_layer.group_send` 通过 Redis 通知**所有 worker** 的对应 consumer。3 个 daphne worker / 100 个客户端,消息正确广播。 ## 客户端 JS ```js const ws = new WebSocket(`wss://example.com/ws/chat/${roomId}/`) ws.onmessage = (e) => { const data = JSON.parse(e.data) addMessageToUI(data.user, data.message) } ws.onopen = () => { console.log('connected') } document.getElementById('send').addEventListener('click', () => { const text = document.getElementById('input').value ws.send(JSON.stringify({ message: text })) }) ``` ## 跑 ```bash # 开发用 runserver(自动支持 ASGI / Channels) python manage.py runserver # 生产用 daphne daphne -b 0.0.0.0 -p 8000 myapp.asgi:application # 多 worker gunicorn -k uvicorn.workers.UvicornWorker myapp.asgi:application --workers 4 ``` Redis 必须跑(channel layer 依赖)。 nginx 反代要支持 WebSocket: ```nginx location /ws/ { proxy_pass http://app; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_read_timeout 86400; # 长连接不要超时 } ``` ## 从 Django view 推 message 业务 view 里触发广播(非 WebSocket 路径): ```python # views.py from channels.layers import get_channel_layer from asgiref.sync import async_to_sync def like_post(request, post_id): post = Post.objects.get(pk=post_id) post.likes += 1 post.save() # 通知所有连接到这帖子的客户端 channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( f'post_{post_id}', { 'type': 'post.update', # 调用 consumer 的 post_update 方法 'likes': post.likes, } ) return JsonResponse({'likes': post.likes}) ``` ```python class PostNotifyConsumer(AsyncWebsocketConsumer): async def connect(self): post_id = self.scope['url_route']['kwargs']['post_id'] self.group_name = f'post_{post_id}' await self.channel_layer.group_add(self.group_name, self.channel_name) await self.accept() async def post_update(self, event): await self.send(text_data=json.dumps({ 'likes': event['likes'], })) ``` 通用模式:**业务 view 改 DB → group_send 通知 → 所有 WebSocket 客户端 收到**。 ## ORM in async consumer WebSocket consumer 是 async,但 Django ORM 默认同步。 用 `sync_to_async`: ```python from asgiref.sync import sync_to_async class MyConsumer(AsyncWebsocketConsumer): async def receive(self, text_data): post = await sync_to_async(Post.objects.get)(pk=1) ``` 或者用 Django 4.1+ 的 async ORM: ```python post = await Post.objects.aget(pk=1) ``` ## 测试 Consumer ```python from channels.testing import WebsocketCommunicator from chat.consumers import ChatConsumer async def test_chat(): communicator = WebsocketCommunicator( ChatConsumer.as_asgi(), '/ws/chat/test/') connected, _ = await communicator.connect() assert connected await communicator.send_json_to({'message': 'hello'}) response = await communicator.receive_json_from() assert response['message'] == 'hello' await communicator.disconnect() ``` ## 性能 / 规模 Channels + Redis 配置 4 worker 在小服务器上: - 1000 并发 WebSocket connection 轻松 - 10000 取决于 Redis 配置 + 网络 - > 10w → 考虑专用 WebSocket server(Centrifugo / Phoenix Channels) ## 与替代品对比 | | Django Channels | 单独 Node WebSocket | Centrifugo | Phoenix LiveView | |---|---|---|---|---| | 语言 | Python | Node.js | Go | Elixir | | 集成 Django | 原生 | 需 Redis 中介 | 同 | N/A | | 并发上限 | 中(万级) | 高 | 极高(百万) | 极高 | | 复杂度 | 中 | 高(双栈) | 中 | 高(学语言) | | 适合 | Django app + 适量 WebSocket | 极致并发 + 现有 Node | 中大规模 push | 完全重 stack | ## 真实部署 case 我们一个内部协作工具: - Django 4.2 + Channels 4 - 500 实时在线协作 user - 用 1 台机器 (4 vCPU / 8 GB) + Redis - daphne 4 worker - nginx 反代 - 6 个月 0 down 足够。如果上 5000 在线考虑 Centrifugo。 ## 替代:HTMX + SSE 如果只是"服务端推" 不需要双向,用 SSE 替代(前面有篇): ```python @app.get('/sse/notifications') async def sse(): async def gen(): async for event in get_events(): yield f'data: {json.dumps(event)}\n\n' return StreamingHttpResponse(gen(), content_type='text/event-stream') ``` 更简单 / 更标准 HTTP / Django 5 原生支持。 WebSocket 仍是双向场景(聊天 / 实时协作)的更优选。 ## 踩过的坑 1. **daphne 没加进 INSTALLED_APPS**:staticfiles 没替换 → runserver 不识别 ASGI。 2. **channel layer 用 InMemory**:单 worker OK;多 worker 时消息 收不到(每 worker 独立 in-memory)。生产必用 RedisChannelLayer。 3. **WebSocket 鉴权**:默认 scope['user'] 是 AnonymousUser。 `AuthMiddlewareStack` 让 Django session cookie 生效。 JWT 等其它 auth 要自己写 middleware。 4. **客户端长时间不发消息断**:默认 keepalive 没设,nginx / proxy 一小时空闲 → 断。客户端定期发 ping:"" 空消息或者 protocol ping。 5. **`async_to_sync` + `sync_to_async` 混用** → ASGI ↔ WSGI 转换贵。 尽量整个 path 同 async / 同 sync。

nginx 调优:keepalive、worker、buffer 实操

## 起因 新部署的 nginx 默认配置在中等流量下出现: - P99 偶尔 200ms+ 突刺 - `connection reset` 客户端错误 - 上游 PHP-FPM / gunicorn `connection timeout` 不调优 → 撑 1k QPS 就开始 wobble。调几个 key 参数后 5k QPS 稳。 下面是我每个新部署都套的 baseline。 ## worker_processes / worker_connections ```nginx worker_processes auto; # = CPU 核心数 worker_rlimit_nofile 65535; # 文件描述符上限 events { worker_connections 8192; # 单 worker 最大连接 use epoll; # Linux 用 epoll multi_accept on; # 一次接多 connection } ``` `worker_processes auto` 让 nginx 自动用所有核。 `worker_connections × worker_processes` = 理论最大并发连接。 8 核 × 8192 = 65536 并发,远超 1k QPS 需求。 ## upstream keepalive(关键) 默认 nginx 反代每请求开新 TCP 连接到 upstream → TCP/TLS 握手成本。 ```nginx upstream backend { server 127.0.0.1:8000; server 127.0.0.1:8001; keepalive 64; # 与 upstream 保持 64 个空闲连接 keepalive_timeout 60s; keepalive_requests 1000; # 每连接最多 1000 请求后重建 } server { location / { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Connection ""; # 必须!不传 Connection close } } ``` `proxy_set_header Connection ""` 这行最容易漏。 缺它 → upstream 默认 HTTP/1.0 不复用 → keepalive 不生效。 加上后 latency 降 30-50%(省 TCP 握手 + 不必要的 connection setup)。 ## buffer / timeout ```nginx # 请求 buffer client_max_body_size 20m; client_body_buffer_size 16k; client_header_buffer_size 4k; large_client_header_buffers 4 16k; # upstream buffer proxy_buffering on; proxy_buffer_size 8k; proxy_buffers 8 16k; proxy_busy_buffers_size 16k; # timeout proxy_connect_timeout 5s; proxy_send_timeout 60s; proxy_read_timeout 60s; send_timeout 60s; ``` `proxy_buffering on`(默认):nginx 先收完 response 再发给 client → 慢 client 不拖累 upstream。 特殊场景关闭(如 SSE / streaming): ```nginx location /sse/ { proxy_pass http://backend; proxy_buffering off; # streaming 不缓冲 proxy_cache off; } ``` ## gzip ```nginx gzip on; gzip_vary on; gzip_min_length 1024; gzip_proxied any; gzip_comp_level 5; gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript image/svg+xml; ``` JSON / HTML 压缩 70-90%。 `gzip_comp_level 5` 在 CPU 和压缩比间平衡(1 - 9 范围)。 ## brotli (更好压缩) ```nginx brotli on; brotli_comp_level 6; brotli_types text/plain text/css application/json application/javascript; ``` 需要 brotli module 装(一些 distro 默认带 `nginx-extras` package)。 比 gzip 压缩比好 15-20%,CPU 占用稍高。 ## 静态文件 sendfile / tcp_nopush ```nginx sendfile on; # 内核态文件直接 send tcp_nopush on; # 凑满 packet 再发(搭配 sendfile) tcp_nodelay on; # 长连接 small payload 不延迟 # 静态文件 cache location /static/ { expires 30d; add_header Cache-Control "public, immutable"; access_log off; } ``` `sendfile` 是 zero-copy → 大文件传输 CPU 占用 1/4。 ## TLS 优化 ```nginx ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384; ssl_prefer_server_ciphers off; ssl_session_cache shared:SSL:10m; # 10 MB session cache,约 40000 session ssl_session_timeout 1d; ssl_session_tickets off; ssl_stapling on; ssl_stapling_verify on; # HTTP/2 listen 443 ssl http2; # HTTP/3 (nginx 1.25+) listen 443 quic reuseport; add_header Alt-Svc 'h3=":443"; ma=86400'; ``` session cache 让 TLS handshake 从 2 RTT → 0 RTT(resumption)。 高 QPS 时显著省 CPU。 ## rate limit 防 abuse: ```nginx http { limit_req_zone $binary_remote_addr zone=api:10m rate=100r/s; limit_conn_zone $binary_remote_addr zone=conn:10m; } server { location /api/ { limit_req zone=api burst=200 nodelay; limit_conn conn 20; proxy_pass http://backend; } } ``` 每 IP 100 req/s + burst 200 + 同时 ≤ 20 connection。 基础防护,挡 script kiddie。 ## log 格式 + sampling ```nginx log_format main_json escape=json '{' '"time":"$time_iso8601",' '"remote":"$remote_addr",' '"method":"$request_method",' '"uri":"$request_uri",' '"status":$status,' '"bytes":$body_bytes_sent,' '"rt":$request_time,' '"upstream_rt":"$upstream_response_time",' '"ua":"$http_user_agent"' '}'; # 大量流量时只 log 一部分(health check / static) map $request_uri $loggable { default 1; /health 0; ~*\.ico 0; } access_log /var/log/nginx/access.log main_json if=$loggable; ``` JSON 格式 → ELK / Loki / Datadog 解析友好。 ## 实战调优 case 某 Django app,单 server,4 vCPU / 8 GB: 调前: - 1000 QPS 撑得住但 P99 300ms - upstream connection 数 4000+ 频繁 TIME_WAIT - 偶发 502 调后(上面 baseline): - 3000 QPS 稳定,P99 80ms - upstream connection 200(keepalive 复用) - 502 几乎绝迹 关键改动: 1. `upstream keepalive 64` + `Connection ""` header 2. worker_processes auto + worker_connections 8192 3. gzip on + brotli on ## 监控 `/nginx_status` (stub_status module): ```nginx location /nginx_status { stub_status; allow 127.0.0.1; deny all; } ``` prometheus exporter(nginx-vts / nginx-prometheus-exporter)抓 → Grafana panel: - active connections - accepted / handled / requests rate - per-status code rate - upstream response time 发现 spike / leak 第一时间。 ## 高级:worker_cpu_affinity ```nginx worker_processes 4; worker_cpu_affinity 0001 0010 0100 1000; ``` 绑 worker 到特定 CPU → cache locality 提升 5-10%。 8 核以下机器一般不必要;多核高压才显著。 ## 踩过的坑 1. **`Connection ""` 漏了**:upstream keepalive 无效 → 性能没提升却 以为配了。`tcpdump` / netstat 看 connection 数确认。 2. **worker_connections 高但 ulimit 低**:worker_rlimit_nofile 也要 配,否则 syscall fail。`ulimit -n` 检查。 3. **proxy_buffer_size 小**:上游 response header 大(cookie 多)→ `upstream sent too big header` 错误。调大到 16k+。 4. **server_tokens on**:默认显 nginx 版本在 header / error page → 信息泄。`server_tokens off`。 5. **reload 不等于 restart**:`nginx -s reload` 优雅 reload 是好习惯; `restart` 断现有连接。CI deploy 用 reload。

Django 4+ async views:什么时候真的有用,什么时候踩坑

## 起因 Django 4.0+ 支持 async views,文档里说"并发 IO 性能提升"。 直接把所有 view 改 `async def`?踩了几个坑: - ORM 默认是同步的,async view 里 `User.objects.get()` 会 block event loop - middleware 不全 async,会有性能 penalty - 不是所有场景都受益 理解什么时候用、什么时候别用,才有价值。 ## 解决方案:分场景 ### 场景 A:view 里发 N 个外部 HTTP 请求(async 真的快) ```python # 同步版(用 requests) def aggregate_view(request): weather = requests.get('https://api.weather/...').json() news = requests.get('https://api.news/...').json() stocks = requests.get('https://api.stocks/...').json() return JsonResponse({'weather': weather, 'news': news, 'stocks': stocks}) ``` 3 个串行 IO,每个 300ms → 总 900ms。 ```python # async 版 import httpx async def aggregate_view(request): async with httpx.AsyncClient() as client: weather, news, stocks = await asyncio.gather( client.get('https://api.weather/...'), client.get('https://api.news/...'), client.get('https://api.stocks/...'), ) return JsonResponse({ 'weather': weather.json(), 'news': news.json(), 'stocks': stocks.json(), }) ``` 3 个并行,总 300ms(max)。3x 加速。 ### 场景 B:view 主要查 ORM(async 没用) ```python async def post_list(request): posts = Post.objects.filter(visibility='public')[:20] # ❌ return ... ``` Django ORM 默认同步。在 async view 里调用同步 ORM → 内部跑 `sync_to_async` 用线程池执行 → 比纯同步 view 还慢一点(多一次 context switch)。 Django 4.1+ 加了 async ORM: ```python async def post_list(request): posts = [p async for p in Post.objects.filter(visibility='public')[:20]] # 或: posts = await Post.objects.filter(...).a_in_bulk([1, 2, 3]) post = await Post.objects.aget(pk=1) await post.adelete() return ... ``` `a*` 系列方法是 async 版。但要点: - 这不让你"并发查多条"——还是单连接顺序查 - 唯一收益:不 block event loop(同进程能服务其它 async 请求) - 高 QPS + 复杂查询:Django ORM 仍是性能瓶颈(不是 async 能解的) ### 场景 C:streaming response(async 是必须的) ```python async def chat_stream(request): async def generator(): async for chunk in llm.stream(prompt): yield f'data: {json.dumps(chunk)}\n\n' return StreamingHttpResponse( generator(), content_type='text/event-stream', ) ``` LLM streaming / 长 polling / SSE → 必须 async 才能正常工作。 同步 view 在 stream 完成前 block 一个 worker。 ### 场景 D:mixed sync + async 需要在 async view 里调同步代码(如某个老 lib): ```python from asgiref.sync import sync_to_async @sync_to_async def heavy_sync_work(x): return cpu_bound_calc(x) async def view(request): result = await heavy_sync_work(42) return JsonResponse({'result': result}) ``` `sync_to_async` 把同步函数包成 awaitable,在线程池跑。 反向 `async_to_sync` 让 sync 调 async。 ## 启动方式 async views 需要 ASGI server(不是 WSGI): ```bash # 之前:gunicorn (WSGI) gunicorn myapp.wsgi # 现在:uvicorn (ASGI) uvicorn myapp.asgi:application --workers 4 # 或 gunicorn + uvicorn worker gunicorn -k uvicorn.workers.UvicornWorker -w 4 myapp.asgi:application ``` `myapp/asgi.py` 默认 Django 已生成。 WSGI server 跑 async view 也能跑(Django 自动用 sync_to_async 适配), 但性能不如 ASGI。 ## 性能测试(实际数据) 我对一个 endpoint 测试:3 个外部 API + 1 个 DB 查询。 | | wrk -t8 -c100 -d30s RPS | P95 latency | |---|---|---| | 同步 gunicorn(4 worker) | 35 | 2.9s | | async uvicorn(4 worker) | 320 | 380ms | 外部 IO 密集场景 async 收益巨大。 但纯 DB 查询 endpoint: | | RPS | P95 | |---|---|---| | 同步 gunicorn | 1200 | 80ms | | async uvicorn | 1100 | 90ms | async 反而略慢(额外的协程开销 + ORM 仍同步)。 ## 什么场景该用 async ✅ 适合: - 外部 API / webhook 调用密集 - SSE / streaming response - WebSocket (Django Channels) - 长 polling - AI / LLM 调用 ❌ 不适合: - 纯 CRUD(ORM 同步主导) - CPU 密集(GIL,async 没用) - 老 lib 没 async 版本 ## 实战:把现有 view 改 async 的流程 1. 评估:这 view 主要 IO 类型是什么?外部 HTTP > DB 查询 > 其它 → 值得改 2. 安装 ASGI server (uvicorn) 3. 把 view 函数 `def` → `async def` 4. 把 `requests` 换 `httpx` / `aiohttp` 5. 把 ORM 调用换 `aget` / `acreate` / async iter(如果用得到) 6. middleware:检查是否 async-aware,老 middleware 加 `async_capable = True` 7. 测试 + 性能 benchmark 对比 ## Channels(WebSocket) 如果要 WebSocket / SSE 大量并发,Django Channels 是标准: ```bash pip install channels channels-redis ``` ```python # routing.py from channels.routing import ProtocolTypeRouter, URLRouter from chat.consumers import ChatConsumer application = ProtocolTypeRouter({ 'websocket': URLRouter([ path('ws/chat/', ChatConsumer.as_asgi()), ]), }) ``` ```python # consumers.py from channels.generic.websocket import AsyncWebsocketConsumer class ChatConsumer(AsyncWebsocketConsumer): async def connect(self): await self.accept() async def receive(self, text_data): await self.send(text_data=f'echo: {text_data}') ``` Channels 让 Django 处理 WebSocket / SSE / HTTP / Channel layer (跨 worker 消息)全面 async。 ## 效果 我们一个 API gateway 类服务(聚合多个内部 service 数据)改 async: - P95 延迟从 1.2s → 230ms - 同硬件下 RPS 从 200 → 1500 - worker 数 from 16 减到 4(每个 worker 并发服务) - 内存占用减半 而我们的 CRUD 类 admin backend 改了一半发现没什么用,回滚保持同步。 ## 踩过的坑 1. **在 async view 里调同步 view function**:直接调会 block。 要 `await sync_to_async(other_view)(request)`。 2. **middleware 不 async**:所有 middleware 必须 async-compatible, 否则 Django 退化到 sync 模式。第三方 middleware 检查文档支持 async。 3. **DB connection pool**:async views 处理并发更高 → 同时打开的 DB connection 多 → DB 连接耗尽。配 PgBouncer 在前面。 4. **`@login_required` 等装饰器**:检查是否 async-compatible。 Django 4.1+ 内置装饰器都改了;第三方需要确认。 5. **测试**:`AsyncClient` 替代 `Client`: ```python async def test_view(): client = AsyncClient() response = await client.get('/api/...') ```

Go 标准库写一个带 ETag / Range / gzip 的 HTTP 文件服务

Go 标准库的 `net/http` + `http.FileServer` 五行就能起一个静态文件服务, 但缺生产里几个关键能力:ETag、Range(断点续传 / 视频拖动)、压缩、 正确的 Cache-Control。下面写一个完整版。 ## 五行起步 ```go package main import "net/http" func main() { http.Handle("/", http.FileServer(http.Dir("./public"))) http.ListenAndServe(":8080", nil) } ``` 这就行了——`http.FileServer` 已经支持 Range、Last-Modified、 If-Modified-Since、自动 Content-Type 推断。 但 ETag / 压缩 / 自定义 Cache-Control 要自己加。 ## 完整版 ```go package main import ( "compress/gzip" "crypto/sha256" "encoding/hex" "io" "log" "mime" "net/http" "os" "path/filepath" "strings" "time" ) const root = "./public" func main() { h := http.HandlerFunc(serve) log.Fatal(http.ListenAndServe(":8080", h)) } func serve(w http.ResponseWriter, r *http.Request) { // 1. 安全:阻止 ../ 跳出 root clean := filepath.Clean(r.URL.Path) if strings.HasPrefix(clean, "..") { http.Error(w, "forbidden", http.StatusForbidden) return } full := filepath.Join(root, clean) // 2. 默认页:目录请求映射到 index.html info, err := os.Stat(full) if err != nil { http.NotFound(w, r) return } if info.IsDir() { full = filepath.Join(full, "index.html") info, err = os.Stat(full) if err != nil { http.NotFound(w, r) return } } // 3. ETag(用 mtime + size 计算,足够稳) etag := computeETag(info) if match := r.Header.Get("If-None-Match"); match == etag { w.WriteHeader(http.StatusNotModified) return } w.Header().Set("ETag", etag) // 4. Cache-Control if isAsset(full) { // 带 hash 的资源:长期缓存 w.Header().Set("Cache-Control", "public, max-age=31536000, immutable") } else { w.Header().Set("Cache-Control", "public, max-age=300") } // 5. Content-Type ct := mime.TypeByExtension(filepath.Ext(full)) if ct != "" { w.Header().Set("Content-Type", ct) } // 6. 真正发文件 —— ServeFile 已包含 Range 支持 if acceptsGzip(r) && isCompressible(full) { w.Header().Set("Content-Encoding", "gzip") w.Header().Add("Vary", "Accept-Encoding") // 注意:开 gzip 后 Range 不再好用(gzip stream 不能任意定位) // 静态文件如果要支持 Range(音视频),别 gzip 它 gz := gzip.NewWriter(w) defer gz.Close() f, _ := os.Open(full) defer f.Close() io.Copy(gz, f) return } http.ServeFile(w, r, full) } func computeETag(info os.FileInfo) string { h := sha256.New() h.Write([]byte(info.Name())) h.Write([]byte(info.ModTime().UTC().Format(time.RFC3339Nano))) h.Write([]byte(string(rune(info.Size())))) return `"` + hex.EncodeToString(h.Sum(nil))[:16] + `"` } func acceptsGzip(r *http.Request) bool { return strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") } func isCompressible(p string) bool { switch strings.ToLower(filepath.Ext(p)) { case ".html", ".css", ".js", ".json", ".svg", ".txt", ".xml": return true } return false } func isAsset(p string) bool { // 文件名含 8+ 位 hex 视为带 hash 的资源 base := filepath.Base(p) for _, part := range strings.Split(base, ".") { if len(part) >= 8 && isHex(part) { return true } } return false } func isHex(s string) bool { for _, c := range s { if !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) { return false } } return true } ``` ## Range / 视频拖动 `http.ServeFile` 已经处理 Range 请求(206 Partial Content)。 不需要自己写。 校验: ```bash curl -I -H 'Range: bytes=0-100' http://localhost:8080/video.mp4 # HTTP/1.1 206 Partial Content # Content-Range: bytes 0-100/1234567 # Content-Length: 101 ``` 如果你不用 ServeFile 而自己 io.Copy,Range 就失效了。 ## ETag 的边界 我用 mtime + size 算 ETag —— 简单但有边界: - 同内容不同文件(不同 mtime)会返回不同 ETag - 改完没改大小且 mtime 精度不够(FAT32 只到 2 秒)会被认为没变 更稳的方案:第一次访问时算文件 SHA256,写到 sidecar 文件 `foo.css.etag` 作为缓存。代价是写文件 / 维护成本。 ## 测试 ```bash # ETag ETAG=$(curl -sI http://localhost:8080/main.css | grep -i etag | cut -d' ' -f2 | tr -d '\r\n') curl -I -H "If-None-Match: $ETAG" http://localhost:8080/main.css # 应该返回 304 # gzip curl -I -H 'Accept-Encoding: gzip' http://localhost:8080/main.css # Content-Encoding: gzip ``` ## 为什么不用 nginx nginx 这事做得更好(C 写的 + sendfile + zero-copy)。这个 Go 实现的 价值是: - 单二进制可执行,跨平台 - 嵌入到现有 Go 后端里 - 配合 `embed.FS` 直接把静态资源打包到二进制(go embed) ```go //go:embed public var staticFS embed.FS http.Handle("/", http.FileServer(http.FS(staticFS))) ``` 整个网站打包成一个 `myapp` 二进制丢服务器跑。CD 流程极简。 ## 踩过的坑 - 没做 `filepath.Clean` + 阻止 `..` → 经典目录穿越漏洞,攻击者请求 `/../../../etc/passwd` 把文件读走。 - `http.ServeFile` 在 path 含编码的 `%2E%2E` 时可能仍接受(早期 Go 版本 有 CVE)。升级到 Go 1.22+ 已修。 - 写自己的 io.Copy 而不用 ServeFile:失去 Range、失去 Last-Modified 比较、效率也差。能用 ServeFile 就用。 - 跨平台 mtime 精度差异:Windows 是 100ns、macOS 是 1ns、Linux 通常 ns。 Build 时给 docker COPY 文件 mtime 都被改为 build 时间,结果所有文件 ETag 一样。CI 里用 `touch -d` 还原 mtime 或换 SHA-based ETag。

systemd 管理服务:替代 supervisord / pm2 / forever

## 起因 部署一个 Python / Node / Go 应用,需要: - 后台跑(不 attach 终端) - 开机自启 - crash 自动重启 - 集中查 log - restart / stop 标准命令 老办法: - nohup + & + 写 pid 文件(朴素 + 不重启) - supervisord(Python,需装) - pm2(Node 圈,需装) - forever(更老的 Node 工具) systemd 是现代 Linux 默认(Ubuntu 16+ / RHEL 7+ / Debian 8+), **无需装第三方工具**。下面写一份 service 文件就够了。 ## 最小 service file ```ini # /etc/systemd/system/myapp.service [Unit] Description=My Web App After=network.target [Service] Type=simple User=www-data Group=www-data WorkingDirectory=/srv/myapp ExecStart=/srv/myapp/.venv/bin/gunicorn myapp.wsgi -b 127.0.0.1:8000 -w 4 Restart=on-failure RestartSec=5s [Install] WantedBy=multi-user.target ``` 启用 + 启动: ```bash sudo systemctl daemon-reload sudo systemctl enable --now myapp ``` 完事。开机自启 + 后台 + crash 自动 5 秒重启 + log 到 journald。 ## 常用命令 ```bash systemctl status myapp # 状态 systemctl start/stop/restart myapp systemctl reload myapp # 发 SIGHUP(如果应用支持 reload) systemctl enable / disable myapp # 开机启动开关 journalctl -u myapp # 看 log journalctl -u myapp -f # tail -f journalctl -u myapp --since '1 hour ago' journalctl -u myapp -p err # 只看 error 级别 ``` ## 环境变量 ```ini [Service] EnvironmentFile=/etc/myapp/env Environment="DJANGO_SETTINGS_MODULE=myapp.settings.production" Environment="PYTHONUNBUFFERED=1" ``` `/etc/myapp/env`: ``` DATABASE_URL=postgres://... SECRET_KEY=... ``` 权限 600 + owned by root → secret 安全。 ## 自动重启策略 ```ini [Service] Restart=on-failure # always / on-failure / on-abnormal RestartSec=5s StartLimitIntervalSec=300 StartLimitBurst=10 # 5 分钟内重启 > 10 次就放弃 ``` `Restart=always`:包括正常退出也重启(适合 worker)。 `Restart=on-failure`:只 exit code ≠ 0 才重启(适合 server)。 ## 资源限制 ```ini [Service] MemoryMax=2G # 超过被 OOM killed CPUQuota=200% # 最多用 2 核 TasksMax=512 # 子进程上限 LimitNOFILE=65535 # 文件描述符 ``` 防 runaway 进程吃光资源。容器化等价但 systemd 也能做。 ## 安全 hardening ```ini [Service] NoNewPrivileges=true # 不能 setuid PrivateTmp=true # 独立 /tmp ProtectSystem=strict # /usr / /boot 等只读 ProtectHome=true # /home 不可见 ReadWritePaths=/var/log/myapp /var/lib/myapp ProtectKernelTunables=true ProtectKernelModules=true ProtectControlGroups=true RestrictSUIDSGID=true ``` systemd-analyze security myapp 评分(0-10,越低越严)。 不必全开但默认加几个稳妥。 ## socket activation ```ini # /etc/systemd/system/myapp.socket [Unit] Description=myapp socket [Socket] ListenStream=127.0.0.1:8000 Accept=no [Install] WantedBy=sockets.target ``` ```ini # myapp.service [Service] ExecStart=/srv/myapp/server StandardInput=socket # 接收 socket fd ``` systemd 监听 port,第一个请求来才启动 service → 节省资源 + 启动期间 请求 buffered。 对 web app 较 niche,inetd 风格。 ## timer (取代 cron) ```ini # /etc/systemd/system/myapp-cleanup.service [Service] Type=oneshot ExecStart=/srv/myapp/.venv/bin/python /srv/myapp/cleanup.py ``` ```ini # /etc/systemd/system/myapp-cleanup.timer [Timer] OnCalendar=daily # 每天午夜 OnCalendar=*-*-* 03:30:00 # 每天 3:30 Persistent=true # boot 后补跑漏的 [Install] WantedBy=timers.target ``` ```bash systemctl enable --now myapp-cleanup.timer systemctl list-timers # 看下次跑时间 journalctl -u myapp-cleanup # 历史 log ``` cron 优势: - log 集成 journald - 重试 / failure 处理 systemd 标准 - random delay(多机错峰) - 可重用 service 单独执行 我现在新部署 0 cron,全 timer。 ## graceful shutdown ```ini [Service] ExecStart=/srv/myapp/server TimeoutStopSec=30s # SIGTERM 后等 30s KillSignal=SIGTERM ExecReload=/bin/kill -HUP $MAINPID ``` `systemctl stop myapp` → 发 SIGTERM → 应用清理 → 30s 内不退就 SIGKILL。 应用要 catch SIGTERM 完成 in-flight 请求再退。 ## 多实例 `@` template: ```ini # /etc/systemd/system/[email protected] [Service] ExecStart=/srv/myapp/server --port=%i ``` ```bash systemctl start myapp@8001 myapp@8002 myapp@8003 ``` 3 个 instance 不同 port,共享 service 模板。 ## user service 不需要 sudo: ```bash # 写到 ~/.config/systemd/user/myapp.service systemctl --user daemon-reload systemctl --user enable --now myapp loginctl enable-linger $USER # 退出 shell 后仍跑 ``` 适合:个人项目 / 不能 root 的服务器。 ## 与 supervisord 对比 | | systemd | supervisord | pm2 | |---|---|---|---| | 默认装 | ✅(modern Linux) | ❌(pip install) | ❌(npm install) | | 配置语法 | INI(详细) | INI(简) | JS/JSON | | 资源限制 | ✅ | ❌ | ❌ | | timer | ✅ | ❌ | ❌(用 cron) | | log | journald | 文件 | 文件 | | 跨平台 | Linux only | 多平台 | 多平台 | | 开机启动 | ✅ | 要 init 配 | pm2 startup | Linux 服务器 systemd 完胜(已经在那)。 非 Linux / 容器(无 systemd)→ supervisord / pm2。 ## docker 里要 systemd? 容器内一般 PID 1 跑应用,不需要 systemd。 特殊场景(多进程 in container)用 `tini` / `supervisord` / `s6-overlay`。 systemd in container:复杂,不推荐。如果非要,Podman 比 Docker 友好。 ## journald log 持久化 默认 journald 内存 + 临时存储。重启丢。 持久化: ```bash sudo mkdir -p /var/log/journal sudo systemctl restart systemd-journald ``` 或者 forward 到 rsyslog / Loki: ```ini # /etc/systemd/journald.conf [Journal] ForwardToSyslog=yes ``` ## 真实部署 case 部署 Django + gunicorn + celery worker + celery beat: ```ini # myapp.service (gunicorn) [Service] ExecStart=/srv/myapp/.venv/bin/gunicorn myapp.wsgi -b 127.0.0.1:8000 # myapp-worker.service [Service] ExecStart=/srv/myapp/.venv/bin/celery -A myapp worker --loglevel=info # myapp-beat.service [Service] ExecStart=/srv/myapp/.venv/bin/celery -A myapp beat --loglevel=info ``` 3 个 service,`systemctl status myapp myapp-worker myapp-beat` 全状态。 部署 deploy script: ```bash git pull .venv/bin/pip install -r requirements.txt .venv/bin/python manage.py migrate sudo systemctl restart myapp myapp-worker myapp-beat ``` 简单 + 工业级稳定。 ## 踩过的坑 1. **改了 service file 没 daemon-reload**:systemctl 用老版本。 `daemon-reload` 必须。 2. **WorkingDirectory 不存在**:service 启不来报错 213。 `journalctl -u myapp` 看具体原因。 3. **env 变量空格转义**:`Environment="K=v with space"` 双引号必须。 4. **Type=forking 错用**:应用 fork 后 systemd 跟丢主进程。多数 web server 用 `Type=simple` / `notify`。 5. **PID 文件错**:traditional daemon 写 PID 文件,systemd 不靠它。 `PIDFile=` 配置是给 forking type 用。

Celery + Redis 跑后台任务的最小可工作配置

Web 请求里同步发邮件 / 调外部 API / 跑重计算就是把响应时间往沟里推。 异步丢任务队列,Web 立刻返回,后台 worker 慢慢干。Celery + Redis 是 Python 生态最常用的组合。 ## 依赖 ```bash uv add 'celery[redis]' ``` `[redis]` extra 装上 `redis-py`,作为 broker + result backend。 ## 项目结构 ``` myapp/ ├── celery_app.py ├── tasks.py └── ... (Django/Flask/FastAPI) ``` ## celery_app.py ```python from celery import Celery app = Celery( 'myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1', include=['myapp.tasks'], ) app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Asia/Shanghai', enable_utc=True, task_acks_late=True, # worker 处理完才 ack;崩了任务会被重新派发 task_reject_on_worker_lost=True, worker_prefetch_multiplier=1, # 长任务必须设 1,否则一个 worker 卡住所有 prefetch task_time_limit=300, # 硬超时 5 分钟(SIGKILL) task_soft_time_limit=270, # 软超时(抛 SoftTimeLimitExceeded) ) ``` `prefetch_multiplier=1` 是长任务必须改的——默认 4,意思是每个 worker 预取 4 个任务,第一个任务慢的话另外 3 个被卡住等。 ## tasks.py ```python from celery import shared_task from celery.exceptions import SoftTimeLimitExceeded import smtplib @shared_task( bind=True, autoretry_for=(ConnectionError, smtplib.SMTPException), retry_backoff=True, # 1s, 2s, 4s, 8s ... retry_backoff_max=600, max_retries=5, ) def send_welcome_email(self, user_id, email): try: # ... 真的发邮件 ... with smtplib.SMTP('smtp.example.com', 587) as s: s.login(...) s.sendmail('from@', email, 'Welcome!') except SoftTimeLimitExceeded: # 清理资源 raise @shared_task def heavy_calc(rows): return sum(complex_fn(r) for r in rows) ``` `autoretry_for` + `retry_backoff` 是失败自动指数退避重试的现代写法。 比手写 `self.retry(countdown=...)` 简洁。 ## Web 端触发 ```python # Django / Flask / FastAPI 都一样 from myapp.tasks import send_welcome_email def signup_view(request): user = User.objects.create(...) send_welcome_email.delay(user.id, user.email) return Response({'ok': True}) # .delay() 是 .apply_async() 的简写 # 想要更细控制: send_welcome_email.apply_async( args=(user.id, user.email), countdown=60, # 60 秒后再执行 queue='email', # 路由到指定队列 expires=3600, # 1 小时后没执行就放弃 ) ``` ## 启动 worker ```bash uv run celery -A myapp.celery_app worker -l info -c 4 # -c 4: 并发数;CPU 密集任务建议 = 物理核心数;IO 密集可以更高 ``` 多队列分开: ```bash # 一个 worker 只跑邮件队列 uv run celery -A myapp.celery_app worker -l info -Q email -c 2 # 另一个 worker 跑默认 + 计算队列 uv run celery -A myapp.celery_app worker -l info -Q celery,heavy -c 8 ``` ## 周期任务(Celery Beat) ```python # celery_app.py app.conf.beat_schedule = { 'cleanup-every-hour': { 'task': 'myapp.tasks.cleanup_temp_files', 'schedule': 3600.0, # 秒;或 crontab(hour=3, minute=0) }, 'send-digest-monday-9am': { 'task': 'myapp.tasks.send_weekly_digest', 'schedule': crontab(hour=9, minute=0, day_of_week='mon'), }, } ``` 启动 beat(单独进程,单实例): ```bash uv run celery -A myapp.celery_app beat -l info ``` beat 是单实例的。多机部署只起一个 beat 进程,否则任务被触发多次。 用 `redbeat` 把 schedule 存 Redis 解决高可用。 ## 监控:Flower ```bash uv add flower uv run celery -A myapp.celery_app flower --port=5555 # 浏览器打开 http://localhost:5555 ``` 可以看到所有 worker、队列长度、任务历史、失败重试。 ## 生产部署清单 1. broker / backend 用 **不同 Redis DB**(broker 流量大、result 流量小) 2. 长时间不取的 result 自动过期:`result_expires=3600` 3. worker 用 systemd unit 管,开 `Restart=on-failure` 4. 监控队列长度(Prometheus celery exporter):超过阈值就扩 worker 5. 别在 task 里用 Django ORM 然后忘记 `connection.close()`: 长跑 worker 会泄连接 ## systemd 单元 ```ini # /etc/systemd/system/celery-worker.service [Unit] Description=Celery worker After=network.target redis.service [Service] Type=simple User=app Group=app WorkingDirectory=/srv/myapp EnvironmentFile=/srv/myapp/.env ExecStart=/srv/myapp/.venv/bin/celery -A myapp.celery_app worker -l info -c 4 Restart=on-failure RestartSec=5s KillMode=mixed [Install] WantedBy=multi-user.target ``` ## 踩过的坑 - task 改了签名(加 / 删参数)后还有老格式的任务在队列里,新 worker 处理时 会爆 TypeError。新增字段加默认值,删字段做兼容 wrapper。 - `task_serializer='pickle'` —— 不要!pickle 允许执行任意代码,broker 一被攻破 全完。一律 JSON。 - worker 跑 Django ORM 任务时,每个任务后忘记 `transaction.commit()`, 数据库连接长期持有,下一个任务可能看到老数据。Django 4+ 默认 ATOMIC_REQUESTS + 显式 `connection.close()` 能解决。 - Result backend 用 Redis 时,每个 task 结果默认占一个 key 直到 expire。 几十万任务的 backlog 能把 Redis 撑爆。要么 `task_ignore_result=True`, 要么 `result_expires` 设短。

Cloudflare Workers:边缘运行 JS / Wasm 的实际玩法

## 起因 某些场景"在边缘节点跑代码" 比"回源" 更优: - 全球用户都需要的轻请求(API rate limit / auth check / A/B redirect) - 静态站定制(个性化 header / cookie 处理) - 流量塑形(特定 user 转到 staging) - 短 latency API(< 50ms 全球) `Cloudflare Workers`:V8 isolate 跑在 250+ 城市边缘节点。 冷启动 < 5ms(不是 Lambda 那种),按请求计费便宜。 ## hello world ```js // worker.js export default { async fetch(request, env, ctx) { return new Response('Hello from the edge!'); }, }; ``` 部署: ```bash npm install -g wrangler wrangler init my-worker cd my-worker wrangler deploy ``` 5 分钟得到 `my-worker.user.workers.dev` URL,全球 250+ 城市同时跑。 ## 完整示例:geo redirect ```js export default { async fetch(request) { const country = request.cf.country; // CF 自动加 country header const url = new URL(request.url); if (country === 'CN' && url.hostname === 'example.com') { return Response.redirect('https://cn.example.com' + url.pathname, 302); } // 其它转到 origin return fetch(request); }, }; ``` 中国用户自动重定向到 cn 子域,其它人正常回源。 ## A/B 测试 ```js export default { async fetch(request) { const cookie = request.headers.get('cookie') || ''; let variant = parseCookie(cookie, 'variant'); if (!variant) { variant = Math.random() < 0.5 ? 'A' : 'B'; } const response = await fetch( variant === 'B' ? request.url.replace('example.com', 'beta.example.com') : request, ); const newResponse = new Response(response.body, response); newResponse.headers.set('set-cookie', `variant=${variant}; Path=/; Max-Age=86400`); return newResponse; }, }; ``` 50/50 分流,cookie 粘性。无需改后端。 ## 鉴权 / API gateway ```js export default { async fetch(request, env) { const auth = request.headers.get('authorization'); if (!auth || !auth.startsWith('Bearer ')) { return new Response('Unauthorized', { status: 401 }); } const token = auth.slice(7); // 验 JWT(用 Web Crypto API) const valid = await verifyJWT(token, env.JWT_PUBLIC_KEY); if (!valid) return new Response('Invalid token', { status: 401 }); // 加 user info 转到 origin const newRequest = new Request(request); newRequest.headers.set('x-user-id', valid.sub); return fetch(newRequest); }, }; ``` 边缘 JWT 验证 → 无效请求不打到 origin → 省 origin 带宽 / CPU。 ## KV / D1 / R2 存储 Workers 配套存储: - **KV**:edge K/V,最终一致,读快写慢 - **D1**:SQLite at edge(每个 region 副本) - **R2**:S3 兼容对象存储,无 egress 费 ```js // 读 KV const value = await env.MY_KV.get('user:42'); // D1 query const result = await env.MY_DB.prepare( 'SELECT * FROM users WHERE id = ?').bind(42).first(); // R2 上传 await env.MY_BUCKET.put('file.bin', request.body); ``` Workers + KV / D1 / R2 全栈在边缘 → 静态站 + API + 数据全套。 ## Durable Objects 需要强一致 + stateful → DO: ```js export class Counter { constructor(state, env) { this.state = state; } async fetch(request) { let count = (await this.state.storage.get('count')) || 0; count++; await this.state.storage.put('count', count); return new Response(count.toString()); } } // Worker export default { async fetch(request, env) { const id = env.COUNTER.idFromName('global'); const obj = env.COUNTER.get(id); return obj.fetch(request); }, }; ``` DO 是全球唯一 instance(按 name),适合:counter / chat room / collaborative state。WebSocket 跨用户的协作 app 杀手锏。 ## limits - 单请求 CPU: 50ms (free) / 30s (paid) - memory: 128 MB - subrequest: 50 (free) / 1000 (paid) - script size: 1 MB compressed (10 MB paid) 不像 Lambda 可以重计算几分钟。 Workers 是"轻量边缘 hook",不是通用 compute。 ## 价格 - free tier: 100k req/day - $5/月: 10M req - $5 per additional 1M req KV / R2 / D1 各自计费但都便宜。 对比 Lambda(cold start + GB-second + egress):高 QPS edge 场景 Workers 便宜 5-10x。 ## 本地 dev ```bash wrangler dev # 本地起 worker(用 V8 模拟) wrangler dev --remote # 直接在 CF 边缘 dev ``` `wrangler.toml`: ```toml name = "my-worker" main = "src/index.js" compatibility_date = "2024-05-25" [vars] ENVIRONMENT = "production" [[kv_namespaces]] binding = "MY_KV" id = "..." [[d1_databases]] binding = "MY_DB" database_name = "myapp" database_id = "..." ``` ## TypeScript / Hono ```ts // 用 Hono framework import { Hono } from 'hono'; const app = new Hono(); app.get('/api/users/:id', async (c) => { const id = c.req.param('id'); const user = await c.env.MY_DB.prepare( 'SELECT * FROM users WHERE id = ?').bind(id).first(); return c.json(user); }); export default app; ``` Hono 是 Workers 上的 Express 替代,类型 + 路由友好。 ## 跟 Lambda@Edge 对比 | | CF Workers | AWS Lambda@Edge | |---|---|---| | 启动 | < 5ms (V8 isolate) | 100-500ms (cold) | | 语言 | JS / Wasm / Python | Node / Python | | 限制 | 50ms CPU | 5s | | 存储 | KV / D1 / R2 / DO | 需调外部 | | 价格 | $0.50 / M req | $0.60 / M req(更贵) | | Region | 250+ | CF 100+ | Workers 现在边缘 compute 几乎事实标准。 AWS 也在推 Lambda@Edge 但慢。 ## 真实 case:API rate limit 我们 API 想全球分布式 rate limit(不只是单 region)。 ```js export default { async fetch(request, env) { const ip = request.headers.get('cf-connecting-ip'); const key = `rl:${ip}`; const count = (await env.RL_KV.get(key)) || 0; if (count >= 100) { return new Response('Too Many Requests', { status: 429 }); } await env.RL_KV.put(key, (parseInt(count) + 1).toString(), { expirationTtl: 60 }); return fetch(request); }, }; ``` 每 IP 每分钟 100 req,跨 CF region 累计(KV 最终一致,5-10s 同步 → 某 region 用户可能 over limit 一点,可接受)。 强一致 → 用 DO + sliding window。 ## 不适合的场景 - 长跑 CPU(视频转码 / ML inference):用 Workers AI / cloud function - 大文件处理(> 100 MB body):直接 R2 - 复杂 stateful 流(DB transaction 多):传统 API server ## 踩过的坑 1. **request body 不能多次读**:`request.body` 是 stream,读一次。 需多用:`const body = await request.text()` 先 buffer。 2. **fetch subrequest 计数**:每 `fetch()` 算 1 subrequest,超 50 (free)报错。复杂 worker 谨慎。 3. **KV 写 eventual consistency**:刚写完读可能旧值。读 critical 用 D1 或 DO。 4. **environment binding 没配**:本地 dev 跑通,部署后 `env.MY_KV` undefined → 看 wrangler.toml 是否 deploy。 5. **package size 1MB**:依赖大(如 lodash 全部 import)→ size 超。 tree-shake + 小依赖(zod / arktype 替代)。

日志收集后端:Loki vs Elasticsearch(成本 vs 功能)

## 起因 应用日志几个 GB / day 起,需要: - 集中查询(多机器日志一个地方搜) - 告警(错误率 spike) - 长期保留(compliance / debug 历史) 主流方案: - **ELK / OpenSearch**:Elasticsearch + Logstash + Kibana - **Loki + Grafana**:Grafana Labs 出,"日志的 Prometheus" - **Datadog / NewRelic**:SaaS 全套,贵但省事 Loki 跟 ES 的核心差异:**Loki 只索引 metadata,不索引日志正文** → 存储成本 10-100x 低,但全文搜索弱。 ## ES 方式 ``` [app] → filebeat → [Logstash] → [Elasticsearch (集群)] → [Kibana] ``` 每条日志 parse + 索引每个 token: ``` ts: "2025-03-14T10:23:00Z" level: "ERROR" service: "api" msg: "Failed to connect to db at host pg-1, error: timeout" trace_id: "abc123" ``` 索引后: - 任意 token 模糊搜索 < 100ms - 字段聚合 / 统计快 - Kibana 仪表盘 / 告警丰富 代价: - 存储 5-10x 原日志 size(倒排索引膨胀) - ES 集群 RAM 重(每 node 8-32 GB) - 索引 CPU 高 - 1 TB/day 日志 → 10-100 TB 索引存储 ## Loki 方式 ``` [app] → Promtail / Fluent Bit → [Loki] → [Grafana] ``` 日志按 stream 存(compressed chunk),只索引 stream label: ``` labels: {service="api", level="ERROR", instance="api-1"} log line: "2025-03-14T10:23:00Z Failed to connect..." ``` 查: ```logql {service="api", level="ERROR"} |= "timeout" ``` - 先按 label 过滤 → 找到匹配的 chunk - 全文 grep chunk 内日志 结果: - 存储 1-2x 原 size(gzip 压缩,无倒排) - 单实例几 GB RAM 撑大量数据 - 全文搜限定 label 范围内 grep(不是全局倒排) ## 实测对比 我们一个项目 100 GB/day 日志: | | ELK | Loki | |---|---|---| | 存储 (30 day retention) | 30 TB | 3 TB | | 节点数 | 5 × 16GB ES + 3 logstash | 2 × 8GB Loki | | 月成本 (AWS) | ~$3500 | ~$400 | | 简单 query | < 100ms | 1-3s | | 复杂搜索 | 强 | 中(label-bound) | | 仪表盘 | Kibana 强 | Grafana 强 | | 告警 | watcher / ElastAlert | Loki alert rule | Loki **十分之一存储成本** + **十分之一计算成本**。 搜索体验稍弱但够用。 ## label 设计是关键 Loki 的 label 不能 high-cardinality(如 user_id / trace_id 不行)。 原则: - low cardinality(< 100k unique values):service / instance / level / env - 把 high cardinality 数据放 log line 里(grep 找) ``` ✅ {service="api", level="ERROR"} # 几十个 service × 4 level ❌ {user_id="12345"} # 百万 user → label 爆炸 ``` high cardinality label → Loki 索引膨胀 → 慢 + 内存 OOM。 ## LogQL 类 PromQL: ```logql # 简单 {service="api"} |= "error" # 多条件 {service=~"api|worker", level="ERROR"} |~ "timeout|connection refused" # rate (metric from logs) rate({service="api", level="ERROR"}[5m]) # parse json + filter {service="api"} | json | status >= 500 ``` `|=` 包含, `|~` regex, `!=` / `!~` 排除。 ## 部署 ### Loki ```yaml # docker-compose.yml services: loki: image: grafana/loki:3.0.0 ports: - 3100:3100 volumes: - ./loki-config.yml:/etc/loki/local-config.yaml - loki-data:/loki promtail: image: grafana/promtail:3.0.0 volumes: - /var/log:/var/log - ./promtail-config.yml:/etc/promtail/config.yml ``` ```yaml # promtail-config.yml clients: - url: http://loki:3100/loki/api/v1/push scrape_configs: - job_name: docker docker_sd_configs: - host: unix:///var/run/docker.sock relabel_configs: - source_labels: ['__meta_docker_container_name'] target_label: 'container' ``` ### Grafana 连 Loki Grafana 加 Loki data source `http://loki:3100`。Explore 直接查。 ## chunked storage Loki 支持 S3 / GCS / 本地: ```yaml storage_config: aws: s3: s3://my-bucket/loki region: us-east-1 ``` 老数据存 S3(几 $0.02/GB/月)→ 长期保留极便宜。 查询时 Loki 拉对应 chunk 解压扫。 ## alert rule ```yaml groups: - name: api-alerts rules: - alert: HighErrorRate expr: sum(rate({service="api", level="ERROR"}[5m])) > 10 for: 5m labels: severity: warning annotations: summary: "API error rate > 10/s for 5min" ``` Loki ruler 跑 alert,发 alertmanager → PagerDuty / Slack。 ## 与 SaaS 对比 - Datadog logs:1 GB/day 跑 $30/月 起,100 GB/day = $3000+/月 - New Relic:类似 - Self-host Loki:100 GB/day = $400/月 scale 上 SaaS 贵 10x+。中小 scale 时间成本 vs 现金成本 trade-off。 ## 何时仍选 ES - **全文搜索是核心**(不只是 grep):日志做"搜索引擎",ES 倒排是杀手锏 - **复杂聚合 / OLAP-like 分析**:ES aggregation 比 LogQL 强 - **业务已有 Kibana dashboard 多**:迁移成本高 - **多种数据 type 混搜**(log + APM trace + metric 一处) ## 与 Quickwit 对比 Quickwit 是另一选项:log 优化 + S3-native + 全文搜索(用 tantivy / Rust)。 比 ES 便宜 + 比 Loki 搜索强。 | | ES | Loki | Quickwit | |---|---|---|---| | 索引 | full | label only | full + S3-native | | 存储成本 | 高 | 极低 | 低 | | 全文搜 | 极强 | 弱 | 强 | | 部署 | 复杂 | 简单 | 中 | 新项目 Quickwit 在评估,但 Loki 仍是 mainstream 简单选择。 ## 我的选择 - **新项目,云原生** → Loki(成本 + Grafana 一体化) - **既有 ES 团队** → 保留 ES - **极致全文搜需求** → ES - **超大 scale + 成本敏感** → Quickwit ## 踩过的坑 1. **label cardinality 爆**:把 trace_id 放 label → 几百万 stream → Loki OOM。`max_streams_per_user` 限制。 2. **chunk size 配置**:默认 1.5 MB chunk,业务量大改 5 MB 减 IO。 3. **regex 慢**:`|~ "complex.*regex"` 全 chunk 扫。先 label filter 再 `|=` 关键词 prefilter,最后 regex。 4. **retention 策略**:默认 chunks 不删。配 `compactor` + retention policy。 5. **promtail 漏 log**:log file rotate 时 inode 变 → promtail 没 follow。`stat_config` 调 polling。

写一个真正有用的 /healthz 和 /readyz(不是返回 200 那么简单)

K8s / 反代 / 监控系统都会查应用的健康状态。很多人把它们写成 `return {"ok": true}` 然后觉得搞定了——这种 endpoint 没区分**进程活着** 和**真的能服务请求**,到时候监控告警和实际故障对不上。 正确做法是分两个端点: - `/healthz` (liveness):**进程是否活着**。失败 → 重启容器 - `/readyz` (readiness):**能否接收新请求**。失败 → 从 LB 后端摘掉但不重启 ## liveness:尽量薄 ```python @app.get('/healthz') def liveness(): return {'status': 'alive'} ``` 就这么薄。原则:**不能查任何外部依赖**。因为: - DB 暂时不通 → 不应该重启 Web 进程 - Redis 慢 → 重启不能解决 - liveness 失败的语义是"进程已经损坏,没法自愈",只有 OOM / 死循环 / panic 这种才该 fail 加点点缀(确认 process 没死锁): ```python import time @app.get('/healthz') def liveness(): # 检查事件循环 / 主线程没卡住 return {'status': 'alive', 'ts': time.time()} ``` ## readiness:检查所有 hard dependency ```python import asyncio from sqlalchemy import text @app.get('/readyz') async def readiness(): checks = {} overall_ok = True # DB try: async with db_session() as s: await asyncio.wait_for( s.execute(text('SELECT 1')), timeout=2.0) checks['db'] = 'ok' except Exception as e: checks['db'] = f'fail: {e!r}' overall_ok = False # Redis try: await asyncio.wait_for(redis.ping(), timeout=1.0) checks['redis'] = 'ok' except Exception as e: checks['redis'] = f'fail: {e!r}' overall_ok = False # 关键外部 API(可选)—— 通常 readiness 不查第三方 API, # 因为他们挂了你也没办法 # checks['stripe'] = ... status = 200 if overall_ok else 503 return JSONResponse( status_code=status, content={'ok': overall_ok, 'checks': checks}, ) ``` 注意: - **wait_for + 超时**:依赖卡死时 readiness 自己别卡死 - **失败返回 503**,K8s 才会把这个 pod 从 service endpoints 里摘掉 - **同时返回详情**:人工排查时一眼看见哪个依赖挂了 ## startup probe(K8s 1.16+) 应用启动慢的(如加载大模型),需要第三种 probe:startup。 启动期间 readiness 还没就绪也别立刻杀,给它时间: ```yaml # K8s 部署 YAML 示例 livenessProbe: httpGet: { path: /healthz, port: 8000 } periodSeconds: 10 failureThreshold: 3 readinessProbe: httpGet: { path: /readyz, port: 8000 } periodSeconds: 5 failureThreshold: 2 startupProbe: httpGet: { path: /readyz, port: 8000 } periodSeconds: 5 failureThreshold: 60 # 给 60 * 5 = 300 秒启动时间 ``` `startupProbe` 没通过前 liveness / readiness 都不算。通过后切到正常 probe。 ## 不要写成的反模式 ```python # 错: 把 health 和 ready 写一起 @app.get('/health') def health(): db_ok = db.check() return {'ok': db_ok} # 问题:DB 抖一下整个进程被重启 → 雪崩 ``` ```python # 错: liveness 检查外部依赖 @app.get('/healthz') def liveness(): requests.get('https://api.example.com/ping', timeout=5) return {'ok': True} # 问题:第三方 API 慢 → liveness 慢 → K8s 觉得进程挂了 → 反复重启 ``` ```python # 错: 不区分 ok / 503 @app.get('/readyz') def ready(): return {'db': 'fail'} # status=200! LB 仍认为这个 instance 健康 ``` ## 给 readiness 加"我自己降级中"标志 有时候你想主动让某 pod 不接新请求(比如准备 deploy / drain): ```python ready_flag = True @app.get('/readyz') def readiness(): if not ready_flag: return JSONResponse(503, {'ok': False, 'reason': 'draining'}) return ... @app.post('/admin/drain') def drain(): global ready_flag ready_flag = False return {'ok': True, 'state': 'draining'} ``` 收到 SIGTERM 时先把 `ready_flag=False`、等 LB 摘掉、再退出: ```python import signal, asyncio async def graceful_shutdown(): global ready_flag ready_flag = False await asyncio.sleep(15) # 等 LB 注意到 # 然后退出 sys.exit(0) signal.signal(signal.SIGTERM, lambda *_: asyncio.create_task(graceful_shutdown())) ``` ## Metric 一起暴露 ```python ready_counter = Counter('readyz_total', 'readyz checks', ['result']) @app.get('/readyz') def readiness(): result = 'ok' if all_ok else 'fail' ready_counter.labels(result=result).inc() ... ``` Prometheus 上能看 readiness 通过率随时间变化。 ## 踩过的坑 - 用 `requests` 同步查依赖 → 阻塞事件循环 → readiness 用了几秒, 健康的 pod 也被错杀。所有依赖检查必须超时 + async。 - 检查 DB 用 `SELECT 1` 是基本健康但不能验证可写。如果你的服务必须能写, 检查 `SELECT 1` 同时 `INSERT ... ON CONFLICT DO NOTHING` 一条特殊行。 - 把 `/healthz` 和 `/readyz` 暴露在公网:让攻击者用慢请求 DoS 你的检查 端点。挂内网,或者加简单 IP 白名单。 - K8s 没配 `terminationGracePeriodSeconds` → SIGTERM 后 30 秒就 SIGKILL, graceful shutdown 没时间完成。把这个值调到至少 60 秒。

OpenTelemetry 追踪一个请求经过的所有微服务(含 propagation)

## 起因 微服务架构里一个用户请求穿过:API gateway → auth service → product service → inventory service → recommendation service → DB / cache 各种。 某个请求慢了或失败,看不出是哪一跳。 日志里有 request id 但要手工去多个服务的 log 里搜,痛苦。 OpenTelemetry (OTel) 是 CNCF 的分布式追踪标准,让一个请求在所有服务里的 执行被串成"瀑布图",秒级定位慢 / 错的环节。 ## 解决方案 ### 1. 整体架构 ``` service A → service B → service C | | | +-----------+-----------+ ↓ OTel Collector ↓ Jaeger / Tempo ↓ Grafana UI ``` 每个 service 用 OTel SDK 生成 span(一段工作)。span 通过 HTTP header 跨服务传递(context propagation)。所有 span 发到 Collector, 后端(Jaeger / Tempo / DataDog)存储 + 显示。 ### 2. Python 服务集成(FastAPI) ```bash uv add opentelemetry-distro opentelemetry-exporter-otlp \ opentelemetry-instrumentation-fastapi \ opentelemetry-instrumentation-requests \ opentelemetry-instrumentation-psycopg ``` 启动时打开: ```bash OTEL_SERVICE_NAME=my-api \ OTEL_TRACES_EXPORTER=otlp \ OTEL_EXPORTER_OTLP_ENDPOINT=http://collector:4317 \ opentelemetry-instrument uvicorn app.main:app ``` `opentelemetry-instrument` 自动 patch FastAPI / requests / psycopg / SQLAlchemy / Redis / Kafka / 几十种库。零代码改动。 每个 HTTP 请求自动开 trace span,DB query / outgoing HTTP 自动是 child span。 ### 3. 手动加 span(业务关键路径) ```python from opentelemetry import trace tracer = trace.get_tracer(__name__) def process_order(order_id: str): with tracer.start_as_current_span('process_order') as span: span.set_attribute('order.id', order_id) with tracer.start_as_current_span('validate'): validate(order_id) with tracer.start_as_current_span('charge_payment'): charge(order_id) with tracer.start_as_current_span('ship'): ship(order_id) ``` UI 里看到 process_order 总耗时 350ms,其中 validate 50ms / charge 280ms / ship 20ms。一眼定位 charge 是瓶颈。 ### 4. Go 服务集成 ```go import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) func setupTracing(ctx context.Context) (*sdktrace.TracerProvider, error) { exp, err := otlptracegrpc.New(ctx, otlptracegrpc.WithEndpoint("collector:4317"), otlptracegrpc.WithInsecure(), ) if err != nil { return nil, err } res := resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceName("my-go-svc"), ) tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(exp), sdktrace.WithResource(res), ) otel.SetTracerProvider(tp) return tp, nil } // HTTP server 加 middleware mux := http.NewServeMux() mux.Handle("/api/", otelhttp.NewHandler(yourHandler, "api")) // 业务里 tracer := otel.Tracer("my-go-svc") ctx, span := tracer.Start(ctx, "fetch_user") defer span.End() span.SetAttributes(attribute.String("user.id", uid)) ``` ### 5. 跨服务 propagation 服务间 HTTP 调用时自动透传 trace context(W3C `traceparent` header): ``` GET /products HTTP/1.1 Host: products-svc traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 ``` 接收方解析 → 在同 trace 下创建 child span → 追踪连续。 Python / Go / Java / Node SDK 都自动处理。 ### 6. Collector 部署 `otel-collector-config.yaml`: ```yaml receivers: otlp: protocols: grpc: { endpoint: 0.0.0.0:4317 } http: { endpoint: 0.0.0.0:4318 } processors: batch: timeout: 5s memory_limiter: limit_mib: 1024 exporters: otlp/jaeger: endpoint: jaeger:4317 tls: { insecure: true } logging: { loglevel: warn } service: pipelines: traces: receivers: [otlp] processors: [memory_limiter, batch] exporters: [otlp/jaeger, logging] ``` ```bash docker run -p 4317:4317 -p 4318:4318 \ -v $(pwd)/otel-collector-config.yaml:/etc/otelcol/config.yaml \ otel/opentelemetry-collector-contrib ``` ### 7. 后端:Jaeger / Tempo ```bash # Jaeger(all-in-one,开发用) docker run -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one:latest # 浏览器:http://localhost:16686 # 选 service → 查 traces → 点开看瀑布图 ``` 生产用 Tempo(与 Grafana / Loki 同生态)+ S3 后端。 ### 8. 在 trace 里看到日志(trace + log correlation) ```python import logging from opentelemetry.instrumentation.logging import LoggingInstrumentor LoggingInstrumentor().instrument(set_logging_format=True) logging.info('processing order %s', order_id) # log 自动带上 trace_id / span_id ``` ``` 2026-05-24 10:00:01 [INFO] [trace_id=abc... span_id=def...] processing order o-123 ``` Loki 配置识别 trace_id → 在 Grafana 里点 log 直接跳到对应 trace。 跨数据源关联无缝。 ### 9. 采样 100% 采样在高 QPS 时数据爆炸。生产建议: ```yaml processors: probabilistic_sampler: sampling_percentage: 5 # 5% 采样 ``` 或更智能 tail sampling:保留所有 error trace + 慢 trace + 5% 普通 trace。 ### 10. metrics + logs(统一 OTel) OTel 不止 trace,还有 metric 和 log。一份 SDK 配置三种信号都收。 逐步替代 Prometheus client / 各种 logger,到 OTel 标准化。 ## 效果 我们 5 微服务架构接 OTel 后: - "用户报反应慢" 类 issue 调查时间从 30min → 3min - 发现一个 N+1 query(隐藏在 lib 里)日浪费 500ms × 万次请求 - 知道哪个下游服务最不稳定(看 trace span 错误率) - DBA / SRE 不再需要"装 5 个服务的 log 拼接" ## 与 Sentry / DataDog 等对比 | | OTel + Jaeger/Tempo | DataDog APM | Sentry Performance | New Relic | |---|---|---|---|---| | 开源 / 自托管 | ✅ | ❌ | 部分 | ❌ | | 学习曲线 | 中 | 低 | 低 | 低 | | 价格 | 几乎免费 | 贵 | 中 | 贵 | | 标准化 | ✅ 行业标准 | 私有 | 私有 | 私有 | OTel 让你的代码 vendor-neutral:今天 Jaeger,明天换 DataDog 切 exporter 就行。 ## 踩过的坑 1. **collector 没起 → SDK 重试堆积 RAM**:SDK 默认会缓存 batch 失败 重试。collector 早死 → 应用内存涨。配 `max_export_batch_size` 和 超时 drop。 2. **trace context 跨异步任务丢**:Celery / async task 默认 trace 断开。 要手动用 OTel context inject / extract: ```python ctx = trace.set_span_in_context(current_span) carrier = {} inject(carrier) # carrier 里有 traceparent celery_task.delay(payload, ctx=carrier) ``` 3. **span 太多**:每个 SQL query 自动一个 span,1 个请求几百个 span, 存储成本飙。SQL instrumentation 配 `enable_commenter=False` 或 只 trace 慢 query。 4. **PII 泄漏**:默认 instrument 把 HTTP query string / body 记进 span attribute → trace 里包含密码 / token。配 `OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT` 或自定义 sanitizer。 5. **service 间时钟漂移**:trace 时间戳来自各服务本机时钟。两机器 差 100ms → 瀑布图错位。所有服务配 chrony 同 NTP。

gRPC vs REST:Go 服务间通信怎么选 + grpc-gateway 兼容方案

## 起因 10 个 Go 微服务之间互通: - REST + JSON:易调试 / 浏览器友好 / 工具多 - gRPC:类型严格 / 性能好 / 双向 streaming 之前都用 REST。后来内部 service 间改 gRPC,对外仍 REST, 靠 grpc-gateway 一份 proto 自动生成两套。 ## gRPC 优势(service 间) ### 1. 强类型 ```protobuf // user.proto syntax = "proto3"; service UserService { rpc GetUser(GetUserRequest) returns (User); rpc CreateUser(CreateUserRequest) returns (User); rpc ListUsers(ListUsersRequest) returns (ListUsersResponse); } message User { int64 id = 1; string email = 2; string name = 3; int64 created_at = 4; } message GetUserRequest { int64 id = 1; } ``` `protoc` 生成 Go / Python / TypeScript / Java client + server stub。 **改字段类型 → 客户端编译报错**。 vs REST 改 JSON schema → 客户端运行时炸。 ### 2. 性能 - HTTP/2 + protobuf binary:3-5x 比 JSON 快 + 小 - 长连接复用:避免每请求 TCP / TLS 握手 - 服务端 streaming / 双向 streaming 原生支持 我们一个高 QPS 服务从 REST + JSON 改 gRPC: | | REST | gRPC | |---|---|---| | P50 延迟 | 5ms | 1.5ms | | 单连接 RPS | 500 | 5000 | | CPU 占用 | 35% | 12% | 3-10x 提升。 ### 3. 双向 streaming ```protobuf service ChatService { rpc Chat(stream Message) returns (stream Message); } ``` 客户端 / 服务端都能持续发消息。WebSocket-like 但带类型。 适合:实时通知、聊天、log tail、bidirectional sync。 ## REST 优势(对外 API) - 浏览器直接调用(不需要特殊 client) - curl / Postman / 任意 HTTP tool 调试 - HTTP cache friendly(GET / If-Modified-Since 等) - OpenAPI / Swagger 文档丰富 - 简单 SDK 自动生成(openapi-generator 覆盖语言广) 对外暴露给"未知客户端" → 必须 REST 或 GraphQL。 ## 两者结合:grpc-gateway `grpc-gateway` 让一份 .proto 同时生成 gRPC server + REST proxy: ```protobuf import "google/api/annotations.proto"; service UserService { rpc GetUser(GetUserRequest) returns (User) { option (google.api.http) = { get: "/v1/users/{id}" }; } rpc CreateUser(CreateUserRequest) returns (User) { option (google.api.http) = { post: "/v1/users" body: "*" }; } } ``` `protoc` + `grpc-gateway` 插件生成: - `UserServiceServer` interface(你实现 gRPC server) - `RegisterUserServiceHandlerServer`(注册 REST → gRPC proxy) ```go // main.go func main() { // gRPC server grpcServer := grpc.NewServer() userpb.RegisterUserServiceServer(grpcServer, &userServer{}) // 启 gRPC :9090 lis, _ := net.Listen("tcp", ":9090") go grpcServer.Serve(lis) // REST gateway :8080 ctx := context.Background() mux := runtime.NewServeMux() err := userpb.RegisterUserServiceHandlerServer(ctx, mux, &userServer{}) http.ListenAndServe(":8080", mux) } ``` 客户端两种方式调: ```bash # REST curl http://localhost:8080/v1/users/42 # gRPC grpcurl -plaintext localhost:9090 user.UserService/GetUser -d '{"id": 42}' ``` 服务端逻辑写一遍,两套 API 自动并存。 **对外 REST,对内 gRPC**。 ## connect-rpc:现代替代 `buf.build` 出的 `connect-go`: - 兼容 gRPC 协议 - 同时支持 REST + JSON + gRPC-Web(无需 grpc-gateway) - 浏览器直接调 - 比 grpc-gateway 简洁 ```go type UserService struct{} func (s *UserService) GetUser(ctx context.Context, req *connect.Request[userv1.GetUserRequest]) (*connect.Response[userv1.User], error) { return connect.NewResponse(&userv1.User{Id: req.Msg.Id, Name: "..."}), nil } mux := http.NewServeMux() mux.Handle(userv1connect.NewUserServiceHandler(&UserService{})) http.ListenAndServe(":8080", mux) ``` ```bash curl -X POST http://localhost:8080/user.v1.UserService/GetUser \ -H 'Content-Type: application/json' \ -d '{"id": 42}' # {"id": 42, "name": "..."} ``` JSON over HTTP/1 / HTTP/2 / gRPC 同 endpoint 自动适配。 **2024 后新项目推荐**。 ## gRPC client(Go) ```go import "google.golang.org/grpc" conn, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials())) defer conn.Close() client := userpb.NewUserServiceClient(conn) resp, err := client.GetUser(ctx, &userpb.GetUserRequest{Id: 42}) fmt.Println(resp.Name) ``` 调 client 跟调本地函数一样。 ### 配置 keep-alive ```go conn, _ := grpc.Dial(addr, grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 10 * time.Second, Timeout: 3 * time.Second, PermitWithoutStream: true, }), ) ``` NAT 后面长连接 ping 保活。 ### 连接池 gRPC 单 connection 默认多路复用(HTTP/2 stream)→ 一般不需要 pool。 高 QPS 时 `grpc.WithDefaultServiceConfig('{"loadBalancingPolicy":"round_robin"}')` + DNS 解析多 IP 自动负载均衡。 ## 错误处理 ```go // 服务端 return nil, status.Errorf(codes.NotFound, "user %d not found", id) // 客户端 resp, err := client.GetUser(...) if err != nil { if status.Code(err) == codes.NotFound { // handle 404 equivalent } } ``` gRPC 错误码标准化(NotFound / Unauthenticated / PermissionDenied / etc)。 比 REST 的"HTTP status + 自定义 body" 类型严格。 ## interceptor(middleware) ```go func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { start := time.Now() resp, err := handler(ctx, req) log.Printf("%s took %v err=%v", info.FullMethod, time.Since(start), err) return resp, err } server := grpc.NewServer(grpc.UnaryInterceptor(loggingInterceptor)) ``` 或者用现成 middleware 库: - `github.com/grpc-ecosystem/go-grpc-middleware/v2` - prometheus / opentelemetry / auth / recovery / rate-limit ## 实战 case:微服务架构 我们的架构: ``` [mobile app / web] [外部 partner API] ↓ ↓ └─────── REST + JSON ───┘ ↓ [API Gateway (Go)] ↓ ┌─────────────┼─────────────┐ ↓ ↓ ↓ [User svc] [Order svc] [Payment svc] ↑ ↑ └─── gRPC ────┘ ``` - 对外 REST + OpenAPI 文档 - 内部 service 间 gRPC(性能 + 类型) - gateway 翻译 ## proto 仓库化 Monorepo / 单独 repo 存所有 .proto: ``` proto-repo/ ├── user/v1/user.proto ├── order/v1/order.proto └── buf.yaml ``` `buf` 工具 lint + 兼容性检查: ```bash buf lint buf breaking --against '.git#branch=main' # PR 不能破坏兼容 buf generate # 生成 Go / TS / Python ``` 防止"改 proto 删字段把客户端搞挂"。 ## 何时不用 gRPC - 客户端是浏览器 + 没 backend proxy → 用 REST / GraphQL(gRPC-Web 也是 选项但复杂) - 极简内部 tool → REST 更友好 - 团队不愿学 protobuf → 强推有阻力 ## 踩过的坑 1. **proto 字段 reserved**:删字段要 `reserved 5` 占位防 future wire-incompatible。 2. **enum 默认 0 值**:proto3 必须有 ENUM_UNSPECIFIED = 0;不写 迁移问题大。 3. **timestamp / duration 用 google.protobuf.Timestamp / Duration**: 不要用 int64 自己定义。Well-known types 跨语言兼容。 4. **streaming RPC 错误处理**:mid-stream error 客户端要正确退出。 ctx cancel + 关 stream。 5. **gRPC client 不复用 connection**:每次 NewClient 新 connection → DoS PG / DB。client 在应用启动时建一次 + 全局共享。

Envoy 当 API gateway:替代 nginx 的现代选择

## 起因 微服务架构需要 API gateway 做: - 路由(path → service) - LB - 限流 / 熔断 - TLS termination - 鉴权(OIDC / API key) - observability (metrics / traces) 老办法 nginx + Lua / openresty,扩展靠 module / 第三方 script。 **Envoy**(Lyft 出,CNCF)是云原生 API gateway 标准: - xDS API 动态配置(控制面 push 配置) - 内置 metric / trace - HTTP/2 / HTTP/3 / gRPC 一等公民 - Istio / Consul Service Mesh 底层用它 ## 装 / 跑 ```bash docker run -d -p 10000:10000 \ -v $(pwd)/envoy.yaml:/etc/envoy/envoy.yaml \ envoyproxy/envoy:v1.30.0 ``` ## 最小配置 ```yaml # envoy.yaml static_resources: listeners: - name: listener_0 address: socket_address: { address: 0.0.0.0, port_value: 10000 } filter_chains: - filters: - name: envoy.filters.network.http_connection_manager typed_config: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager stat_prefix: ingress_http route_config: virtual_hosts: - name: backend domains: ["*"] routes: - match: { prefix: "/" } route: { cluster: backend_service } http_filters: - name: envoy.filters.http.router clusters: - name: backend_service type: STRICT_DNS lb_policy: ROUND_ROBIN load_assignment: cluster_name: backend_service endpoints: - lb_endpoints: - endpoint: address: socket_address: { address: backend, port_value: 8080 } ``` 监听 10000,转发到 backend:8080。 ## 路由 ```yaml routes: - match: { prefix: "/api/users" } route: { cluster: user_service } - match: { prefix: "/api/orders" } route: { cluster: order_service } - match: prefix: "/admin" headers: - name: x-internal-tool string_match: { exact: "true" } route: { cluster: admin_service } - match: { prefix: "/" } route: { cluster: web_frontend } ``` prefix / path / regex / header / query 匹配,路由到不同 cluster。 ## LB 策略 ```yaml clusters: - name: api_service lb_policy: LEAST_REQUEST # ROUND_ROBIN / LEAST_REQUEST / RANDOM / RING_HASH load_assignment: endpoints: - lb_endpoints: - endpoint: { address: { socket_address: { address: api-1, port_value: 8080 }}} - endpoint: { address: { socket_address: { address: api-2, port_value: 8080 }}} ``` `LEAST_REQUEST` 比 round-robin 更智能(少请求的 instance 接新请求)。 ## 限流 ```yaml http_filters: - name: envoy.filters.http.local_ratelimit typed_config: "@type": type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit stat_prefix: http_local_rate_limiter token_bucket: max_tokens: 100 tokens_per_fill: 100 fill_interval: 1s filter_enabled: runtime_key: local_rate_limit_enabled default_value: { numerator: 100 } filter_enforced: runtime_key: local_rate_limit_enforced default_value: { numerator: 100 } ``` 每秒 100 req,超出 429。 或者全局限流(多 envoy 共享)→ 外接 rate limit service(如 lyft/ratelimit)。 ## 熔断 ```yaml clusters: - name: backend circuit_breakers: thresholds: - max_connections: 100 - max_pending_requests: 50 - max_requests: 100 - max_retries: 3 ``` backend 慢 / 死 → envoy 主动拒新请求(避免雪崩)。 ## retry ```yaml routes: - match: { prefix: "/" } route: cluster: backend retry_policy: retry_on: "5xx,reset,connect-failure" num_retries: 3 per_try_timeout: 5s ``` 5xx / connection reset 自动重试 3 次。 对 idempotent 请求安全;POST / 写要小心。 ## TLS ```yaml listeners: - address: socket_address: { address: 0.0.0.0, port_value: 443 } filter_chains: - transport_socket: name: envoy.transport_sockets.tls typed_config: common_tls_context: tls_certificates: - certificate_chain: { filename: /etc/certs/cert.pem } private_key: { filename: /etc/certs/key.pem } ``` ALPN / SNI 自动。HTTP/2 自动协商。 HTTP/3 / QUIC 也支持(额外配 `udp_listener_config`)。 ## 鉴权:OAuth2 / JWT ```yaml http_filters: - name: envoy.filters.http.jwt_authn typed_config: "@type": type.googleapis.com/envoy.extensions.filters.http.jwt_authn.v3.JwtAuthentication providers: my_provider: issuer: https://auth.example.com remote_jwks: http_uri: uri: https://auth.example.com/.well-known/jwks.json cluster: jwks_cluster timeout: 5s cache_duration: 600s forward: true rules: - match: { prefix: "/api" } requires: { provider_name: my_provider } ``` envoy 自动验 JWT,无效 401。验过的 claim 转发给 backend。 backend 不必再写 JWT 验逻辑。 ## ext_authz(外部鉴权 service) 复杂鉴权(动态 ACL / per-resource permission)→ envoy 调外部 service: ```yaml http_filters: - name: envoy.filters.http.ext_authz typed_config: grpc_service: envoy_grpc: cluster_name: ext_authz_cluster ``` 每请求 envoy → gRPC 调 authz service → allow/deny。 集中 policy decision,service 不写 authz。 ## observability ```yaml # admin endpoint admin: address: socket_address: { address: 0.0.0.0, port_value: 9901 } ``` `http://localhost:9901/stats` → 几百个 metric。 `http://localhost:9901/clusters` → 当前 cluster 状态。 Prometheus scrape `/stats/prometheus`。 ## tracing ```yaml tracing: http: name: envoy.tracers.opentelemetry typed_config: grpc_service: envoy_grpc: cluster_name: otel_collector service_name: my-gateway ``` 每请求自动产生 span,发 OTEL collector → Jaeger / Tempo。 backend trace 跟 gateway 串联。 ## 动态配置(xDS) 静态 envoy.yaml 改配置要重启。 xDS API 让 envoy 从 control plane(Istio / Consul / 自家)pull 配置: ```yaml dynamic_resources: ads_config: api_type: GRPC grpc_services: - envoy_grpc: cluster_name: xds_cluster ``` router / cluster / listener 全运行时改,无中断。 大规模 / 多 envoy 必备。 ## vs nginx | | nginx | Envoy | |---|---|---| | 配置 | nginx.conf(声明式) | yaml + xDS | | 动态更新 | reload(断老连接) | xDS 真 hot reload | | HTTP/2 / 3 | 支持但 retrofit | first-class | | gRPC | 透传 | 原生(gRPC routing) | | observability | 第三方 | 内置 metric + trace | | 学习曲线 | 中 | 高(yaml 长) | | 资源 | 小 | 中 | 简单场景 nginx 够。微服务 + mesh / 动态路由 / gRPC → Envoy。 ## 与 API gateway product 对比 - **Kong**:基于 nginx + 插件 - **Tyk**:Go 写,open source - **APISIX**:基于 nginx + etcd - **Envoy + Istio**:service mesh 体系 Envoy 是底层 + 通用。Kong/APISIX/Tyk 是产品(envoy 上层封装)。 要 admin UI / 插件市场 → API gateway 产品。 完全自控 → envoy 自配。 ## 真实部署 我们一个 k8s 集群,envoy 当 ingress(替代 nginx-ingress): - 200 微服务 - 10w QPS 峰值 - envoy 2 replica(HPA 到 8) - xDS 控制面 = Istio + 自家 control plane 效果: - 配置变更 1 秒 propagate(vs nginx reload 几秒中断) - gRPC 路由原生(vs nginx 需要插件) - per-service mTLS 自动(mesh 模式) ## 踩过的坑 1. **yaml 配置长**:1000+ 行常见。用 helm / kustomize 模板化。 2. **memory 使用**:默认 envoy 几百 MB;大 route 表几 GB。监控 + adjust。 3. **JWKS cache miss**:JWT auth 频繁拉 JWKS endpoint → IdP rate limit。 `cache_duration: 600s` 加大。 4. **route order**:从上到下匹配第一个。`/` prefix 放最前 → 后面 都不生效。常见错。 5. **TLS cert reload**:file 改了 envoy 不自动 reload。SDS (Secret Discovery Service) 动态推或者用 cert-manager + restart envoy pod。