知识广场

按学科筛选:计算机科学 / 后端开发
清除筛选

«计算机科学 / 后端开发» 分类下共 35 篇帖子

用 uv 起一个最小 Django 5 项目(替代 pip + venv)

`uv` 是 Astral(Ruff 团队)的 Python 项目管理器,Rust 写的, 比 `pip + venv + pip-tools` 快 10-100 倍。2024 之后基本是新项目的默认。 下面 5 分钟起一个 Django 5 项目。 ## 1. 装 uv ```bash curl -LsSf https://astral.sh/uv/install.sh | sh # 或 macOS: brew install uv # 或 Windows: powershell -c "irm https://astral.sh/uv/install.ps1 | iex" uv --version ``` ## 2. 起项目 ```bash uv init myapp --python 3.12 cd myapp ``` `uv init` 生成 `pyproject.toml` + `.python-version` + `hello.py`, 不创建 venv 直到需要。 ## 3. 加依赖 ```bash uv add django gunicorn psycopg[binary] python-dotenv uv add --dev ruff pytest pytest-django ``` `uv add` 会: 1. 解析依赖(极快) 2. 写入 `pyproject.toml` 3. 锁定到 `uv.lock` 4. 装到 `.venv/`(自动创建) `psycopg[binary]` 是 psycopg3 的预编译版(生产建议 `psycopg[c]` 自己编)。 ## 4. Django 项目结构 ```bash uv run django-admin startproject myapp . uv run python manage.py startapp blog ``` `uv run` 等价于"在项目 venv 里跑"。比 `source .venv/bin/activate && python ...` 直接,跨 shell / CI 都一样。 ## 5. 跑 ```bash uv run python manage.py migrate uv run python manage.py createsuperuser uv run python manage.py runserver ``` ## 6. CI / Docker 中使用 `Dockerfile`: ```dockerfile FROM python:3.12-slim # 装 uv 二进制 COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv WORKDIR /app COPY pyproject.toml uv.lock ./ # --frozen 严格按 lock 安装,不解析 RUN uv sync --frozen --no-install-project COPY . . RUN uv sync --frozen EXPOSE 8000 CMD ["uv", "run", "gunicorn", "myapp.wsgi:application", "-b", "0.0.0.0:8000"] ``` GitHub Actions: ```yaml - uses: astral-sh/setup-uv@v3 with: enable-cache: true - run: uv sync --frozen - run: uv run pytest ``` `enable-cache: true` 跨 job 缓存 wheel,CI 飞快。 ## 7. 升级 / 锁定 ```bash uv lock --upgrade-package django # 只升 django uv lock --upgrade # 全部升到最新允许的范围 uv add 'django>=5.1,<6' # 改约束 uv pip compile pyproject.toml > requirements.txt # 兼容 pip 导出 ``` ## 8. 与 pip / poetry 对比 | 操作 | pip / pip-tools | uv | |---|---|---| | 装 100 个依赖 | 30-60s | 1-3s | | 解析 lock | 几秒到几十秒 | < 1s | | 创建 venv | `python -m venv .venv` (慢) | 自动且快 | | 跨平台 lock | 麻烦 | 内置 | | Python 多版本 | pyenv 配合 | `uv python install 3.12` 内置 | ## 踩过的坑 - `uv.lock` 必须进 git。它包含跨平台依赖锁,删了会让 `uv sync --frozen` 失败。 - `uv add` 默认只在主 group;要 dev 依赖加 `--dev`;要 optional group 加 `--optional groupname`。 - `psycopg[binary]` 在 musl-libc(Alpine)镜像上没预编译 wheel,会回退到 源码编译——慢。改用 `python:3.12-slim`(glibc)镜像。 - VSCode 自动识别 `.venv/` —— 但要把 Python 解释器手动选为 `.venv/bin/python`, 否则 import 检查走系统 Python。

FastAPI + Pydantic v2 严格校验请求与响应(含自定义错误格式)

FastAPI 的核心卖点是"用 Python 类型注解定义 API schema,自动校验 + 生成 OpenAPI 文档"。Pydantic v2 是其背后的校验引擎,比 v1 快 5-50 倍。 ## 1. 最小例子 ```python from fastapi import FastAPI from pydantic import BaseModel, EmailStr, Field app = FastAPI() class CreateUser(BaseModel): email: EmailStr nickname: str = Field(min_length=2, max_length=30) age: int = Field(ge=0, le=150) class UserOut(BaseModel): id: int email: EmailStr nickname: str @app.post('/users', response_model=UserOut, status_code=201) def create_user(payload: CreateUser) -> UserOut: # ... 写入 DB ... return UserOut(id=42, email=payload.email, nickname=payload.nickname) ``` 发请求时任何字段不合法都返回 422 + 详细错误。打开 `/docs` 看自动文档。 ## 2. 自定义 validator ```python from pydantic import field_validator class CreateUser(BaseModel): nickname: str @field_validator('nickname') @classmethod def no_whitespace(cls, v: str) -> str: if v != v.strip() or ' ' in v: raise ValueError('昵称不能含首尾或连续空格') return v ``` V2 必须加 `@classmethod`。 ## 3. 计算字段(动态) ```python from pydantic import computed_field class UserOut(BaseModel): nickname: str username: str @computed_field @property def display_name(self) -> str: return f'{self.nickname}@{self.username}' ``` ## 4. 加载 / 序列化别名 ```python class UserIn(BaseModel): email_address: str = Field(alias='email') # 接收的 JSON 里是 "email",Python 属性是 email_address ``` ## 5. 严格模式:拒绝多余字段 V2 默认允许额外字段被忽略。生产里建议严格: ```python class CreateUser(BaseModel): model_config = {'extra': 'forbid'} email: EmailStr # 客户端发 {"email": ..., "foo": ...} 会 422 ``` ## 6. 统一错误格式 FastAPI 默认 422 响应是 Pydantic 原始结构。给前端友好点: ```python from fastapi import Request, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse @app.exception_handler(RequestValidationError) async def validation_handler(request: Request, exc: RequestValidationError): errors = [ { 'field': '.'.join(str(x) for x in e['loc']), 'message': e['msg'], 'type': e['type'], } for e in exc.errors() ] return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={'detail': '请求数据校验失败', 'errors': errors}, ) ``` ## 7. 路径 / 查询 / Body / Header / Cookie ```python from fastapi import Path, Query, Header, Cookie @app.get('/items/{item_id}') def get_item( item_id: int = Path(ge=1), fields: list[str] | None = Query(None, max_length=10), user_agent: str | None = Header(None), session: str | None = Cookie(None), ): ... ``` ## 8. Depends 注入 ```python from fastapi import Depends def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.get('/users/{uid}') def read_user(uid: int, db = Depends(get_db)): return db.query(User).get(uid) ``` `Depends` 是 FastAPI 的 DI 系统;可以套娃(依赖里又用 Depends)。 ## 9. 鉴权依赖 ```python from fastapi import HTTPException, status from fastapi.security import OAuth2PasswordBearer oauth2 = OAuth2PasswordBearer(tokenUrl='token') def current_user(token: str = Depends(oauth2)): user = decode_jwt(token) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, headers={'WWW-Authenticate': 'Bearer'}) return user @app.get('/me') def me(user = Depends(current_user)): return user ``` ## 10. 后台任务(轻量) ```python from fastapi import BackgroundTasks def send_welcome_email(email: str): # ... 同步发邮件 ... ... @app.post('/users') def create_user(payload: CreateUser, tasks: BackgroundTasks): user = save(payload) tasks.add_task(send_welcome_email, user.email) return user ``` 请求立即返回,邮件在响应发出后异步执行。注意:失败没有重试。 要可靠就上 Celery / RQ / Arq。 ## 11. CORS ```python from fastapi.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins=['https://example.com'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) ``` ## 12. 运行 ```bash uv add fastapi 'uvicorn[standard]' uv run uvicorn main:app --reload # 开发 uv run uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000 # 生产 # 生产更稳的是用 gunicorn 启 uvicorn worker: uv run gunicorn -k uvicorn.workers.UvicornWorker -w 4 main:app ``` ## 踩过的坑 - Pydantic v1 / v2 在同一项目混用:v1 model 的 `.dict()`、`.json()` 在 v2 是 `.model_dump()` 和 `.model_dump_json()`。代码改名后老的方法 调用会静默返回不正确的格式。 - 用 dataclass 替代 BaseModel:dataclass 在 FastAPI 路径参数处不会被校验, 只在 response_model 处校验。混着用很容易出问题。 - `response_model` 会**过滤**响应字段(不在 model 里的字段被丢掉), 这是 feature 不是 bug。想全输出就别设 response_model 或用 `dict`。 - `BackgroundTasks` 是同一进程里的协程,长任务会撑住 worker; 超过 30 秒的任务就该上 Celery。

Python typing.Protocol:写库时拥抱 duck typing 又有类型提示

## 起因 写一个数据导出库,接收任何"长得像 file-like 对象的"输入: 内置 `open()` 返回值、`io.BytesIO`、Django 的 `UploadedFile`、 S3 client 返回的 streaming body…… ```python def export(stream): stream.write(b'header') for row in data: stream.write(row.serialize()) ``` 类型怎么标?`stream: BinaryIO`?只 cover 标准库;`stream: object`? 失去类型提示。 `typing.Protocol`(PEP 544)解决:定义"结构子类型"(structural subtyping), 描述"具有 X 方法的任何对象",无需对方继承。 ## 解决方案 ```python from typing import Protocol class WritableBytes(Protocol): def write(self, data: bytes) -> int: ... def flush(self) -> None: ... def export(stream: WritableBytes) -> None: stream.write(b'header') for row in data: stream.write(row.serialize()) stream.flush() ``` 任何"有 write(bytes) → int 和 flush() → None" 方法的类都满足。 不需要 import 我的 Protocol、不需要 inherit、不需要 register。 类型检查器(mypy / pyright)静态确认: ```python import io export(io.BytesIO()) # ✅ export(open('out.bin', 'wb')) # ✅ export("not a file") # ❌ mypy 报错 ``` ## runtime check 需要在运行时判断:"这东西满足 Protocol 吗?" ```python from typing import Protocol, runtime_checkable @runtime_checkable class WritableBytes(Protocol): def write(self, data: bytes) -> int: ... if isinstance(obj, WritableBytes): obj.write(b'x') ``` `@runtime_checkable` 让 `isinstance` 工作。但只检查方法**存在**, 不检查方法签名 —— 比静态检查弱。 ## 实战例子:可插拔的 storage backend ```python from typing import Protocol class Storage(Protocol): def get(self, key: str) -> bytes | None: ... def set(self, key: str, value: bytes, ttl: int | None = None) -> None: ... def delete(self, key: str) -> None: ... class InMemoryStorage: def __init__(self): self._d: dict[str, bytes] = {} def get(self, key): return self._d.get(key) def set(self, key, value, ttl=None): self._d[key] = value def delete(self, key): self._d.pop(key, None) class RedisStorage: def __init__(self, url): self._r = redis.from_url(url) def get(self, key): return self._r.get(key) def set(self, key, value, ttl=None): if ttl: self._r.set(key, value, ex=ttl) else: self._r.set(key, value) def delete(self, key): self._r.delete(key) def setup_cache(storage: Storage) -> Cache: return Cache(storage) setup_cache(InMemoryStorage()) setup_cache(RedisStorage('redis://localhost')) ``` `InMemoryStorage` / `RedisStorage` 都没 inherit `Storage`, 但都 conform 该结构 → mypy 通过。 ## 与 ABC 对比 ```python from abc import ABC, abstractmethod class Storage(ABC): @abstractmethod def get(self, key: str) -> bytes | None: ... @abstractmethod def set(self, key: str, value: bytes, ttl: int | None = None) -> None: ... @abstractmethod def delete(self, key: str) -> None: ... class InMemoryStorage(Storage): # 必须显式继承 ... ``` ABC 强制继承。Protocol 不强制 → 第三方库的类不需要修改就能用。 适用场景: - **Protocol**:写库 / interface 定义,鸭子类型友好 - **ABC**:内部类型层次、要 share 实现(Mixin)、强制 inherit ## generic protocol ```python from typing import Protocol, TypeVar T = TypeVar('T', covariant=True) class Iterable(Protocol[T]): def __iter__(self) -> 'Iterator[T]': ... class Iterator(Protocol[T]): def __next__(self) -> T: ... ``` 实际 `Iterable` 已在 `typing` 模块,举例说明语法。 ## 在标准库里你已经在用 `typing` / `collections.abc` 模块里很多就是 Protocol: ```python from collections.abc import Iterable, Mapping, Hashable, Container from typing import Protocol, SupportsLen, SupportsInt ``` `SupportsLen` 是 `def __len__(self) -> int`。所以 `len(x)` 能用的 都满足。 ## 给现有类"贴" Protocol ```python from third_party import SomeClass class HasFoo(Protocol): def foo(self) -> str: ... # SomeClass 有 foo() 方法但作者没标注 x: HasFoo = SomeClass() # mypy 检查 OK x.foo() ``` 零侵入。 ## 实战 case:测试替身 ```python class Notifier(Protocol): def send(self, msg: str, to: str) -> None: ... def process_order(order: Order, notifier: Notifier) -> None: # ... notifier.send(f'Order {order.id} confirmed', to=order.email) # 生产 process_order(order, EmailNotifier()) # 测试 class FakeNotifier: def __init__(self): self.sent: list[tuple[str, str]] = [] def send(self, msg, to): self.sent.append((msg, to)) def test_confirm(): n = FakeNotifier() process_order(test_order, n) assert ('Order 1 confirmed', '[email protected]') in n.sent ``` `FakeNotifier` 不需要 inherit `Notifier` ABC,纯写实现即可。 ## 与 `__class_getitem__` / `TypeVar` 联用 ```python from typing import Protocol, TypeVar K = TypeVar('K') V = TypeVar('V') class Cache(Protocol[K, V]): def get(self, key: K) -> V | None: ... def set(self, key: K, value: V) -> None: ... class StringIntCache: def get(self, key: str) -> int | None: ... def set(self, key: str, value: int) -> None: ... def use(c: Cache[str, int]): v = c.get('x') # mypy 知道 v: int | None ``` ## 效果 - 库的 API 类型严格但不要求用户继承 - 测试时随便造 fake,不需要 mock 框架 - 重构内部实现时 Protocol 是"接口",业务代码改少 - mypy / pyright 在 IDE 里实时提示,写错立刻知道 ## 踩过的坑 1. **Protocol 不能 instantiate**:`x = WritableBytes()` 报错(没意义, 它是接口)。 2. **runtime_checkable 检查只看方法名**:`isinstance(obj, WritableBytes)` 只确认有 `write` 和 `flush`,不查签名。运行时碰到方法签名不对仍 crash。 3. **Protocol 的方法有 default impl** 让它变成 mixin 又不强制继承: ```python class Repr(Protocol): def __repr__(self) -> str: ... class Mixin(Repr): def __repr__(self): return f'<{type(self).__name__}>' ``` 语义微妙,团队约定清楚。 4. **structural subtyping 太宽容**:所有有 `read()` 方法的都被当 readable,比如自定义类 `class Sensor: def read(self): ...` 不该 是 file-like 但 mypy 不报错。给 Protocol 多放几个方法(read + readable 等)让匹配更严。 5. **Protocol 间不能继承 default impl**:跟 mixin 不同。要复用代码用 ABC + Protocol 组合,或者纯函数化。

