知识广场

按学科筛选:计算机科学 / 后端开发 / Python
清除筛选

«计算机科学 / 后端开发 / Python» 分类下共 12 篇帖子

用 uv 起一个最小 Django 5 项目(替代 pip + venv)

`uv` 是 Astral(Ruff 团队)的 Python 项目管理器,Rust 写的, 比 `pip + venv + pip-tools` 快 10-100 倍。2024 之后基本是新项目的默认。 下面 5 分钟起一个 Django 5 项目。 ## 1. 装 uv ```bash curl -LsSf https://astral.sh/uv/install.sh | sh # 或 macOS: brew install uv # 或 Windows: powershell -c "irm https://astral.sh/uv/install.ps1 | iex" uv --version ``` ## 2. 起项目 ```bash uv init myapp --python 3.12 cd myapp ``` `uv init` 生成 `pyproject.toml` + `.python-version` + `hello.py`, 不创建 venv 直到需要。 ## 3. 加依赖 ```bash uv add django gunicorn psycopg[binary] python-dotenv uv add --dev ruff pytest pytest-django ``` `uv add` 会: 1. 解析依赖(极快) 2. 写入 `pyproject.toml` 3. 锁定到 `uv.lock` 4. 装到 `.venv/`(自动创建) `psycopg[binary]` 是 psycopg3 的预编译版(生产建议 `psycopg[c]` 自己编)。 ## 4. Django 项目结构 ```bash uv run django-admin startproject myapp . uv run python manage.py startapp blog ``` `uv run` 等价于"在项目 venv 里跑"。比 `source .venv/bin/activate && python ...` 直接,跨 shell / CI 都一样。 ## 5. 跑 ```bash uv run python manage.py migrate uv run python manage.py createsuperuser uv run python manage.py runserver ``` ## 6. CI / Docker 中使用 `Dockerfile`: ```dockerfile FROM python:3.12-slim # 装 uv 二进制 COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv WORKDIR /app COPY pyproject.toml uv.lock ./ # --frozen 严格按 lock 安装,不解析 RUN uv sync --frozen --no-install-project COPY . . RUN uv sync --frozen EXPOSE 8000 CMD ["uv", "run", "gunicorn", "myapp.wsgi:application", "-b", "0.0.0.0:8000"] ``` GitHub Actions: ```yaml - uses: astral-sh/setup-uv@v3 with: enable-cache: true - run: uv sync --frozen - run: uv run pytest ``` `enable-cache: true` 跨 job 缓存 wheel,CI 飞快。 ## 7. 升级 / 锁定 ```bash uv lock --upgrade-package django # 只升 django uv lock --upgrade # 全部升到最新允许的范围 uv add 'django>=5.1,<6' # 改约束 uv pip compile pyproject.toml > requirements.txt # 兼容 pip 导出 ``` ## 8. 与 pip / poetry 对比 | 操作 | pip / pip-tools | uv | |---|---|---| | 装 100 个依赖 | 30-60s | 1-3s | | 解析 lock | 几秒到几十秒 | < 1s | | 创建 venv | `python -m venv .venv` (慢) | 自动且快 | | 跨平台 lock | 麻烦 | 内置 | | Python 多版本 | pyenv 配合 | `uv python install 3.12` 内置 | ## 踩过的坑 - `uv.lock` 必须进 git。它包含跨平台依赖锁,删了会让 `uv sync --frozen` 失败。 - `uv add` 默认只在主 group;要 dev 依赖加 `--dev`;要 optional group 加 `--optional groupname`。 - `psycopg[binary]` 在 musl-libc(Alpine)镜像上没预编译 wheel,会回退到 源码编译——慢。改用 `python:3.12-slim`(glibc)镜像。 - VSCode 自动识别 `.venv/` —— 但要把 Python 解释器手动选为 `.venv/bin/python`, 否则 import 检查走系统 Python。

FastAPI + Pydantic v2 严格校验请求与响应(含自定义错误格式)

FastAPI 的核心卖点是"用 Python 类型注解定义 API schema,自动校验 + 生成 OpenAPI 文档"。Pydantic v2 是其背后的校验引擎,比 v1 快 5-50 倍。 ## 1. 最小例子 ```python from fastapi import FastAPI from pydantic import BaseModel, EmailStr, Field app = FastAPI() class CreateUser(BaseModel): email: EmailStr nickname: str = Field(min_length=2, max_length=30) age: int = Field(ge=0, le=150) class UserOut(BaseModel): id: int email: EmailStr nickname: str @app.post('/users', response_model=UserOut, status_code=201) def create_user(payload: CreateUser) -> UserOut: # ... 写入 DB ... return UserOut(id=42, email=payload.email, nickname=payload.nickname) ``` 发请求时任何字段不合法都返回 422 + 详细错误。打开 `/docs` 看自动文档。 ## 2. 自定义 validator ```python from pydantic import field_validator class CreateUser(BaseModel): nickname: str @field_validator('nickname') @classmethod def no_whitespace(cls, v: str) -> str: if v != v.strip() or ' ' in v: raise ValueError('昵称不能含首尾或连续空格') return v ``` V2 必须加 `@classmethod`。 ## 3. 计算字段(动态) ```python from pydantic import computed_field class UserOut(BaseModel): nickname: str username: str @computed_field @property def display_name(self) -> str: return f'{self.nickname}@{self.username}' ``` ## 4. 加载 / 序列化别名 ```python class UserIn(BaseModel): email_address: str = Field(alias='email') # 接收的 JSON 里是 "email",Python 属性是 email_address ``` ## 5. 严格模式:拒绝多余字段 V2 默认允许额外字段被忽略。生产里建议严格: ```python class CreateUser(BaseModel): model_config = {'extra': 'forbid'} email: EmailStr # 客户端发 {"email": ..., "foo": ...} 会 422 ``` ## 6. 统一错误格式 FastAPI 默认 422 响应是 Pydantic 原始结构。给前端友好点: ```python from fastapi import Request, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse @app.exception_handler(RequestValidationError) async def validation_handler(request: Request, exc: RequestValidationError): errors = [ { 'field': '.'.join(str(x) for x in e['loc']), 'message': e['msg'], 'type': e['type'], } for e in exc.errors() ] return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={'detail': '请求数据校验失败', 'errors': errors}, ) ``` ## 7. 路径 / 查询 / Body / Header / Cookie ```python from fastapi import Path, Query, Header, Cookie @app.get('/items/{item_id}') def get_item( item_id: int = Path(ge=1), fields: list[str] | None = Query(None, max_length=10), user_agent: str | None = Header(None), session: str | None = Cookie(None), ): ... ``` ## 8. Depends 注入 ```python from fastapi import Depends def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.get('/users/{uid}') def read_user(uid: int, db = Depends(get_db)): return db.query(User).get(uid) ``` `Depends` 是 FastAPI 的 DI 系统;可以套娃(依赖里又用 Depends)。 ## 9. 鉴权依赖 ```python from fastapi import HTTPException, status from fastapi.security import OAuth2PasswordBearer oauth2 = OAuth2PasswordBearer(tokenUrl='token') def current_user(token: str = Depends(oauth2)): user = decode_jwt(token) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, headers={'WWW-Authenticate': 'Bearer'}) return user @app.get('/me') def me(user = Depends(current_user)): return user ``` ## 10. 后台任务(轻量) ```python from fastapi import BackgroundTasks def send_welcome_email(email: str): # ... 同步发邮件 ... ... @app.post('/users') def create_user(payload: CreateUser, tasks: BackgroundTasks): user = save(payload) tasks.add_task(send_welcome_email, user.email) return user ``` 请求立即返回,邮件在响应发出后异步执行。注意:失败没有重试。 要可靠就上 Celery / RQ / Arq。 ## 11. CORS ```python from fastapi.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins=['https://example.com'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) ``` ## 12. 运行 ```bash uv add fastapi 'uvicorn[standard]' uv run uvicorn main:app --reload # 开发 uv run uvicorn main:app --workers 4 --host 0.0.0.0 --port 8000 # 生产 # 生产更稳的是用 gunicorn 启 uvicorn worker: uv run gunicorn -k uvicorn.workers.UvicornWorker -w 4 main:app ``` ## 踩过的坑 - Pydantic v1 / v2 在同一项目混用:v1 model 的 `.dict()`、`.json()` 在 v2 是 `.model_dump()` 和 `.model_dump_json()`。代码改名后老的方法 调用会静默返回不正确的格式。 - 用 dataclass 替代 BaseModel:dataclass 在 FastAPI 路径参数处不会被校验, 只在 response_model 处校验。混着用很容易出问题。 - `response_model` 会**过滤**响应字段(不在 model 里的字段被丢掉), 这是 feature 不是 bug。想全输出就别设 response_model 或用 `dict`。 - `BackgroundTasks` 是同一进程里的协程,长任务会撑住 worker; 超过 30 秒的任务就该上 Celery。

Python typing.Protocol:写库时拥抱 duck typing 又有类型提示

