知识广场
按学科筛选:计算机科学 / 后端开发
«计算机科学 / 后端开发» 分类下共 35 篇帖子
Python web 应用部署最常见的问题:worker 数怎么定?sync 还是 async? gunicorn vs uvicorn 选哪个?下面讲清楚。 ## 1. WSGI vs ASGI - **WSGI**:传统同步接口(Flask / Django 默认) - **ASGI**:异步接口(FastAPI / Starlette / Django 4+ async) WSGI 接口里每个请求一个线程同步处理;ASGI 一个 event loop 协程并发。 ## 2. 几种部署组合 ### A. gunicorn + sync workers(WSGI 经典) ```bash gunicorn -w 4 myapp:app ``` `-w 4` 起 4 个 worker 进程,每个进程同步处理一个请求。 WSGI app(Flask / Django)的默认。 并发上限 = workers 数。worker 数建议 `2 × CPU + 1`。 ### B. gunicorn + gthread ```bash gunicorn -w 4 --threads 8 myapp:app ``` 每个 worker 起 8 个线程,并发 = 4 × 8 = 32。 合适 IO 密集应用(Python 线程 GIL 不阻塞 IO)。 ### C. gunicorn + gevent / eventlet ```bash gunicorn -w 4 -k gevent --worker-connections 1000 myapp:app ``` gevent monkey-patch socket → 每个 worker 处理 1000 并发协程。 极高并发但代码必须 monkey-patch 友好(少用 C 扩展)。 适用:Django / Flask 想要异步表现但不能完全重写。 注意:psycopg2 等 C 扩展跟 gevent 不友好,要用 psycogreen 包。 ### D. uvicorn(ASGI 推荐) ```bash uvicorn myapp:app --workers 4 --host 0.0.0.0 --port 8000 ``` uvicorn 起 4 个 worker,每个跑独立 event loop。 FastAPI / Starlette 项目默认。 ### E. gunicorn + uvicorn workers(生产 ASGI 推荐) ```bash gunicorn myapp:app -w 4 -k uvicorn.workers.UvicornWorker \ --bind 0.0.0.0:8000 \ --timeout 60 --keep-alive 5 ``` gunicorn 管理 worker 生命周期(restart 优雅、信号处理稳),uvicorn 处理 ASGI 协议。FastAPI 生产推荐这个。 为什么不直接 uvicorn?gunicorn 的进程管理更成熟(worker 数、reload 信号、 preload、failover)。uvicorn 单进程模式 dev 用就好。 ## 3. worker 数选择 CPU 密集(每个请求大量计算):`workers = CPU 核数` IO 密集 + 同步代码(Flask):`workers = 2 × CPU + 1`(部分 worker 阻塞时 另一个能接客) IO 密集 + async(FastAPI):`workers = CPU 核数`,依靠 event loop 内并发 具体数字看 profiling,不要拍脑袋。 ## 4. 内存 每个 worker 独立内存。Django / 大型 app 一个 worker 可能 200-500 MB。 8 worker × 300 MB = 2.4 GB 起步。 `--preload`:在 fork 前加载 app 代码,worker 共享只读 page → 节省内存: ```bash gunicorn --preload -w 4 myapp:app ``` 副作用:app 启动慢,且某些资源(DB connection / cache)不能在 fork 前 初始化。生产开 preload 时常踩这个坑。 ## 5. graceful restart ```bash # Reload code without dropping requests kill -HUP $(cat gunicorn.pid) ``` gunicorn 收到 HUP 重读代码,启动新 worker,等老 worker 处理完旧请求再退。 **代码部署的标准操作**。 uvicorn 单进程没有这个能力(要重启),所以生产基本是 gunicorn + uvicorn workers。 ## 6. timeout ```bash gunicorn --timeout 60 ... ``` 请求处理超过 60s worker 被 kill。默认 30s。慢 endpoint 设大一点; 但太大会让卡死的 worker 长期占资源。 uvicorn worker 类型支持 `--timeout-keep-alive`、`--limit-concurrency` 等。 ## 7. keepalive ```bash gunicorn --keep-alive 5 ... ``` 每条 TCP 连接保持 5s 等下一个请求。CDN / 反代后面建议 30-60s 减少 TCP 握手开销。 ## 8. systemd unit ```ini # /etc/systemd/system/myapp.service [Unit] Description=My FastAPI app After=network.target [Service] Type=notify User=app Group=app WorkingDirectory=/srv/myapp EnvironmentFile=/srv/myapp/.env # gunicorn + uvicorn worker ExecStart=/srv/myapp/.venv/bin/gunicorn myapp.main:app \ -w 4 -k uvicorn.workers.UvicornWorker \ --bind 127.0.0.1:8001 \ --timeout 60 --keep-alive 30 \ --access-logfile - --error-logfile - ExecReload=/bin/kill -s HUP $MAINPID KillMode=mixed TimeoutStopSec=30 Restart=on-failure RestartSec=5s [Install] WantedBy=multi-user.target ``` 注意: - `Type=notify`:gunicorn 启动好之后 notify systemd - `--access-logfile -`:日志到 stdout → systemd journal - `KillMode=mixed`:先 SIGTERM 父进程,超时再 KILL 子进程 ## 9. 前面反代(nginx) ```nginx upstream myapp { server 127.0.0.1:8001; # keepalive 减少 TCP 握手 keepalive 32; } server { location / { proxy_pass http://myapp; proxy_http_version 1.1; proxy_set_header Connection ""; proxy_set_header Host $host; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_read_timeout 60s; proxy_connect_timeout 5s; } } ``` `proxy_http_version 1.1 + Connection ""` 让 nginx → gunicorn 用长连接。 ## 10. uvicorn 单独的场景 dev:`uvicorn --reload`(自动热加载 + 单进程) 小测试 / sidecar:`uvicorn --workers 2 ...` 生产不直接 uvicorn(信号处理 / preload / 优雅退出都不如 gunicorn)。 ## 11. Hypercorn / Daphne 等其它 ASGI server - **Hypercorn**:纯 Python,支持 HTTP/2 + HTTP/3 - **Daphne**:Django Channels 的官方推荐 - **Granian**:Rust 写的 ASGI,号称更快 uvicorn 是事实标准,其它有特定需求才考虑。 ## 12. asgi 与 wsgi 共存 老 Django 项目想加 async 路由:用 ASGI server + django.core.asgi.get_asgi_application(), sync 视图 / async 视图都能跑。 ## 踩过的坑 - 数据库连接没在 fork 后重建:preload + gunicorn → 所有 worker 共享一个 DB connection 导致游标错乱。`postworkerfork` hook 里重置 DB pool。 - worker 数太多:CPU 都被 context switch 吃了。8 核机器开 64 worker 是反优化。 - timeout 太短:报告 / 导出大数据的 endpoint 几十秒被 kill。慢 endpoint 设单独 timeout 或异步化(Celery)。 - 异步框架里 await 一个同步 IO(pandas / requests / boto3)→ 阻塞 event loop,其它请求全等。用 `run_in_executor` 或者换 async 库。
## 起因 应用要异步处理 / 解耦 / 削峰,需要消息中间件。 候选: - **Kafka**:日志型,高吞吐,持久化 - **RabbitMQ**:经典 broker,AMQP,灵活路由 - **NATS / NATS JetStream**:现代轻量,Go 写 - **SQS / Pub/Sub**:云托管 下面对比关键差异 + 各自适合的场景。 ## Kafka ``` [Producer] → topic (partition × N) → [Consumer Group] ↓ 持久化到 disk (broker × N,replicated) ``` - 消息是 append-only log,consumer 维护 offset - partition 是并行度(一个 partition 同时一个 consumer instance) - replication 跨 broker → HA - 默认保留消息几天到几周(retention 配置) ### 用法 ```python # Producer (confluent-kafka) from confluent_kafka import Producer p = Producer({'bootstrap.servers': 'kafka:9092'}) p.produce('orders', key='user-1', value='{"action": "buy"}') p.flush() # Consumer from confluent_kafka import Consumer c = Consumer({ 'bootstrap.servers': 'kafka:9092', 'group.id': 'order-processors', 'auto.offset.reset': 'earliest', }) c.subscribe(['orders']) while True: msg = c.poll(1.0) if msg: print(msg.value()) c.commit(msg) ``` ### 优势 - 极高吞吐(百万 msg/s/broker) - 重放历史(offset 任意 reset) - 分布式天生 HA - 流处理生态强(Kafka Streams / Flink / ksqlDB) ### 劣势 - 操作复杂(broker + zookeeper / KRaft + partition 设计) - 资源消耗大(broker JVM 4 GB+) - 不擅长 priority / per-message TTL / 复杂路由 - 消费速度受 partition 数限制 ## RabbitMQ ``` [Producer] → [Exchange] → routing → [Queue × N] → [Consumer] ``` - 经典 broker 模型(push-based) - AMQP 协议,丰富 routing(direct / fanout / topic / headers) - 消息消费后默认删除(除非 stream queue) - 单 broker 几万 msg/s ### 用法 ```python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.queue_declare(queue='orders', durable=True) # Producer channel.basic_publish(exchange='', routing_key='orders', body='hello', properties=pika.BasicProperties(delivery_mode=2)) # 持久化 # Consumer def callback(ch, method, properties, body): print(body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(queue='orders', on_message_callback=callback) channel.start_consuming() ``` ### 优势 - 灵活路由(topic exchange `order.*.created`) - per-message TTL / priority - 死信队列原生 - 部署简单(单 Erlang 进程) - 消费 ack/nack / retry 模型完善 ### 劣势 - 吞吐 < Kafka - 不擅长 replay(消费后消息没了) - HA cluster 配置棘手(mirror / quorum queue) - 老版本运维痛点多 ## NATS / JetStream ``` [Publisher] → subject → [Subscriber] (JetStream 模式可持久化) ``` - Go 写,极轻量(单 binary 30 MB) - subject-based(类似 Kafka topic 但更灵活) - core NATS:fire-and-forget(极快) - JetStream:持久化 + replication(Kafka 类似 semantics) ### 用法 ```python import asyncio import nats async def main(): nc = await nats.connect('nats://localhost:4222') js = nc.jetstream() await js.add_stream(name='ORDERS', subjects=['orders.>']) # publish await js.publish('orders.created', b'hello') # consume sub = await js.subscribe('orders.>', durable='processor') async for msg in sub.messages: print(msg.data) await msg.ack() asyncio.run(main()) ``` ### 优势 - 极轻量 + 易部署(单 binary,跨平台) - 性能好(百万 msg/s core,几十万 jetstream) - subject wildcard 路由 - 现代设计(gRPC 风格) ### 劣势 - 生态比 Kafka / RMQ 小 - 工具链 / 监控比较新 - 跨 cluster 复制弱(不像 Kafka MirrorMaker 成熟) ## 横向对比 | | Kafka | RabbitMQ | NATS JetStream | SQS | |---|---|---|---|---| | 吞吐 | 极高 | 中 | 高 | 中 | | 持久化 | 默认 | 可选 | 可选 | 默认 | | 路由 | partition+key | 丰富 (AMQP) | subject wildcard | queue | | 顺序 | partition 内 | queue 内 | stream 内 | FIFO queue 内 | | Replay | ✅ | 弱 | ✅ | ❌ | | HA | 强(replication) | 中(cluster) | 强 | 托管 | | 部署 | 复杂 | 简单 | 极简 | 0 | | 资源 | 大 | 中 | 小 | 0 | | 适合 | 日志 / 流 / 大规模 | 任务队列 / RPC | 现代云 / 服务网格 | 简单异步 | ## 推荐 - **任务队列(celery 类)** → RabbitMQ 或 Redis Streams - **日志聚合 / event sourcing** → Kafka - **微服务消息总线 + 现代云原生** → NATS - **简单异步 + 不想运维** → SQS / Pub/Sub - **数据 pipeline ETL** → Kafka - **极致吞吐(百万 QPS)** → Kafka ## 我的实际选择 中小项目(< 100k msg/s): - **Celery + Redis**:Django 任务队列首选,0 引入 - **NATS JetStream**:微服务通信,轻量 大项目 / 数据 pipeline: - **Kafka**:必备 ## 真实 case 1:Celery → Kafka 迁移 老 Celery + Redis 跑 task queue。规模大了: - 任务 50k/s 起 Redis 撑不住 - 失败任务重放难 - 监控弱 改 Kafka: - 任务塞 topic - consumer group 处理 - 失败 → 进 retry topic(带 delay)→ DLQ - 重放 / 重处理一行命令 挑战: - partition 设计(保证某 user 的 task 顺序) - consumer rebalance 期间 short downtime 但 throughput + observability 大改善。 ## 真实 case 2:RabbitMQ 路由 某客户系统:"订单创建" event 要通知: - 库存(扣库存) - 财务(开发票) - 物流(准备发货) - analytics(统计) RabbitMQ topic exchange: ```python # producer channel.basic_publish(exchange='order_events', routing_key='order.created.us', body=...) # 4 consumer 各自 bind channel.queue_bind('inventory_queue', 'order_events', 'order.created.*') channel.queue_bind('finance_queue', 'order_events', 'order.created.*') channel.queue_bind('logistics_queue', 'order_events', 'order.created.us') channel.queue_bind('analytics_queue', 'order_events', 'order.*') ``` 灵活订阅,新 consumer 加入无需 producer 改动。 Kafka 也能做但要 topic 设计多副本 + 自管 routing。 ## 与 outbox pattern 配 应用先写 DB + outbox 表 → background job 读 outbox → 发到 MQ。 保证 DB 写成功 + MQ 发出一致(事务安全)。 debezium 等 CDC 工具自动监听 PG WAL → Kafka,省手写。 ## 不要为了用而用 很多场景: - 单 DB + 简单异步:cron job + DB queue 表 - 微服务通信:HTTP / gRPC(不必每次 async) - 通知:直接 webhook MQ 增加运维 + 复杂度。确认真的需要 async / decouple / 高吞吐 / replay 再上。 ## 踩过的坑 1. **Kafka partition 数选错**:partition 是并行度上限。设 4,consumer 增到 8 后 4 个 idle。partition 加容易减难,初始就设大些(如 32+)。 2. **RabbitMQ 队列 unbounded**:consumer 慢 + producer 快 → 队列长几 亿条 → broker OOM。设 `x-max-length` 或 `x-message-ttl` 限。 3. **NATS JetStream stream 配错**:retention 设短 / max bytes 小 → 消息被驱逐。production 前 stress test。 4. **at-least-once 重复处理**:所有 MQ 都是 at-least-once。consumer 必须幂等(用 message id 去重)。 5. **monitoring 缺**:MQ 死了应用一段时间没察觉。最少监控 lag / queue size / consumer alive。
## 起因 新人经常困惑:"Python 怎么做并发?asyncio / threading / multiprocessing 都是干嘛的?什么时候用哪个?" 下面用具体场景拆开。 ## GIL 是什么 Python(CPython 实现)的 GIL(Global Interpreter Lock)让同一时刻 只有一个 thread 在跑 Python bytecode。 threading 在 CPU 密集任务上**得不到并行加速**。 CPython 3.13+ 实验性的 "free-threaded" build 移除 GIL,但默认还是有 GIL,下面假设默认。 ## 三种并发选哪个 | 场景 | 推荐 | |---|---| | IO 密集(HTTP / DB / file) + 高并发 | asyncio | | IO 密集 + 现有同步代码 + 中等并发 | threading | | CPU 密集(数学 / 加密 / 解析) | multiprocessing | | 数据科学(numpy / pandas) | 用 numpy 内部并行 | | 跑 N 个独立 task(如批处理) | concurrent.futures.ProcessPoolExecutor | ## 详细 1:asyncio (IO 密集) ```python import asyncio import aiohttp async def fetch(url): async with aiohttp.ClientSession() as s: async with s.get(url) as r: return await r.text() async def main(): urls = ['https://x.com', 'https://y.com', 'https://z.com'] results = await asyncio.gather(*[fetch(u) for u in urls]) print(len(results)) asyncio.run(main()) ``` 3 个 fetch 并发跑(不是真的并行,但 IO 等待时事件循环切换)。 适合: - web server(FastAPI / Sanic / aiohttp) - API gateway - crawler - WebSocket 服务端 GIL 不阻碍 IO,所以 asyncio 单进程能处理 10k+ 并发连接。 ## 详细 2:threading (IO 密集 + legacy code) ```python import threading import requests def fetch(url): return requests.get(url).text threads = [threading.Thread(target=fetch, args=(u,)) for u in urls] for t in threads: t.start() for t in threads: t.join() ``` 跟 asyncio 类似的 IO 并发,但用同步代码 + thread。 threading 优势: - 不用改成 async - 现有 sync code 直接用 - thread pool 简单 劣势: - 创建 thread 开销大(asyncio coroutine 几 KB / thread 几 MB) - 上下文切换贵 - 并发上限低(几百 thread vs asyncio 万级 coroutine) 实际: - Django / Flask 用 thread-based server (gunicorn sync workers): 通常够用 - 高并发 / 长连接 → 改 asyncio ## 详细 3:multiprocessing (CPU 密集) ```python from multiprocessing import Pool def cpu_heavy(n): return sum(i * i for i in range(n)) with Pool(processes=8) as pool: results = pool.map(cpu_heavy, [10_000_000] * 8) ``` 8 个 Python 进程并行跑 → 真正利用多核 CPU。 GIL 是 per-process 的。多进程绕过 GIL → 多核并行计算。 代价: - 进程启动慢(几十 ms) - 进程间通信只能 pickle(大数据传输贵) - 每进程独立 RAM(不共享 Python 对象) 适合: - 图像处理批量 - 机器学习预处理 - 复杂数学 / 解析 ## 详细 4:concurrent.futures(统一 API) ```python from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # IO 密集 → thread with ThreadPoolExecutor(max_workers=20) as ex: results = list(ex.map(fetch_url, urls)) # CPU 密集 → process with ProcessPoolExecutor(max_workers=8) as ex: results = list(ex.map(cpu_heavy, datas)) ``` `concurrent.futures` 是 thread / process 的统一封装。 简单"batch 处理"场景首选。 ## 详细 5:用 numpy / pandas / pytorch 替代手写 CPU 并发 ```python import numpy as np # ❌ 手写循环(慢 + GIL) result = [x * 2 + y for x, y in zip(arr1, arr2)] # ✅ numpy vectorize result = arr1 * 2 + arr2 # C 层并行 + SIMD ``` NumPy / pandas / PyTorch 内部 release GIL 跑 C 代码 + 多核 SIMD。 "用对工具"比"加并发"快得多。 ## 混合:asyncio + thread pool asyncio 里需要调同步代码(如 pandas / requests): ```python import asyncio async def main(): loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, blocking_function, arg) ``` `run_in_executor` 把同步函数丢线程池跑,async 继续。 小心:仍受 GIL 限制(同步函数还是单核)。CPU 密集时换 `ProcessPoolExecutor`: ```python from concurrent.futures import ProcessPoolExecutor pool = ProcessPoolExecutor() result = await loop.run_in_executor(pool, cpu_heavy, data) ``` ## 实战:crawler 需求:抓 10000 个网页 + 解析(IO 密集 + 轻量 CPU)。 ```python import asyncio import aiohttp from bs4 import BeautifulSoup async def fetch_and_parse(session, url): async with session.get(url) as r: html = await r.text() # 解析在主线程(轻量) soup = BeautifulSoup(html, 'html.parser') return soup.title.string if soup.title else '' async def main(urls): semaphore = asyncio.Semaphore(50) # 限并发 async with aiohttp.ClientSession() as session: async def bounded(url): async with semaphore: return await fetch_and_parse(session, url) results = await asyncio.gather(*[bounded(u) for u in urls]) return results asyncio.run(main(urls)) ``` 50 并发同时跑 → 10000 URL 几分钟完成。 单 Python 进程,内存几百 MB。 如果解析很重(NLP / image),把解析丢 process pool: ```python async def fetch_and_dispatch(session, url, pool): async with session.get(url) as r: html = await r.text() loop = asyncio.get_running_loop() # 解析跑 process pool(绕过 GIL) return await loop.run_in_executor(pool, heavy_parse, html) ``` ## 实战:图像批量处理 CPU 密集 → multiprocessing: ```python from multiprocessing import Pool from PIL import Image def process(path): img = Image.open(path) img.thumbnail((800, 800)) img.save(path.replace('.jpg', '_thumb.jpg')) if __name__ == '__main__': with Pool(processes=8) as pool: pool.map(process, image_paths) ``` `if __name__ == '__main__':` 必须(multiprocessing fork 模式要)。 8 核机器 ~ 8x 加速。 ## free-threaded Python (3.13t) Python 3.13 引入 `--disable-gil` build: ```bash # 装 free-threaded version uv python install 3.13t ``` 理论上 threading 在 CPU 密集任务能并行。 但: - 实验性 + 慢一些(单线程慢 10-20%) - C extension 兼容性问题(很多包还没支持) - 几年内还不是默认 生产暂时仍用 GIL build + multiprocessing。 ## 性能对比(10000 URL fetch) | 方法 | 时间 | 内存 | |---|---|---| | sync requests + for loop | 30 min | 50 MB | | threading + ThreadPool(20) | 3 min | 200 MB | | threading + ThreadPool(200) | 1.5 min | 800 MB | | asyncio + aiohttp + sem(50) | 2 min | 150 MB | | asyncio + aiohttp + sem(200) | 45s | 250 MB | | multiprocessing | 没意义(IO 任务) | - | IO 任务:asyncio 全胜。 CPU 任务(10000 张图缩略): | 方法 | 时间 | |---|---| | sync for loop | 50 min | | threading | 48 min(GIL,几乎没加速) | | multiprocessing(8) | 8 min | | numpy vectorize 改写(若适用) | 5 min | ## 选型决策树 ``` 你的任务是? ├── IO 密集(网络 / 文件 / DB) │ ├── 高并发(万级)→ asyncio │ ├── 已有 sync code 不想改 → threading + ThreadPoolExecutor │ └── 简单批处理 → concurrent.futures.ThreadPoolExecutor │ ├── CPU 密集 │ ├── 数学 / 矩阵 → numpy / pytorch(向量化) │ └── 一般计算 → multiprocessing / ProcessPoolExecutor │ └── 混合 └── asyncio + run_in_executor(ProcessPoolExecutor) ``` ## 踩过的坑 1. **threading + requests**:默认 `requests.Session` 不 thread-safe。 每线程独立 session 或用 `aiohttp` async。 2. **multiprocessing + fork**:Linux fork 复制整个进程 → 大数据 in parent 也复制 → 内存爆。改 `spawn` 或者把 data 写文件 worker 读。 3. **asyncio 里调 `time.sleep(5)`** → 阻塞整个 event loop。 `await asyncio.sleep(5)`。 4. **mixed event loop**:多个 `asyncio.run` 嵌套 → 报"event loop already running"。一般 `asyncio.run` 只在 main 调一次。 5. **multiprocessing on Windows / Mac**:spawn 模式要求 worker 函数 可 pickle + 子进程重新 import module → 用 `if __name__ == '__main__':` 保护。
## 起因 Go 写数据库代码两条路: - 用 ORM(GORM):自动 query 但隐藏 SQL;复杂 join 难表达 - 手写 sql.DB:每条 query 自己写、scanner 自己 typed、容易写错 `sqlc` 是另一条路:写纯 SQL,工具生成完全类型化的 Go 函数。 两者优点都有:SQL 完全可控 + Go 调用类型安全。 ## 解决方案 ### 1. 装 ```bash go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest # 或 brew install sqlc / Docker ``` ### 2. 配 sqlc.yaml ```yaml version: "2" sql: - engine: "postgresql" queries: "db/query.sql" schema: "db/schema.sql" gen: go: package: "db" out: "internal/db" sql_package: "pgx/v5" # 或 "database/sql" ``` ### 3. 写 schema.sql ```sql -- db/schema.sql CREATE TABLE users ( id BIGSERIAL PRIMARY KEY, email TEXT NOT NULL UNIQUE, nickname TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE TABLE posts ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE, title TEXT NOT NULL, body TEXT NOT NULL, published_at TIMESTAMPTZ ); ``` ### 4. 写 query.sql ```sql -- db/query.sql -- name: GetUser :one SELECT * FROM users WHERE id = $1; -- name: GetUserByEmail :one SELECT * FROM users WHERE email = $1; -- name: ListUsers :many SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2; -- name: CreateUser :one INSERT INTO users (email, nickname) VALUES ($1, $2) RETURNING *; -- name: UpdateUserNickname :exec UPDATE users SET nickname = $2 WHERE id = $1; -- name: DeleteUser :exec DELETE FROM users WHERE id = $1; -- name: ListUserPosts :many SELECT p.*, u.nickname AS author_nickname FROM posts p JOIN users u ON u.id = p.user_id WHERE p.user_id = $1 ORDER BY p.published_at DESC LIMIT $2 OFFSET $3; -- name: CountPostsByUser :one SELECT count(*) FROM posts WHERE user_id = $1; ``` `-- name: X :one|many|exec|execrows` 告诉 sqlc 生成什么类型函数: - `:one` 返回单行 - `:many` 返回多行 - `:exec` 不返回(INSERT/UPDATE/DELETE) - `:execrows` 返回影响行数 ### 5. 生成 ```bash sqlc generate ``` `internal/db/` 下生成: ``` internal/db/ ├── db.go # 接口 + Queries struct ├── models.go # 表 → Go struct └── query.sql.go # 每个 query → Go 函数 ``` `models.go`: ```go type User struct { ID int64 Email string Nickname string CreatedAt time.Time } type Post struct { ID int64 UserID int64 Title string Body string PublishedAt pgtype.Timestamptz } ``` `query.sql.go`: ```go const getUser = `-- name: GetUser :one SELECT id, email, nickname, created_at FROM users WHERE id = $1 ` func (q *Queries) GetUser(ctx context.Context, id int64) (User, error) { row := q.db.QueryRow(ctx, getUser, id) var u User err := row.Scan(&u.ID, &u.Email, &u.Nickname, &u.CreatedAt) return u, err } ``` 类型完全推断自 schema。 ### 6. 用 ```go import ( "context" "github.com/jackc/pgx/v5/pgxpool" "myapp/internal/db" ) func main() { ctx := context.Background() pool, _ := pgxpool.New(ctx, "postgresql://localhost/myapp") defer pool.Close() queries := db.New(pool) // 创建 user, err := queries.CreateUser(ctx, db.CreateUserParams{ Email: "[email protected]", Nickname: "Alice", }) // 查 u, _ := queries.GetUserByEmail(ctx, "[email protected]") // 列表 posts, _ := queries.ListUserPosts(ctx, db.ListUserPostsParams{ UserID: u.ID, Limit: 20, Offset: 0, }) for _, p := range posts { fmt.Println(p.Title, p.AuthorNickname) } } ``` 写错列名 / 类型 / 参数 → 编译报错。重构 schema 后 `sqlc generate` 所有不兼容的 query 调用都立刻被编译器抓出。 ### 7. JOIN 出来的"虚拟表"自动生成 struct ```sql -- name: ListUserPosts :many SELECT p.*, u.nickname AS author_nickname FROM posts p JOIN users u ON u.id = p.user_id WHERE p.user_id = $1; ``` 生成: ```go type ListUserPostsRow struct { ID int64 UserID int64 Title string Body string PublishedAt pgtype.Timestamptz AuthorNickname string } func (q *Queries) ListUserPosts(...) ([]ListUserPostsRow, error) { ... } ``` 新结构体自动产生,不需要自己定义 DTO。 ### 8. transaction ```go tx, err := pool.Begin(ctx) defer tx.Rollback(ctx) qtx := queries.WithTx(tx) user, err := qtx.CreateUser(ctx, ...) _, err = qtx.CreateProfile(ctx, ...) return tx.Commit(ctx) ``` `WithTx(tx)` 返回绑定到事务的 queries 实例。所有调用都在同事务。 ### 9. 跑迁移 sqlc 不管迁移。配合 [goose](https://github.com/pressly/goose) / [golang-migrate](https://github.com/golang-migrate/migrate) / [atlas](https://atlasgo.io): ```bash goose -dir migrations postgres "postgresql://..." up ``` `migrations/0001_init.sql`: ```sql -- +goose Up CREATE TABLE users (...); -- +goose Down DROP TABLE users; ``` sqlc 的 schema.sql 通常是迁移结果的"快照"(开发时方便)。 生产以 migration 序列为准。 ### 10. 动态 query sqlc 主要服务于"静态 SQL"。动态 WHERE / ORDER BY 灵活性差。 解决: - 简单分页 / 排序:参数化 `LIMIT $1 OFFSET $2` - 可选 filter:`WHERE (@email::text IS NULL OR email = @email)` - 真正动态:用 squirrel / goqu 等 query builder,sqlc 处理静态部分 ## 与 GORM / ent 对比 | | GORM | ent | sqlc | |---|---|---|---| | 哲学 | ORM 全自动 | schema-first ORM | 写 SQL,生成 type-safe 代码 | | 学习曲线 | 低 | 中 | 极低(你已经会 SQL) | | 性能 | 中(reflection) | 高 | 高(直接 SQL) | | 复杂 JOIN | 难表达 | 好 | 自然写 SQL | | 类型安全 | 弱 | 强 | 强 | | migration | 内置 | 内置 | 需配合工具 | 我的取向:业务大量 SQL → sqlc;CRUD 简单 → GORM 也行;schema 变化 频繁 + 业务复杂 → ent。 ## 效果 我们 5 万行 Go 代码用 sqlc 替代手写 sql.DB.Query: - DB 相关 bug 减少 80%(编译期捕获) - 重构 schema 时编译器告诉所有要改的地方 - 团队新人 onboarding:会 SQL 就能写,不需要学 ORM 语法 - 性能比 GORM 快 ~30%(无 reflection / 直接 prepared statement) ## 踩过的坑 1. **nullable 列变 pgtype**:column `NULL` 在 Go 是 `pgtype.Text` / `pgtype.Int4` 等,不是 string / int。 `String.Valid` 字段判 null。 `SET sqlc.go.emit_pointers_for_null_types = true` 改用 `*string` 等。 2. **改 schema 后忘 sqlc generate**:编译失败但不知道为啥。 把 `sqlc generate` 加进 `go generate` + Makefile / justfile。 3. **复杂 query plan 看不见**:写了个 SQL 跑很慢,没人在 ORM 层 优化。`EXPLAIN ANALYZE` 是基本功 —— ORM 怎么也帮不上忙。 4. **time.Time vs pgtype.Timestamptz**:默认 timestamptz 列生成 `pgtype.Timestamptz`。`Time` 字段拿值,`Valid` 判 null。 设 `emit_exact_table_names = true` 让生成名跟列对应。 5. **JOIN 列名冲突**:两表都有 `id` 列 → 生成 struct 字段冲突。 SQL 里手动 alias:`SELECT p.id AS post_id, u.id AS user_id ...`。
REST 仍是默认选择,但有些场景 GraphQL 更合适。下面讲两者的取舍 + 最小骨架。 ## 哲学差异 - **REST**:服务端定义资源 + 端点,前端按需调用多个端点 - **GraphQL**:服务端定义 schema + resolver,前端按需 query 任意字段 ## 谁该选 GraphQL - 移动端 / 弱网:一次请求拿所有数据,比串多个 REST 快 - 复杂前端(仪表盘 / 列表 + 详情):避免 over-fetch / under-fetch - 多客户端(web / iOS / android)字段需求差异大 - 公共 API(GitHub / Shopify):用户按需 query ## 谁该选 REST - 简单 CRUD - 强缓存需求(HTTP cache 直接生效) - 流式 / 文件上传下载(REST 更自然) - 团队熟悉度 ## 1. REST 最小骨架 (FastAPI) ```python from fastapi import FastAPI from pydantic import BaseModel app = FastAPI() class User(BaseModel): id: int name: str email: str @app.get('/users/{uid}', response_model=User) def get_user(uid: int): ... @app.get('/users/{uid}/posts') def list_user_posts(uid: int): ... @app.post('/users', response_model=User, status_code=201) def create_user(payload: CreateUser): ... ``` 每个资源一组端点:GET / POST / PATCH / DELETE。 返回固定字段集合。 前端要拿"用户 + 他的 5 篇最新帖子": ```js // REST: 2 个请求 (worst case 串行) const user = await fetch('/users/42').then(r => r.json()) const posts = await fetch('/users/42/posts?limit=5').then(r => r.json()) ``` ## 2. GraphQL 最小骨架 (Strawberry, Python) ```bash uv add 'strawberry-graphql[fastapi]' ``` ```python import strawberry from typing import List @strawberry.type class Post: id: int title: str body: str @strawberry.type class User: id: int name: str email: str @strawberry.field def posts(self, limit: int = 5) -> List[Post]: return db.get_user_posts(self.id, limit) @strawberry.type class Query: @strawberry.field def user(self, id: int) -> User: return db.get_user(id) schema = strawberry.Schema(query=Query) from strawberry.fastapi import GraphQLRouter from fastapi import FastAPI app = FastAPI() app.include_router(GraphQLRouter(schema), prefix='/graphql') ``` 启动后浏览器打开 `http://localhost:8000/graphql`,自带 Playground IDE。 前端 query: ```graphql query { user(id: 42) { id name posts(limit: 5) { id title } } } ``` 一个请求拿全部,不传 email / body 等不需要的字段。 ## 3. 避免 N+1 query:DataLoader GraphQL 最大坑是 resolver 一对多时 N+1: ```python @strawberry.type class Post: author_id: int @strawberry.field def author(self) -> User: return db.get_user(self.author_id) # 每篇帖子都查一次 ``` 20 篇帖子 → 1 (list posts) + 20 (各自 author) = 21 次 DB 查询。 DataLoader 模式 batch + cache: ```python from strawberry.dataloader import DataLoader async def load_users(keys: List[int]) -> List[User]: rows = db.get_users_in(keys) by_id = {u.id: u for u in rows} return [by_id[k] for k in keys] user_loader = DataLoader(load_fn=load_users) @strawberry.type class Post: author_id: int @strawberry.field async def author(self) -> User: return await user_loader.load(self.author_id) ``` 20 篇帖子的 author 字段被合并成一次 `SELECT WHERE id IN (...)`。 所有现代 GraphQL 库都自带或推荐 DataLoader。 ## 4. mutation ```python @strawberry.input class CreatePostInput: title: str body: str @strawberry.type class Mutation: @strawberry.mutation def create_post(self, input: CreatePostInput) -> Post: return db.create_post(input) schema = strawberry.Schema(query=Query, mutation=Mutation) ``` 前端: ```graphql mutation { createPost(input: { title: "Hi", body: "..." }) { id title } } ``` ## 5. subscription(GraphQL over WebSocket) ```python @strawberry.type class Subscription: @strawberry.subscription async def comment_added(self, post_id: int) -> AsyncGenerator[Comment, None]: async for c in db.watch_comments(post_id): yield c schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription) ``` 前端用 WebSocket: ```graphql subscription { commentAdded(postId: 1) { id body } } ``` ## 6. 缓存 REST:HTTP cache + ETag + max-age 直接生效,CDN 友好。 GraphQL:所有 query 都 POST 到 `/graphql`,HTTP cache 不起作用。 要客户端层缓存(Apollo Client / urql / Relay)。 ## 7. 鉴权 / 授权 REST:每个 endpoint 装饰器 `@require_login` 简单。 GraphQL:每个 resolver 自己检查 ctx。或者用 directive: ```graphql type Query { adminStats: Stats @auth(role: "admin") } ``` 实现时拦截 schema 执行。 ## 8. error handling REST:HTTP status code (4xx/5xx)。 GraphQL:固定 200,error 在响应 `errors` 数组: ```json { "data": { "user": null }, "errors": [{ "message": "User not found", "extensions": { "code": "NOT_FOUND" } }] } ``` 部分字段失败仍返回其它字段的结果(partial success)—— GraphQL 的特点。 ## 9. 性能 - REST 缓存友好;GraphQL 缓存差 - GraphQL N+1 不注意会严重慢;DataLoader 必备 - REST endpoint 容易优化(针对单接口加索引);GraphQL 查询任意组合, 查询计划难预测 ## 10. 实际生产经验 - 内部团队 API:REST 够用 + 简单 - 公共开放 API:GraphQL 客户体验好但运维难 - 移动 app:GraphQL 节省流量 - B2B + 强一致:REST + OpenAPI 文档生成 client SDK 最稳 可以混搭:核心业务 REST,前端聚合层 GraphQL(BFF 模式)。 ## 11. 替代:tRPC 如果前后端都是 TypeScript,tRPC 让你完全跳过 schema: ```ts // 后端 export const appRouter = router({ user: router({ get: publicProcedure .input(z.object({ id: z.string() })) .query(({ input }) => db.users.findUnique({ where: { id: input.id } })), }), }) // 前端(类型完全自动推导) const user = await trpc.user.get.query({ id: '1' }) ``` 不是 REST / GraphQL 任一阵营,但端到端类型安全极其爽。 ## 踩过的坑 - GraphQL schema 暴露太多内部字段 → 给客户端"复杂查询"的能力变成 DOS 武器。query depth limit + complexity limit 是必须的。 - 把 GraphQL 用成"REST + JSON in URL":每个 query 只取一个对象一层字段, 完全没用 GraphQL 的优势。这种场景就用 REST。 - DataLoader 跨 request 共享:缓存了别的 request 的数据 → 数据错乱。 每个 request 新建 loader 实例。 - GraphQL 客户端缓存(Apollo)配错 normalize key → mutation 后 UI 没刷新。 仔细读 normalize 文档。