Python asyncio 实战要点:gather / TaskGroup / 取消 / 限并发

asyncio 已经是 Python 标准并发模型。但坑也多——很多人以为加 async/await 就能"并行",结果代码顺序跑得跟同步一样。下面讲实际让代码并发的关键。 ## 1. 单纯加 async/await 不会并行 ```python async def fetch_one(url): async with aiohttp.ClientSession() as s: async with s.get(url) as r: return await r.text() async def main(): for url in urls: await fetch_one(url) # 串行!每个等上一个完 ``` `await` 让出控制权,但 for + await 仍然是顺序的。要并发用 `gather` 或 `TaskGroup`。 ## 2. asyncio.gather ```python async def main(): results = await asyncio.gather(*[fetch_one(u) for u in urls]) ``` 所有 fetch 同时进行,gather 等全部完成返回列表。 注意: - 任何一个抛异常默认会取消其它任务并向上抛 - 用 `return_exceptions=True` 让异常也作为结果返回,不取消其它 ```python results = await asyncio.gather(*coros, return_exceptions=True) for r in results: if isinstance(r, Exception): log.warning('one failed: %s', r) else: process(r) ``` ## 3. TaskGroup(Python 3.11+,推荐) ```python async def main(): async with asyncio.TaskGroup() as tg: t1 = tg.create_task(fetch_one(u1)) t2 = tg.create_task(fetch_one(u2)) t3 = tg.create_task(fetch_one(u3)) # 所有 task 自动完成 + 自动 cancel 兄弟 task 当一个失败 print(t1.result(), t2.result()) ``` TaskGroup 是 PEP 654 + 3.11 引入的结构化并发。优点: - 异常传播更清晰(aggregates as ExceptionGroup) - 自动 cancel 兄弟,避免 "task 还在跑" 的孤儿 - 比 gather 更结构化 新代码优先 TaskGroup。 ## 4. 限并发:Semaphore 如果有 10000 个 URL 要抓,全用 gather 同时跑会爆 socket / 被目标限流。 用 Semaphore 控制并发度: ```python sem = asyncio.Semaphore(20) async def bounded_fetch(url): async with sem: return await fetch_one(url) results = await asyncio.gather(*[bounded_fetch(u) for u in urls]) ``` `sem` 上限 20 → 最多 20 个 fetch 同时进行。 更高级的限速(按时间,比如每秒 50 个请求): ```python import aiometer async def fetch_one(url): ... results = await aiometer.run_on_each( fetch_one, urls, max_at_once=20, max_per_second=50, ) ``` ## 5. 取消 + 超时 ```python # 单个任务超时 try: result = await asyncio.wait_for(fetch_one(url), timeout=5.0) except asyncio.TimeoutError: log.warning('timeout') ``` `asyncio.wait_for` 在超时后取消 coroutine(抛 CancelledError)。 任务里要正确处理这个 cancel: ```python async def fetch_one(url): try: async with session.get(url) as r: return await r.text() except asyncio.CancelledError: # 清理资源(如果还在持有) log.info('cancelled') raise # 必须 re-raise,否则任务不算被取消 ``` 不 re-raise 会让 cancel 信号丢失,cleanup 顺序乱。 ## 6. 不要在 async 里跑同步阻塞 ```python async def bad(): time.sleep(5) # 阻塞整个事件循环! result = requests.get('...') # 同步 IO ``` 修复: ```python async def good(): await asyncio.sleep(5) async with aiohttp.ClientSession() as s: async with s.get(...) as r: await r.text() ``` 如果必须用同步库(pandas / boto3 / heavy CPU 计算): ```python # 跑到 thread pool 不阻塞 loop result = await asyncio.to_thread(blocking_function, args) ``` `asyncio.to_thread` 把同步函数包成 awaitable,在 thread pool 里跑。 ## 7. CPU 密集任务用 ProcessPoolExecutor ```python from concurrent.futures import ProcessPoolExecutor executor = ProcessPoolExecutor() loop = asyncio.get_running_loop() result = await loop.run_in_executor(executor, cpu_heavy_function, data) ``` CPU 密集(图片处理 / 加密 / 解析)放进程池,避开 GIL。 ## 8. 信号 / 优雅退出 ```python import signal async def main(): stop = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, stop.set) server_task = asyncio.create_task(run_server()) await stop.wait() log.info('shutting down') server_task.cancel() try: await server_task except asyncio.CancelledError: pass asyncio.run(main()) ``` 收到 Ctrl-C 时优雅 cancel + 等待清理。Windows 不支持 add_signal_handler。 ## 9. 共享状态 asyncio 是单线程协作式并发,访问全局变量**不需要锁**。 但 await 之后状态可能被其它 coroutine 改了: ```python counter = 0 async def bad(): global counter n = counter await asyncio.sleep(0.001) # 让出执行 counter = n + 1 # n 可能已经过时 ``` 需要原子读改写时用 `asyncio.Lock`: ```python lock = asyncio.Lock() async def good(): async with lock: n = counter await ... counter = n + 1 ``` ## 10. 调试 ```python import asyncio asyncio.run(main(), debug=True) # 或 export PYTHONASYNCIODEBUG=1 ``` debug 模式会警告: - coroutine 没 await - 协程跑超过 100ms(可能阻塞了 loop) - 任务 leaks `asyncio.all_tasks()` 看当前所有任务。 ## 11. 常用第三方 - **aiohttp**:HTTP client/server - **httpx**:HTTP,同步 + 异步统一 API(更现代) - **asyncpg**:PostgreSQL,比 aiopg / sqlalchemy-asyncio 快很多 - **redis-py 4+**:内置 async - **anyio**:跨 asyncio / trio 的抽象层 新项目 HTTP 优先用 httpx,老项目 aiohttp 仍然稳定。 ## 踩过的坑 - `coro = some_async_fn()` 但忘 await:coroutine 不会执行 + Python 抛 "coroutine was never awaited" 警告。 - `asyncio.run()` 不能嵌套:在 Jupyter / IPython 里要用 `await main()` 直接(IPython 7+ 自动包 await)。 - 多线程跑 asyncio:每个线程需要自己 `asyncio.new_event_loop()`。 通常不推荐——asyncio 设计是单线程的。 - `aiohttp.ClientSession` 是有 connection pool 的,每个请求建一个新 session 性能差。整个程序生命周期共享一个 session。

Go database/sql 连接池调优:避免"too many connections"和死锁