## 起因 写一个数据导出库,接收任何"长得像 file-like 对象的"输入: 内置 `open()` 返回值、`io.BytesIO`、Django 的 `UploadedFile`、 S3 client 返回的 streaming body…… ```python def export(stream): stream.write(b'header') for row in data: stream.write(row.serialize()) ``` 类型怎么标?`stream: BinaryIO`?只 cover 标准库;`stream: object`? 失去类型提示。 `typing.Protocol`(PEP 544)解决:定义"结构子类型"(structural subtyping), 描述"具有 X 方法的任何对象",无需对方继承。 ## 解决方案 ```python from typing import Protocol class WritableBytes(Protocol): def write(self, data: bytes) -> int: ... def flush(self) -> None: ... def export(stream: WritableBytes) -> None: stream.write(b'header') for row in data: stream.write(row.serialize()) stream.flush() ``` 任何"有 write(bytes) → int 和 flush() → None" 方法的类都满足。 不需要 import 我的 Protocol、不需要 inherit、不需要 register。 类型检查器(mypy / pyright)静态确认: ```python import io export(io.BytesIO()) # ✅ export(open('out.bin', 'wb')) # ✅ export("not a file") # ❌ mypy 报错 ``` ## runtime check 需要在运行时判断:"这东西满足 Protocol 吗?" ```python from typing import Protocol, runtime_checkable @runtime_checkable class WritableBytes(Protocol): def write(self, data: bytes) -> int: ... if isinstance(obj, WritableBytes): obj.write(b'x') ``` `@runtime_checkable` 让 `isinstance` 工作。但只检查方法**存在**, 不检查方法签名 —— 比静态检查弱。 ## 实战例子:可插拔的 storage backend ```python from typing import Protocol class Storage(Protocol): def get(self, key: str) -> bytes | None: ... def set(self, key: str, value: bytes, ttl: int | None = None) -> None: ... def delete(self, key: str) -> None: ... class InMemoryStorage: def __init__(self): self._d: dict[str, bytes] = {} def get(self, key): return self._d.get(key) def set(self, key, value, ttl=None): self._d[key] = value def delete(self, key): self._d.pop(key, None) class RedisStorage: def __init__(self, url): self._r = redis.from_url(url) def get(self, key): return self._r.get(key) def set(self, key, value, ttl=None): if ttl: self._r.set(key, value, ex=ttl) else: self._r.set(key, value) def delete(self, key): self._r.delete(key) def setup_cache(storage: Storage) -> Cache: return Cache(storage) setup_cache(InMemoryStorage()) setup_cache(RedisStorage('redis://localhost')) ``` `InMemoryStorage` / `RedisStorage` 都没 inherit `Storage`, 但都 conform 该结构 → mypy 通过。 ## 与 ABC 对比 ```python from abc import ABC, abstractmethod class Storage(ABC): @abstractmethod def get(self, key: str) -> bytes | None: ... @abstractmethod def set(self, key: str, value: bytes, ttl: int | None = None) -> None: ... @abstractmethod def delete(self, key: str) -> None: ... class InMemoryStorage(Storage): # 必须显式继承 ... ``` ABC 强制继承。Protocol 不强制 → 第三方库的类不需要修改就能用。 适用场景: - **Protocol**:写库 / interface 定义,鸭子类型友好 - **ABC**:内部类型层次、要 share 实现(Mixin)、强制 inherit ## generic protocol ```python from typing import Protocol, TypeVar T = TypeVar('T', covariant=True) class Iterable(Protocol[T]): def __iter__(self) -> 'Iterator[T]': ... class Iterator(Protocol[T]): def __next__(self) -> T: ... ``` 实际 `Iterable` 已在 `typing` 模块,举例说明语法。 ## 在标准库里你已经在用 `typing` / `collections.abc` 模块里很多就是 Protocol: ```python from collections.abc import Iterable, Mapping, Hashable, Container from typing import Protocol, SupportsLen, SupportsInt ``` `SupportsLen` 是 `def __len__(self) -> int`。所以 `len(x)` 能用的 都满足。 ## 给现有类"贴" Protocol ```python from third_party import SomeClass class HasFoo(Protocol): def foo(self) -> str: ... # SomeClass 有 foo() 方法但作者没标注 x: HasFoo = SomeClass() # mypy 检查 OK x.foo() ``` 零侵入。 ## 实战 case:测试替身 ```python class Notifier(Protocol): def send(self, msg: str, to: str) -> None: ... def process_order(order: Order, notifier: Notifier) -> None: # ... notifier.send(f'Order {order.id} confirmed', to=order.email) # 生产 process_order(order, EmailNotifier()) # 测试 class FakeNotifier: def __init__(self): self.sent: list[tuple[str, str]] = [] def send(self, msg, to): self.sent.append((msg, to)) def test_confirm(): n = FakeNotifier() process_order(test_order, n) assert ('Order 1 confirmed', '[email protected]') in n.sent ``` `FakeNotifier` 不需要 inherit `Notifier` ABC,纯写实现即可。 ## 与 `__class_getitem__` / `TypeVar` 联用 ```python from typing import Protocol, TypeVar K = TypeVar('K') V = TypeVar('V') class Cache(Protocol[K, V]): def get(self, key: K) -> V | None: ... def set(self, key: K, value: V) -> None: ... class StringIntCache: def get(self, key: str) -> int | None: ... def set(self, key: str, value: int) -> None: ... def use(c: Cache[str, int]): v = c.get('x') # mypy 知道 v: int | None ``` ## 效果 - 库的 API 类型严格但不要求用户继承 - 测试时随便造 fake,不需要 mock 框架 - 重构内部实现时 Protocol 是"接口",业务代码改少 - mypy / pyright 在 IDE 里实时提示,写错立刻知道 ## 踩过的坑 1. **Protocol 不能 instantiate**:`x = WritableBytes()` 报错(没意义, 它是接口)。 2. **runtime_checkable 检查只看方法名**:`isinstance(obj, WritableBytes)` 只确认有 `write` 和 `flush`,不查签名。运行时碰到方法签名不对仍 crash。 3. **Protocol 的方法有 default impl** 让它变成 mixin 又不强制继承: ```python class Repr(Protocol): def __repr__(self) -> str: ... class Mixin(Repr): def __repr__(self): return f'<{type(self).__name__}>' ``` 语义微妙,团队约定清楚。 4. **structural subtyping 太宽容**:所有有 `read()` 方法的都被当 readable,比如自定义类 `class Sensor: def read(self): ...` 不该 是 file-like 但 mypy 不报错。给 Protocol 多放几个方法(read + readable 等)让匹配更严。 5. **Protocol 间不能继承 default impl**:跟 mixin 不同。要复用代码用 ABC + Protocol 组合,或者纯函数化。

Python asyncio 实战要点:gather / TaskGroup / 取消 / 限并发

asyncio 已经是 Python 标准并发模型。但坑也多——很多人以为加 async/await 就能"并行",结果代码顺序跑得跟同步一样。下面讲实际让代码并发的关键。 ## 1. 单纯加 async/await 不会并行 ```python async def fetch_one(url): async with aiohttp.ClientSession() as s: async with s.get(url) as r: return await r.text() async def main(): for url in urls: await fetch_one(url) # 串行!每个等上一个完 ``` `await` 让出控制权,但 for + await 仍然是顺序的。要并发用 `gather` 或 `TaskGroup`。 ## 2. asyncio.gather ```python async def main(): results = await asyncio.gather(*[fetch_one(u) for u in urls]) ``` 所有 fetch 同时进行,gather 等全部完成返回列表。 注意: - 任何一个抛异常默认会取消其它任务并向上抛 - 用 `return_exceptions=True` 让异常也作为结果返回,不取消其它 ```python results = await asyncio.gather(*coros, return_exceptions=True) for r in results: if isinstance(r, Exception): log.warning('one failed: %s', r) else: process(r) ``` ## 3. TaskGroup(Python 3.11+,推荐) ```python async def main(): async with asyncio.TaskGroup() as tg: t1 = tg.create_task(fetch_one(u1)) t2 = tg.create_task(fetch_one(u2)) t3 = tg.create_task(fetch_one(u3)) # 所有 task 自动完成 + 自动 cancel 兄弟 task 当一个失败 print(t1.result(), t2.result()) ``` TaskGroup 是 PEP 654 + 3.11 引入的结构化并发。优点: - 异常传播更清晰(aggregates as ExceptionGroup) - 自动 cancel 兄弟,避免 "task 还在跑" 的孤儿 - 比 gather 更结构化 新代码优先 TaskGroup。 ## 4. 限并发:Semaphore 如果有 10000 个 URL 要抓,全用 gather 同时跑会爆 socket / 被目标限流。 用 Semaphore 控制并发度: ```python sem = asyncio.Semaphore(20) async def bounded_fetch(url): async with sem: return await fetch_one(url) results = await asyncio.gather(*[bounded_fetch(u) for u in urls]) ``` `sem` 上限 20 → 最多 20 个 fetch 同时进行。 更高级的限速(按时间,比如每秒 50 个请求): ```python import aiometer async def fetch_one(url): ... results = await aiometer.run_on_each( fetch_one, urls, max_at_once=20, max_per_second=50, ) ``` ## 5. 取消 + 超时 ```python # 单个任务超时 try: result = await asyncio.wait_for(fetch_one(url), timeout=5.0) except asyncio.TimeoutError: log.warning('timeout') ``` `asyncio.wait_for` 在超时后取消 coroutine(抛 CancelledError)。 任务里要正确处理这个 cancel: ```python async def fetch_one(url): try: async with session.get(url) as r: return await r.text() except asyncio.CancelledError: # 清理资源(如果还在持有) log.info('cancelled') raise # 必须 re-raise,否则任务不算被取消 ``` 不 re-raise 会让 cancel 信号丢失,cleanup 顺序乱。 ## 6. 不要在 async 里跑同步阻塞 ```python async def bad(): time.sleep(5) # 阻塞整个事件循环! result = requests.get('...') # 同步 IO ``` 修复: ```python async def good(): await asyncio.sleep(5) async with aiohttp.ClientSession() as s: async with s.get(...) as r: await r.text() ``` 如果必须用同步库(pandas / boto3 / heavy CPU 计算): ```python # 跑到 thread pool 不阻塞 loop result = await asyncio.to_thread(blocking_function, args) ``` `asyncio.to_thread` 把同步函数包成 awaitable,在 thread pool 里跑。 ## 7. CPU 密集任务用 ProcessPoolExecutor ```python from concurrent.futures import ProcessPoolExecutor executor = ProcessPoolExecutor() loop = asyncio.get_running_loop() result = await loop.run_in_executor(executor, cpu_heavy_function, data) ``` CPU 密集(图片处理 / 加密 / 解析)放进程池,避开 GIL。 ## 8. 信号 / 优雅退出 ```python import signal async def main(): stop = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, stop.set) server_task = asyncio.create_task(run_server()) await stop.wait() log.info('shutting down') server_task.cancel() try: await server_task except asyncio.CancelledError: pass asyncio.run(main()) ``` 收到 Ctrl-C 时优雅 cancel + 等待清理。Windows 不支持 add_signal_handler。 ## 9. 共享状态 asyncio 是单线程协作式并发,访问全局变量**不需要锁**。 但 await 之后状态可能被其它 coroutine 改了: ```python counter = 0 async def bad(): global counter n = counter await asyncio.sleep(0.001) # 让出执行 counter = n + 1 # n 可能已经过时 ``` 需要原子读改写时用 `asyncio.Lock`: ```python lock = asyncio.Lock() async def good(): async with lock: n = counter await ... counter = n + 1 ``` ## 10. 调试 ```python import asyncio asyncio.run(main(), debug=True) # 或 export PYTHONASYNCIODEBUG=1 ``` debug 模式会警告: - coroutine 没 await - 协程跑超过 100ms(可能阻塞了 loop) - 任务 leaks `asyncio.all_tasks()` 看当前所有任务。 ## 11. 常用第三方 - **aiohttp**:HTTP client/server - **httpx**:HTTP,同步 + 异步统一 API(更现代) - **asyncpg**:PostgreSQL,比 aiopg / sqlalchemy-asyncio 快很多 - **redis-py 4+**:内置 async - **anyio**:跨 asyncio / trio 的抽象层 新项目 HTTP 优先用 httpx,老项目 aiohttp 仍然稳定。 ## 踩过的坑 - `coro = some_async_fn()` 但忘 await:coroutine 不会执行 + Python 抛 "coroutine was never awaited" 警告。 - `asyncio.run()` 不能嵌套:在 Jupyter / IPython 里要用 `await main()` 直接(IPython 7+ 自动包 await)。 - 多线程跑 asyncio:每个线程需要自己 `asyncio.new_event_loop()`。 通常不推荐——asyncio 设计是单线程的。 - `aiohttp.ClientSession` 是有 connection pool 的,每个请求建一个新 session 性能差。整个程序生命周期共享一个 session。

Flask + SQLAlchemy 2.0 + Alembic 的最小可用骨架

Flask 的微框架哲学意味着每个项目都要自己组装数据层。下面是一套 经过几个生产项目验证的最小骨架:Flask 3.x + SQLAlchemy 2.0 + Alembic 迁移 + 应用工厂模式。 ## 项目结构 ``` myapp/ ├── pyproject.toml ├── alembic.ini ├── migrations/ │ ├── env.py │ └── versions/ └── src/myapp/ ├── __init__.py # create_app() ├── extensions.py # db, ... ├── models.py ├── routes.py └── config.py ``` ## 1. 依赖 ```bash uv add flask 'sqlalchemy>=2.0' alembic psycopg[binary] python-dotenv ``` ## 2. extensions.py ```python from flask_sqlalchemy import SQLAlchemy from sqlalchemy.orm import DeclarativeBase class Base(DeclarativeBase): pass db = SQLAlchemy(model_class=Base) ``` SQLAlchemy 2.0 推荐用 `DeclarativeBase` 替代 `declarative_base()`, 类型提示更友好。 ## 3. models.py ```python from datetime import datetime from sqlalchemy import String, DateTime, ForeignKey from sqlalchemy.orm import Mapped, mapped_column, relationship from .extensions import db, Base class User(Base): __tablename__ = 'users' id: Mapped[int] = mapped_column(primary_key=True) email: Mapped[str] = mapped_column(String(120), unique=True) nickname: Mapped[str] = mapped_column(String(60)) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) posts: Mapped[list['Post']] = relationship(back_populates='author') class Post(Base): __tablename__ = 'posts' id: Mapped[int] = mapped_column(primary_key=True) title: Mapped[str] = mapped_column(String(200)) body: Mapped[str] author_id: Mapped[int] = mapped_column(ForeignKey('users.id')) author: Mapped[User] = relationship(back_populates='posts') ``` `Mapped[...]` + `mapped_column(...)` 是 SA 2.0 的新风格, 完全类型化,mypy / pyright 直接懂。 ## 4. config.py ```python import os from dotenv import load_dotenv load_dotenv() class Config: SQLALCHEMY_DATABASE_URI = os.environ.get( 'DATABASE_URL', 'postgresql+psycopg://localhost/myapp') SQLALCHEMY_TRACK_MODIFICATIONS = False SECRET_KEY = os.environ['SECRET_KEY'] ``` ## 5. __init__.py(应用工厂) ```python from flask import Flask from .config import Config from .extensions import db def create_app(config_object=Config): app = Flask(__name__) app.config.from_object(config_object) db.init_app(app) from . import routes app.register_blueprint(routes.bp) return app ``` ## 6. routes.py ```python from flask import Blueprint, jsonify, request from sqlalchemy import select from .extensions import db from .models import User, Post bp = Blueprint('main', __name__) @bp.get('/users/<int:uid>/posts') def user_posts(uid): stmt = select(Post).where(Post.author_id == uid) posts = db.session.scalars(stmt).all() return jsonify([{'id': p.id, 'title': p.title} for p in posts]) @bp.post('/users') def create_user(): data = request.get_json() u = User(email=data['email'], nickname=data['nickname']) db.session.add(u) db.session.commit() return jsonify({'id': u.id}), 201 ``` SA 2.0 用 `select()` + `db.session.scalars()`;老的 `Model.query.filter_by(...)` 仍可用但官方建议迁移。 ## 7. Alembic 初始化 ```bash uv run alembic init -t async migrations # 或同步:uv run alembic init migrations ``` 改 `alembic.ini`: ```ini sqlalchemy.url = postgresql+psycopg://localhost/myapp ``` 改 `migrations/env.py`,让它能找到 metadata: ```python from myapp import create_app from myapp.extensions import db, Base # ... target_metadata = Base.metadata app = create_app() with app.app_context(): ... ``` ## 8. 第一次迁移 ```bash uv run alembic revision --autogenerate -m 'init users + posts' # 检查生成的 migrations/versions/*.py uv run alembic upgrade head ``` ## 9. 运行 ```bash uv run flask --app src.myapp run --debug # 或: uv run gunicorn 'src.myapp:create_app()' -b 0.0.0.0:8000 ``` ## 10. 测试 ```python # tests/conftest.py import pytest from src.myapp import create_app from src.myapp.extensions import db class TestConfig: SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:' SECRET_KEY = 'test' @pytest.fixture def app(): app = create_app(TestConfig) with app.app_context(): db.create_all() yield app @pytest.fixture def client(app): return app.test_client() ``` ## 踩过的坑 - Alembic autogenerate 看不到 model:检查 `env.py` 是否真的 import 了 所有 model 模块(光 `import models` 子模块不自动加载)。我们一般写 `from myapp import models # noqa`。 - 加字段时 default 是 Python 端的,DB 端不会自动加 default。 改 model 后跑 autogenerate 会生成 `op.add_column` 但 nullable 字段 没初值时升级会失败。手动给 server_default 或者分两步迁移。 - SA 2.0 的 lazy loading 默认严格:N+1 查询会触发警告。生产建议 `select(...).options(selectinload(Post.author))` 显式预取。 - Flask-SQLAlchemy 3.x 配 SA 2.0 时,model 必须用 `db.Model` 或自己定义 的 `Base`,不能混用。

用 prometheus_client 给 Python 应用暴露指标(4 种 metric 用法)