## 起因 Go 服务上线后偶发"connection pool exhausted" 和 PG "too many connections" 错误。debug 后发现 Go `database/sql` 默认连接池配置 极宽松(无 max),高并发下能瞬间打开几百个 DB 连接。 下面讲怎么正确配 + 排查问题。 ## 默认行为陷阱 ```go db, _ := sql.Open("postgres", dsn) // db 默认 MaxOpenConns = 0(无限制) // MaxIdleConns = 2 // ConnMaxLifetime = 0(永不过期) ``` 并发 1000 请求 + 每个 query 50ms → 瞬间 1000 个 DB 连接 → PG max_connections 100 撞墙 → 报错。 ## 正确配置 ```go db, err := sql.Open("postgres", dsn) if err != nil { ... } db.SetMaxOpenConns(25) // 同时最多 25 个连接 db.SetMaxIdleConns(10) // 池里 idle 保留 10 个 db.SetConnMaxIdleTime(5 * time.Minute) // idle 超 5 分钟关闭 db.SetConnMaxLifetime(30 * time.Minute) // 连接最长存活 30 分钟(轮换) ``` 四个参数: | | 作用 | 推荐 | |---|---|---| | MaxOpenConns | 并发上限 | DB max_connections / 进程数 | | MaxIdleConns | 闲置池上限 | MaxOpenConns 的 1/2 | | ConnMaxIdleTime | 闲置多久关 | 5-10 min | | ConnMaxLifetime | 总寿命 | 30-60 min(防 DB 重启 / NAT timeout) | ## 计算 MaxOpenConns 设 PG `max_connections = 100`,集群跑 4 个 Go 服务进程: ``` 每进程 MaxOpenConns = (100 - 10 reserved) / 4 = 22 ``` 留 10 个 superuser / 监控 / DBA 用。 如果你的应用还会跑 background worker 进程也吃连接,进一步分配。 ## 测试当前配置 ```go import "fmt" import "time" func printPoolStats(db *sql.DB) { for range time.Tick(5 * time.Second) { s := db.Stats() fmt.Printf( "[db] open=%d in_use=%d idle=%d wait_count=%d wait_dur=%s\n", s.OpenConnections, s.InUse, s.Idle, s.WaitCount, s.WaitDuration, ) } } go printPoolStats(db) ``` `WaitCount > 0` 持续增长 = MaxOpenConns 太小(请求在等池)。 `InUse` 接近 MaxOpenConns 时常 = 业务高峰;偶尔正常,持续要扩。 Prometheus exporter: ```go import "github.com/prometheus/client_golang/prometheus" func collectDBMetrics(db *sql.DB) { s := db.Stats() openGauge.Set(float64(s.OpenConnections)) inUseGauge.Set(float64(s.InUse)) waitCountCounter.Add(float64(s.WaitCount)) } ``` ## ctx 超时控制 ```go ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() rows, err := db.QueryContext(ctx, "SELECT ...") ``` `QueryContext` 而不是 `Query` —— query 卡死时 ctx cancel 让 query 立刻断 + 连接归还池。 否则连接被卡住一直占。 ## 长事务陷阱 ```go // ❌ tx, _ := db.BeginTx(ctx, nil) sendEmail(...) // 慢 IO(几秒) tx.Commit() ``` 事务期间一直占一个连接。1000 请求 × 3 秒邮件 = 池被压爆。 ```go // ✅ sendEmail(...) // 先发邮件 tx, _ := db.BeginTx(ctx, nil) defer tx.Rollback() // 短的 DB 操作 tx.Commit() ``` 事务内只做 DB;非 DB 操作放外面。 ## rows 没 Close → 连接泄露 ```go // ❌ panic 时 rows.Close() 没跑 rows, _ := db.Query("...") for rows.Next() { ... } ``` ```go // ✅ defer rows, err := db.Query("...") if err != nil { return err } defer rows.Close() for rows.Next() { ... } return rows.Err() // 别忘 Err 检查 ``` 或者用更高级的 ORM(sqlx / GORM)封装这些细节。 ## prepared statement cache ```go stmt, err := db.PrepareContext(ctx, "SELECT * FROM users WHERE id = $1") // stmt 可以跨多次 query 复用 for _, id := range ids { rows, _ := stmt.QueryContext(ctx, id) // ... } stmt.Close() ``` 但 PG 的 prepared statement 是 per-connection 的。`database/sql` 自动处理(每次 PrepareContext 实际 ad-hoc prepared per connection)。 Postgres 推荐用 prepared statement 提升性能。或者用 `pgx` 替代 (更原生)。 ## pgx vs database/sql ```bash go get github.com/jackc/pgx/v5 ``` ```go // pgx 直接连接池 import "github.com/jackc/pgx/v5/pgxpool" pool, err := pgxpool.New(ctx, dsn) // 配置: config, _ := pgxpool.ParseConfig(dsn) config.MaxConns = 25 config.MinConns = 5 config.MaxConnLifetime = 30 * time.Minute pool, _ = pgxpool.NewWithConfig(ctx, config) // query rows, _ := pool.Query(ctx, "SELECT id FROM users WHERE x = $1", x) ``` pgx 优势: - 直接 PostgreSQL,性能比 database/sql 包 lib/pq 快 30-50% - 原生 prepared statement cache - 支持 PG 特有类型(jsonb / array / range) 如果你只用 PG,直接 pgx 替代。 ## PgBouncer:连接池中间层 如果有 N 个进程,每个开 25 连接 = N × 25 连接到 PG。 PG 单连接 5-10 MB → 100 连接 = 1 GB+ RAM。 PgBouncer 在 PG 前面挡: ``` app1 ─┐ app2 ─┤ ... ─┼→ PgBouncer (在 app 端,pool transaction-level) → PG (少量真连接) appN ─┘ ``` ```bash sudo apt install -y pgbouncer ``` `/etc/pgbouncer/pgbouncer.ini`: ```ini [databases] mydb = host=pg-server dbname=mydb [pgbouncer] listen_addr = 127.0.0.1 listen_port = 6432 auth_type = scram-sha-256 auth_file = /etc/pgbouncer/userlist.txt pool_mode = transaction # 关键:transaction-level pooling max_client_conn = 1000 default_pool_size = 25 reserve_pool_size = 5 server_idle_timeout = 60 ``` 应用端连 `localhost:6432` 替代 PG 5432。 `pool_mode=transaction` 让连接事务级共享,1000 client → ~25 PG 连接。 极适合 short-lived query 场景。 注意:transaction mode 不能用 prepared statement / SET LOCAL(跨事务 状态丢)。pgx 5.0+ 有 PgBouncer 兼容模式。 ## DB 端配置 PostgreSQL `postgresql.conf`: ``` max_connections = 200 # 集群总上限 shared_buffers = 8GB # 通常 RAM 25% effective_cache_size = 24GB # RAM 75% work_mem = 64MB # per query maintenance_work_mem = 1GB ``` `max_connections = 200` × `work_mem = 64MB` = 最差 12.8 GB(每 query 峰值)。可能 OOM。 工业上: - max_connections 100-200 - 应用端 PgBouncer pool 共享 - 单连接 work_mem 16-32MB ## 实战调参流程 1. 启动配 `MaxOpenConns = 25`,pprof 看实际使用 2. `WaitCount` 持续上涨 → 增大池或加 PG 3. PG 端 `pg_stat_activity` 看实际并发: ```sql SELECT count(*) FROM pg_stat_activity WHERE state = 'active'; ``` 4. 加 PgBouncer 减少 PG 物理连接数 ## 踩过的坑 1. **每个请求 `sql.Open` 新连接** → 没池化,几秒内打满。 `sql.Open` 一次性 + db 全局共享 + DI 注入。 2. **死锁 + 长事务**:事务 A 持有 row 锁 + 等连接(池满), 事务 B 持有连接 + 等 A 的 row 锁。 解决:短事务 + ctx timeout。 3. **MaxIdleConns > MaxOpenConns**:参数互相矛盾,实际生效的是 MaxOpenConns。Idle 上限被 cap。 4. **NAT 后面 DB**:长连接经过几小时 NAT 表项过期 → 应用以为还活, 实际 DB 已断 → 报 "broken pipe"。设 ConnMaxLifetime < NAT timeout (通常 30 min 安全)。 5. **使用 `Ping()` 测连接**:高频 Ping 浪费连接 + 是 round-trip 开销。 只在启动时验证一次;运行时用 ConnMaxLifetime + 实际 query 检测。

Litestream:把 SQLite 实时复制到 S3(小项目灾备)

## 起因 个人 / 小项目用 SQLite: - 单文件,零运维 - 性能足够(WAL 模式撑万 QPS 读 + 千 QPS 写) - 备份 = cp 文件 缺点:单服务器挂了 → 服务停 + 上次备份后的数据丢。 `Litestream`(Ben Johnson):实时把 SQLite WAL 流式复制到 S3 / SFTP / 其它。无应用改动 + 几乎实时(秒级 RPO)。 ## 装 ```bash # binary wget https://github.com/benbjohnson/litestream/releases/download/v0.3.13/litestream-v0.3.13-linux-amd64.tar.gz tar -xf litestream-*.tar.gz sudo mv litestream /usr/local/bin/ ``` ## 配 `/etc/litestream.yml`: ```yaml dbs: - path: /srv/myapp/db.sqlite3 replicas: - type: s3 bucket: my-backups path: myapp/db region: us-east-1 access-key-id: ${AWS_ACCESS_KEY_ID} secret-access-key: ${AWS_SECRET_ACCESS_KEY} retention: 720h # 30 day ``` 启动: ```bash litestream replicate -config /etc/litestream.yml ``` ## systemd service ```ini # /etc/systemd/system/litestream.service [Unit] Description=Litestream After=network.target [Service] ExecStart=/usr/local/bin/litestream replicate -config /etc/litestream.yml Restart=always EnvironmentFile=/etc/litestream.env [Install] WantedBy=multi-user.target ``` ```bash sudo systemctl enable --now litestream ``` ## 怎么工作 SQLite WAL 模式下,写操作先到 -wal 文件,定期 checkpoint 到主 db。 Litestream: 1. open snapshot baseline 上传 S3 2. tail WAL → 流式上传增量 (每秒) 3. WAL 自动 checkpoint 时 → 新 snapshot S3 存: ``` myapp/db/ snapshots/ 20250314T100000.db.lz4 20250320T100000.db.lz4 wal/ 20250314T100000_000001.wal.lz4 ... ``` 可以 restore 到任意时间点(snapshot + replay WAL)。 ## restore ```bash # 最新 litestream restore -o /tmp/restored.db -config /etc/litestream.yml /srv/myapp/db.sqlite3 # 时间点 litestream restore -o /tmp/restored.db \ -timestamp '2025-03-14T10:00:00Z' \ -config /etc/litestream.yml \ /srv/myapp/db.sqlite3 ``` 5 分钟级别 RTO(取决于 DB size)。 ## 应用透明 litestream 不改 SQLite 行为。应用照常 read/write `db.sqlite3`。 litestream 是 sidecar 进程,监 WAL。 应用不知有 litestream 存在 → 0 风险。 ## 性能影响 litestream 跟 SQLite 共享 disk IO。 小 DB(< 1 GB)+ 适度写(< 100 wps):几乎无感。 高写 throughput → litestream 上传 bandwidth 跟得上吗? 我们 production: - 1 GB SQLite - 50 wps - litestream + S3:CPU < 1%,network 几 KB/s 平均 - WAL upload latency P99 < 2s ## 不替代 HA litestream 是 **disaster recovery**(备份 + restore),不是 HA。 HA 需要: - 多 server / 多 region 同时活 - failover 自动 SQLite + litestream 是"主备"模式:主挂了,备份机 restore + 起来 = 5 分钟 downtime。 真要 HA → Postgres + replication / managed DB。 ## read replica litestream 0.4+ 实验性 read replica: ```yaml dbs: - path: /srv/myapp/db.sqlite3 replicas: - type: s3 bucket: ... ``` ```bash # 在别 server 上 litestream replicate \ -read-only \ -config replica.yml ``` 从 S3 拉 + apply WAL → 本地 read-only SQLite。 读扩展可行(写仍主单机)。 ## 价格 S3 storage:1 GB DB + 1 month WAL = 几 GB → $0.10/月。 S3 PUT:每秒一次 WAL upload → 86400 PUT/day × $0.005/1k = $0.40/day。 总:几刀/月。比 RDS db.t3.micro($15/月)便宜很多。 ## SFTP / GCS / Azure 不一定 S3: ```yaml replicas: - type: sftp host: backup.example.com user: backup path: /backups/myapp key-path: ~/.ssh/backup_key ``` 或 GCS: ```yaml replicas: - type: gcs bucket: my-bucket path: myapp/db ``` ## 与 PG / MySQL 对比 | | SQLite + Litestream | Postgres | |---|---|---| | 写 QPS | 千级 | 万级 | | 多 reader | 是 | 是 | | 多 writer | 单 | 是 | | HA | 主备(5min downtime) | streaming replica | | 运维 | 极简 | 中 | | 备份 | litestream | pg_dump / WAL-G | 简单 app + 单 server → SQLite + litestream。 多 server / 高 throughput / HA 必须 → Postgres。 ## 真实部署 我个人项目 / 小 SaaS(< 1k DAU): - VPS $5/月(Hetzner) - Django + SQLite + litestream → S3 - nginx reverse proxy - Cloudflare CDN free 总成本 < $10/月。 db.sqlite3 + WAL 上 S3 自动 5 分钟内一份 (snapshot interval)。 服务器爆炸 → 新 VPS + restore + 部署,半小时上线。 SLA 不是 99.99% 但 99% 易达。 ## 与 cron + cp 对比 ```bash cp db.sqlite3 /backup/db-$(date +%F).sqlite3 ``` 简单但: - 间隔大(每日)→ RPO 一天 - 没 PITR - cp 时 WAL 可能不一致 litestream RPO 秒级 + PITR + WAL consistent。 ## 监控 litestream 暴露 prometheus metrics: ```yaml addr: ":9090" ``` - `litestream_replica_position_bytes` - `litestream_replica_last_sync_seconds` 报警:> 60s 没 sync。 ## 踩过的坑 1. **WAL 没启用**:`PRAGMA journal_mode=WAL;` 必须。不是 WAL litestream 不能 tail。 2. **multi-process write 麻烦**:SQLite 多进程写有限制。 只让一个进程写 → litestream tail 那 WAL。 3. **DB 删了**:手动 `rm db.sqlite3` → litestream 看到删,但 S3 上仍 有数据。restore 即可。但小心 `--full-resync` 误操作覆盖 backup。 4. **快速 restore 慢**:大 DB(10 GB+)restore 几分钟(下 snapshot + replay WAL)。RTO 不是 instant。 5. **monitor 缺失**:litestream 进程死了,应用照常跑没人知道 → 备份悄无声息断。systemd Restart=always + prometheus monitor。

Flask + SQLAlchemy 2.0 + Alembic 的最小可用骨架