`prometheus_client` 是 Prometheus 官方的 Python 库,让任何 Python 应用 能在 `/metrics` 端点导出 Prometheus 格式的指标。 ## 安装 + 最小可用 ```bash uv add prometheus-client ``` ```python # 暴露在独立端口 from prometheus_client import start_http_server, Counter, Gauge, Histogram req_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'path', 'status']) start_http_server(8001) # 独立 :8001/metrics # 然后做你的事 ... req_count.labels(method='GET', path='/users', status='200').inc() ``` 或挂在 FastAPI 路由: ```python from prometheus_client import make_asgi_app app.mount('/metrics', make_asgi_app()) ``` Django: ```python # urls.py from prometheus_client import make_wsgi_app from django.urls import path from django.views.generic import View class MetricsView(View): def get(self, request): ... # 用 django-prometheus 包更省事 ``` 实际项目直接用 `django-prometheus`: ```bash uv add django-prometheus ``` 加 middleware 后内置一堆 Django 指标(视图延迟、SQL 时间、缓存命中等)。 ## 4 种 metric 用法 ### Counter:单调递增 ```python requests = Counter('requests_total', '...', ['method']) requests.labels(method='GET').inc() requests.labels(method='POST').inc(5) ``` PromQL: ``` rate(requests_total[5m]) # 每秒请求数(按 5 分钟窗口) ``` ### Gauge:可上可下 ```python queue_size = Gauge('queue_size', '...') queue_size.set(42) queue_size.inc(); queue_size.dec() # 用 callback 让 Prometheus 拉取时实时计算 queue_size.set_function(lambda: r.llen('jobs')) ``` 适合:当前队列长度、连接数、温度、内存使用。 ### Histogram:分布 ```python latency = Histogram('http_latency_seconds', 'HTTP latency', ['endpoint'], buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]) # 上下文管理器自动测延迟 with latency.labels(endpoint='/users').time(): do_work() ``` PromQL 出 p95: ``` histogram_quantile(0.95, sum(rate(http_latency_seconds_bucket[5m])) by (le, endpoint)) ``` bucket 选 5-12 个,覆盖典型延迟范围。bucket 多了占资源,bucket 少了 分位数不准。 ### Summary:分位数(不推荐) ```python from prometheus_client import Summary size = Summary('request_size_bytes', '...') size.observe(2048) ``` Summary 在客户端算分位数,无法在多实例上做正确聚合。**生产基本只用 Histogram**,需要分位数时用 `histogram_quantile()` 在服务端算。 ## label 设计原则 label 的不同值组合决定了 series 的数量。label cardinality 高了 Prometheus 内存爆。 ```python # 错: user_id 是无限基数 requests.labels(user_id=user.id).inc() # 错: 完整 URL 含动态 ID requests.labels(path='/users/123/posts').inc() # 对: 路径模板化 requests.labels(path='/users/{id}/posts').inc() ``` **label 数应该是个有限小集合**:endpoint 模板、HTTP method、status code、 某几个固定 region 等。 ## 一个完整中间件(FastAPI) ```python import time from prometheus_client import Counter, Histogram from starlette.middleware.base import BaseHTTPMiddleware REQ = Counter('http_requests_total', '...', ['method', 'path', 'status']) LATENCY = Histogram('http_latency_seconds', '...', ['method', 'path'], buckets=[.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5]) class MetricsMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): t0 = time.perf_counter() response = await call_next(request) elapsed = time.perf_counter() - t0 # 用路由模板而不是实际路径 route = request.scope.get('route') path = route.path if route else request.url.path REQ.labels(method=request.method, path=path, status=str(response.status_code)).inc() LATENCY.labels(method=request.method, path=path).observe(elapsed) return response app.add_middleware(MetricsMiddleware) ``` `request.scope.get('route').path` 给出的是 `/users/{user_id}/posts` 这种模板,不是 `/users/42/posts` 这种实参。 ## 内置 collector `prometheus_client` 自带几个: ```python from prometheus_client import REGISTRY, GCCollector, PlatformCollector, ProcessCollector # 默认这些已经注册(CPython 平台 + 进程信息) # 可以手动反注册节省指标 REGISTRY.unregister(GCCollector(REGISTRY)) ``` `process_cpu_seconds_total`、`process_resident_memory_bytes` 等都是 免费白送的。 ## 多 worker 的坑 gunicorn / uvicorn 多 worker 时,每个 worker 是独立进程,独立的 metrics。 直接 scrape `/metrics` 只看到一个 worker 的数据。两种解法: 1. **mmap 共享**:设 `PROMETHEUS_MULTIPROC_DIR=/tmp/prom`,prometheus_client 会把指标写到共享目录,import 时聚合所有 worker 的数据。 2. **每个 worker 独立 scrape**:让 Prometheus 单独抓每个 worker 端口 (复杂度高,不推荐)。 mmap 版本写法: ```python # 进程启动时 import os os.environ['PROMETHEUS_MULTIPROC_DIR'] = '/tmp/prom' # 暴露指标时用 MultiProcessCollector from prometheus_client import multiprocess, CollectorRegistry, generate_latest registry = CollectorRegistry() multiprocess.MultiProcessCollector(registry) output = generate_latest(registry) ``` worker 退出时调 `multiprocess.mark_process_dead(pid)`。 ## 测试 ```python def test_counter_increments(): REQ.labels(method='GET', path='/x', status='200').inc() # 直接读 metric 内部值 val = REQ.labels(method='GET', path='/x', status='200')._value.get() assert val == 1 ``` ## 踩过的坑 - 把 user-id / session-id 当 label:1M 用户 = 1M series,Prometheus 崩盘。 这种"高基数"信息应该用 log,不用 metric。 - Histogram bucket 改了:旧数据和新数据不可比较,需要双写或重设。建议 bucket 一开始想清楚。 - `/metrics` 端点不要走鉴权:Prometheus scrape 没有简单的 auth; 挂内网或加 IP 白名单。 - 用 `with` 管 Histogram timer:抛异常时也会记录,所以错误请求的延迟 也算进 p95。如果想只看成功请求的延迟,加 try/except 区分。

FastAPI Depends() 实战:DI 让 testing 和 modularity 都受益

## 起因 FastAPI 的 `Depends()` 是其设计精华之一。新人常忽视它,把所有 view 写 成"自己 new DB connection + 拉 user 信息",导致: - 测试时无法 mock DB - 业务逻辑跟 framework 耦合 - 改 auth 流程要改 100 处 view 学会用 Depends 后代码组织上一个台阶。 ## 1. 基础 ```python from fastapi import FastAPI, Depends app = FastAPI() def get_db(): db = Session() try: yield db finally: db.close() @app.get('/users/{uid}') def read_user(uid: int, db = Depends(get_db)): return db.query(User).filter_by(id=uid).first() ``` `Depends(get_db)`: - 每个请求自动调 `get_db()` - yield 之前的代码:创建 session - yield 之后的代码:关闭 session - 类似 Django 的 middleware 但更灵活 ## 2. 嵌套依赖 ```python def get_db(): ... def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ) -> User: user = decode_token(token, db) if not user: raise HTTPException(401) return user @app.get('/me') def me(user: User = Depends(get_current_user)): return user ``` FastAPI 自动解析依赖图:`me` 需要 `get_current_user` 需要 `get_db` 和 `oauth2_scheme`。 **同一个请求里 `get_db` 只调一次**(即使多个依赖都需要它)。 ## 3. 权限检查依赖 ```python def require_admin(user: User = Depends(get_current_user)) -> User: if user.role != 'admin': raise HTTPException(403, 'admin required') return user @app.delete('/users/{uid}') def delete_user(uid: int, admin: User = Depends(require_admin), db: Session = Depends(get_db)): db.query(User).filter_by(id=uid).delete() db.commit() ``` 权限检查从 view body 抽出来 → 复用 + 类型安全。 ## 4. 类做依赖(state-ful) ```python class CommonQueryParams: def __init__(self, q: str = '', skip: int = 0, limit: int = 20): self.q = q self.skip = skip self.limit = limit @app.get('/search') def search(params: CommonQueryParams = Depends()): # params.q / params.skip / params.limit ... ``` `Depends()` 无参数时自动用类型 `CommonQueryParams` 当 dependency。 重复的 query param 模式抽成 class。 ## 5. router-level dependency ```python from fastapi import APIRouter admin_router = APIRouter( prefix='/admin', dependencies=[Depends(require_admin)], # router 级依赖 ) @admin_router.get('/dashboard') def dashboard(): pass @admin_router.get('/users') def users(): pass ``` `admin_router` 下所有 endpoint 自动要求 admin。不需要每个 view 写 Depends。 或 app 级: ```python app = FastAPI(dependencies=[Depends(log_request)]) # 全局 ``` ## 6. 测试:覆盖依赖 ```python def get_db_test(): return MockDB() app.dependency_overrides[get_db] = get_db_test def test_read_user(): client = TestClient(app) response = client.get('/users/1') assert response.json() == {'id': 1, 'name': 'mock'} ``` `dependency_overrides` 让测试用 mock 实现替换生产依赖。 **整个测试不真连 DB**。 对比:"view 内直接 import DB" → 没法 mock,要么 mock 整个 module, 要么真起 DB。 ## 7. Generator dependency(cleanup 必须 finally) ```python def get_db(): db = SessionLocal() try: yield db finally: db.close() ``` 不 try/finally 的话,view 内 exception → cleanup 不跑 → 连接泄露。 **永远 try/finally**。 ## 8. 同步 vs async dependency 混用 OK: ```python def get_settings(): # 同步 return Settings() async def get_user(): # async return await fetch_user() @app.get('/x') async def x( s: Settings = Depends(get_settings), u: User = Depends(get_user), ): ... ``` FastAPI 自动决定 thread pool 还是 await。 ## 9. Sub-app / mount ```python admin_app = FastAPI() @admin_app.get('/stats') def stats(): pass app.mount('/admin', admin_app) ``` 完全独立的 FastAPI 应用挂载到主 app。各自有独立 dependency / docs / middleware。复杂场景拆分。 ## 10. lifecycle hooks vs Depends ```python @app.on_event('startup') async def startup(): app.state.db = await create_pool() @app.on_event('shutdown') async def shutdown(): await app.state.db.close() ``` 或现代 lifespan: ```python from contextlib import asynccontextmanager @asynccontextmanager async def lifespan(app): app.state.db = await create_pool() yield await app.state.db.close() app = FastAPI(lifespan=lifespan) ``` 应用启动 / 关闭一次性资源(DB pool / Redis client)放这里,不是 per-request Depends。 之后 Depends 引用: ```python def get_db(request: Request): return request.app.state.db.acquire() ``` ## 完整模板:blog API 骨架 ```python # deps.py from fastapi import Depends, HTTPException, Request from sqlalchemy.orm import Session def get_db(request: Request) -> Session: db = request.app.state.SessionLocal() try: yield db finally: db.close() def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ) -> User: user = decode_jwt(token, db) if not user: raise HTTPException(401) return user def get_post(post_id: int, db: Session = Depends(get_db)) -> Post: post = db.query(Post).get(post_id) if not post: raise HTTPException(404) return post def require_post_owner( post: Post = Depends(get_post), user: User = Depends(get_current_user), ) -> Post: if post.author_id != user.id: raise HTTPException(403) return post ``` ```python # views.py @app.get('/posts/{post_id}') def read_post(post: Post = Depends(get_post)): return post @app.put('/posts/{post_id}') def update_post( data: UpdatePostIn, post: Post = Depends(require_post_owner), db: Session = Depends(get_db), ): post.title = data.title db.commit() return post @app.delete('/posts/{post_id}') def delete_post( post: Post = Depends(require_post_owner), db: Session = Depends(get_db), ): db.delete(post) db.commit() return {'ok': True} ``` 每个 view 体内不超过 5 行业务逻辑。权限 / DB session / 对象加载都 declarative。 ## 性能注意 每请求依赖图都重新解析。深嵌套 / 重操作 dependency 会累加。 但通常 dependency 都很轻(DB query / dict lookup),impact 极小。 ## 与 Django / Flask 对比 Django:middleware + `@login_required` 装饰器 + view-local code。 Flask:`@app.before_request` + 装饰器 + flask-login。 FastAPI Depends 优点: - type-safe(IDE / mypy 知道 user 是 User) - 显式(看签名就知道这个 endpoint 需要什么) - 测试友好(dependency_overrides) 但学习曲线略陡。习惯后回不去。 ## 踩过的坑 1. **Depends 写位置错**: ```python def view(user=Depends(get_user), uid: int = 1): # ❌ ``` Depends 不能有 default。改顺序: ```python def view(uid: int, user=Depends(get_user)): ``` 2. **循环依赖**:A depends B,B depends A → import error。架构上重新 设计;通常说明边界划错。 3. **dependency 内 raise** → FastAPI 自动 422 / 配你的 HTTPException。 写 `raise HTTPException(401, "Bad token")` 别 `return None`。 4. **测试 dependency_overrides 没清** → 测试间互相污染。每个测试 `app.dependency_overrides.clear()` 或用 fixture。 5. **`Depends()` 无参 + 类**:必须显式 type annotation: ```python def view(params: CommonQueryParams = Depends()): ``` 不写 type FastAPI 不知道用什么。