Flask 的微框架哲学意味着每个项目都要自己组装数据层。下面是一套 经过几个生产项目验证的最小骨架:Flask 3.x + SQLAlchemy 2.0 + Alembic 迁移 + 应用工厂模式。 ## 项目结构 ``` myapp/ ├── pyproject.toml ├── alembic.ini ├── migrations/ │ ├── env.py │ └── versions/ └── src/myapp/ ├── __init__.py # create_app() ├── extensions.py # db, ... ├── models.py ├── routes.py └── config.py ``` ## 1. 依赖 ```bash uv add flask 'sqlalchemy>=2.0' alembic psycopg[binary] python-dotenv ``` ## 2. extensions.py ```python from flask_sqlalchemy import SQLAlchemy from sqlalchemy.orm import DeclarativeBase class Base(DeclarativeBase): pass db = SQLAlchemy(model_class=Base) ``` SQLAlchemy 2.0 推荐用 `DeclarativeBase` 替代 `declarative_base()`, 类型提示更友好。 ## 3. models.py ```python from datetime import datetime from sqlalchemy import String, DateTime, ForeignKey from sqlalchemy.orm import Mapped, mapped_column, relationship from .extensions import db, Base class User(Base): __tablename__ = 'users' id: Mapped[int] = mapped_column(primary_key=True) email: Mapped[str] = mapped_column(String(120), unique=True) nickname: Mapped[str] = mapped_column(String(60)) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) posts: Mapped[list['Post']] = relationship(back_populates='author') class Post(Base): __tablename__ = 'posts' id: Mapped[int] = mapped_column(primary_key=True) title: Mapped[str] = mapped_column(String(200)) body: Mapped[str] author_id: Mapped[int] = mapped_column(ForeignKey('users.id')) author: Mapped[User] = relationship(back_populates='posts') ``` `Mapped[...]` + `mapped_column(...)` 是 SA 2.0 的新风格, 完全类型化,mypy / pyright 直接懂。 ## 4. config.py ```python import os from dotenv import load_dotenv load_dotenv() class Config: SQLALCHEMY_DATABASE_URI = os.environ.get( 'DATABASE_URL', 'postgresql+psycopg://localhost/myapp') SQLALCHEMY_TRACK_MODIFICATIONS = False SECRET_KEY = os.environ['SECRET_KEY'] ``` ## 5. __init__.py(应用工厂) ```python from flask import Flask from .config import Config from .extensions import db def create_app(config_object=Config): app = Flask(__name__) app.config.from_object(config_object) db.init_app(app) from . import routes app.register_blueprint(routes.bp) return app ``` ## 6. routes.py ```python from flask import Blueprint, jsonify, request from sqlalchemy import select from .extensions import db from .models import User, Post bp = Blueprint('main', __name__) @bp.get('/users/<int:uid>/posts') def user_posts(uid): stmt = select(Post).where(Post.author_id == uid) posts = db.session.scalars(stmt).all() return jsonify([{'id': p.id, 'title': p.title} for p in posts]) @bp.post('/users') def create_user(): data = request.get_json() u = User(email=data['email'], nickname=data['nickname']) db.session.add(u) db.session.commit() return jsonify({'id': u.id}), 201 ``` SA 2.0 用 `select()` + `db.session.scalars()`;老的 `Model.query.filter_by(...)` 仍可用但官方建议迁移。 ## 7. Alembic 初始化 ```bash uv run alembic init -t async migrations # 或同步:uv run alembic init migrations ``` 改 `alembic.ini`: ```ini sqlalchemy.url = postgresql+psycopg://localhost/myapp ``` 改 `migrations/env.py`,让它能找到 metadata: ```python from myapp import create_app from myapp.extensions import db, Base # ... target_metadata = Base.metadata app = create_app() with app.app_context(): ... ``` ## 8. 第一次迁移 ```bash uv run alembic revision --autogenerate -m 'init users + posts' # 检查生成的 migrations/versions/*.py uv run alembic upgrade head ``` ## 9. 运行 ```bash uv run flask --app src.myapp run --debug # 或: uv run gunicorn 'src.myapp:create_app()' -b 0.0.0.0:8000 ``` ## 10. 测试 ```python # tests/conftest.py import pytest from src.myapp import create_app from src.myapp.extensions import db class TestConfig: SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:' SECRET_KEY = 'test' @pytest.fixture def app(): app = create_app(TestConfig) with app.app_context(): db.create_all() yield app @pytest.fixture def client(app): return app.test_client() ``` ## 踩过的坑 - Alembic autogenerate 看不到 model:检查 `env.py` 是否真的 import 了 所有 model 模块(光 `import models` 子模块不自动加载)。我们一般写 `from myapp import models # noqa`。 - 加字段时 default 是 Python 端的,DB 端不会自动加 default。 改 model 后跑 autogenerate 会生成 `op.add_column` 但 nullable 字段 没初值时升级会失败。手动给 server_default 或者分两步迁移。 - SA 2.0 的 lazy loading 默认严格:N+1 查询会触发警告。生产建议 `select(...).options(selectinload(Post.author))` 显式预取。 - Flask-SQLAlchemy 3.x 配 SA 2.0 时,model 必须用 `db.Model` 或自己定义 的 `Base`,不能混用。

Tailscale / WireGuard:给团队建私有 VPN(不开公网)

## 起因 团队 / 个人需要访问内部服务: - 内网 dev server / Postgres / 监控仪表盘 - 多机 ssh 不暴露公网 - 跨地点协作(家 / 办公室 / 出差) 老方案: - 开 22 端口公网 + fail2ban:被扫被打 - OpenVPN:配置复杂、性能一般 - ssh tunnel + jump host:能用但碎 **WireGuard**:现代 VPN 协议,几百行 C 代码,内核态,10x OpenVPN 性能。 **Tailscale**:WireGuard 之上的 mesh VPN,自动 NAT 穿透 + 身份认证。 我家里 / 公司 / 客户机器 30+ 节点全 Tailscale,5 分钟搭好。 ## Tailscale 用法 ```bash # Mac brew install tailscale # Linux curl -fsSL https://tailscale.com/install.sh | sh # 启动 + 登录(浏览器 OAuth) sudo tailscale up # 看节点 tailscale status ``` 每台机器装好登录同账号 → 自动加入私有 mesh。 每节点拿个 `100.x.y.z` 内网 IP(CGNAT range)。 跨机器: ```bash ssh [email protected] # 用 tailscale IP ssh user@my-laptop # 或者用 hostname (MagicDNS) ``` ## MagicDNS ```bash tailscale set --accept-dns # 启用 MagicDNS ``` 每节点名(hostname)自动解析: ```bash ssh my-laptop # 等价 100.64.1.2 curl http://my-server:8080 ``` 不用记 IP。 ## 节点 expose ```bash # 在内网 server 上 tailscale up --advertise-routes=10.0.0.0/24 ``` 让 tailscale network 能访问那台机器背后的整个子网(subnet router)。 家里 NAS / 路由器 / 旧设备一并访问。 ## ACL Tailscale admin 控制台配 ACL(json): ```json { "acls": [ { "action": "accept", "src": ["group:admin"], "dst": ["*:*"] }, { "action": "accept", "src": ["group:dev"], "dst": ["tag:dev-server:*"] }, ], "groups": { "group:admin": ["[email protected]"], "group:dev": ["[email protected]", "[email protected]"], }, "tagOwners": { "tag:dev-server": ["group:admin"], }, } ``` dev 组只能访问 dev-server tag 节点,admin 能访问全部。 基于身份的 zero-trust 网络。 ## SSH ```bash # 启用 Tailscale SSH(替代 OpenSSH) sudo tailscale up --ssh ``` `tailscale ssh user@host` 用 tailscale 身份做 SSH key 替代。 忘记管 SSH key + ACL 一处控制。 ## funnel(公网暴露) ```bash tailscale serve --https=443 localhost:3000 # 内部用 tailscale funnel --https=443 localhost:3000 # 公网暴露(HTTPS 自动) ``` `funnel` 让任意 tailscale URL 公网可达,自动 HTTPS(Let's Encrypt)。 不开 router 端口 + 不要 cloudflare 中转就把本地服务对公网。 ## WireGuard 原生(不用 Tailscale) 更轻量但要自己 NAT 穿透 / key 分发。 ```bash sudo apt install wireguard # 生成 key wg genkey | tee privatekey | wg pubkey > publickey ``` server `wg0.conf`: ```ini [Interface] PrivateKey = <server-priv> Address = 10.0.0.1/24 ListenPort = 51820 [Peer] PublicKey = <client-pub> AllowedIPs = 10.0.0.2/32 ``` client: ```ini [Interface] PrivateKey = <client-priv> Address = 10.0.0.2/24 [Peer] PublicKey = <server-pub> Endpoint = vpn.example.com:51820 AllowedIPs = 10.0.0.0/24 PersistentKeepalive = 25 ``` 启动: ```bash sudo wg-quick up wg0 ``` 简单可控但**没 NAT 穿透**(client 直连 server 端口必须可达)。 mesh 不行(每节点连每节点 → key 矩阵)。 ## Tailscale 是 WireGuard 之上的什么 - key 分发自动化 - DERP 中继服务(双 NAT 时 fallback) - 身份系统(OAuth + ACL) - MagicDNS - admin UI WireGuard 协议是 Tailscale 的 data plane。 ## 性能 iperf3 测试,两台 1 Gbps 机器: - 直连:940 Mbps - WireGuard:920 Mbps (~2% overhead) - Tailscale:900 Mbps (NAT 穿透成功) - Tailscale (DERP 中继):100-300 Mbps NAT 友好时性能几乎无损。复杂 NAT fallback DERP 会慢。 ## Headscale(开源 Tailscale 控制面) Tailscale 客户端开源 + 协议公开,但控制面是 SaaS。 担心 vendor lock-in / 完全自托管 → Headscale: ```bash docker run -d -p 8080:8080 \ -v ./config.yaml:/etc/headscale/config.yaml \ headscale/headscale serve ``` 自己跑控制面,Tailscale 客户端连。 免费 + 完全控制。 ## 与 NetMaker / ZeroTier 对比 - **ZeroTier**:跟 Tailscale 类似 mesh VPN,更老,OG - **NetMaker**:开源,self-host,WireGuard 之上 - **Nebula**(Slack 出):CA 模型,性能极好 我用 Tailscale: - 易用性最好 - 个人免费 tier 100 device - ACL / SSH / funnel 集成 ## 实战 case:跨地点开发 我家、办公室、客户公司、3 个云服务器(DigitalOcean / Hetzner / AWS) 全 Tailscale: - ssh 任意机器:`ssh alice@office-server` - 内网 DB 直连:`psql -h 100.x.y.z` - 客户给我个 server 加入 group → 我能 ssh,3 个月后他 revoke - 出差咖啡店 wifi → 一样工作 - 公网 0 开放(除了 80/443 给 web) 安全 + 方便 + 0 复杂网络配置。 ## 与 Cloudflare Zero Trust 对比 Cloudflare Access (Zero Trust): - 主要给 HTTP 服务(不是 IP-level mesh) - 优势:浏览器原生 + SSO 集成 - 劣势:每协议要单独配 / 不能 ssh Tailscale 是 network layer(all IP traffic)。 Cloudflare Access 是 application layer。 我两个都用: - ssh / 内网服务 → Tailscale - 内部 web app → Cloudflare Access (OIDC) ## 踩过的坑 1. **subnet router 不工作**:`--advertise-routes` 后 admin console 要 approve route。 2. **double NAT 性能慢**:`tailscale netcheck` 看 NAT 类型。CGNAT 背后 fallback DERP,慢。 3. **MagicDNS 冲突**:本机 /etc/hosts 已经有 hostname → 解析错。 `tailscale set --accept-dns=false` 或者删 /etc/hosts。 4. **WireGuard MTU**:跨 VPN 隧道 packet MTU 减小,应用大 packet 慢。 `MTU = 1280` 或者调。 5. **退订机器忘删**:员工离职后 device 还在 admin console。定期清理 + ACL 跟着员工 group 自动。

全文搜索后端:Elasticsearch vs Meilisearch vs Quickwit

## 起因 要给 web app 加"搜索文章" 功能。SQL `LIKE '%word%'` 慢 + 不能模糊。 PG 的 `tsvector` / FTS5 能用但中文分词 + 高级 query 较弱。 专用搜索引擎选哪个? 三个主流选项: - **Elasticsearch**:业界事实标准,功能最全 + 最重 - **Meilisearch**:Rust 写的"现代轻量",开箱即用 - **Quickwit**:Rust 写的"日志搜索专用",对象存储友好 下面对比。 ## Elasticsearch (8.x) 老牌强者。基于 Lucene。 ### 装 ```bash docker run -d -p 9200:9200 \ -e "discovery.type=single-node" \ -e "xpack.security.enabled=false" \ -m 2g \ docker.elastic.co/elasticsearch/elasticsearch:8.15.0 ``` `-m 2g` 必须(ES Java 默认 1 GB heap + JVM overhead,最低 2 GB RAM)。 ### 索引 + 查询 ```python from elasticsearch import Elasticsearch es = Elasticsearch('http://localhost:9200') # 索引文档 es.index(index='articles', id='1', document={ 'title': 'PostgreSQL 全文搜索', 'body': 'tsvector ts_rank ...', 'tags': ['db', 'pg'], 'created_at': '2024-05-24', }) # 搜索 res = es.search(index='articles', query={ 'bool': { 'must': [ {'match': {'body': 'tsvector 全文'}}, ], 'filter': [ {'term': {'tags': 'pg'}}, {'range': {'created_at': {'gte': '2024-01-01'}}}, ], }, }) for hit in res['hits']['hits']: print(hit['_score'], hit['_source']['title']) ``` ### 优势 - 极完整:聚合 / 复杂 query / geo / vector - 大规模成熟(PB 级集群) - 生态广(Logstash / Kibana / Beats) - 中文分词通过 IK plugin ### 劣势 - 资源吃货(最少 2 GB RAM;生产 8-32 GB / node) - 集群运维复杂 - API 庞大学习曲线陡 - Elastic 公司协议 2021 改为 SSPL(云厂商不爽 fork AWS OpenSearch) 适合:日志 + 复杂搜索 + 已有 ES 经验 / 团队。 ## Meilisearch (1.x) 2018 年起的新晋。Rust 写。专为"产品内搜索"优化。 ### 装 ```bash docker run -d -p 7700:7700 \ -e MEILI_MASTER_KEY=your-key \ -v meili-data:/meili_data \ getmeili/meilisearch:v1.10 ``` 200 MB 镜像,启动几秒。 ### 索引 + 查询 ```python import meilisearch client = meilisearch.Client('http://localhost:7700', 'your-key') index = client.index('articles') # 索引 index.add_documents([ {'id': 1, 'title': 'PostgreSQL 全文搜索', 'body': '...', 'tags': ['db']}, {'id': 2, 'title': 'Elasticsearch 入门', 'body': '...', 'tags': ['search']}, ]) # 默认配置:所有字段都搜 results = index.search('全文搜索') # { # "hits": [{"id": 1, "title": "...", ...}], # "processingTimeMs": 3 # } # 配 filter / sort(需要先标记 filterable / sortable) index.update_filterable_attributes(['tags']) index.update_sortable_attributes(['created_at']) results = index.search('搜索', { 'filter': 'tags = db', 'sort': ['created_at:desc'], 'limit': 20, }) ``` ### 优势 - **零配置开箱即用**(typo-tolerance / instant search 默认开) - 极快:百万级文档 P99 < 50ms - API 极简 - 资源占用低(150 MB RAM 跑 10w 文档) - 内置 admin UI(http://localhost:7700) - 支持中文 / 日文等亚洲语言(自动 tokenize) ### 劣势 - 不像 ES 那么强大的复杂 query - 集群在 v1 是 Cloud-only feature(自托管单节点) - 生态相对小 - 不适合大日志(无 time-series 优化) 适合:电商 / 文档 / 博客 / 内容站的"搜索框"。 对比 ES:90% 用户的"产品内搜索"用 Meilisearch 更省心。 ### TypoTolerance + Synonyms ```python index.update_typo_tolerance({ 'enabled': True, 'minWordSizeForTypos': {'oneTypo': 4, 'twoTypos': 8}, }) index.update_synonyms({ 'js': ['javascript'], 'k8s': ['kubernetes'], }) ``` 打 "javasrcipt" 也能找到 "javascript";搜 "js" 同时匹配 "javascript"。 ## Quickwit 针对**日志搜索** 优化的现代后端,基于 tantivy(Rust Lucene)。 ### 装 ```bash docker run -d -p 7280:7280 -p 7281:7281 \ quickwit/quickwit:v0.8 run ``` ### 设计哲学 - 索引存对象存储(S3 / GCS / local),不依赖本地 SSD - 计算 / 存储分离,scale-to-zero - log search 优化:append-only,按时间分片,老数据冷存 ### 用法 ```bash # 创建索引 curl -X POST http://localhost:7280/api/v1/indexes -H 'Content-Type: application/yaml' --data ' version: 0.7 index_id: logs doc_mapping: field_mappings: - name: timestamp type: datetime input_formats: ['unix_timestamp'] fast: true - name: level type: text tokenizer: raw - name: message type: text tokenizer: default timestamp_field: timestamp search_settings: default_search_fields: [message] ' # Ingest logs curl -X POST http://localhost:7280/api/v1/logs/ingest \ -H 'Content-Type: application/json' \ -d '{"timestamp": 1716543210, "level": "ERROR", "message": "DB connection failed"}' # 查 curl 'http://localhost:7280/api/v1/logs/search?query=ERROR+database&start_timestamp=...&end_timestamp=...' ``` ### 优势 - 极便宜(对象存储几 $0.02/GB/月 vs SSD 10-50x) - "无限"保留(S3 不限容量) - 查询历史日志快(带时间过滤的 query) ### 劣势 - 仅 log / time-series 场景 - 不适合"产品内搜索" - 复杂 query 不如 ES 适合:日志聚合 / 审计 / 任何"时间序列 + 文本搜索"。 ## 决策矩阵 | | Elasticsearch | Meilisearch | Quickwit | PG FTS / SQLite FTS5 | |---|---|---|---|---| | 入门门槛 | 高 | 极低 | 中 | 低 | | 资源占用 | 2 GB+ | 150 MB | 1 GB | 共享 DB | | 文档规模 | PB 级 | 千万级 | EB 级 (log) | 千万级 | | Query 复杂度 | 极高 | 中 | 中(log 限定) | 中 | | 集群 / HA | 复杂 | 单 | 计算存储分离 | 跟 DB | | 实时性 | 秒 | 秒 | 秒 | 实时 | | 价格 | 资源贵 | 便宜 | 极便宜 | 0 | ## 推荐 - **数据规模小(< 10w 文档)+ 已经在用 PG / SQLite** → PG FTS / FTS5 - **产品内搜索(电商 / 文档 / 内容)** → Meilisearch - **日志聚合** → Quickwit(替代 ES + Loki 也行) - **大规模 + 复杂 query + 已有 ES 经验** → Elasticsearch - **企业偏好** → OpenSearch(ES 的 Apache fork) ## 实战:Meilisearch 集成 Django ```python # requirements.txt # meilisearch import meilisearch client = meilisearch.Client('http://localhost:7700', settings.MEILI_KEY) index = client.index('posts') # Django signal: 文档变更同步到 Meili from django.db.models.signals import post_save, post_delete @receiver(post_save, sender=Post) def index_post(sender, instance, **kwargs): index.add_documents([{ 'id': instance.id, 'title': instance.title, 'body': instance.body, 'tags': list(instance.tags.values_list('name', flat=True)), 'author': instance.author.name, 'created_at': instance.created_at.timestamp(), }]) @receiver(post_delete, sender=Post) def delete_post(sender, instance, **kwargs): index.delete_document(instance.id) # 搜索 view def search(request): q = request.GET.get('q', '') results = index.search(q, {'limit': 20}) return render(request, 'search.html', {'hits': results['hits']}) ``` 部署:Meilisearch 容器 + Django 配 MEILI_URL。 效果: - 搜索从 PG `ILIKE` 几秒 → Meili 50ms - 支持 typo / 高亮 / 相关性排序 - 资源占用比 PG 多 200 MB ## 踩过的坑 1. **Meilisearch 没 master key**:API 任何人能读写。生产必设。 2. **ES heap size**:默认 1 GB 不够大数据。Java options 设 `-Xms4g -Xmx4g`。 3. **重索引慢**:document 多了 reindex 几小时。设计时考虑增量 sync。 4. **filterable 字段没声明** → filter 不生效报错。Meilisearch 每加 filter 字段要先 `update_filterable_attributes`。 5. **数据一致性**:DB write success + Meili sync failed → 数据漂移。 重要场景用 outbox pattern:写 DB + outbox table → background worker 把 outbox sync 到 Meili。

Go errgroup + 信号量做 worker pool(不引入第三方)

## 起因 要并发处理 10 万个 URL 抓取。原生 goroutine + channel 写起来麻烦: - 限制并发数(不能开 10 万 goroutine 抓) - 错误传播(一个失败要么 cancel 其它要么继续) - 等所有任务完成 - 收集结果 `sync/errgroup` + `golang.org/x/sync/semaphore` 标准库组合解决, 不需要 ants / workerpool 第三方。 ## 解决方案 ### 1. 简单 errgroup ```go package main import ( "context" "fmt" "net/http" "time" "golang.org/x/sync/errgroup" ) func main() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() g, ctx := errgroup.WithContext(ctx) urls := []string{ "https://example.com", "https://golang.org", "https://github.com", } results := make([]int, len(urls)) for i, u := range urls { i, u := i, u // capture range vars g.Go(func() error { req, err := http.NewRequestWithContext(ctx, "GET", u, nil) if err != nil { return err } resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() results[i] = resp.StatusCode return nil }) } if err := g.Wait(); err != nil { fmt.Println("error:", err) } fmt.Println(results) } ``` 关键点: - `g.Go(func() error)` 启动一个 goroutine - 任一 goroutine 返回 error → `errgroup` 自动 cancel ctx → 其它 goroutine 收到 cancel signal 退出 - `g.Wait()` 等所有完成,返回第一个 error ### 2. 限并发:SetLimit (Go 1.20+) ```go g, ctx := errgroup.WithContext(ctx) g.SetLimit(50) // 最多 50 个 goroutine 并发 for _, u := range urls { u := u g.Go(func() error { return fetch(ctx, u) }) } g.Wait() ``` `SetLimit(50)` + `Go()` 会阻塞 caller 直到有空位。 10 万 URL 排队,永远不超过 50 个并发。 ### 3. 老 Go 版本:semaphore ```go import "golang.org/x/sync/semaphore" sem := semaphore.NewWeighted(50) g, ctx := errgroup.WithContext(ctx) for _, u := range urls { u := u if err := sem.Acquire(ctx, 1); err != nil { break } g.Go(func() error { defer sem.Release(1) return fetch(ctx, u) }) } g.Wait() ``` `semaphore` 是计数信号量,权重可以不是 1(如内存敏感任务每个占 4)。 ### 4. 结果收集:channel ```go type result struct { url string code int err error } results := make(chan result, len(urls)) g, ctx := errgroup.WithContext(ctx) g.SetLimit(50) for _, u := range urls { u := u g.Go(func() error { code, err := fetch(ctx, u) results <- result{u, code, err} return nil // 不让 errgroup 因单个 fetch 失败 cancel 其它 }) } go func() { g.Wait() close(results) }() for r := range results { fmt.Println(r) } ``` 注意: - 用 buffered channel 避免阻塞 - 单个 fetch 错误不让 errgroup cancel(包成 result 一起传) - 单独 goroutine wait + close channel ### 5. 真正的 worker pool 如果 task 是 stream 进来(不预知数量),用 channel + worker: ```go type Task struct{ URL string } func RunPool(ctx context.Context, tasks <-chan Task, workers int) <-chan error { out := make(chan error, workers) var wg sync.WaitGroup for i := 0; i < workers; i++ { wg.Add(1) go func() { defer wg.Done() for t := range tasks { select { case <-ctx.Done(): return default: } if err := process(ctx, t); err != nil { out <- err } } }() } go func() { wg.Wait() close(out) }() return out } // 用 tasks := make(chan Task, 100) go func() { defer close(tasks) for _, u := range urls { tasks <- Task{URL: u} } }() errors := RunPool(ctx, tasks, 50) for err := range errors { log.Printf("err: %v", err) } ``` stream 处理 + back-pressure(producer 阻塞在 tasks 满时)。 ### 6. context cancel 在 worker 里 worker 函数必须**响应 ctx.Done()**,否则 cancel 没用: ```go func fetch(ctx context.Context, url string) error { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) // 关键 if err != nil { return err } resp, err := http.DefaultClient.Do(req) // ctx cancel 时立刻断 // ... } ``` `http.NewRequestWithContext` / `db.QueryContext` / `redis.Get(ctx)` 等 所有 IO 都用 ctx 版本。 ### 7. 实际 benchmark 10 万 URL,50 并发: | 写法 | 总时长 | 内存峰值 | |---|---|---| | 串行 | 30 min+ | 50 MB | | 无限制 goroutine | 1.5 min(多数失败) | 800 MB | | errgroup SetLimit(50) | 4 min(全成功) | 80 MB | | worker pool channel | 4 min | 70 MB | 无限制 goroutine 看似快但实际:DNS 解析失败、连接被 ban、 socket fd 耗尽。**永远要限并发**。 ## 效果 - 10 万 task 在 5 分钟内可控完成 - 任意 task 失败 ctx cancel,剩余资源不浪费 - 内存控制在 50-100 MB,不会 OOM - 代码 50 行,比引第三方库少依赖 ## 何时还是要第三方库 - **panicking worker 自动 recover**:errgroup 不 recover panic。 生产建议每个 worker 函数 `defer recover()`。 - **复杂 retry / backoff 策略**:用 `github.com/cenkalti/backoff/v4` - **分布式(跨机器)worker pool**:用 Asynq / Machinery - **动态 worker 数**:errgroup SetLimit 启动后不能改 ## 踩过的坑 1. **range var 没 capture**: ```go for _, u := range urls { g.Go(func() error { return fetch(u) // ❌ 所有 goroutine 看到 last u }) } ``` Go 1.22 才修了 range var 默认 scope。1.21- 必须 `u := u` 显式 capture。 2. **g.Wait 不调**:goroutine leak。 ```go if err := g.Wait(); err != nil { ... } ``` 永远要 Wait。 3. **`g.Go` 里 panic** → 整个程序 crash(errgroup 不 recover)。 生产 worker 函数 wrap: ```go g.Go(func() (err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("panic: %v", r) } }() return fetch(ctx, u) }) ``` 4. **共享 slice 写**:上面 `results[i] = ...` 不需要锁(每个 worker 写 不同 index),但如果是 `results = append(results, ...)` 则要 sync.Mutex 或 channel。 5. **HTTP client default reuse**:`http.DefaultClient` 全局共享 + connection pool。多个 goroutine 用 OK。但默认无超时——务必传 ctx 或自己 `Client{ Timeout: 30 * time.Second }`。