Django Channels:给 Django 加 WebSocket(不引入 Node)

## 起因 Django 项目要加"实时通知"功能(用户 like 时其他用户立刻看到 count 更新)。 两种思路: 1. 起独立 Node.js WebSocket server + Django 通过 Redis pub/sub 协调 2. Django Channels:在 Django 内部加 WebSocket / async 不想多维护一个 Node 服务 → Channels 直接。 ## Channels 是什么 Django 4.0+ 原生支持 ASGI,可以跑 async view。 Channels 是 Django Software Foundation 的官方扩展,处理: - WebSocket 协议(HTTP 升级) - 多 worker 间消息分发(channel layer,用 Redis) - SSE / 长连接 ## 装 ```bash uv add channels channels-redis daphne ``` ## 配置 ASGI `asgi.py`: ```python import os from django.core.asgi import get_asgi_application from channels.routing import ProtocolTypeRouter, URLRouter from channels.auth import AuthMiddlewareStack from django.urls import path os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings') django_asgi_app = get_asgi_application() from chat.consumers import ChatConsumer from notifications.consumers import NotifyConsumer application = ProtocolTypeRouter({ 'http': django_asgi_app, # 普通 HTTP 走原本 Django 'websocket': AuthMiddlewareStack( # WebSocket 路由 URLRouter([ path('ws/chat/<str:room>/', ChatConsumer.as_asgi()), path('ws/notify/', NotifyConsumer.as_asgi()), ]) ), }) ``` `settings.py`: ```python INSTALLED_APPS = [ 'daphne', # 替代 staticfiles 的 ASGI server,必须排前面 'django.contrib.staticfiles', 'channels', # ... 你的 apps ] ASGI_APPLICATION = 'myapp.asgi.application' # channel layer:跨 worker 消息 CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': {'hosts': [('redis', 6379)]}, }, } ``` ## 写一个简单 Consumer ```python # chat/consumers.py import json from channels.generic.websocket import AsyncWebsocketConsumer class ChatConsumer(AsyncWebsocketConsumer): async def connect(self): self.room = self.scope['url_route']['kwargs']['room'] self.group_name = f'chat_{self.room}' self.user = self.scope['user'] if not self.user.is_authenticated: await self.close() return # 加入房间 group(Redis pub/sub) await self.channel_layer.group_add(self.group_name, self.channel_name) await self.accept() async def disconnect(self, close_code): await self.channel_layer.group_discard(self.group_name, self.channel_name) async def receive(self, text_data): # 客户端发来消息 data = json.loads(text_data) message = data['message'] # 广播给房间所有人 await self.channel_layer.group_send( self.group_name, { 'type': 'chat.message', # 调下面方法 'message': message, 'user': self.user.username, } ) async def chat_message(self, event): # 房间收到消息时(包括自己发的) await self.send(text_data=json.dumps({ 'message': event['message'], 'user': event['user'], })) ``` `channel_layer.group_send` 通过 Redis 通知**所有 worker** 的对应 consumer。3 个 daphne worker / 100 个客户端,消息正确广播。 ## 客户端 JS ```js const ws = new WebSocket(`wss://example.com/ws/chat/${roomId}/`) ws.onmessage = (e) => { const data = JSON.parse(e.data) addMessageToUI(data.user, data.message) } ws.onopen = () => { console.log('connected') } document.getElementById('send').addEventListener('click', () => { const text = document.getElementById('input').value ws.send(JSON.stringify({ message: text })) }) ``` ## 跑 ```bash # 开发用 runserver(自动支持 ASGI / Channels) python manage.py runserver # 生产用 daphne daphne -b 0.0.0.0 -p 8000 myapp.asgi:application # 多 worker gunicorn -k uvicorn.workers.UvicornWorker myapp.asgi:application --workers 4 ``` Redis 必须跑(channel layer 依赖)。 nginx 反代要支持 WebSocket: ```nginx location /ws/ { proxy_pass http://app; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_read_timeout 86400; # 长连接不要超时 } ``` ## 从 Django view 推 message 业务 view 里触发广播(非 WebSocket 路径): ```python # views.py from channels.layers import get_channel_layer from asgiref.sync import async_to_sync def like_post(request, post_id): post = Post.objects.get(pk=post_id) post.likes += 1 post.save() # 通知所有连接到这帖子的客户端 channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( f'post_{post_id}', { 'type': 'post.update', # 调用 consumer 的 post_update 方法 'likes': post.likes, } ) return JsonResponse({'likes': post.likes}) ``` ```python class PostNotifyConsumer(AsyncWebsocketConsumer): async def connect(self): post_id = self.scope['url_route']['kwargs']['post_id'] self.group_name = f'post_{post_id}' await self.channel_layer.group_add(self.group_name, self.channel_name) await self.accept() async def post_update(self, event): await self.send(text_data=json.dumps({ 'likes': event['likes'], })) ``` 通用模式:**业务 view 改 DB → group_send 通知 → 所有 WebSocket 客户端 收到**。 ## ORM in async consumer WebSocket consumer 是 async,但 Django ORM 默认同步。 用 `sync_to_async`: ```python from asgiref.sync import sync_to_async class MyConsumer(AsyncWebsocketConsumer): async def receive(self, text_data): post = await sync_to_async(Post.objects.get)(pk=1) ``` 或者用 Django 4.1+ 的 async ORM: ```python post = await Post.objects.aget(pk=1) ``` ## 测试 Consumer ```python from channels.testing import WebsocketCommunicator from chat.consumers import ChatConsumer async def test_chat(): communicator = WebsocketCommunicator( ChatConsumer.as_asgi(), '/ws/chat/test/') connected, _ = await communicator.connect() assert connected await communicator.send_json_to({'message': 'hello'}) response = await communicator.receive_json_from() assert response['message'] == 'hello' await communicator.disconnect() ``` ## 性能 / 规模 Channels + Redis 配置 4 worker 在小服务器上: - 1000 并发 WebSocket connection 轻松 - 10000 取决于 Redis 配置 + 网络 - > 10w → 考虑专用 WebSocket server(Centrifugo / Phoenix Channels) ## 与替代品对比 | | Django Channels | 单独 Node WebSocket | Centrifugo | Phoenix LiveView | |---|---|---|---|---| | 语言 | Python | Node.js | Go | Elixir | | 集成 Django | 原生 | 需 Redis 中介 | 同 | N/A | | 并发上限 | 中(万级) | 高 | 极高(百万) | 极高 | | 复杂度 | 中 | 高(双栈) | 中 | 高(学语言) | | 适合 | Django app + 适量 WebSocket | 极致并发 + 现有 Node | 中大规模 push | 完全重 stack | ## 真实部署 case 我们一个内部协作工具: - Django 4.2 + Channels 4 - 500 实时在线协作 user - 用 1 台机器 (4 vCPU / 8 GB) + Redis - daphne 4 worker - nginx 反代 - 6 个月 0 down 足够。如果上 5000 在线考虑 Centrifugo。 ## 替代:HTMX + SSE 如果只是"服务端推" 不需要双向,用 SSE 替代(前面有篇): ```python @app.get('/sse/notifications') async def sse(): async def gen(): async for event in get_events(): yield f'data: {json.dumps(event)}\n\n' return StreamingHttpResponse(gen(), content_type='text/event-stream') ``` 更简单 / 更标准 HTTP / Django 5 原生支持。 WebSocket 仍是双向场景(聊天 / 实时协作)的更优选。 ## 踩过的坑 1. **daphne 没加进 INSTALLED_APPS**:staticfiles 没替换 → runserver 不识别 ASGI。 2. **channel layer 用 InMemory**:单 worker OK;多 worker 时消息 收不到(每 worker 独立 in-memory)。生产必用 RedisChannelLayer。 3. **WebSocket 鉴权**:默认 scope['user'] 是 AnonymousUser。 `AuthMiddlewareStack` 让 Django session cookie 生效。 JWT 等其它 auth 要自己写 middleware。 4. **客户端长时间不发消息断**:默认 keepalive 没设,nginx / proxy 一小时空闲 → 断。客户端定期发 ping:"" 空消息或者 protocol ping。 5. **`async_to_sync` + `sync_to_async` 混用** → ASGI ↔ WSGI 转换贵。 尽量整个 path 同 async / 同 sync。