收 webhook 端点设计:签名校验 + 幂等 + 异步重试

## 起因 要接 Stripe webhook 处理支付成功事件,"用户付款成功 → 给账户加余额"。 看似简单: ```python @app.post('/webhook/stripe') def handle(req): event = req.json() if event['type'] == 'payment_intent.succeeded': add_credit(event['data']['object']['customer'], 100) return {'ok': True} ``` 实际几个问题: 1. Stripe 重发同一事件(network 重试 / 我们一时返回 500)→ 用户余额被加多次 2. 没校验签名 → 任何人 POST 这个 URL 都能给账户加钱 3. 处理慢 → Stripe 超时 → 它重试 → 雪崩 4. add_credit 失败 → 错过事件 → 数据丢 正确实现需要 4 个东西:**签名校验、幂等、异步处理、死信队列**。 ## 解决方案 ### 1. 签名校验(永远第一步) Stripe 在 header `Stripe-Signature` 里发 HMAC: ```python import stripe STRIPE_WEBHOOK_SECRET = 'whsec_...' @app.post('/webhook/stripe') def handle(req): payload = req.body_bytes # 原始 bytes,不是 parsed JSON sig = req.headers.get('Stripe-Signature') try: event = stripe.Webhook.construct_event( payload, sig, STRIPE_WEBHOOK_SECRET ) except stripe.SignatureVerificationError: return Response(status=400) # ... 继续处理 ``` construct_event 内部验签 + parse。验签失败 → 400 拒绝。 **用原始 bytes,不要先 parse JSON 再 stringify**——序列化可能改字节 (key 顺序 / 空格),HMAC 算不对。 自己实现 webhook(不用 SDK)的签名校验: ```python import hmac, hashlib, time def verify(payload: bytes, sig_header: str, secret: str, tolerance=300): # 解 header: t=timestamp,v1=hex_signature parts = dict(p.split('=') for p in sig_header.split(',')) ts = int(parts['t']) if abs(time.time() - ts) > tolerance: raise ValueError('timestamp too old (replay?)') signed = f'{ts}.{payload.decode()}'.encode() expected = hmac.new(secret.encode(), signed, hashlib.sha256).hexdigest() if not hmac.compare_digest(expected, parts['v1']): raise ValueError('signature mismatch') ``` `hmac.compare_digest` 防时序攻击;`tolerance` 防 replay。 ### 2. 幂等:去重表 + 唯一 event id Stripe 每个 event 有唯一 `id` 字段,重发时 id 不变。 建一个表存已处理 id: ```sql CREATE TABLE webhook_events ( id TEXT PRIMARY KEY, -- Stripe event id (evt_...) type TEXT NOT NULL, received_at TIMESTAMPTZ DEFAULT now(), processed_at TIMESTAMPTZ, payload JSONB NOT NULL ); ``` 接收端: ```python @app.post('/webhook/stripe') def handle(req): # 1. 验签(上面) event = construct_event(...) # 2. 幂等检查 + 入库 try: db.execute( 'INSERT INTO webhook_events (id, type, payload) VALUES (%s, %s, %s)', (event.id, event.type, event.to_dict()) ) except UniqueViolation: # 重复事件,已经处理过,直接 200 OK return {'ok': True, 'duplicate': True} # 3. 异步处理 enqueue_event_processing.delay(event.id) # 4. 立刻返回 200(< 100ms) return {'ok': True} ``` INSERT + 唯一约束就是去重。返回前不真正处理业务——业务逻辑放到后台。 ### 3. 异步处理 ```python @celery_task(bind=True, max_retries=5) def enqueue_event_processing(self, event_id): e = db.fetch_event(event_id) if e.processed_at: return # 已处理 try: if e.type == 'payment_intent.succeeded': obj = e.payload['data']['object'] add_credit(obj['customer'], obj['amount_received']) elif e.type == 'invoice.payment_failed': send_payment_failed_email(...) # ... 其它事件 db.execute( 'UPDATE webhook_events SET processed_at = now() WHERE id = %s', (event_id,) ) except Exception as exc: # Celery 自动按 exponential backoff 重试 raise self.retry(exc=exc, countdown=2 ** self.request.retries) ``` 关键: - webhook endpoint 只做"放入队列",秒级返回 200 - 真处理在后台 worker,失败有重试 + 死信 - `processed_at` 字段防"重试 N 次都成功"被算多次 ### 4. 处理顺序 Webhook 可能乱序到达(network 重试)。如果业务关心顺序(如订单状态 pending → paid → shipped),需要: ```python def handle_event(e): obj_id = e.payload['data']['object']['id'] # 加锁防同对象并发处理 with db_lock(f'order:{obj_id}'): current = db.get_order(obj_id) if e.payload['data']['object']['updated'] < current.updated_at: return # 这是老事件,丢 update_order(obj_id, e.payload['data']['object']) ``` 用对象上的 timestamp / version 字段判断"这事件是不是 stale"。 ### 5. 死信队列 (DLQ) 重试 N 次都失败的事件不要丢,放 DLQ 人工 review: ```python @celery_task(bind=True, max_retries=5) def enqueue_event_processing(self, event_id): try: process(event_id) except Exception as exc: if self.request.retries >= self.max_retries: # 最后一次重试还失败 move_to_dlq.delay(event_id, str(exc)) else: raise self.retry(countdown=2 ** self.request.retries) ``` ```sql CREATE TABLE webhook_dlq ( id BIGSERIAL PRIMARY KEY, event_id TEXT REFERENCES webhook_events(id), error TEXT, moved_at TIMESTAMPTZ DEFAULT now(), resolved_at TIMESTAMPTZ ); ``` 定时 / Slack 告警:"DLQ 有 N 条未处理",人工修业务后从 DLQ 重放。 ### 6. 监控 ```python # Prometheus metrics webhook_received = Counter('webhook_received_total', '', ['provider', 'type']) webhook_processed = Counter('webhook_processed_total', '', ['provider', 'type', 'result']) webhook_lag = Histogram('webhook_lag_seconds', '') # 上报 webhook_received.labels('stripe', event.type).inc() # 处理完 webhook_processed.labels('stripe', event.type, 'ok').inc() ``` 仪表盘看: - 每分钟事件量 - 处理延迟 P95 - 错误率(按 event type 分) - DLQ 数量 ## 完整流程 ``` 1. Stripe POST 事件 ↓ 2. 验签(HMAC)→ 失败 400 拒绝 ↓ 3. INSERT 去重表(唯一 id)→ 已存在返 200 (dup) ↓ 4. 推入 Celery 队列 ↓ 5. 返回 200(end-to-end < 100ms) 后台: 6. worker 拿事件 ↓ 7. 业务处理(add_credit 等) ↓ 8. 成功 → 标记 processed_at 失败 → exponential retry ↓ 9. 重试 5 次都失败 → 进 DLQ + 告警 ``` ## 效果 按这套设计后: - 重复扣款 / 加钱 bug 归零 - 即使 Stripe 一秒发 1000 个事件,endpoint 不挂(异步队列削峰) - DLQ 偶尔有 2-3 条(被告警捕获,人工处理 < 1 小时) - 整套对 Stripe 的接入测试通过 Stripe 官方的 webhook test ## 安全 checklist 1. ✅ HMAC 签名校验 2. ✅ 时间戳验证防 replay 3. ✅ 用原始 bytes 验签 4. ✅ 不在 endpoint 里做业务 5. ✅ 业务幂等(DB unique constraint 等) 6. ✅ HTTPS only 7. ✅ Webhook secret 进 vault / env,不进 git ## 踩过的坑 1. **`request.json()` 之后再验签**:FastAPI / Flask 默认 parse JSON 后再处理,原始 body 拿不到。要用 `await request.body()` / `request.get_data()` 拿 bytes。 2. **endpoint 返回慢 → Stripe 重发**:处理逻辑写在 endpoint 里 → 8 秒返回 → Stripe 5 秒超时认为失败 → 重发 → 你又花 8 秒 → 死循环。 异步是必须的。 3. **重试时业务"看起来 OK 但 DB 没改"**:DB transaction 中间 commit 失败 → 业务部分生效 → 重试又跑一次。所有 update 必须 atomic transaction。 4. **DLQ 没人看**:上 DLQ 后没监控 / 告警,几个月后发现 1000 条 未处理。alert 是必须的。 5. **多 webhook 端点共享一个 secret**:Stripe / 其它 SaaS 给每个 endpoint 独立 secret 才对,混用一个 secret 出问题难定位来源。

用 prometheus_client 给 Python 应用暴露指标(4 种 metric 用法)

`prometheus_client` 是 Prometheus 官方的 Python 库,让任何 Python 应用 能在 `/metrics` 端点导出 Prometheus 格式的指标。 ## 安装 + 最小可用 ```bash uv add prometheus-client ``` ```python # 暴露在独立端口 from prometheus_client import start_http_server, Counter, Gauge, Histogram req_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'path', 'status']) start_http_server(8001) # 独立 :8001/metrics # 然后做你的事 ... req_count.labels(method='GET', path='/users', status='200').inc() ``` 或挂在 FastAPI 路由: ```python from prometheus_client import make_asgi_app app.mount('/metrics', make_asgi_app()) ``` Django: ```python # urls.py from prometheus_client import make_wsgi_app from django.urls import path from django.views.generic import View class MetricsView(View): def get(self, request): ... # 用 django-prometheus 包更省事 ``` 实际项目直接用 `django-prometheus`: ```bash uv add django-prometheus ``` 加 middleware 后内置一堆 Django 指标(视图延迟、SQL 时间、缓存命中等)。 ## 4 种 metric 用法 ### Counter:单调递增 ```python requests = Counter('requests_total', '...', ['method']) requests.labels(method='GET').inc() requests.labels(method='POST').inc(5) ``` PromQL: ``` rate(requests_total[5m]) # 每秒请求数(按 5 分钟窗口) ``` ### Gauge:可上可下 ```python queue_size = Gauge('queue_size', '...') queue_size.set(42) queue_size.inc(); queue_size.dec() # 用 callback 让 Prometheus 拉取时实时计算 queue_size.set_function(lambda: r.llen('jobs')) ``` 适合:当前队列长度、连接数、温度、内存使用。 ### Histogram:分布 ```python latency = Histogram('http_latency_seconds', 'HTTP latency', ['endpoint'], buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]) # 上下文管理器自动测延迟 with latency.labels(endpoint='/users').time(): do_work() ``` PromQL 出 p95: ``` histogram_quantile(0.95, sum(rate(http_latency_seconds_bucket[5m])) by (le, endpoint)) ``` bucket 选 5-12 个,覆盖典型延迟范围。bucket 多了占资源,bucket 少了 分位数不准。 ### Summary:分位数(不推荐) ```python from prometheus_client import Summary size = Summary('request_size_bytes', '...') size.observe(2048) ``` Summary 在客户端算分位数,无法在多实例上做正确聚合。**生产基本只用 Histogram**,需要分位数时用 `histogram_quantile()` 在服务端算。 ## label 设计原则 label 的不同值组合决定了 series 的数量。label cardinality 高了 Prometheus 内存爆。 ```python # 错: user_id 是无限基数 requests.labels(user_id=user.id).inc() # 错: 完整 URL 含动态 ID requests.labels(path='/users/123/posts').inc() # 对: 路径模板化 requests.labels(path='/users/{id}/posts').inc() ``` **label 数应该是个有限小集合**:endpoint 模板、HTTP method、status code、 某几个固定 region 等。 ## 一个完整中间件(FastAPI) ```python import time from prometheus_client import Counter, Histogram from starlette.middleware.base import BaseHTTPMiddleware REQ = Counter('http_requests_total', '...', ['method', 'path', 'status']) LATENCY = Histogram('http_latency_seconds', '...', ['method', 'path'], buckets=[.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5]) class MetricsMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): t0 = time.perf_counter() response = await call_next(request) elapsed = time.perf_counter() - t0 # 用路由模板而不是实际路径 route = request.scope.get('route') path = route.path if route else request.url.path REQ.labels(method=request.method, path=path, status=str(response.status_code)).inc() LATENCY.labels(method=request.method, path=path).observe(elapsed) return response app.add_middleware(MetricsMiddleware) ``` `request.scope.get('route').path` 给出的是 `/users/{user_id}/posts` 这种模板,不是 `/users/42/posts` 这种实参。 ## 内置 collector `prometheus_client` 自带几个: ```python from prometheus_client import REGISTRY, GCCollector, PlatformCollector, ProcessCollector # 默认这些已经注册(CPython 平台 + 进程信息) # 可以手动反注册节省指标 REGISTRY.unregister(GCCollector(REGISTRY)) ``` `process_cpu_seconds_total`、`process_resident_memory_bytes` 等都是 免费白送的。 ## 多 worker 的坑 gunicorn / uvicorn 多 worker 时,每个 worker 是独立进程,独立的 metrics。 直接 scrape `/metrics` 只看到一个 worker 的数据。两种解法: 1. **mmap 共享**:设 `PROMETHEUS_MULTIPROC_DIR=/tmp/prom`,prometheus_client 会把指标写到共享目录,import 时聚合所有 worker 的数据。 2. **每个 worker 独立 scrape**:让 Prometheus 单独抓每个 worker 端口 (复杂度高,不推荐)。 mmap 版本写法: ```python # 进程启动时 import os os.environ['PROMETHEUS_MULTIPROC_DIR'] = '/tmp/prom' # 暴露指标时用 MultiProcessCollector from prometheus_client import multiprocess, CollectorRegistry, generate_latest registry = CollectorRegistry() multiprocess.MultiProcessCollector(registry) output = generate_latest(registry) ``` worker 退出时调 `multiprocess.mark_process_dead(pid)`。 ## 测试 ```python def test_counter_increments(): REQ.labels(method='GET', path='/x', status='200').inc() # 直接读 metric 内部值 val = REQ.labels(method='GET', path='/x', status='200')._value.get() assert val == 1 ``` ## 踩过的坑 - 把 user-id / session-id 当 label:1M 用户 = 1M series,Prometheus 崩盘。 这种"高基数"信息应该用 log,不用 metric。 - Histogram bucket 改了:旧数据和新数据不可比较,需要双写或重设。建议 bucket 一开始想清楚。 - `/metrics` 端点不要走鉴权:Prometheus scrape 没有简单的 auth; 挂内网或加 IP 白名单。 - 用 `with` 管 Histogram timer:抛异常时也会记录,所以错误请求的延迟 也算进 p95。如果想只看成功请求的延迟,加 try/except 区分。

Go context.Context:超时 / 取消 / 传值的正确姿势

`context.Context` 是 Go 标准库里跨 API 边界传递取消信号 + 超时 + 请求值的 统一方式。所有"可能长时间运行 + 可能需要被取消"的函数都该接 ctx 作为 **第一个参数**。 ## 1. 基础 ```go ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 必须 defer cancel 避免 goroutine 泄露 go doWork(ctx) time.Sleep(2 * time.Second) cancel() // 显式取消 ``` `cancel()` 触发后所有继承这个 ctx 的下游都收到信号。 ## 2. 超时 ```go ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() result, err := http.NewRequestWithContext(ctx, "GET", url, nil) // 5 秒后自动 cancel;HTTP 客户端会立即中断请求 ``` `WithTimeout` 是 `WithDeadline(time.Now().Add(d))` 的简写。 ## 3. 在函数里响应 ctx ```go func doWork(ctx context.Context) error { for i := 0; i < 100; i++ { select { case <-ctx.Done(): return ctx.Err() // context.Canceled 或 context.DeadlineExceeded case <-time.After(100 * time.Millisecond): process(i) } } return nil } ``` 关键模式:每次能 block 的地方都 `select { case <-ctx.Done(): ... }`。 对 I/O 操作通常用接 ctx 的版本: ```go req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) resp, err := http.DefaultClient.Do(req) row := db.QueryRowContext(ctx, "SELECT ...", args...) ``` ## 4. 链式派生 ```go ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // 派生:更短超时 ctxShort, cancel2 := context.WithTimeout(ctx, 2*time.Second) defer cancel2() callRemote(ctxShort) ``` 子 ctx 永远比父 ctx 早结束。取较短的 timeout 生效。 父 cancel → 所有子 ctx 都被 cancel。 ## 5. ctx 传值 ```go type traceIDKey struct{} ctx = context.WithValue(ctx, traceIDKey{}, "abc-123") // 下游取 tid, ok := ctx.Value(traceIDKey{}).(string) ``` 约定: - key 用未导出的私有 type,避免不同包冲突 - 只传"跨 API 边界的请求作用域元数据"(trace ID / user ID / locale), 不传业务参数 - 业务参数走显式函数签名 ## 6. context.Background() vs context.TODO() - `Background()`:根 context,main / init 用 - `TODO()`:当你不知道用哪个 ctx 时用(让 linter / 自己后续修) ```go ctx := context.TODO() // I'll come back to wire this ``` ## 7. HTTP 服务端:取请求 ctx ```go func handler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // 客户端断开连接 → ctx.Done() result, err := fetchFromDB(ctx, ...) if err == context.Canceled { // 客户端已经断开,没必要返回响应 return } json.NewEncoder(w).Encode(result) } ``` 客户端 close 连接时 r.Context 自动 cancel,所有下游 query 应自动中断。 省服务器 CPU / DB 连接。 ## 8. errgroup + ctx 并发 `sync/errgroup` 比手写 channel + WaitGroup 简洁: ```go import "golang.org/x/sync/errgroup" func fetchAll(ctx context.Context, urls []string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) results := make([]string, len(urls)) for i, u := range urls { i, u := i, u // capture g.Go(func() error { data, err := fetch(ctx, u) if err != nil { return err // 任一失败 ctx 自动 cancel 其它 } results[i] = data return nil }) } if err := g.Wait(); err != nil { return nil, err } return results, nil } ``` `errgroup.WithContext` 返回的 ctx 在任一 goroutine 返回 error 时 自动 cancel。 限并发: ```go g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) // 最多 10 个 goroutine 并发 ``` ## 9. 不要把 ctx 存结构体 反模式: ```go type Server struct { ctx context.Context // ❌ } ``` ctx 是请求 / 操作的属性,不是对象的属性。每次方法显式传: ```go type Server struct{} func (s *Server) Handle(ctx context.Context, req Req) (Resp, error) { ... } ``` 例外:长生命周期的"管理者"对象(如 server 自身的 shutdown ctx) 可以存——但要清楚标注。 ## 10. cancel() 必须 defer ```go ctx, cancel := context.WithTimeout(..., 5*time.Second) // 忘 defer cancel() → 即使函数正常返回,定时器还在 → goroutine leak defer cancel() ``` go vet 会警告"the cancel function is not used on all paths", 认真处理。 ## 11. ctx 的零成本约定 约定 ctx 永远作为第一个参数: ```go // ✅ func doWork(ctx context.Context, args Args) (Result, error) // ❌ func doWork(args Args, ctx context.Context) (Result, error) ``` 阅读 / IDE 补全 / linter 都基于这个约定。 ## 12. 实际生产 pattern ```go func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() srv := &http.Server{Addr: ":8080", Handler: setupHandler()} go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("server error: %v", err) } }() <-ctx.Done() log.Info("shutting down") shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := srv.Shutdown(shutdownCtx); err != nil { log.Errorf("forced shutdown: %v", err) } } ``` `signal.NotifyContext`(Go 1.16+)一行处理信号 → ctx 模式: - 收 SIGINT / SIGTERM → ctx.Done() - main 等 done - `srv.Shutdown(timeout ctx)` 拒新连接 + 等老请求完,30 秒超时强制退出 ## 踩过的坑 - ctx 没传到下游:所有 hop 都要传 ctx。中间某个函数没接 → cancel 信号 断了,超时不生效。 - 用 `time.Sleep(d)` 而不是 `select { case <-time.After: case <-ctx.Done():}`: cancel 后还在 sleep,不响应。 - `errgroup.Go` 里的 goroutine panic → 整个 group 崩。在 Go 函数内部 recover 自保。 - `ctx.Value(string)` 用字符串作 key → 跨包冲突。永远用私有 type。

FastAPI Depends() 实战:DI 让 testing 和 modularity 都受益