Django 4+ async views:什么时候真的有用,什么时候踩坑

## 起因 Django 4.0+ 支持 async views,文档里说"并发 IO 性能提升"。 直接把所有 view 改 `async def`?踩了几个坑: - ORM 默认是同步的,async view 里 `User.objects.get()` 会 block event loop - middleware 不全 async,会有性能 penalty - 不是所有场景都受益 理解什么时候用、什么时候别用,才有价值。 ## 解决方案:分场景 ### 场景 A:view 里发 N 个外部 HTTP 请求(async 真的快) ```python # 同步版(用 requests) def aggregate_view(request): weather = requests.get('https://api.weather/...').json() news = requests.get('https://api.news/...').json() stocks = requests.get('https://api.stocks/...').json() return JsonResponse({'weather': weather, 'news': news, 'stocks': stocks}) ``` 3 个串行 IO,每个 300ms → 总 900ms。 ```python # async 版 import httpx async def aggregate_view(request): async with httpx.AsyncClient() as client: weather, news, stocks = await asyncio.gather( client.get('https://api.weather/...'), client.get('https://api.news/...'), client.get('https://api.stocks/...'), ) return JsonResponse({ 'weather': weather.json(), 'news': news.json(), 'stocks': stocks.json(), }) ``` 3 个并行,总 300ms(max)。3x 加速。 ### 场景 B:view 主要查 ORM(async 没用) ```python async def post_list(request): posts = Post.objects.filter(visibility='public')[:20] # ❌ return ... ``` Django ORM 默认同步。在 async view 里调用同步 ORM → 内部跑 `sync_to_async` 用线程池执行 → 比纯同步 view 还慢一点(多一次 context switch)。 Django 4.1+ 加了 async ORM: ```python async def post_list(request): posts = [p async for p in Post.objects.filter(visibility='public')[:20]] # 或: posts = await Post.objects.filter(...).a_in_bulk([1, 2, 3]) post = await Post.objects.aget(pk=1) await post.adelete() return ... ``` `a*` 系列方法是 async 版。但要点: - 这不让你"并发查多条"——还是单连接顺序查 - 唯一收益:不 block event loop(同进程能服务其它 async 请求) - 高 QPS + 复杂查询:Django ORM 仍是性能瓶颈(不是 async 能解的) ### 场景 C:streaming response(async 是必须的) ```python async def chat_stream(request): async def generator(): async for chunk in llm.stream(prompt): yield f'data: {json.dumps(chunk)}\n\n' return StreamingHttpResponse( generator(), content_type='text/event-stream', ) ``` LLM streaming / 长 polling / SSE → 必须 async 才能正常工作。 同步 view 在 stream 完成前 block 一个 worker。 ### 场景 D:mixed sync + async 需要在 async view 里调同步代码(如某个老 lib): ```python from asgiref.sync import sync_to_async @sync_to_async def heavy_sync_work(x): return cpu_bound_calc(x) async def view(request): result = await heavy_sync_work(42) return JsonResponse({'result': result}) ``` `sync_to_async` 把同步函数包成 awaitable,在线程池跑。 反向 `async_to_sync` 让 sync 调 async。 ## 启动方式 async views 需要 ASGI server(不是 WSGI): ```bash # 之前:gunicorn (WSGI) gunicorn myapp.wsgi # 现在:uvicorn (ASGI) uvicorn myapp.asgi:application --workers 4 # 或 gunicorn + uvicorn worker gunicorn -k uvicorn.workers.UvicornWorker -w 4 myapp.asgi:application ``` `myapp/asgi.py` 默认 Django 已生成。 WSGI server 跑 async view 也能跑(Django 自动用 sync_to_async 适配), 但性能不如 ASGI。 ## 性能测试(实际数据) 我对一个 endpoint 测试:3 个外部 API + 1 个 DB 查询。 | | wrk -t8 -c100 -d30s RPS | P95 latency | |---|---|---| | 同步 gunicorn(4 worker) | 35 | 2.9s | | async uvicorn(4 worker) | 320 | 380ms | 外部 IO 密集场景 async 收益巨大。 但纯 DB 查询 endpoint: | | RPS | P95 | |---|---|---| | 同步 gunicorn | 1200 | 80ms | | async uvicorn | 1100 | 90ms | async 反而略慢(额外的协程开销 + ORM 仍同步)。 ## 什么场景该用 async ✅ 适合: - 外部 API / webhook 调用密集 - SSE / streaming response - WebSocket (Django Channels) - 长 polling - AI / LLM 调用 ❌ 不适合: - 纯 CRUD(ORM 同步主导) - CPU 密集(GIL,async 没用) - 老 lib 没 async 版本 ## 实战:把现有 view 改 async 的流程 1. 评估:这 view 主要 IO 类型是什么?外部 HTTP > DB 查询 > 其它 → 值得改 2. 安装 ASGI server (uvicorn) 3. 把 view 函数 `def` → `async def` 4. 把 `requests` 换 `httpx` / `aiohttp` 5. 把 ORM 调用换 `aget` / `acreate` / async iter(如果用得到) 6. middleware:检查是否 async-aware,老 middleware 加 `async_capable = True` 7. 测试 + 性能 benchmark 对比 ## Channels(WebSocket) 如果要 WebSocket / SSE 大量并发,Django Channels 是标准: ```bash pip install channels channels-redis ``` ```python # routing.py from channels.routing import ProtocolTypeRouter, URLRouter from chat.consumers import ChatConsumer application = ProtocolTypeRouter({ 'websocket': URLRouter([ path('ws/chat/', ChatConsumer.as_asgi()), ]), }) ``` ```python # consumers.py from channels.generic.websocket import AsyncWebsocketConsumer class ChatConsumer(AsyncWebsocketConsumer): async def connect(self): await self.accept() async def receive(self, text_data): await self.send(text_data=f'echo: {text_data}') ``` Channels 让 Django 处理 WebSocket / SSE / HTTP / Channel layer (跨 worker 消息)全面 async。 ## 效果 我们一个 API gateway 类服务(聚合多个内部 service 数据)改 async: - P95 延迟从 1.2s → 230ms - 同硬件下 RPS 从 200 → 1500 - worker 数 from 16 减到 4(每个 worker 并发服务) - 内存占用减半 而我们的 CRUD 类 admin backend 改了一半发现没什么用,回滚保持同步。 ## 踩过的坑 1. **在 async view 里调同步 view function**:直接调会 block。 要 `await sync_to_async(other_view)(request)`。 2. **middleware 不 async**:所有 middleware 必须 async-compatible, 否则 Django 退化到 sync 模式。第三方 middleware 检查文档支持 async。 3. **DB connection pool**:async views 处理并发更高 → 同时打开的 DB connection 多 → DB 连接耗尽。配 PgBouncer 在前面。 4. **`@login_required` 等装饰器**:检查是否 async-compatible。 Django 4.1+ 内置装饰器都改了;第三方需要确认。 5. **测试**:`AsyncClient` 替代 `Client`: ```python async def test_view(): client = AsyncClient() response = await client.get('/api/...') ```

Celery + Redis 跑后台任务的最小可工作配置

Web 请求里同步发邮件 / 调外部 API / 跑重计算就是把响应时间往沟里推。 异步丢任务队列,Web 立刻返回,后台 worker 慢慢干。Celery + Redis 是 Python 生态最常用的组合。 ## 依赖 ```bash uv add 'celery[redis]' ``` `[redis]` extra 装上 `redis-py`,作为 broker + result backend。 ## 项目结构 ``` myapp/ ├── celery_app.py ├── tasks.py └── ... (Django/Flask/FastAPI) ``` ## celery_app.py ```python from celery import Celery app = Celery( 'myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1', include=['myapp.tasks'], ) app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Asia/Shanghai', enable_utc=True, task_acks_late=True, # worker 处理完才 ack;崩了任务会被重新派发 task_reject_on_worker_lost=True, worker_prefetch_multiplier=1, # 长任务必须设 1,否则一个 worker 卡住所有 prefetch task_time_limit=300, # 硬超时 5 分钟(SIGKILL) task_soft_time_limit=270, # 软超时(抛 SoftTimeLimitExceeded) ) ``` `prefetch_multiplier=1` 是长任务必须改的——默认 4,意思是每个 worker 预取 4 个任务,第一个任务慢的话另外 3 个被卡住等。 ## tasks.py ```python from celery import shared_task from celery.exceptions import SoftTimeLimitExceeded import smtplib @shared_task( bind=True, autoretry_for=(ConnectionError, smtplib.SMTPException), retry_backoff=True, # 1s, 2s, 4s, 8s ... retry_backoff_max=600, max_retries=5, ) def send_welcome_email(self, user_id, email): try: # ... 真的发邮件 ... with smtplib.SMTP('smtp.example.com', 587) as s: s.login(...) s.sendmail('from@', email, 'Welcome!') except SoftTimeLimitExceeded: # 清理资源 raise @shared_task def heavy_calc(rows): return sum(complex_fn(r) for r in rows) ``` `autoretry_for` + `retry_backoff` 是失败自动指数退避重试的现代写法。 比手写 `self.retry(countdown=...)` 简洁。 ## Web 端触发 ```python # Django / Flask / FastAPI 都一样 from myapp.tasks import send_welcome_email def signup_view(request): user = User.objects.create(...) send_welcome_email.delay(user.id, user.email) return Response({'ok': True}) # .delay() 是 .apply_async() 的简写 # 想要更细控制: send_welcome_email.apply_async( args=(user.id, user.email), countdown=60, # 60 秒后再执行 queue='email', # 路由到指定队列 expires=3600, # 1 小时后没执行就放弃 ) ``` ## 启动 worker ```bash uv run celery -A myapp.celery_app worker -l info -c 4 # -c 4: 并发数;CPU 密集任务建议 = 物理核心数;IO 密集可以更高 ``` 多队列分开: ```bash # 一个 worker 只跑邮件队列 uv run celery -A myapp.celery_app worker -l info -Q email -c 2 # 另一个 worker 跑默认 + 计算队列 uv run celery -A myapp.celery_app worker -l info -Q celery,heavy -c 8 ``` ## 周期任务(Celery Beat) ```python # celery_app.py app.conf.beat_schedule = { 'cleanup-every-hour': { 'task': 'myapp.tasks.cleanup_temp_files', 'schedule': 3600.0, # 秒;或 crontab(hour=3, minute=0) }, 'send-digest-monday-9am': { 'task': 'myapp.tasks.send_weekly_digest', 'schedule': crontab(hour=9, minute=0, day_of_week='mon'), }, } ``` 启动 beat(单独进程,单实例): ```bash uv run celery -A myapp.celery_app beat -l info ``` beat 是单实例的。多机部署只起一个 beat 进程,否则任务被触发多次。 用 `redbeat` 把 schedule 存 Redis 解决高可用。 ## 监控:Flower ```bash uv add flower uv run celery -A myapp.celery_app flower --port=5555 # 浏览器打开 http://localhost:5555 ``` 可以看到所有 worker、队列长度、任务历史、失败重试。 ## 生产部署清单 1. broker / backend 用 **不同 Redis DB**(broker 流量大、result 流量小) 2. 长时间不取的 result 自动过期:`result_expires=3600` 3. worker 用 systemd unit 管,开 `Restart=on-failure` 4. 监控队列长度(Prometheus celery exporter):超过阈值就扩 worker 5. 别在 task 里用 Django ORM 然后忘记 `connection.close()`: 长跑 worker 会泄连接 ## systemd 单元 ```ini # /etc/systemd/system/celery-worker.service [Unit] Description=Celery worker After=network.target redis.service [Service] Type=simple User=app Group=app WorkingDirectory=/srv/myapp EnvironmentFile=/srv/myapp/.env ExecStart=/srv/myapp/.venv/bin/celery -A myapp.celery_app worker -l info -c 4 Restart=on-failure RestartSec=5s KillMode=mixed [Install] WantedBy=multi-user.target ``` ## 踩过的坑 - task 改了签名(加 / 删参数)后还有老格式的任务在队列里,新 worker 处理时 会爆 TypeError。新增字段加默认值,删字段做兼容 wrapper。 - `task_serializer='pickle'` —— 不要!pickle 允许执行任意代码,broker 一被攻破 全完。一律 JSON。 - worker 跑 Django ORM 任务时,每个任务后忘记 `transaction.commit()`, 数据库连接长期持有,下一个任务可能看到老数据。Django 4+ 默认 ATOMIC_REQUESTS + 显式 `connection.close()` 能解决。 - Result backend 用 Redis 时,每个 task 结果默认占一个 key 直到 expire。 几十万任务的 backlog 能把 Redis 撑爆。要么 `task_ignore_result=True`, 要么 `result_expires` 设短。

gunicorn vs uvicorn vs uvicorn workers:Python web 进程模型对比

Python web 应用部署最常见的问题:worker 数怎么定?sync 还是 async? gunicorn vs uvicorn 选哪个?下面讲清楚。 ## 1. WSGI vs ASGI - **WSGI**:传统同步接口(Flask / Django 默认) - **ASGI**:异步接口(FastAPI / Starlette / Django 4+ async) WSGI 接口里每个请求一个线程同步处理;ASGI 一个 event loop 协程并发。 ## 2. 几种部署组合 ### A. gunicorn + sync workers(WSGI 经典) ```bash gunicorn -w 4 myapp:app ``` `-w 4` 起 4 个 worker 进程,每个进程同步处理一个请求。 WSGI app(Flask / Django)的默认。 并发上限 = workers 数。worker 数建议 `2 × CPU + 1`。 ### B. gunicorn + gthread ```bash gunicorn -w 4 --threads 8 myapp:app ``` 每个 worker 起 8 个线程,并发 = 4 × 8 = 32。 合适 IO 密集应用(Python 线程 GIL 不阻塞 IO)。 ### C. gunicorn + gevent / eventlet ```bash gunicorn -w 4 -k gevent --worker-connections 1000 myapp:app ``` gevent monkey-patch socket → 每个 worker 处理 1000 并发协程。 极高并发但代码必须 monkey-patch 友好(少用 C 扩展)。 适用:Django / Flask 想要异步表现但不能完全重写。 注意:psycopg2 等 C 扩展跟 gevent 不友好,要用 psycogreen 包。 ### D. uvicorn(ASGI 推荐) ```bash uvicorn myapp:app --workers 4 --host 0.0.0.0 --port 8000 ``` uvicorn 起 4 个 worker,每个跑独立 event loop。 FastAPI / Starlette 项目默认。 ### E. gunicorn + uvicorn workers(生产 ASGI 推荐) ```bash gunicorn myapp:app -w 4 -k uvicorn.workers.UvicornWorker \ --bind 0.0.0.0:8000 \ --timeout 60 --keep-alive 5 ``` gunicorn 管理 worker 生命周期(restart 优雅、信号处理稳),uvicorn 处理 ASGI 协议。FastAPI 生产推荐这个。 为什么不直接 uvicorn?gunicorn 的进程管理更成熟(worker 数、reload 信号、 preload、failover)。uvicorn 单进程模式 dev 用就好。 ## 3. worker 数选择 CPU 密集(每个请求大量计算):`workers = CPU 核数` IO 密集 + 同步代码(Flask):`workers = 2 × CPU + 1`(部分 worker 阻塞时 另一个能接客) IO 密集 + async(FastAPI):`workers = CPU 核数`,依靠 event loop 内并发 具体数字看 profiling,不要拍脑袋。 ## 4. 内存 每个 worker 独立内存。Django / 大型 app 一个 worker 可能 200-500 MB。 8 worker × 300 MB = 2.4 GB 起步。 `--preload`:在 fork 前加载 app 代码,worker 共享只读 page → 节省内存: ```bash gunicorn --preload -w 4 myapp:app ``` 副作用:app 启动慢,且某些资源(DB connection / cache)不能在 fork 前 初始化。生产开 preload 时常踩这个坑。 ## 5. graceful restart ```bash # Reload code without dropping requests kill -HUP $(cat gunicorn.pid) ``` gunicorn 收到 HUP 重读代码,启动新 worker,等老 worker 处理完旧请求再退。 **代码部署的标准操作**。 uvicorn 单进程没有这个能力(要重启),所以生产基本是 gunicorn + uvicorn workers。 ## 6. timeout ```bash gunicorn --timeout 60 ... ``` 请求处理超过 60s worker 被 kill。默认 30s。慢 endpoint 设大一点; 但太大会让卡死的 worker 长期占资源。 uvicorn worker 类型支持 `--timeout-keep-alive`、`--limit-concurrency` 等。 ## 7. keepalive ```bash gunicorn --keep-alive 5 ... ``` 每条 TCP 连接保持 5s 等下一个请求。CDN / 反代后面建议 30-60s 减少 TCP 握手开销。 ## 8. systemd unit ```ini # /etc/systemd/system/myapp.service [Unit] Description=My FastAPI app After=network.target [Service] Type=notify User=app Group=app WorkingDirectory=/srv/myapp EnvironmentFile=/srv/myapp/.env # gunicorn + uvicorn worker ExecStart=/srv/myapp/.venv/bin/gunicorn myapp.main:app \ -w 4 -k uvicorn.workers.UvicornWorker \ --bind 127.0.0.1:8001 \ --timeout 60 --keep-alive 30 \ --access-logfile - --error-logfile - ExecReload=/bin/kill -s HUP $MAINPID KillMode=mixed TimeoutStopSec=30 Restart=on-failure RestartSec=5s [Install] WantedBy=multi-user.target ``` 注意: - `Type=notify`:gunicorn 启动好之后 notify systemd - `--access-logfile -`:日志到 stdout → systemd journal - `KillMode=mixed`:先 SIGTERM 父进程,超时再 KILL 子进程 ## 9. 前面反代(nginx) ```nginx upstream myapp { server 127.0.0.1:8001; # keepalive 减少 TCP 握手 keepalive 32; } server { location / { proxy_pass http://myapp; proxy_http_version 1.1; proxy_set_header Connection ""; proxy_set_header Host $host; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_read_timeout 60s; proxy_connect_timeout 5s; } } ``` `proxy_http_version 1.1 + Connection ""` 让 nginx → gunicorn 用长连接。 ## 10. uvicorn 单独的场景 dev:`uvicorn --reload`(自动热加载 + 单进程) 小测试 / sidecar:`uvicorn --workers 2 ...` 生产不直接 uvicorn(信号处理 / preload / 优雅退出都不如 gunicorn)。 ## 11. Hypercorn / Daphne 等其它 ASGI server - **Hypercorn**:纯 Python,支持 HTTP/2 + HTTP/3 - **Daphne**:Django Channels 的官方推荐 - **Granian**:Rust 写的 ASGI,号称更快 uvicorn 是事实标准,其它有特定需求才考虑。 ## 12. asgi 与 wsgi 共存 老 Django 项目想加 async 路由:用 ASGI server + django.core.asgi.get_asgi_application(), sync 视图 / async 视图都能跑。 ## 踩过的坑 - 数据库连接没在 fork 后重建:preload + gunicorn → 所有 worker 共享一个 DB connection 导致游标错乱。`postworkerfork` hook 里重置 DB pool。 - worker 数太多:CPU 都被 context switch 吃了。8 核机器开 64 worker 是反优化。 - timeout 太短:报告 / 导出大数据的 endpoint 几十秒被 kill。慢 endpoint 设单独 timeout 或异步化(Celery)。 - 异步框架里 await 一个同步 IO(pandas / requests / boto3)→ 阻塞 event loop,其它请求全等。用 `run_in_executor` 或者换 async 库。