## 起因 FastAPI 的 `Depends()` 是其设计精华之一。新人常忽视它,把所有 view 写 成"自己 new DB connection + 拉 user 信息",导致: - 测试时无法 mock DB - 业务逻辑跟 framework 耦合 - 改 auth 流程要改 100 处 view 学会用 Depends 后代码组织上一个台阶。 ## 1. 基础 ```python from fastapi import FastAPI, Depends app = FastAPI() def get_db(): db = Session() try: yield db finally: db.close() @app.get('/users/{uid}') def read_user(uid: int, db = Depends(get_db)): return db.query(User).filter_by(id=uid).first() ``` `Depends(get_db)`: - 每个请求自动调 `get_db()` - yield 之前的代码:创建 session - yield 之后的代码:关闭 session - 类似 Django 的 middleware 但更灵活 ## 2. 嵌套依赖 ```python def get_db(): ... def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ) -> User: user = decode_token(token, db) if not user: raise HTTPException(401) return user @app.get('/me') def me(user: User = Depends(get_current_user)): return user ``` FastAPI 自动解析依赖图:`me` 需要 `get_current_user` 需要 `get_db` 和 `oauth2_scheme`。 **同一个请求里 `get_db` 只调一次**(即使多个依赖都需要它)。 ## 3. 权限检查依赖 ```python def require_admin(user: User = Depends(get_current_user)) -> User: if user.role != 'admin': raise HTTPException(403, 'admin required') return user @app.delete('/users/{uid}') def delete_user(uid: int, admin: User = Depends(require_admin), db: Session = Depends(get_db)): db.query(User).filter_by(id=uid).delete() db.commit() ``` 权限检查从 view body 抽出来 → 复用 + 类型安全。 ## 4. 类做依赖(state-ful) ```python class CommonQueryParams: def __init__(self, q: str = '', skip: int = 0, limit: int = 20): self.q = q self.skip = skip self.limit = limit @app.get('/search') def search(params: CommonQueryParams = Depends()): # params.q / params.skip / params.limit ... ``` `Depends()` 无参数时自动用类型 `CommonQueryParams` 当 dependency。 重复的 query param 模式抽成 class。 ## 5. router-level dependency ```python from fastapi import APIRouter admin_router = APIRouter( prefix='/admin', dependencies=[Depends(require_admin)], # router 级依赖 ) @admin_router.get('/dashboard') def dashboard(): pass @admin_router.get('/users') def users(): pass ``` `admin_router` 下所有 endpoint 自动要求 admin。不需要每个 view 写 Depends。 或 app 级: ```python app = FastAPI(dependencies=[Depends(log_request)]) # 全局 ``` ## 6. 测试:覆盖依赖 ```python def get_db_test(): return MockDB() app.dependency_overrides[get_db] = get_db_test def test_read_user(): client = TestClient(app) response = client.get('/users/1') assert response.json() == {'id': 1, 'name': 'mock'} ``` `dependency_overrides` 让测试用 mock 实现替换生产依赖。 **整个测试不真连 DB**。 对比:"view 内直接 import DB" → 没法 mock,要么 mock 整个 module, 要么真起 DB。 ## 7. Generator dependency(cleanup 必须 finally) ```python def get_db(): db = SessionLocal() try: yield db finally: db.close() ``` 不 try/finally 的话,view 内 exception → cleanup 不跑 → 连接泄露。 **永远 try/finally**。 ## 8. 同步 vs async dependency 混用 OK: ```python def get_settings(): # 同步 return Settings() async def get_user(): # async return await fetch_user() @app.get('/x') async def x( s: Settings = Depends(get_settings), u: User = Depends(get_user), ): ... ``` FastAPI 自动决定 thread pool 还是 await。 ## 9. Sub-app / mount ```python admin_app = FastAPI() @admin_app.get('/stats') def stats(): pass app.mount('/admin', admin_app) ``` 完全独立的 FastAPI 应用挂载到主 app。各自有独立 dependency / docs / middleware。复杂场景拆分。 ## 10. lifecycle hooks vs Depends ```python @app.on_event('startup') async def startup(): app.state.db = await create_pool() @app.on_event('shutdown') async def shutdown(): await app.state.db.close() ``` 或现代 lifespan: ```python from contextlib import asynccontextmanager @asynccontextmanager async def lifespan(app): app.state.db = await create_pool() yield await app.state.db.close() app = FastAPI(lifespan=lifespan) ``` 应用启动 / 关闭一次性资源(DB pool / Redis client)放这里,不是 per-request Depends。 之后 Depends 引用: ```python def get_db(request: Request): return request.app.state.db.acquire() ``` ## 完整模板:blog API 骨架 ```python # deps.py from fastapi import Depends, HTTPException, Request from sqlalchemy.orm import Session def get_db(request: Request) -> Session: db = request.app.state.SessionLocal() try: yield db finally: db.close() def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ) -> User: user = decode_jwt(token, db) if not user: raise HTTPException(401) return user def get_post(post_id: int, db: Session = Depends(get_db)) -> Post: post = db.query(Post).get(post_id) if not post: raise HTTPException(404) return post def require_post_owner( post: Post = Depends(get_post), user: User = Depends(get_current_user), ) -> Post: if post.author_id != user.id: raise HTTPException(403) return post ``` ```python # views.py @app.get('/posts/{post_id}') def read_post(post: Post = Depends(get_post)): return post @app.put('/posts/{post_id}') def update_post( data: UpdatePostIn, post: Post = Depends(require_post_owner), db: Session = Depends(get_db), ): post.title = data.title db.commit() return post @app.delete('/posts/{post_id}') def delete_post( post: Post = Depends(require_post_owner), db: Session = Depends(get_db), ): db.delete(post) db.commit() return {'ok': True} ``` 每个 view 体内不超过 5 行业务逻辑。权限 / DB session / 对象加载都 declarative。 ## 性能注意 每请求依赖图都重新解析。深嵌套 / 重操作 dependency 会累加。 但通常 dependency 都很轻(DB query / dict lookup),impact 极小。 ## 与 Django / Flask 对比 Django:middleware + `@login_required` 装饰器 + view-local code。 Flask:`@app.before_request` + 装饰器 + flask-login。 FastAPI Depends 优点: - type-safe(IDE / mypy 知道 user 是 User) - 显式(看签名就知道这个 endpoint 需要什么) - 测试友好(dependency_overrides) 但学习曲线略陡。习惯后回不去。 ## 踩过的坑 1. **Depends 写位置错**: ```python def view(user=Depends(get_user), uid: int = 1): # ❌ ``` Depends 不能有 default。改顺序: ```python def view(uid: int, user=Depends(get_user)): ``` 2. **循环依赖**:A depends B,B depends A → import error。架构上重新 设计;通常说明边界划错。 3. **dependency 内 raise** → FastAPI 自动 422 / 配你的 HTTPException。 写 `raise HTTPException(401, "Bad token")` 别 `return None`。 4. **测试 dependency_overrides 没清** → 测试间互相污染。每个测试 `app.dependency_overrides.clear()` 或用 fixture。 5. **`Depends()` 无参 + 类**:必须显式 type annotation: ```python def view(params: CommonQueryParams = Depends()): ``` 不写 type FastAPI 不知道用什么。

JWT:HS256 vs RS256 vs EdDSA,啥时候哪个

## 起因 JWT 签名算法常见三个: - **HS256**:HMAC-SHA256,对称密钥 - **RS256**:RSA + SHA256,非对称 - **EdDSA (Ed25519)**:现代椭圆曲线,非对称 选哪个?默认 HS256 行不行?为啥很多场景非 RS256 不可? ## HS256 (HMAC) ```python import jwt SECRET = 'shared-secret-key' # 签 token = jwt.encode({'user_id': 42}, SECRET, algorithm='HS256') # 验 payload = jwt.decode(token, SECRET, algorithms=['HS256']) ``` 对称:**签和验用同一 secret**。 ### 优势 - 简单(一个 secret) - 性能极好(HMAC 比 RSA 快 100x) - token 短 ### 劣势 / 适用边界 - 任何能验 token 的服务都能签 token → **不能给第三方验** - 单 monolith 内部用 OK;微服务要严格区分签发方与验证方 适合:单一服务 + 用同一 secret 的自家系统。 ## RS256 (RSA 非对称) ```python # 服务端:private key 签 with open('private.pem', 'rb') as f: private_key = f.read() token = jwt.encode({'user_id': 42}, private_key, algorithm='RS256') # 任何方:public key 验 with open('public.pem', 'rb') as f: public_key = f.read() payload = jwt.decode(token, public_key, algorithms=['RS256']) ``` 非对称:**private key 签,public key 验**。 public key 公开(其它服务 / 客户端能拿到),但拿到也不能伪造 token。 ### 优势 - 安全分离:auth server 持 private key,resource server 只持 public key - public key 可发布(JWKS endpoint) - 大组织 / OAuth / SSO 标配 ### 劣势 - 性能:RSA 签验比 HMAC 慢 100-1000x - token 大(RSA 签名 256 字节起) - key 管理更复杂 适合:微服务 / OAuth / 第三方需验 token。 ## EdDSA / Ed25519 ```python token = jwt.encode({'user_id': 42}, ed25519_private_key, algorithm='EdDSA') ``` 非对称,基于 Ed25519 椭圆曲线。 RS256 的现代替代: | | RS256 (2048 bit) | EdDSA (Ed25519) | |---|---|---| | 签名速度 | 慢 | 快(10x+) | | 验证速度 | 较快 | 极快 | | key size | 2048 bit | 256 bit | | 签名 size | 256 byte | 64 byte | | 安全性 | 强 | 强(现代设计) | 新项目能用 EdDSA 就用,性能 + size 全面优于 RS256。 RS256 仍是 OAuth 兼容性最广(IdP / library 都支持)。 ## 决策 ``` 监控你的 token 谁签 / 谁验 单 monolith / 自家服务全套: → HS256 跨服务 / 公开 API / 给第三方 / SSO: → RS256(兼容性)或 EdDSA(性能) OAuth provider / IdP: → RS256(事实标准) ``` ## 错配灾难(algorithm confusion attack) ```python # 危险:alg 不锁定 payload = jwt.decode(token, public_key) # 不指定 algorithms ``` 攻击者把 token 的 alg header 改成 `HS256`,用 public key 当 secret 签 (因为 RS256 → HS256 confusion)→ 假 token 通过验证。 **正确**: ```python payload = jwt.decode(token, public_key, algorithms=['RS256']) # 指定接受的算法,不让 attacker 选 ``` 库默认大多防御此攻击,但**永远显式指定 algorithms**。 ## key rotation JWT 没内置 rotation。常见做法: - token 包含 `kid` (key ID) header - JWKS endpoint 暴露多个 active public key - 验证时根据 kid 选 key ```python # 假设 JWKS 已 cache def verify(token): header = jwt.get_unverified_header(token) kid = header['kid'] key = jwks_cache[kid] return jwt.decode(token, key, algorithms=['RS256']) ``` 旧 key 留 grace period 后删 → 老 token 平滑过渡。 ## token 内容设计 ```python { "iss": "https://auth.example.com", # issuer "sub": "user-12345", # subject (user) "aud": "https://api.example.com", # audience (intended verifier) "iat": 1716000000, # issued at "exp": 1716003600, # expiry "nbf": 1716000000, # not before "jti": "unique-id", # JWT ID(防重放) // custom "roles": ["admin", "editor"], "tenant": "acme" } ``` 验证时检查: - `exp` 没过 - `aud` 是自己 - `iss` 是受信任 issuer 很多 lib 自动。 ## refresh token JWT 短 expiry(15min - 1h)+ refresh token(长期,不透明 random string)。 access token 过期 → 用 refresh token 换新。 refresh token 存数据库(能撤销);access token 是 stateless JWT。 ## stateless vs session JWT 优势:服务端不存 session → 横向扩展无 sticky session 问题。 劣势:撤销难(除非短 expiry + refresh token 模式)。 中小项目其实 session cookie + Redis store 简单 + 够用。 微服务 / 多 IdP / OAuth 才必要 JWT。 ## 不要存敏感数据 ```python # bad jwt.encode({'password': 'secret123', ...}, ...) ``` JWT body **未加密**只签名。`base64decode` 立刻看到内容。 不要放:password / API secret / PII。 需要加密 → JWE (JSON Web Encryption),但复杂得多,少用。 ## 实战 case:迁移 HS256 → RS256 老项目 monolith HS256。 拆出 mobile API + 计划开放第三方 API → 需要 RS256(合作伙伴验 token 不该知 secret)。 迁移: 1. 生成 RSA key pair 2. JWKS endpoint 暴露 public key 3. token issue 支持双 alg:老 token HS256 + 新 token RS256 4. token verify 接受 HS256(kid="legacy")+ RS256 5. 等所有老 token expire(30 day) 6. 删 HS256 支持 平滑过渡,无服务中断。 ## 性能数据 签 / 验 100k token / single core: | | sign | verify | |---|---|---| | HS256 | 0.5s | 0.5s | | RS256 (2048) | 80s | 5s | | EdDSA | 3s | 1s | 签贵很多,验也比 HS256 慢。 高 QPS 验 token 时考虑 cache verified token 或者用 EdDSA。 ## 库 - Python: `PyJWT` / `python-jose` / `authlib` - Node: `jsonwebtoken` - Go: `github.com/golang-jwt/jwt` - Rust: `jsonwebtoken` PyJWT 简单 + 维护好。authlib 大全(含 OAuth)。 ## 踩过的坑 1. **algorithms 没指定**:confusion attack。永远 `algorithms=['XX']`。 2. **exp 时区**:服务器时间错 → 立刻过期 / 永不过期。NTP sync 必备。 3. **public key 写错**:copy 时 \n 丢 → 验失败。pem 用 file 加载, 不要在 env var 里塞 multi-line。 4. **JWT 当 session**:放 admin_panel=True 类敏感权限 → token leak 后 攻击者直接获取。敏感操作必须 DB 二次验证。 5. **长 expiry + 无 revoke**:30 day expiry JWT 用户改密码后老 token 仍有效。要么短 expiry + refresh,要么 blacklist 表(牺牲 stateless)。