Python: GIL / asyncio / multiprocessing / threading 选谁

## 起因 新人经常困惑:"Python 怎么做并发?asyncio / threading / multiprocessing 都是干嘛的?什么时候用哪个?" 下面用具体场景拆开。 ## GIL 是什么 Python(CPython 实现)的 GIL(Global Interpreter Lock)让同一时刻 只有一个 thread 在跑 Python bytecode。 threading 在 CPU 密集任务上**得不到并行加速**。 CPython 3.13+ 实验性的 "free-threaded" build 移除 GIL,但默认还是有 GIL,下面假设默认。 ## 三种并发选哪个 | 场景 | 推荐 | |---|---| | IO 密集(HTTP / DB / file) + 高并发 | asyncio | | IO 密集 + 现有同步代码 + 中等并发 | threading | | CPU 密集(数学 / 加密 / 解析) | multiprocessing | | 数据科学(numpy / pandas) | 用 numpy 内部并行 | | 跑 N 个独立 task(如批处理) | concurrent.futures.ProcessPoolExecutor | ## 详细 1:asyncio (IO 密集) ```python import asyncio import aiohttp async def fetch(url): async with aiohttp.ClientSession() as s: async with s.get(url) as r: return await r.text() async def main(): urls = ['https://x.com', 'https://y.com', 'https://z.com'] results = await asyncio.gather(*[fetch(u) for u in urls]) print(len(results)) asyncio.run(main()) ``` 3 个 fetch 并发跑(不是真的并行,但 IO 等待时事件循环切换)。 适合: - web server(FastAPI / Sanic / aiohttp) - API gateway - crawler - WebSocket 服务端 GIL 不阻碍 IO,所以 asyncio 单进程能处理 10k+ 并发连接。 ## 详细 2:threading (IO 密集 + legacy code) ```python import threading import requests def fetch(url): return requests.get(url).text threads = [threading.Thread(target=fetch, args=(u,)) for u in urls] for t in threads: t.start() for t in threads: t.join() ``` 跟 asyncio 类似的 IO 并发,但用同步代码 + thread。 threading 优势: - 不用改成 async - 现有 sync code 直接用 - thread pool 简单 劣势: - 创建 thread 开销大(asyncio coroutine 几 KB / thread 几 MB) - 上下文切换贵 - 并发上限低(几百 thread vs asyncio 万级 coroutine) 实际: - Django / Flask 用 thread-based server (gunicorn sync workers): 通常够用 - 高并发 / 长连接 → 改 asyncio ## 详细 3:multiprocessing (CPU 密集) ```python from multiprocessing import Pool def cpu_heavy(n): return sum(i * i for i in range(n)) with Pool(processes=8) as pool: results = pool.map(cpu_heavy, [10_000_000] * 8) ``` 8 个 Python 进程并行跑 → 真正利用多核 CPU。 GIL 是 per-process 的。多进程绕过 GIL → 多核并行计算。 代价: - 进程启动慢(几十 ms) - 进程间通信只能 pickle(大数据传输贵) - 每进程独立 RAM(不共享 Python 对象) 适合: - 图像处理批量 - 机器学习预处理 - 复杂数学 / 解析 ## 详细 4:concurrent.futures(统一 API) ```python from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # IO 密集 → thread with ThreadPoolExecutor(max_workers=20) as ex: results = list(ex.map(fetch_url, urls)) # CPU 密集 → process with ProcessPoolExecutor(max_workers=8) as ex: results = list(ex.map(cpu_heavy, datas)) ``` `concurrent.futures` 是 thread / process 的统一封装。 简单"batch 处理"场景首选。 ## 详细 5:用 numpy / pandas / pytorch 替代手写 CPU 并发 ```python import numpy as np # ❌ 手写循环(慢 + GIL) result = [x * 2 + y for x, y in zip(arr1, arr2)] # ✅ numpy vectorize result = arr1 * 2 + arr2 # C 层并行 + SIMD ``` NumPy / pandas / PyTorch 内部 release GIL 跑 C 代码 + 多核 SIMD。 "用对工具"比"加并发"快得多。 ## 混合:asyncio + thread pool asyncio 里需要调同步代码(如 pandas / requests): ```python import asyncio async def main(): loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, blocking_function, arg) ``` `run_in_executor` 把同步函数丢线程池跑,async 继续。 小心:仍受 GIL 限制(同步函数还是单核)。CPU 密集时换 `ProcessPoolExecutor`: ```python from concurrent.futures import ProcessPoolExecutor pool = ProcessPoolExecutor() result = await loop.run_in_executor(pool, cpu_heavy, data) ``` ## 实战:crawler 需求:抓 10000 个网页 + 解析(IO 密集 + 轻量 CPU)。 ```python import asyncio import aiohttp from bs4 import BeautifulSoup async def fetch_and_parse(session, url): async with session.get(url) as r: html = await r.text() # 解析在主线程(轻量) soup = BeautifulSoup(html, 'html.parser') return soup.title.string if soup.title else '' async def main(urls): semaphore = asyncio.Semaphore(50) # 限并发 async with aiohttp.ClientSession() as session: async def bounded(url): async with semaphore: return await fetch_and_parse(session, url) results = await asyncio.gather(*[bounded(u) for u in urls]) return results asyncio.run(main(urls)) ``` 50 并发同时跑 → 10000 URL 几分钟完成。 单 Python 进程,内存几百 MB。 如果解析很重(NLP / image),把解析丢 process pool: ```python async def fetch_and_dispatch(session, url, pool): async with session.get(url) as r: html = await r.text() loop = asyncio.get_running_loop() # 解析跑 process pool(绕过 GIL) return await loop.run_in_executor(pool, heavy_parse, html) ``` ## 实战:图像批量处理 CPU 密集 → multiprocessing: ```python from multiprocessing import Pool from PIL import Image def process(path): img = Image.open(path) img.thumbnail((800, 800)) img.save(path.replace('.jpg', '_thumb.jpg')) if __name__ == '__main__': with Pool(processes=8) as pool: pool.map(process, image_paths) ``` `if __name__ == '__main__':` 必须(multiprocessing fork 模式要)。 8 核机器 ~ 8x 加速。 ## free-threaded Python (3.13t) Python 3.13 引入 `--disable-gil` build: ```bash # 装 free-threaded version uv python install 3.13t ``` 理论上 threading 在 CPU 密集任务能并行。 但: - 实验性 + 慢一些(单线程慢 10-20%) - C extension 兼容性问题(很多包还没支持) - 几年内还不是默认 生产暂时仍用 GIL build + multiprocessing。 ## 性能对比(10000 URL fetch) | 方法 | 时间 | 内存 | |---|---|---| | sync requests + for loop | 30 min | 50 MB | | threading + ThreadPool(20) | 3 min | 200 MB | | threading + ThreadPool(200) | 1.5 min | 800 MB | | asyncio + aiohttp + sem(50) | 2 min | 150 MB | | asyncio + aiohttp + sem(200) | 45s | 250 MB | | multiprocessing | 没意义(IO 任务) | - | IO 任务:asyncio 全胜。 CPU 任务(10000 张图缩略): | 方法 | 时间 | |---|---| | sync for loop | 50 min | | threading | 48 min(GIL,几乎没加速) | | multiprocessing(8) | 8 min | | numpy vectorize 改写(若适用) | 5 min | ## 选型决策树 ``` 你的任务是? ├── IO 密集(网络 / 文件 / DB) │ ├── 高并发(万级)→ asyncio │ ├── 已有 sync code 不想改 → threading + ThreadPoolExecutor │ └── 简单批处理 → concurrent.futures.ThreadPoolExecutor │ ├── CPU 密集 │ ├── 数学 / 矩阵 → numpy / pytorch(向量化) │ └── 一般计算 → multiprocessing / ProcessPoolExecutor │ └── 混合 └── asyncio + run_in_executor(ProcessPoolExecutor) ``` ## 踩过的坑 1. **threading + requests**:默认 `requests.Session` 不 thread-safe。 每线程独立 session 或用 `aiohttp` async。 2. **multiprocessing + fork**:Linux fork 复制整个进程 → 大数据 in parent 也复制 → 内存爆。改 `spawn` 或者把 data 写文件 worker 读。 3. **asyncio 里调 `time.sleep(5)`** → 阻塞整个 event loop。 `await asyncio.sleep(5)`。 4. **mixed event loop**:多个 `asyncio.run` 嵌套 → 报"event loop already running"。一般 `asyncio.run` 只在 main 调一次。 5. **multiprocessing on Windows / Mac**:spawn 模式要求 worker 函数 可 pickle + 子进程重新 import module → 用 `if __name__ == '__main__':` 保护。