知识广场

按学科筛选:计算机科学
清除筛选

«计算机科学» 分类下共 256 篇帖子

收 webhook 端点设计:签名校验 + 幂等 + 异步重试

## 起因 要接 Stripe webhook 处理支付成功事件,"用户付款成功 → 给账户加余额"。 看似简单: ```python @app.post('/webhook/stripe') def handle(req): event = req.json() if event['type'] == 'payment_intent.succeeded': add_credit(event['data']['object']['customer'], 100) return {'ok': True} ``` 实际几个问题: 1. Stripe 重发同一事件(network 重试 / 我们一时返回 500)→ 用户余额被加多次 2. 没校验签名 → 任何人 POST 这个 URL 都能给账户加钱 3. 处理慢 → Stripe 超时 → 它重试 → 雪崩 4. add_credit 失败 → 错过事件 → 数据丢 正确实现需要 4 个东西:**签名校验、幂等、异步处理、死信队列**。 ## 解决方案 ### 1. 签名校验(永远第一步) Stripe 在 header `Stripe-Signature` 里发 HMAC: ```python import stripe STRIPE_WEBHOOK_SECRET = 'whsec_...' @app.post('/webhook/stripe') def handle(req): payload = req.body_bytes # 原始 bytes,不是 parsed JSON sig = req.headers.get('Stripe-Signature') try: event = stripe.Webhook.construct_event( payload, sig, STRIPE_WEBHOOK_SECRET ) except stripe.SignatureVerificationError: return Response(status=400) # ... 继续处理 ``` construct_event 内部验签 + parse。验签失败 → 400 拒绝。 **用原始 bytes,不要先 parse JSON 再 stringify**——序列化可能改字节 (key 顺序 / 空格),HMAC 算不对。 自己实现 webhook(不用 SDK)的签名校验: ```python import hmac, hashlib, time def verify(payload: bytes, sig_header: str, secret: str, tolerance=300): # 解 header: t=timestamp,v1=hex_signature parts = dict(p.split('=') for p in sig_header.split(',')) ts = int(parts['t']) if abs(time.time() - ts) > tolerance: raise ValueError('timestamp too old (replay?)') signed = f'{ts}.{payload.decode()}'.encode() expected = hmac.new(secret.encode(), signed, hashlib.sha256).hexdigest() if not hmac.compare_digest(expected, parts['v1']): raise ValueError('signature mismatch') ``` `hmac.compare_digest` 防时序攻击;`tolerance` 防 replay。 ### 2. 幂等:去重表 + 唯一 event id Stripe 每个 event 有唯一 `id` 字段,重发时 id 不变。 建一个表存已处理 id: ```sql CREATE TABLE webhook_events ( id TEXT PRIMARY KEY, -- Stripe event id (evt_...) type TEXT NOT NULL, received_at TIMESTAMPTZ DEFAULT now(), processed_at TIMESTAMPTZ, payload JSONB NOT NULL ); ``` 接收端: ```python @app.post('/webhook/stripe') def handle(req): # 1. 验签(上面) event = construct_event(...) # 2. 幂等检查 + 入库 try: db.execute( 'INSERT INTO webhook_events (id, type, payload) VALUES (%s, %s, %s)', (event.id, event.type, event.to_dict()) ) except UniqueViolation: # 重复事件,已经处理过,直接 200 OK return {'ok': True, 'duplicate': True} # 3. 异步处理 enqueue_event_processing.delay(event.id) # 4. 立刻返回 200(< 100ms) return {'ok': True} ``` INSERT + 唯一约束就是去重。返回前不真正处理业务——业务逻辑放到后台。 ### 3. 异步处理 ```python @celery_task(bind=True, max_retries=5) def enqueue_event_processing(self, event_id): e = db.fetch_event(event_id) if e.processed_at: return # 已处理 try: if e.type == 'payment_intent.succeeded': obj = e.payload['data']['object'] add_credit(obj['customer'], obj['amount_received']) elif e.type == 'invoice.payment_failed': send_payment_failed_email(...) # ... 其它事件 db.execute( 'UPDATE webhook_events SET processed_at = now() WHERE id = %s', (event_id,) ) except Exception as exc: # Celery 自动按 exponential backoff 重试 raise self.retry(exc=exc, countdown=2 ** self.request.retries) ``` 关键: - webhook endpoint 只做"放入队列",秒级返回 200 - 真处理在后台 worker,失败有重试 + 死信 - `processed_at` 字段防"重试 N 次都成功"被算多次 ### 4. 处理顺序 Webhook 可能乱序到达(network 重试)。如果业务关心顺序(如订单状态 pending → paid → shipped),需要: ```python def handle_event(e): obj_id = e.payload['data']['object']['id'] # 加锁防同对象并发处理 with db_lock(f'order:{obj_id}'): current = db.get_order(obj_id) if e.payload['data']['object']['updated'] < current.updated_at: return # 这是老事件,丢 update_order(obj_id, e.payload['data']['object']) ``` 用对象上的 timestamp / version 字段判断"这事件是不是 stale"。 ### 5. 死信队列 (DLQ) 重试 N 次都失败的事件不要丢,放 DLQ 人工 review: ```python @celery_task(bind=True, max_retries=5) def enqueue_event_processing(self, event_id): try: process(event_id) except Exception as exc: if self.request.retries >= self.max_retries: # 最后一次重试还失败 move_to_dlq.delay(event_id, str(exc)) else: raise self.retry(countdown=2 ** self.request.retries) ``` ```sql CREATE TABLE webhook_dlq ( id BIGSERIAL PRIMARY KEY, event_id TEXT REFERENCES webhook_events(id), error TEXT, moved_at TIMESTAMPTZ DEFAULT now(), resolved_at TIMESTAMPTZ ); ``` 定时 / Slack 告警:"DLQ 有 N 条未处理",人工修业务后从 DLQ 重放。 ### 6. 监控 ```python # Prometheus metrics webhook_received = Counter('webhook_received_total', '', ['provider', 'type']) webhook_processed = Counter('webhook_processed_total', '', ['provider', 'type', 'result']) webhook_lag = Histogram('webhook_lag_seconds', '') # 上报 webhook_received.labels('stripe', event.type).inc() # 处理完 webhook_processed.labels('stripe', event.type, 'ok').inc() ``` 仪表盘看: - 每分钟事件量 - 处理延迟 P95 - 错误率(按 event type 分) - DLQ 数量 ## 完整流程 ``` 1. Stripe POST 事件 ↓ 2. 验签(HMAC)→ 失败 400 拒绝 ↓ 3. INSERT 去重表(唯一 id)→ 已存在返 200 (dup) ↓ 4. 推入 Celery 队列 ↓ 5. 返回 200(end-to-end < 100ms) 后台: 6. worker 拿事件 ↓ 7. 业务处理(add_credit 等) ↓ 8. 成功 → 标记 processed_at 失败 → exponential retry ↓ 9. 重试 5 次都失败 → 进 DLQ + 告警 ``` ## 效果 按这套设计后: - 重复扣款 / 加钱 bug 归零 - 即使 Stripe 一秒发 1000 个事件,endpoint 不挂(异步队列削峰) - DLQ 偶尔有 2-3 条(被告警捕获,人工处理 < 1 小时) - 整套对 Stripe 的接入测试通过 Stripe 官方的 webhook test ## 安全 checklist 1. ✅ HMAC 签名校验 2. ✅ 时间戳验证防 replay 3. ✅ 用原始 bytes 验签 4. ✅ 不在 endpoint 里做业务 5. ✅ 业务幂等(DB unique constraint 等) 6. ✅ HTTPS only 7. ✅ Webhook secret 进 vault / env,不进 git ## 踩过的坑 1. **`request.json()` 之后再验签**:FastAPI / Flask 默认 parse JSON 后再处理,原始 body 拿不到。要用 `await request.body()` / `request.get_data()` 拿 bytes。 2. **endpoint 返回慢 → Stripe 重发**:处理逻辑写在 endpoint 里 → 8 秒返回 → Stripe 5 秒超时认为失败 → 重发 → 你又花 8 秒 → 死循环。 异步是必须的。 3. **重试时业务"看起来 OK 但 DB 没改"**:DB transaction 中间 commit 失败 → 业务部分生效 → 重试又跑一次。所有 update 必须 atomic transaction。 4. **DLQ 没人看**:上 DLQ 后没监控 / 告警,几个月后发现 1000 条 未处理。alert 是必须的。 5. **多 webhook 端点共享一个 secret**:Stripe / 其它 SaaS 给每个 endpoint 独立 secret 才对,混用一个 secret 出问题难定位来源。

用 strace 排查应用启动卡死 / 慢

应用启动卡了 30 秒、跑命令显示半天没动静、systemd 报 timeout —— 不知道 卡在哪步时,strace 能直接告诉你进程在做什么 syscall。 90% 的"卡"是某个 syscall 在等待:DNS / 网络连接 / 文件锁 / 互斥锁 / fsync。 ## 基本用法 ```bash # 跑一个新进程,观察所有 syscall strace -f -tt -o /tmp/trace.log ./myapp # attach 到正在运行的进程 strace -f -tt -p <PID> -o /tmp/trace.log # Ctrl-C 停(被 attach 的进程继续跑) ``` 关键参数: - `-f`:跟踪 fork 出来的子进程(多线程 / 多进程应用必选) - `-tt`:每行加微秒时间戳(看延迟必备) - `-o file`:写文件而不是混进 stdout - `-T`:每个 syscall 的耗时(找慢调用神器) - `-e trace=...`:只看某类 syscall 常见过滤: ```bash # 只看网络相关 strace -f -e trace=network -p <PID> # 只看文件 I/O strace -f -e trace=file -p <PID> # 只看慢的(> 100ms) strace -f -T -p <PID> 2>&1 | awk '/<[0-9]+\.[0-9]+>/{ split($NF, a, "<|>"); if (a[2]+0 > 0.1) print }' ``` ## 案例 1:应用启动卡 30 秒 ```bash strace -f -tt -T -o /tmp/start.log ./myapp # ... 30 秒后启动完成 sort -k1 /tmp/start.log | grep -E '\<[0-9]\.[0-9]+\>' | sort -k2 -t'<' -nr | head ``` 或者直接看时间戳跳变: ```bash awk '{print $1, $0}' /tmp/start.log | awk ' NR==1{prev=$1; next} { diff=$1-prev if (diff > 1) print "*** gap " diff "s at " $0 prev=$1 }' /tmp/start.log ``` 常见根因: - `connect(... 192.168.x.x:53 ...)` 后 timeout 几秒 → DNS 配置错或者 resolv.conf 第一个 server 不可达 - `openat(... /etc/...locked) = -1 EAGAIN` 反复重试 → 文件锁等其它进程 - `futex(... FUTEX_WAIT ...)` 长时间不返回 → 库内部 mutex 死锁 - `read(socket, ...)` 长时间阻塞 → 下游响应慢 ## 案例 2:DNS 慢 ```bash strace -f -e trace=network -tt curl http://api.example.com/ 2>&1 | head -30 # 11:23:45.001 socket(AF_INET, SOCK_DGRAM, 0) = 5 # 11:23:45.002 connect(5, {sa_family=AF_INET, sin_addr=8.8.8.8, ...}, 16) = 0 # 11:23:45.003 sendto(5, ...) = 32 # DNS query # 11:23:50.003 sendto(5, ...) = 32 # 5 秒后重试,第一次超时了 ``` 5 秒间隙说明第一个 nameserver 不响应。改 `/etc/resolv.conf`: ``` nameserver 1.1.1.1 nameserver 8.8.8.8 options timeout:1 attempts:2 ``` `timeout:1` 把单次 DNS 超时从默认 5 秒降到 1 秒。 ## 案例 3:找文件被打开了多少次 ```bash strace -f -e trace=openat -o /tmp/o.log ./myapp grep -oE '"[^"]+"' /tmp/o.log | sort | uniq -c | sort -rn | head # 832 "/etc/ld.so.cache" # 412 "/usr/lib/x86_64-linux-gnu/libc.so.6" # 158 "/proc/self/maps" # ... ``` 发现某个文件被 open 几百上千次 → 可能是配置文件没缓存、或者插件加载死循环。 ## strace vs ltrace vs perf | 工具 | 看什么 | 典型场景 | |---|---|---| | strace | syscalls | I/O 慢、卡 syscall、文件路径错 | | ltrace | 动态库函数调用 | 怀疑某个 libc 函数 / glib 函数行为 | | perf | CPU 采样 | CPU 100% 找热点函数 | | bpftrace | 几乎一切 | 复杂的内核态行为 | ## 性能影响 strace 会 **大幅** 拖慢被跟踪进程(每个 syscall 加上 ptrace 上下文切换)。 高并发服务上用 `-e ...` 严格限制范围,或者用 `bpftrace` 这种基于 eBPF 的"无停顿"工具。 ## 高级:只在某行附近抓 ```bash # 进程已经卡 5 分钟了,attach 抓 30 秒 strace -f -tt -p <PID> -o /tmp/probe.log & sleep 30 kill %1 ``` ## 踩过的坑 - 容器内可能没装 strace;`apt install strace` 在精简镜像装不上时, 从宿主 `nsenter` 进容器跑也行: ```bash sudo nsenter -t <PID> -p -m -n strace -f -p <PID> ``` - strace 输出的字符串默认截断 32 字符,要看完整路径加 `-s 256`。 - 多线程进程 attach 时 `-f` 会顺带跟所有线程,量大可能淹掉日志, 缩小范围用 `-e trace=...`。 - 不要在生产数据库上 attach strace —— 可能把 IO 慢到触发副节点切换 / 超时回滚。先看 perf / eBPF。

Tailwind CSS v3.4+ 设计令牌定制 + 一些"高级"用法

Tailwind 的核心是 utility-first CSS,但生产里要配合品牌设计系统, 直接用默认色板 / 间距是不行的。下面是 5 个把 Tailwind 用到位的点。 ## 1. 设计令牌:用 CSS 变量做主题 ```css /* src/styles/tokens.css */ @layer base { :root { --color-brand: 220 90% 56%; /* HSL,分量分开存便于 alpha */ --color-brand-soft: 220 90% 96%; --color-text: 222 47% 11%; --color-bg: 0 0% 100%; --radius: 0.75rem; } .dark { --color-brand: 220 90% 65%; --color-text: 210 40% 96%; --color-bg: 222 47% 11%; } } ``` `tailwind.config.ts`: ```ts export default { content: ['./src/**/*.{ts,tsx}'], darkMode: 'class', theme: { extend: { colors: { brand: 'hsl(var(--color-brand) / <alpha-value>)', 'brand-soft': 'hsl(var(--color-brand-soft) / <alpha-value>)', text: 'hsl(var(--color-text) / <alpha-value>)', bg: 'hsl(var(--color-bg) / <alpha-value>)', }, borderRadius: { DEFAULT: 'var(--radius)', }, }, }, } ``` 之后: ```html <div class="bg-brand text-white rounded"> <p class="text-brand/80">透明度 80%</p> </div> ``` `/80` 是 Tailwind v3.1+ 的 alpha 语法,配合 `<alpha-value>` 占位让 HSL 变量与 opacity 联动。 切换 dark mode: ```ts document.documentElement.classList.toggle('dark') ``` 所有 token 通过 CSS 变量级联变化。 ## 2. 自定义 plugin ```ts // tailwind.config.ts import plugin from 'tailwindcss/plugin' export default { plugins: [ plugin(({ addUtilities, theme }) => { addUtilities({ '.text-balance': { 'text-wrap': 'balance' }, '.scrollbar-thin': { 'scrollbar-width': 'thin', }, // 用 theme 引用其它 token '.no-x-scroll': { 'overflow-x': 'hidden', 'background': theme('colors.bg / 50%'), }, }) }), ], } ``` `<div class="text-balance">` 应用 `text-wrap: balance`(让标题换行均衡)。 ## 3. 任意值 / arbitrary values ```html <div class="w-[273px] grid-cols-[200px_1fr_auto] before:content-['→']"> ``` 方括号语法让你写一次性的非标准值,避免为了某个奇异需求改 config。 但常用值还是放 config 里。 ## 4. group / peer 改善交互样式 ```html <!-- 父 hover 时子元素响应 --> <div class="group"> <h3 class="group-hover:text-brand">Title</h3> <p class="group-hover:opacity-100 opacity-50">Detail</p> </div> <!-- 兄弟元素影响 --> <input type="checkbox" class="peer"> <label class="peer-checked:text-brand">Toggle me</label> <!-- 命名 peer --> <input type="checkbox" class="peer/agree"> <label class="peer-checked/agree:underline">同意条款</label> ``` `group-*` / `peer-*` 是 Tailwind 的 ":has" 替代品,不需要 JS。 ## 5. responsive + container queries ```html <!-- 视口响应式 --> <div class="text-sm md:text-base lg:text-lg"> <!-- 容器查询(Tailwind v3.4+) --> <div class="@container"> <div class="@md:flex @lg:grid"> ``` 启用 container queries 插件: ```ts import containerQueries from '@tailwindcss/container-queries' export default { plugins: [containerQueries], } ``` ## 6. dark mode 策略 ```ts darkMode: 'class' // 用 .dark 类 // 或: darkMode: 'media' // 用 prefers-color-scheme // 或最新: darkMode: ['selector', '[data-mode="dark"]'] ``` `class` 模式最灵活——可以手动切;`media` 跟随系统。 现代项目通常 class + 手动初始化跟随系统。 ## 7. arbitrary properties ```html <div class="[mask:linear-gradient(black,transparent)]"> <div class="[--my-var:42px] [width:var(--my-var)]"> ``` 让任何 CSS 属性都能写在 class 里——逃生舱。 ## 8. apply(谨慎用) ```css .btn { @apply px-4 py-2 rounded bg-brand text-white; } .btn:hover { @apply bg-brand-dark; } ``` `@apply` 让你把 Tailwind class 抽成自定义类。适合: - 公司风格按钮 / input 等基础组件 - 把超长 class 列表抽出(可读性) 不适合:日常 UI(违反 utility-first 哲学)。 ## 9. just-in-time(JIT)默认开启 Tailwind v3+ 默认 JIT 模式: - 只生成你用到的 class - 任意值(`w-[273px]`)即时编译 - watch 模式毫秒级重新生成 生产 CSS 体积通常 5-20 KB(gzipped),比传统 CSS 框架轻。 ## 10. 与 shadcn/ui `shadcn/ui` 不是组件库,是 "复制粘贴的 Tailwind 组件代码": ```bash npx shadcn-ui@latest init npx shadcn-ui@latest add button card dialog ``` 把组件 .tsx 文件复制到你的项目里,你拥有源码可改。 2024 后 React 项目用 Tailwind + shadcn 几乎是 default。 ## 11. typography 插件 ```bash npm i -D @tailwindcss/typography ``` ```html <article class="prose dark:prose-invert"> <h1>...</h1> <p>...</p> </article> ``` 给纯 markdown 渲染的文章一键加上专业排版(标题大小、行高、代码块、 列表等)。 ## 12. forms 插件 ```bash npm i -D @tailwindcss/forms ``` normalize 浏览器默认 input / select / checkbox 样式差异, 让你 utility class 控制更可预测。 ## 踩过的坑 - `content` 配置漏路径:JIT 扫不到你的源文件,class 没生成 → 看着没样式。 确认 `content: ['./src/**/*.{ts,tsx,html}']` 覆盖。 - 类名拼接 / 动态字符串:`text-{color}` 这种 Tailwind 扫描不到。 改成完整字符串 `text-red-500` 写出来,或在 safelist 里列出。 - `@apply` 滥用:所有按钮都 `.btn` `.btn-lg` `.btn-primary`,最后 CSS 比手写还多。utility-first 才是 Tailwind 的正道。 - dark mode class 加错位置:必须在 `<html>` 而不是 `<body>` 上,否则 某些组件查不到。

HashiCorp Vault:动态 secret + 自动 rotation

## 起因 应用要数据库密码 / API key / cloud credential。 反模式: - secret 写 .env 文件 commit git(最差) - 环境变量(多 deploy 多份) - AWS Secrets Manager 静态(仍要手动 rotate) - K8s Secret base64(任何 cluster admin 看见) **Vault**: - 动态生成 secret(每应用 / 每会话独立 credential) - TTL + 自动 revoke - 审计 log 每次访问 - 多 backend(DB / cloud / PKI / ...) ## 装 ```bash # dev 模式(生产不能用) vault server -dev # prod (单机) vault server -config=vault.hcl ``` ```hcl # vault.hcl ui = true storage "raft" { path = "/var/lib/vault" node_id = "node1" } listener "tcp" { address = "0.0.0.0:8200" tls_cert_file = "/etc/vault/cert.pem" tls_key_file = "/etc/vault/key.pem" } ``` init + unseal: ```bash vault operator init # 显示 unseal key + root token vault operator unseal <key1> vault operator unseal <key2> vault operator unseal <key3> ``` Shamir secret sharing:5 个 unseal key 中 3 个能解锁。 ## 静态 secret (KV) ```bash vault kv put secret/myapp/db password=secret123 username=admin vault kv get secret/myapp/db ``` 应用读: ```python import hvac client = hvac.Client(url='https://vault:8200', token='...') secret = client.secrets.kv.v2.read_secret_version(path='myapp/db') db_pass = secret['data']['data']['password'] ``` 跟 AWS Secrets Manager 类似,但 KV 不是 Vault 杀手锏。 ## 动态 secret (DB) 杀手 feature:vault 实时生成 DB credential。 ```bash # 配 PG backend vault secrets enable database vault write database/config/myapp-pg \ plugin_name=postgresql-database-plugin \ connection_url="postgresql://{{username}}:{{password}}@pg:5432/myapp" \ allowed_roles="readonly,readwrite" \ username="vault_admin" \ password="..." # 定义 role vault write database/roles/readonly \ db_name=myapp-pg \ creation_statements="CREATE USER \"{{name}}\" WITH PASSWORD '{{password}}' VALID UNTIL '{{expiration}}'; \ GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\";" \ default_ttl="1h" \ max_ttl="24h" ``` 应用请求: ```bash vault read database/creds/readonly # username = v-token-readonly-xxx # password = randompw # lease_id = ... # lease_duration = 3600 ``` vault 实时在 PG 里 CREATE USER + GRANT,返回临时 credential。 1 小时后自动 DROP USER → credential 失效。 ## 应用集成 ```python import hvac client = hvac.Client(url='https://vault:8200') client.auth.kubernetes.login(role='myapp', jwt=open('/var/run/secrets/k8s/jwt').read()) creds = client.secrets.database.generate_credentials('readonly') username = creds['data']['username'] password = creds['data']['password'] # 用 credential 连 PG db = psycopg2.connect(host='pg', user=username, password=password) ``` 每应用启动拿一份独立 credential。 应用挂了 / 重启 → 新 credential。 泄露 1 个 credential → 影响仅那 1 个,1 小时后自动废。 ## auth method 应用怎么登 Vault? - **token**:root token / 长期 token(不推荐) - **AppRole**:app + role-id + secret-id(机器登录) - **Kubernetes**:pod service account JWT 自动登 - **AWS / GCP / Azure**:cloud IAM identity - **OIDC**:人通过 SSO K8s 集成最常用: ```yaml # Pod 自动有 service account token spec: serviceAccountName: myapp ``` Vault 配 K8s auth → JWT 验证后给 Vault token。 ## PKI (cert) vault 当内部 CA: ```bash vault secrets enable pki vault write pki/root/generate/internal common_name="myorg" ttl=8760h ``` 应用请求 cert: ```bash vault write pki/issue/myapp common_name=myapp.example.com ttl=24h # 返回 cert + key + ca chain ``` 服务每天 rotate cert,过期自动失效。 比 Let's Encrypt 还自动化(内部 service 间 mTLS)。 ## audit log ```bash vault audit enable file file_path=/var/log/vault/audit.log ``` 每次 secret 访问 log: ```json { "time": "2025-03-14T...", "auth": {"display_name": "k8s-myapp-pod-xxx"}, "request": {"path": "database/creds/readonly", "operation": "read"}, "response": {...} } ``` 事后审计:谁什么时候拿了什么 secret。compliance 必要。 ## 与 AWS Secrets Manager 对比 | | Vault | AWS Secrets Manager | |---|---|---| | 动态 secret | ✅ (DB / cloud / SSH) | 弱(仅 DB rotation) | | 多 cloud | ✅ | AWS only | | PKI | ✅ | ❌ | | 自托管 | ✅ | ❌ | | 价格 | OSS 0 | $0.40 / secret / month | | 复杂度 | 高 | 低 | 简单需求 → AWS Secrets Manager 省事。 多 cloud / 动态 secret 重 / 严格合规 → Vault。 ## sealed-secret(K8s 平民方案) 不想上 Vault 但又不想 git 存 secret: ```bash # 生成 sealed-secret echo -n "secret123" | kubectl create secret generic mysecret --dry-run=client --from-file=password=/dev/stdin -o yaml | kubeseal -o yaml > sealed.yaml ``` sealed.yaml 加密,commit git 安全。 集群里 sealed-secret-controller 解密 → 创建普通 K8s Secret。 简单 + GitOps friendly,但没动态 / rotation / audit。 小项目 OK,企业级要 Vault。 ## external-secrets operator K8s + 外部 secret store(AWS SM / Vault / GCP SM 等): ```yaml apiVersion: external-secrets.io/v1 kind: ExternalSecret metadata: name: db-pass spec: refreshInterval: 1h secretStoreRef: name: vault-backend kind: ClusterSecretStore target: name: db-pass-secret data: - secretKey: password remoteRef: key: secret/myapp/db property: password ``` operator 从 Vault 同步到 K8s Secret,应用用 K8s Secret 不变。 适合:"想用 Vault 集中管 secret + 应用不用改"。 ## Vault as service mesh (Consul Connect) Vault + Consul 一起做 mTLS service mesh。 某些场景替代 Istio。 ## 真实部署 我们 prod: - Vault HA cluster(3 node Raft) - AWS RDS / Redis / Slack API / GitHub token 都 Vault 管 - K8s auth method(pod 自动 login) - DB credential 1h TTL - audit log → Loki 效果: - 0 secret in git - credential leak 不再 catastrophic(1 小时失效) - audit 自动 → SOC2 / ISO 文档自动生成 挑战: - Vault 自身要 HA(挂了应用拿不到 secret → 失败) - unseal key 灾备(多人持有) - 学习曲线(团队培训几周) ## 踩过的坑 1. **unseal key 丢**:Vault crash 后没人能 unseal → 数据丢。 3 人各持 1 把 key + 异地存。 2. **lease 不 renew**:长跑应用 credential 1h 过期后失败。要么续 lease 要么处理 reauth。 3. **policy 写错**:太宽 → app 能读其它 app secret。policy review。 4. **Vault 单点**:3 节点 HA 但 quorum 挂 → 全停。raft snapshot 备份。 5. **dev 模式记 prod**:`vault server -dev` 数据 in-memory,重启丢。 生产绝不用 dev mode。

hyperfine:CLI 命令 benchmark 工具

## 起因 想知道哪个命令更快: - `find . -name '*.py'` vs `fd -e py` - `cat file | grep foo` vs `rg foo file` - 两个不同 build 工具 老办法 `time cmd`:跑一次结果有噪声,不科学。 `hyperfine`(Rust)专门 benchmark:自动多次跑 + warmup + 统计 + 对比 + 优雅输出。 ## 装 ```bash brew install hyperfine cargo install hyperfine ``` ## 基本 ```bash $ hyperfine 'find . -name "*.py"' Benchmark 1: find . -name "*.py" Time (mean ± σ): 345.2 ms ± 12.3 ms [User: 90.1 ms, System: 245.6 ms] Range (min … max): 330.1 ms … 365.8 ms 10 runs ``` 自动跑 10 次 + mean / σ / range。 ## 对比两命令 ```bash $ hyperfine 'find . -name "*.py"' 'fd -e py' Benchmark 1: find . -name "*.py" Time (mean ± σ): 345.2 ms ± 12.3 ms Benchmark 2: fd -e py Time (mean ± σ): 62.1 ms ± 3.4 ms Summary 'fd -e py' ran 5.56 ± 0.36 times faster than 'find . -name "*.py"' ``` 直接看 fd 比 find 5.5x 快。 ## warmup 第一次跑可能 cold cache: ```bash hyperfine --warmup 3 'cmd' ``` 跑 3 次 warmup(不计时)+ 后面 10 次计时。 避免 disk cache miss 干扰。 ## prepare / cleanup ```bash hyperfine --prepare 'sync; echo 3 > /proc/sys/vm/drop_caches' 'cmd' ``` 每次 run 前 prepare(drop cache 测 cold path)。 或者每次后 cleanup(删 output file 之类)。 ```bash hyperfine --cleanup 'rm output.txt' 'process input > output.txt' ``` ## 参数 sweep ```bash hyperfine --parameter-list n 100,1000,10000 'sleep 0.{n}' ``` 跑 sleep 0.100 / 0.1000 / 0.10000 各 10 次 → 输出 sweep 表格。 例:测不同 thread 数: ```bash hyperfine --parameter-scan threads 1 8 'make -j{threads}' ``` 1-8 thread 各跑 make → 看 sweet spot。 ## export 数据 ```bash hyperfine 'cmd1' 'cmd2' --export-markdown result.md hyperfine 'cmd1' 'cmd2' --export-json result.json hyperfine 'cmd1' 'cmd2' --export-csv result.csv ``` Markdown 直接贴 PR 比较 before/after。 ## 真实 case 1:CI 测试加速 CI 跑 pytest 改 plugin: ```bash hyperfine --warmup 1 \ 'pytest -p no:cacheprovider' \ 'pytest' \ 'pytest -n 4' \ 'pytest -n auto' ``` | | mean | |---|---| | pytest (no cache) | 45s | | pytest (cache) | 38s | | pytest -n 4 | 14s | | pytest -n auto (8 cores) | 9s | 加 -n auto = 5x 加速 → CI yaml 一行改动收益巨大。 ## 真实 case 2:build 工具对比 ```bash hyperfine --warmup 2 --prepare 'rm -rf node_modules dist' \ 'npm install && npm run build' \ 'pnpm install && pnpm build' \ 'bun install && bun run build' ``` 得出"对此项目 pnpm 比 npm 快 2x"。 凭印象不如实测。 ## ignore failure ```bash hyperfine --ignore-failure 'might_fail_cmd' ``` 某些工具偶尔 fail 但你想 measure 成功的部分。 ## 数据可视化 ```bash hyperfine 'cmd1' 'cmd2' --export-json result.json python -m hyperfine.plot.histogram result.json ``` 直方图看 distribution(不只是 mean)。 发现"通常 1s 偶尔 10s" 的长尾。 ## warmup 不够准 ```bash hyperfine --runs 100 'cmd' ``` 跑 100 次 → CLT 收敛 → 更准 mean / σ。 缺点:慢命令做不到。 ## 与 perf / criterion 对比 - `perf stat cmd`:Linux 性能 counter(cache miss / branch misses 等),更深 - `criterion`(Rust lib):微基准,函数级别 - `hyperfine`:命令级别比较 hyperfine 是"对比工具 / shell 命令"的瑞士军刀。 深入 profile 用 perf。 ## 与 ab / wrk 对比 `ab` / `wrk` 测 HTTP server(并发 + RPS)。 hyperfine 测**单 invocation 时间**。 ```bash # 测 server 响应(应该用 wrk) wrk -t 4 -c 100 -d 30s http://localhost:8000/ # 测命令耗时 hyperfine 'curl http://localhost:8000/' ``` 不同用途。 ## 我的常用 alias ```bash alias bench='hyperfine --warmup 2' alias bench-cold='hyperfine --warmup 0 --prepare "sync; echo 3 | sudo tee /proc/sys/vm/drop_caches"' ``` 测优化前后效果: ```bash git stash # 老代码 bench 'cmd' git stash pop # 新代码 bench 'cmd' ``` 或者一次同时跑: ```bash git stash hyperfine 'cmd' --export-json before.json git stash pop hyperfine 'cmd' --export-json after.json hyperfine-compare before.json after.json ``` ## 踩过的坑 1. **stdout 输出大**:命令 print 几 MB → buffer 影响 measurement。 `> /dev/null` redirect。 2. **shell startup overhead**:测 sub-ms 命令 → shell fork 本身就 几 ms。极短命令 hyperfine 不准。 3. **multi-core 干扰**:测时其它进程跑 → 数字飘。`taskset 0x1 hyperfine ...` 绑核 isolation。 4. **测试 build 重复 work**:第二次 build incremental → 比第一次快。 `--prepare 'rm -rf build/'` 公平。 5. **system noise**:disk encryption / antivirus / Time Machine 等 背景影响。多次跑 + 看 σ。

PyTorch 训练 OOM 排查:activation checkpoint / 梯度累积 / offload

## 起因 要 fine-tune 一个 7B 模型,A100 40GB 显存,跑起来直接 CUDA OOM。 "换大卡"是简单解决但贵。理解几个技术能在同样显存里训更大模型 / 更大 batch。 ## 各项的显存占用拆解 训练时显存 ≈ 模型权重 + 梯度 + optimizer state + activations + 临时 buffer。以 7B FP16 模型 + AdamW 为例: | 项 | 公式 | 7B 模型 | |---|---|---| | 权重 | params × 2 bytes (fp16) | 14 GB | | 梯度 | params × 2 bytes | 14 GB | | optimizer state(AdamW) | params × 8 bytes (FP32 m+v) | 56 GB | | activations | 依 batch / seq | 几 GB-几十 GB | **总 = 84 GB + activations**。一张 A100 40GB 远不够。 ## 解决方案逐个上 ### 1. 混合精度(FP16/BF16)— 必选 ```python # pure PyTorch scaler = torch.cuda.amp.GradScaler() for batch in loader: with torch.cuda.amp.autocast(dtype=torch.bfloat16): loss = model(batch).loss scaler.scale(loss).backward() scaler.step(opt); scaler.update() ``` 权重 / 梯度从 FP32 4 bytes → FP16/BF16 2 bytes,对半省。 A100+ 推荐 BF16(无需 grad scaler,数值更稳)。 ### 2. Gradient Checkpointing — 用计算换显存 normal 前向把所有 activations 都存着(反向用)。checkpointing 只保存 某几层,其它 layer 反向时重新算前向: ```python model.gradient_checkpointing_enable() # transformers 一行 ``` 省 activations 50-80%,代价是训练慢 ~20-30%。LLM fine-tune 默认开。 ### 3. Gradient Accumulation — 模拟更大 batch 显存装不下 batch=32?跑 batch=8 累 4 次 = batch=32 等效: ```python accum_steps = 4 for i, batch in enumerate(loader): loss = model(batch).loss / accum_steps loss.backward() if (i + 1) % accum_steps == 0: opt.step(); opt.zero_grad() ``` 显存等同 batch=8,效果近似 batch=32。 ### 4. CPU offload(DeepSpeed / accelerate) 把 optimizer state 卸到 CPU 内存,反正它不参与每步前反向: ```python from accelerate import Accelerator acc = Accelerator( mixed_precision='bf16', gradient_accumulation_steps=4, ) model, opt, loader = acc.prepare(model, opt, loader) ``` 或用 DeepSpeed ZeRO-2 / ZeRO-3: ```python # accelerate config 选 DeepSpeed # 跑: accelerate launch --num_processes=1 \ --mixed_precision=bf16 \ --deepspeed_stage=2 \ train.py ``` ZeRO-2 把 optimizer state 分片(多卡时)/ offload 到 CPU(单卡时), 省 56 GB → 0 GB(cpu 接管)。代价:每 step 数据传输延迟。 ### 5. LoRA / QLoRA — 只训一小部分参数 ```python from peft import LoraConfig, get_peft_model config = LoraConfig( r=8, lora_alpha=16, lora_dropout=0.05, target_modules=['q_proj', 'v_proj'], task_type='CAUSAL_LM', ) model = get_peft_model(base_model, config) model.print_trainable_parameters() # trainable: 4.2M / 7B = 0.06% ``` 只有 LoRA 的小矩阵需要梯度 + optimizer state。7B 模型变成"7B 冻结 + 4M 可训",显存暴跌。 QLoRA 进一步把 base model 也量化到 4-bit: ```python from transformers import BitsAndBytesConfig bnb = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type='nf4', bnb_4bit_compute_dtype=torch.bfloat16, ) model = AutoModelForCausalLM.from_pretrained(name, quantization_config=bnb) model = get_peft_model(model, config) ``` 7B QLoRA 单 A100 40GB 训 batch=4 可以跑得动。 ## 效果(我的 case:A100 40GB fine-tune Qwen2 7B) | 配置 | 显存 | 训练速度 | 效果损失 | |---|---|---|---| | FP32 full ft | OOM | — | — | | BF16 full ft | OOM (~80 GB) | — | — | | BF16 + grad checkpoint | OOM (~50 GB) | — | — | | BF16 + checkpoint + ZeRO-2 cpu offload | 32 GB | 1x | 0 | | BF16 + LoRA | 24 GB | 1.3x | 微小 | | BF16 + QLoRA | 14 GB | 1.2x | 1-2% | 最终 QLoRA 跑通 fine-tune,loss 收敛、benchmark 比 base 提升 8%。 ## 调试技巧 ```python # 看每层显存 print(torch.cuda.memory_summary()) # 最大峰值 print(f'peak: {torch.cuda.max_memory_allocated() / 1e9:.2f} GB') torch.cuda.reset_peak_memory_stats() # 实时监控 nvidia-smi -l 1 # 或更细: nvtop ``` 跑 OOM 时立刻 `nvidia-smi` 看到底是 model load 时挂了还是 forward 时挂了, 对症下药。 ## 踩过的坑 1. **`del var` 不立刻释放**:PyTorch caching allocator 不还给 OS。 `torch.cuda.empty_cache()` 也只是把 cached block 让出来,**不会** 实际减少 OS 看到的进程显存。 2. **DataLoader pin_memory + num_workers 大**:每个 worker 一份 GPU 显存映射。OOM 时先减 `num_workers`。 3. **eval 不开 no_grad**:评估时没 `with torch.no_grad():`,accidentally build 完整 computation graph,显存翻倍。 4. **多个模型同时 load**:base model + LoRA + reward model 一起在 GPU 上时,DPO / RLHF 训练显存压力极大。把 reward model 量化或 freeze 后丢 CPU。 5. **使用 `compute_dtype=torch.float16` + Adam**:fp16 + Adam 数值 不稳定。一律 `bf16` 或者 fp32 master weight(mixed precision)。

Great Expectations + dbt test:数据质量门禁

## 起因 数据 pipeline 跑出来的表: - 突然某天 row 数 -80%(上游断了) - 某列 null 比例飙到 30%(schema 改了没告知) - 重要 metric 暴增 100x(埋点 bug) 下游 BI 报表 / model 已经用上 → 业务 / model 出错。 **数据质量监控**是数据团队必须建的。 两个主流工具: - **dbt test**(轻量,pipeline 集成) - **Great Expectations / GX**(重量,schema + 历史 profile) ## dbt test 基础 dbt model 的 schema.yml 里加 test: ```yaml # models/orders/schema.yml models: - name: orders columns: - name: id tests: - unique - not_null - name: status tests: - accepted_values: values: ['pending', 'paid', 'cancelled', 'refunded'] - name: amount tests: - dbt_expectations.expect_column_values_to_be_between: min_value: 0 max_value: 1000000 tests: - dbt_utils.expression_is_true: expression: "amount = price * quantity" ``` ```bash dbt test --select orders ``` 每个 test → 一个 SQL `SELECT count(*) WHERE 失败条件`。 返回 > 0 → 失败。 ### 优势 - 跟 dbt run 同 workflow(test after run) - SQL-based 简单 - `dbt_utils` / `dbt_expectations` 几百 test - 失败定位明确(具体 model + column) ### 劣势 - 只能查"当前结果是否符合规则" - 没历史 baseline / trend - 不知道"昨天 100 行,今天 50 行"算 anomaly ## dbt 常用 test ```yaml tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: [user_id, day] - dbt_utils.recency: datepart: day field: created_at interval: 1 # 最新 record 不能超过 1 天前 - dbt_expectations.expect_table_row_count_to_be_between: min_value: 1000 max_value: 100000 - dbt_expectations.expect_column_value_lengths_to_equal: column: phone value: 11 - dbt_expectations.expect_column_proportion_of_unique_values_to_be_between: column: email min_value: 0.9 ``` table_row_count 等 catch "今天数据突然少"。 ## Great Expectations (GX) ```python import great_expectations as gx context = gx.get_context() # 定义 expectation suite suite = context.suites.add(name='orders_suite') suite.add_expectation( gx.expectations.ExpectColumnValuesToBeBetween( column='amount', min_value=0, max_value=1000000)) suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column='user_id')) # 跑 validation validator = context.get_validator(...) results = validator.validate() ``` GX 比 dbt test 复杂但功能更多: - profile dataset → 自动生成 expectation - HTML 报告(哪 row 失败、% 等) - versioned expectation suite - data docs 生成 ## 自动 profile ```python profiler = gx.profile.UserConfigurableProfiler(profile_dataset=batch) suite = profiler.build_suite() ``` 让 GX 看现有数据,自动建 expectation(每列 min/max/null% etc)。 人审 + 调 → 提交。 第一次设 expectation 非常省时。 ## anomaly detection GX 0.18+ 支持 "expect column values stay close to historical mean": ```python gx.expectations.ExpectColumnMeanToBeBetween( column='daily_revenue', min_value=-3, max_value=3, strict_min=False, auto=True, # 自动 baseline last 30 day ) ``` 跟历史 baseline 比,3σ 之外报警。 catch "突然飙升 / 暴跌"。 ## dbt + GX 集成 `dbt-expectations` 包是 GX expectation 用 SQL 重写的 dbt test 版本。 所以两个工具的核心 expectation 高度重叠: ```yaml # dbt_expectations 在 dbt test 里 - dbt_expectations.expect_column_values_to_match_regex: column: email regex: '^[^@]+@[^@]+\.[^@]+$' ``` 简单 expectation → dbt_expectations 够,pipeline 内集成。 需要 profile / anomaly / 历史 → 用 GX。 ## 我们的 setup ``` 1. dbt run + dbt test(pipeline 内,failing test → 阻塞 pipeline) 2. GX 每天对核心表 daily check(独立 schedule) 3. anomaly detected → Slack 告警 + 人工调查 ``` dbt test 防"明显 bug"。GX 防"渐变趋势异常"。 ## test 严重级别 dbt 1.5+ test 有 severity: ```yaml - not_null: severity: error # 默认,阻塞 - not_null: severity: warn # 不阻塞,只 log ``` `warn` 给"偶尔不符合但还 acceptable"的规则。 ## 失败时怎么办 dbt test 默认 fail → 整 pipeline halt。 策略: 1. **block downstream**:dbt 默认行为,下游 model 不跑(避免坏数据传播) 2. **alert only**:`severity: warn`,下游照跑 + 通知人 3. **quarantine**:把坏 row 隔离到 errors table,好 row 继续 选哪个看业务容忍度。金融 → block。日志 → warn。 ## storing test results dbt test 默认结果不存。 `dbt-checkpoint` 或 自定义 macro 把结果写表: ```sql -- models/_test_results.sql SELECT current_timestamp AS run_at, '{{ this.name }}' AS test, {{ test_function() }} AS result ``` 历史化 → Grafana 看 test pass rate 趋势。 ## 真实 case:救命的 test 我们一个 ETL pipeline: ```yaml tests: - dbt_expectations.expect_table_row_count_to_be_between: min_value: 50000 # 历史日均 80k max_value: 200000 ``` 某天上游 partition 错 → 我们表只 catch 到 5k 行。 test 立刻失败 → pipeline halt → BI 没拿到坏数据 → 修 partition → re-run。 没这 test 的话,dashboard 显示 5k 行 = "今天业务下滑 95%", 高管 panic。 ## 与 monte-carlo / soda 对比 | | dbt test | GX | Monte Carlo | Soda | |---|---|---|---|---| | 部署 | 跟 dbt | self-host / cloud | SaaS | self/cloud | | 价格 | 0 | 0 | 贵 | mid | | anomaly | 基础 | 中 | 强 (ML) | 中 | | 集成 | dbt 原生 | API | data warehouse 联 | 类 GX | | 上手 | 极易 | 中 | easy(SaaS) | 中 | 预算紧 → dbt test + GX。 预算大 + 不想运维 → Monte Carlo。 ## 踩过的坑 1. **expectation 太严**:`amount > 0` 但实际有 refund 是负数 → 全部 alert false positive。expectation 必须 calibrate。 2. **suite 跟 schema 不同步**:表加列,suite 没改 → 没 cover。 review process。 3. **GX 版本升级**:0.x → 1.x breaking change 大。锁版本 / 小心升。 4. **test 跑慢**:每个 test 一条 query → 大表 N test 慢。dbt `--store-failures` 让结果存表 + 跑一次 query 多 test。 5. **silent broken**:test 跑了但通过(即使数据有问题)。覆盖度 review 重要。每次 incident 后加新 test,防同问题。

用 prometheus_client 给 Python 应用暴露指标(4 种 metric 用法)

`prometheus_client` 是 Prometheus 官方的 Python 库,让任何 Python 应用 能在 `/metrics` 端点导出 Prometheus 格式的指标。 ## 安装 + 最小可用 ```bash uv add prometheus-client ``` ```python # 暴露在独立端口 from prometheus_client import start_http_server, Counter, Gauge, Histogram req_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'path', 'status']) start_http_server(8001) # 独立 :8001/metrics # 然后做你的事 ... req_count.labels(method='GET', path='/users', status='200').inc() ``` 或挂在 FastAPI 路由: ```python from prometheus_client import make_asgi_app app.mount('/metrics', make_asgi_app()) ``` Django: ```python # urls.py from prometheus_client import make_wsgi_app from django.urls import path from django.views.generic import View class MetricsView(View): def get(self, request): ... # 用 django-prometheus 包更省事 ``` 实际项目直接用 `django-prometheus`: ```bash uv add django-prometheus ``` 加 middleware 后内置一堆 Django 指标(视图延迟、SQL 时间、缓存命中等)。 ## 4 种 metric 用法 ### Counter:单调递增 ```python requests = Counter('requests_total', '...', ['method']) requests.labels(method='GET').inc() requests.labels(method='POST').inc(5) ``` PromQL: ``` rate(requests_total[5m]) # 每秒请求数(按 5 分钟窗口) ``` ### Gauge:可上可下 ```python queue_size = Gauge('queue_size', '...') queue_size.set(42) queue_size.inc(); queue_size.dec() # 用 callback 让 Prometheus 拉取时实时计算 queue_size.set_function(lambda: r.llen('jobs')) ``` 适合:当前队列长度、连接数、温度、内存使用。 ### Histogram:分布 ```python latency = Histogram('http_latency_seconds', 'HTTP latency', ['endpoint'], buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]) # 上下文管理器自动测延迟 with latency.labels(endpoint='/users').time(): do_work() ``` PromQL 出 p95: ``` histogram_quantile(0.95, sum(rate(http_latency_seconds_bucket[5m])) by (le, endpoint)) ``` bucket 选 5-12 个,覆盖典型延迟范围。bucket 多了占资源,bucket 少了 分位数不准。 ### Summary:分位数(不推荐) ```python from prometheus_client import Summary size = Summary('request_size_bytes', '...') size.observe(2048) ``` Summary 在客户端算分位数,无法在多实例上做正确聚合。**生产基本只用 Histogram**,需要分位数时用 `histogram_quantile()` 在服务端算。 ## label 设计原则 label 的不同值组合决定了 series 的数量。label cardinality 高了 Prometheus 内存爆。 ```python # 错: user_id 是无限基数 requests.labels(user_id=user.id).inc() # 错: 完整 URL 含动态 ID requests.labels(path='/users/123/posts').inc() # 对: 路径模板化 requests.labels(path='/users/{id}/posts').inc() ``` **label 数应该是个有限小集合**:endpoint 模板、HTTP method、status code、 某几个固定 region 等。 ## 一个完整中间件(FastAPI) ```python import time from prometheus_client import Counter, Histogram from starlette.middleware.base import BaseHTTPMiddleware REQ = Counter('http_requests_total', '...', ['method', 'path', 'status']) LATENCY = Histogram('http_latency_seconds', '...', ['method', 'path'], buckets=[.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5]) class MetricsMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): t0 = time.perf_counter() response = await call_next(request) elapsed = time.perf_counter() - t0 # 用路由模板而不是实际路径 route = request.scope.get('route') path = route.path if route else request.url.path REQ.labels(method=request.method, path=path, status=str(response.status_code)).inc() LATENCY.labels(method=request.method, path=path).observe(elapsed) return response app.add_middleware(MetricsMiddleware) ``` `request.scope.get('route').path` 给出的是 `/users/{user_id}/posts` 这种模板,不是 `/users/42/posts` 这种实参。 ## 内置 collector `prometheus_client` 自带几个: ```python from prometheus_client import REGISTRY, GCCollector, PlatformCollector, ProcessCollector # 默认这些已经注册(CPython 平台 + 进程信息) # 可以手动反注册节省指标 REGISTRY.unregister(GCCollector(REGISTRY)) ``` `process_cpu_seconds_total`、`process_resident_memory_bytes` 等都是 免费白送的。 ## 多 worker 的坑 gunicorn / uvicorn 多 worker 时,每个 worker 是独立进程,独立的 metrics。 直接 scrape `/metrics` 只看到一个 worker 的数据。两种解法: 1. **mmap 共享**:设 `PROMETHEUS_MULTIPROC_DIR=/tmp/prom`,prometheus_client 会把指标写到共享目录,import 时聚合所有 worker 的数据。 2. **每个 worker 独立 scrape**:让 Prometheus 单独抓每个 worker 端口 (复杂度高,不推荐)。 mmap 版本写法: ```python # 进程启动时 import os os.environ['PROMETHEUS_MULTIPROC_DIR'] = '/tmp/prom' # 暴露指标时用 MultiProcessCollector from prometheus_client import multiprocess, CollectorRegistry, generate_latest registry = CollectorRegistry() multiprocess.MultiProcessCollector(registry) output = generate_latest(registry) ``` worker 退出时调 `multiprocess.mark_process_dead(pid)`。 ## 测试 ```python def test_counter_increments(): REQ.labels(method='GET', path='/x', status='200').inc() # 直接读 metric 内部值 val = REQ.labels(method='GET', path='/x', status='200')._value.get() assert val == 1 ``` ## 踩过的坑 - 把 user-id / session-id 当 label:1M 用户 = 1M series,Prometheus 崩盘。 这种"高基数"信息应该用 log,不用 metric。 - Histogram bucket 改了:旧数据和新数据不可比较,需要双写或重设。建议 bucket 一开始想清楚。 - `/metrics` 端点不要走鉴权:Prometheus scrape 没有简单的 auth; 挂内网或加 IP 白名单。 - 用 `with` 管 Histogram timer:抛异常时也会记录,所以错误请求的延迟 也算进 p95。如果想只看成功请求的延迟,加 try/except 区分。

marimo vs Jupyter:reactive notebook 的演化

## 起因 Jupyter notebook 用了 10 年,痛点积累: - cell 乱序执行 → 隐藏状态 / 难复现 - JSON 文件 git diff 不友好 - 无 IDE 类型检查 - 不能直接 run as script - import 难(要把 notebook 转 .py) `marimo`(2023+)是新一代 Python notebook,**reactive** 设计: - 单 file = `.py`(git friendly + 可 import) - cell dependency 图 → 改一个 cell 自动重跑下游 - 无隐藏状态(重启 → 一致结果) - 内置 UI widget(slider / dropdown 等) ## 装 ```bash pip install marimo # 或者 uv add marimo ``` ```bash marimo new # 新建 notebook marimo edit my.py # 编辑 marimo run my.py # 当 app 跑(read-only) ``` ## 文件格式 ```python # my_notebook.py import marimo __generated_with = "0.6.0" app = marimo.App() @app.cell def __(): import polars as pl df = pl.read_csv('data.csv') df @app.cell def __(df): summary = df.group_by('country').agg(pl.col('amount').sum()) summary @app.cell def __(summary): summary.plot.bar(x='country', y='amount') ``` 每 cell 是函数:参数 = 它依赖的变量。 marimo 自动构建 DAG,运行时知道改 `df` → `summary` 重算 → `plot` 重算。 ## reactivity 改第一个 cell,比如 `pl.read_csv('other.csv')`: - marimo 自动识别 `df` 变了 - 下游 cell(用 `df` 的)自动跑 - 不用手动 Shift+Enter 每个 cell vs Jupyter:你必须记得"我改了 df,要 re-run 下游"。少 re-run 一个就 state 不一致。 ## 无隐藏状态 Jupyter 经典 bug: ```python # cell 1 x = 5 # cell 2 print(x) # 5 # cell 1 修改 x = 10 # 不重跑 cell 2 # cell 3 print(x) # 10 # cell 2 显示 5,cell 3 显示 10 → 矛盾状态 ``` marimo 不允许:cell 1 改了 → cell 2 / 3 都重跑。 restart kernel + run all 跟正常 run 一样。 ## git friendly ```bash $ git diff my_notebook.py - df = pl.read_csv('data.csv') + df = pl.read_csv('orders.csv') ``` 正常 Python diff。 vs Jupyter `.ipynb`: ```bash $ git diff my.ipynb - "execution_count": 12, + "execution_count": 13, - "metadata": {"id": "abc"}, + "metadata": {"id": "def"}, - ... 几百行 base64 image diff ``` `.ipynb` 是 JSON + base64 编码的 output,PR review 痛苦。 工具如 `nbdime` / `jupytext` 能缓解但 marimo 原生 .py 更纯。 ## 当 script 跑 ```bash python my_notebook.py # 跟 jupyter nbconvert 不一样,marimo .py 真的是 Python 文件,直接跑 ``` CI 跑 notebook 测试 → 一行命令。 ## 当 import 用 ```python # main.py from my_notebook import summary # marimo 让 cell 变量 import 出来 ``` marimo notebook = Python module。 prototype 完直接被生产 code 引用,不用"先把 cell 改 function 再 import"。 ## UI widget ```python @app.cell def __(mo): slider = mo.ui.slider(0, 100, value=50, label='Sample size') slider @app.cell def __(slider, df): sample = df.head(slider.value) sample ``` slider 调 → 自动重算 sample → 显新结果。 不用 ipywidgets 复杂 callback。 ## marimo as app ```bash marimo run my.py --host 0.0.0.0 --port 8000 ``` 变成 web app(read-only),URL 分享给同事。 UI widget 仍可交互。 替代 streamlit 简单场景。 ## vs Jupyter 优势汇总 | | Jupyter | marimo | |---|---|---| | 文件格式 | JSON .ipynb | Python .py | | 隐藏状态 | 是 | 否 | | Reactive | 否 | 是 | | Git diff | 烂 | 好 | | Run as script | 要 nbconvert | 直接 | | Import | 要 nbconvert | 直接 | | UI widget | ipywidgets | marimo.ui | | AI 友好度 | 弱(JSON) | 强(.py) | ## Jupyter 仍胜的场景 - 巨大社区(教程 / 大量 .ipynb 内容) - Google Colab / Kaggle / VS Code 内置 Jupyter - 自由 cell 顺序(marimo 强制 DAG 无环) - 显示 rich output 历史悠久 + 稳 新项目我用 marimo,老 .ipynb 继续 Jupyter。 ## 我的工作流 数据探索: 1. `marimo edit explore.py` 2. 改 cell → 即时看下游结果(reactive) 3. 完成的 cell 重构成 function 4. 把 explore.py 里关键 function `import` 到 production code production analysis dashboard: 1. `marimo edit dashboard.py` 加 widget 2. `marimo run dashboard.py --headless` 部署 替代 Jupyter + nbconvert + streamlit 三个工具。 ## 与 nbdev 对比 `nbdev`(fast.ai):Jupyter 当源码,notebook 直接 ship 为库。 marimo:notebook 就是 .py,本来就是源码。 nbdev 改进 Jupyter;marimo 重新设计。我选 marimo(少抽象层)。 ## 与 Pluto.jl 对比 Julia 的 Pluto 是 marimo 的灵感来源(reactive notebook)。 marimo 在 Python 把这模型实现。 ## 性能 reactive DAG 计算开销小(几 ms / 改动)。 大 cell 仍跟 Jupyter 一样跑。 增量执行有时还更快(只跑变化的 cell + 下游)。 ## 踩过的坑 1. **强制 DAG 无环**:写 `x = 1` in cell 1 + `x = 2` in cell 2 报错 (same variable defined twice)。需要 refactor 到 function 或者 rename。开始不适应。 2. **某些 imperative pattern 不支持**:`for i in range(5): print(i)` in cell → marimo 不 reactive。包 function。 3. **plot library 兼容**:matplotlib OK;plotly / altair OK; 极少 niche lib 可能不渲染。 4. **AI tab completion 弱于 Jupyter**:jupyter-ai / VS Code Copilot Jupyter 集成强。marimo 在追赶。 5. **大 dataframe display 慢**:reactive 每次跑都重 display。 `df` 大时显式 `df.head()`,或者 cache。

Zod:TS 之外,运行时也要校验数据

## 起因 TypeScript 是编译时类型检查。运行时 API 返回数据 / 用户输入 / localStorage 读出来的东西,TS 不知道: ```ts type User = { id: number; email: string }; const res = await fetch('/api/me'); const user: User = await res.json(); // ⚠️ 类型断言不验运行时 user.email.toLowerCase(); // 万一 API 返回 null 或者 email 是 undefined? ``` 类型 lie → 运行时崩。 `Zod` 是 runtime schema validation + 自动 TS 类型推导。一份 schema 既 是验证 + 又是 type。 ## 基本 ```ts import { z } from 'zod'; const UserSchema = z.object({ id: z.number(), email: z.string().email(), name: z.string().min(1).max(100), age: z.number().int().positive().optional(), }); type User = z.infer<typeof UserSchema>; // { id: number; email: string; name: string; age?: number } const data = await res.json(); const user = UserSchema.parse(data); // throw if invalid // user 类型 = User,且运行时保证符合 ``` 一份 schema 解决: - 运行时验证 - TS 类型(infer) - 错误信息(with 路径) ## 复杂 schema ```ts const PostSchema = z.object({ id: z.string().uuid(), title: z.string(), body: z.string(), status: z.enum(['draft', 'published', 'archived']), tags: z.array(z.string()), author: z.object({ name: z.string(), avatar: z.string().url().optional(), }), createdAt: z.string().datetime(), }); ``` 嵌套 object + array + enum + format check(uuid / url / datetime)。 ## transform ```ts const DateSchema = z.string().datetime().transform((s) => new Date(s)); // input: string // output: Date const result = DateSchema.parse('2025-03-14T10:00:00Z'); // result: Date instance ``` parse 时把 string 自动转 Date。 input / output type 不同(z.infer 拿 output)。 ## refine (自定义校验) ```ts const passwordSchema = z .string() .min(8) .refine((s) => /[A-Z]/.test(s), { message: 'Need uppercase' }) .refine((s) => /\d/.test(s), { message: 'Need digit' }); ``` ## 错误处理 ```ts const result = UserSchema.safeParse(data); if (!result.success) { console.log(result.error.format()); // { // email: { _errors: ['Invalid email'] }, // age: { _errors: ['Expected positive number'] } // } return; } console.log(result.data); // 通过的 typed data ``` `safeParse` 不 throw,返回 result object。 form 验证场景常用。 ## 跟 React Hook Form 集成 ```tsx import { useForm } from 'react-hook-form'; import { zodResolver } from '@hookform/resolvers/zod'; const FormSchema = z.object({ email: z.string().email(), password: z.string().min(8), }); function LoginForm() { const { register, handleSubmit, formState: { errors } } = useForm({ resolver: zodResolver(FormSchema), }); return ( <form onSubmit={handleSubmit((data) => console.log(data))}> <input {...register('email')} /> {errors.email && <p>{errors.email.message}</p>} <input {...register('password')} type="password" /> {errors.password && <p>{errors.password.message}</p>} <button>Login</button> </form> ); } ``` form 校验自动用 Zod schema → message 自动显。 ## API 验证 (tRPC / Hono) ```ts // tRPC procedure import { z } from 'zod'; const userRouter = t.router({ create: t.procedure .input(z.object({ email: z.string().email(), name: z.string().min(1), })) .mutation(({ input }) => { // input 类型 + 运行时都符合 return db.user.create({ data: input }); }), }); ``` ```ts // Hono app.post('/users', zValidator('json', UserSchema), (c) => { const user = c.req.valid('json'); // typed return c.json(user); }); ``` backend 入口验证 input → 后续 code 类型 + runtime 一致。 ## 与 yup / joi 对比 | | Zod | yup | joi | |---|---|---|---| | TS 类型推导 | ✅ first-class | 弱 | 弱 | | bundle | 中(14 KB) | 中 | 大 | | API | builder | builder | builder | | 生态 | 大(现代) | 中 | 后端流行 | | 性能 | 中 | 中 | 慢 | TS 项目 → Zod。 JS 老项目 → yup。 Node API server 不要 TS → joi。 ## 与 valibot / arktype 对比 新一代竞争: - **valibot**:tree-shakable,bundle 极小(500B-2KB) - **arktype**:TS-as-schema(直接写 TS 类型当 schema),运行时极快 ```ts // valibot import { object, string, number } from 'valibot'; const UserSchema = object({ email: string([email()]), age: number([integer()]), }); ``` ```ts // arktype import { type } from 'arktype'; const UserSchema = type({ email: 'email', 'age?': 'number.integer>0', }); ``` valibot 适合极致 bundle size(移动 web)。 arktype 适合纯 TS 语法控。 Zod 仍最普及 + 生态最大。 ## ecosystem zod 生态丰富: - `@hookform/resolvers/zod` - `zod-to-openapi`:自动 OpenAPI 3 spec - `zod-to-ts`:生成 TS file - `prisma-zod-generator`:从 Prisma schema 生成 Zod - `zod-form-data`:multipart form 验证 ## 性能 百万次 parse benchmark: | | ops/sec | |---|---| | Zod | 350k | | valibot | 800k | | arktype | 1500k | | yup | 100k | Zod 不是最快但够用。极致性能场景才换。 ## 真实 case:API 返回防御 API 三方 / 老接口经常返回奇怪: ```ts // API 文档说返回 number,实际偶尔 null const result = await fetch('/old/api').then(r => r.json()); // 防御 const ResultSchema = z.object({ count: z.number().nullable().transform((n) => n ?? 0), items: z.array(ItemSchema), }); const safe = ResultSchema.parse(result); // safe.count 永远 number(null 转 0) ``` 避免下游 `result.count + 5` 报 NaN。 ## 复杂 union ```ts const EventSchema = z.discriminatedUnion('type', [ z.object({ type: z.literal('click'), x: z.number(), y: z.number() }), z.object({ type: z.literal('input'), value: z.string() }), z.object({ type: z.literal('submit'), formId: z.string() }), ]); const event = EventSchema.parse(data); if (event.type === 'click') { event.x; // typed } ``` `discriminatedUnion` 比 plain union 快 + type narrowing 更准。 ## env 变量验证 ```ts const envSchema = z.object({ DATABASE_URL: z.string().url(), PORT: z.coerce.number().int().positive(), NODE_ENV: z.enum(['development', 'production', 'test']), }); export const env = envSchema.parse(process.env); ``` 启动时验证 env,缺 / 错立刻报。 之后 code 用 `env.PORT` 类型 number 不是 string。 ## 踩过的坑 1. **z.string().uuid() too strict**:传统 v1 UUID 不过。`.uuid()` 是 严格 v4 格式。 2. **.parse vs .safeParse**:parse throw + safeParse 返 result。 form / API boundary 用 safeParse 控制响应;trust source 用 parse 简洁。 3. **infer 大 schema 慢**:深嵌套 union 等 TS 编译慢。break into smaller schema。 4. **transform 后 input type 隐**:`z.infer<typeof schema>` 拿 output; `z.input<typeof schema>` 拿 input。文档 / API 注意区分。 5. **生产 bundle 大**:Zod 14 KB gzip 在 mobile critical 可能多。 考虑 valibot 替换关键路径。

Go context.Context:超时 / 取消 / 传值的正确姿势

`context.Context` 是 Go 标准库里跨 API 边界传递取消信号 + 超时 + 请求值的 统一方式。所有"可能长时间运行 + 可能需要被取消"的函数都该接 ctx 作为 **第一个参数**。 ## 1. 基础 ```go ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 必须 defer cancel 避免 goroutine 泄露 go doWork(ctx) time.Sleep(2 * time.Second) cancel() // 显式取消 ``` `cancel()` 触发后所有继承这个 ctx 的下游都收到信号。 ## 2. 超时 ```go ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() result, err := http.NewRequestWithContext(ctx, "GET", url, nil) // 5 秒后自动 cancel;HTTP 客户端会立即中断请求 ``` `WithTimeout` 是 `WithDeadline(time.Now().Add(d))` 的简写。 ## 3. 在函数里响应 ctx ```go func doWork(ctx context.Context) error { for i := 0; i < 100; i++ { select { case <-ctx.Done(): return ctx.Err() // context.Canceled 或 context.DeadlineExceeded case <-time.After(100 * time.Millisecond): process(i) } } return nil } ``` 关键模式:每次能 block 的地方都 `select { case <-ctx.Done(): ... }`。 对 I/O 操作通常用接 ctx 的版本: ```go req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) resp, err := http.DefaultClient.Do(req) row := db.QueryRowContext(ctx, "SELECT ...", args...) ``` ## 4. 链式派生 ```go ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // 派生:更短超时 ctxShort, cancel2 := context.WithTimeout(ctx, 2*time.Second) defer cancel2() callRemote(ctxShort) ``` 子 ctx 永远比父 ctx 早结束。取较短的 timeout 生效。 父 cancel → 所有子 ctx 都被 cancel。 ## 5. ctx 传值 ```go type traceIDKey struct{} ctx = context.WithValue(ctx, traceIDKey{}, "abc-123") // 下游取 tid, ok := ctx.Value(traceIDKey{}).(string) ``` 约定: - key 用未导出的私有 type,避免不同包冲突 - 只传"跨 API 边界的请求作用域元数据"(trace ID / user ID / locale), 不传业务参数 - 业务参数走显式函数签名 ## 6. context.Background() vs context.TODO() - `Background()`:根 context,main / init 用 - `TODO()`:当你不知道用哪个 ctx 时用(让 linter / 自己后续修) ```go ctx := context.TODO() // I'll come back to wire this ``` ## 7. HTTP 服务端:取请求 ctx ```go func handler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // 客户端断开连接 → ctx.Done() result, err := fetchFromDB(ctx, ...) if err == context.Canceled { // 客户端已经断开,没必要返回响应 return } json.NewEncoder(w).Encode(result) } ``` 客户端 close 连接时 r.Context 自动 cancel,所有下游 query 应自动中断。 省服务器 CPU / DB 连接。 ## 8. errgroup + ctx 并发 `sync/errgroup` 比手写 channel + WaitGroup 简洁: ```go import "golang.org/x/sync/errgroup" func fetchAll(ctx context.Context, urls []string) ([]string, error) { g, ctx := errgroup.WithContext(ctx) results := make([]string, len(urls)) for i, u := range urls { i, u := i, u // capture g.Go(func() error { data, err := fetch(ctx, u) if err != nil { return err // 任一失败 ctx 自动 cancel 其它 } results[i] = data return nil }) } if err := g.Wait(); err != nil { return nil, err } return results, nil } ``` `errgroup.WithContext` 返回的 ctx 在任一 goroutine 返回 error 时 自动 cancel。 限并发: ```go g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) // 最多 10 个 goroutine 并发 ``` ## 9. 不要把 ctx 存结构体 反模式: ```go type Server struct { ctx context.Context // ❌ } ``` ctx 是请求 / 操作的属性,不是对象的属性。每次方法显式传: ```go type Server struct{} func (s *Server) Handle(ctx context.Context, req Req) (Resp, error) { ... } ``` 例外:长生命周期的"管理者"对象(如 server 自身的 shutdown ctx) 可以存——但要清楚标注。 ## 10. cancel() 必须 defer ```go ctx, cancel := context.WithTimeout(..., 5*time.Second) // 忘 defer cancel() → 即使函数正常返回,定时器还在 → goroutine leak defer cancel() ``` go vet 会警告"the cancel function is not used on all paths", 认真处理。 ## 11. ctx 的零成本约定 约定 ctx 永远作为第一个参数: ```go // ✅ func doWork(ctx context.Context, args Args) (Result, error) // ❌ func doWork(args Args, ctx context.Context) (Result, error) ``` 阅读 / IDE 补全 / linter 都基于这个约定。 ## 12. 实际生产 pattern ```go func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() srv := &http.Server{Addr: ":8080", Handler: setupHandler()} go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("server error: %v", err) } }() <-ctx.Done() log.Info("shutting down") shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := srv.Shutdown(shutdownCtx); err != nil { log.Errorf("forced shutdown: %v", err) } } ``` `signal.NotifyContext`(Go 1.16+)一行处理信号 → ctx 模式: - 收 SIGINT / SIGTERM → ctx.Done() - main 等 done - `srv.Shutdown(timeout ctx)` 拒新连接 + 等老请求完,30 秒超时强制退出 ## 踩过的坑 - ctx 没传到下游:所有 hop 都要传 ctx。中间某个函数没接 → cancel 信号 断了,超时不生效。 - 用 `time.Sleep(d)` 而不是 `select { case <-time.After: case <-ctx.Done():}`: cancel 后还在 sleep,不响应。 - `errgroup.Go` 里的 goroutine panic → 整个 group 崩。在 Go 函数内部 recover 自保。 - `ctx.Value(string)` 用字符串作 key → 跨包冲突。永远用私有 type。

用 IntersectionObserver 做无限滚动列表(不卡、不漏、可中断)

无限滚动早年是监听 `scroll` 事件 + 算 `scrollTop`,性能差且容易漏触发。 `IntersectionObserver` 是浏览器原生的 "元素进入视口" 通知机制, 节能 + 精准。 ## 1. 最小实现 ```html <ul id="list"></ul> <div id="sentinel" style="height: 1px"></div> ``` ```js const list = document.getElementById('list') const sentinel = document.getElementById('sentinel') let page = 0 let loading = false let done = false async function fetchPage(n) { const res = await fetch(`/api/items?page=${n}`) return res.json() // { items: [...], hasMore: true } } async function loadMore() { if (loading || done) return loading = true const data = await fetchPage(++page) data.items.forEach(item => { const li = document.createElement('li') li.textContent = item.title list.appendChild(li) }) if (!data.hasMore) done = true loading = false } const observer = new IntersectionObserver( entries => { if (entries[0].isIntersecting) loadMore() }, { rootMargin: '300px' } // 距视口 300px 时就触发,提前预加载 ) observer.observe(sentinel) loadMore() // 启动 ``` 关键点: - **sentinel 元素**:放在列表末尾的一个空 div,作为"快到底"的探测点 - **rootMargin**:把视口边界往外扩 300px,提前触发,用户感受不到等待 - **loading guard**:防止滚动太快连续触发同一次加载 ## 2. React 版 ```tsx import { useEffect, useRef, useState, useCallback } from 'react' export function useInfinite<T>(fetchFn: (page: number) => Promise<{items: T[], hasMore: boolean}>) { const [items, setItems] = useState<T[]>([]) const [page, setPage] = useState(0) const [loading, setLoading] = useState(false) const [done, setDone] = useState(false) const sentinelRef = useRef<HTMLDivElement | null>(null) const loadMore = useCallback(async () => { if (loading || done) return setLoading(true) const next = page + 1 const data = await fetchFn(next) setItems(prev => [...prev, ...data.items]) setPage(next) if (!data.hasMore) setDone(true) setLoading(false) }, [loading, done, page, fetchFn]) useEffect(() => { const el = sentinelRef.current if (!el) return const obs = new IntersectionObserver( entries => { if (entries[0].isIntersecting) loadMore() }, { rootMargin: '300px' } ) obs.observe(el) return () => obs.disconnect() }, [loadMore]) return { items, loading, done, sentinelRef } } ``` 使用: ```tsx function FeedPage() { const { items, loading, done, sentinelRef } = useInfinite(p => fetch(`/api/feed?page=${p}`).then(r => r.json()) ) return ( <> {items.map(i => <Item key={i.id} {...i} />)} <div ref={sentinelRef} /> {loading && <Spinner />} {done && <div>到底了</div>} </> ) } ``` ## 3. 处理"立即可见的 sentinel" 如果列表初始就短,sentinel 一开始就在视口里,`IntersectionObserver` 立即触发 → 立即触发下一页 → 加载完 sentinel 仍在视口 → 又触发…… 循环到 `done` 才停。 通常没问题(连续加载到首屏满),但如果数据源慢,会发出一堆并发请求。 解决:保留 loading guard + 让 loadMore 在 fetch 完成前不允许并发, 就够了。 更稳的做法:用 `useCallback` + 检查 `loading` 状态。React 18 严格模式 下会双调用,loading 必须用 ref 而不是 state: ```tsx const loadingRef = useRef(false) const loadMore = useCallback(async () => { if (loadingRef.current || done) return loadingRef.current = true try { /* ... */ } finally { loadingRef.current = false } }, [done]) ``` ## 4. 中断 / 卸载安全 组件卸载时正在 fetch:要么 abort,要么忽略结果。 ```tsx useEffect(() => { const ctrl = new AbortController() fetch('/api/...', { signal: ctrl.signal }) .then(r => r.json()) .then(data => { /* setState */ }) .catch(e => { if (e.name !== 'AbortError') console.error(e) }) return () => ctrl.abort() }, [page]) ``` ## 5. virtualized 列表的配合 万条以上的列表,光无限滚动还不够,DOM 节点太多会卡。配 react-virtuoso 或 react-window 做虚拟滚动,只渲染视口内的几十行。Virtuoso 自带 `endReached` 回调,IntersectionObserver 都省了。 ## 6. 浏览器兼容 `IntersectionObserver` 在 Safari 12.1+ 全支持,2024 年的所有目标浏览器都行。 不再需要 polyfill。 ## 踩过的坑 - 忘了 `observer.disconnect()` → 内存泄漏。React 用 cleanup 函数; vanilla JS 用一个全局 observer 在路由切换时手动 disconnect。 - sentinel 是 `display: none` → 永远不触发。要给它真实尺寸(至少 1px)。 - 滚动容器不是 window 时(例如内嵌可滚动的 div),要给 `IntersectionObserver` 设 `root: scrollContainer`,否则 viewport 默认是 window。 - 移动端慢网下重复触发:用户慢慢滑,每次 sentinel 进入视口都触发; 服务端在还没响应时收到多个相同 page 请求。后端用幂等返回 OK。

用 ONNX Runtime 部署 PyTorch 模型(CPU / GPU 通用、跨语言)

训练用 PyTorch 灵活,部署到生产时通常希望: - 没有 PyTorch 100+ MB 依赖 - 跨语言(C++ / Go / JS / Java 都能加载) - CPU / GPU 都能跑 - 性能更好(融合算子) ONNX 是开放神经网络交换格式,ONNX Runtime 是 Microsoft 的高性能推理引擎。 PyTorch 训练完导出 ONNX,运行时用 ORT 加载。 ## 装 ```bash uv add torch onnx onnxruntime # GPU 推理: uv add onnxruntime-gpu ``` ## 1. 导出 PyTorch 模型为 ONNX ```python import torch from your_model import Net model = Net() model.load_state_dict(torch.load('mnist.pt')) model.eval() # 一个 dummy 输入用于 trace dummy = torch.randn(1, 1, 28, 28) torch.onnx.export( model, dummy, 'mnist.onnx', input_names=['input'], output_names=['logits'], dynamic_axes={ 'input': {0: 'batch'}, # batch 维度可变 'logits': {0: 'batch'}, }, opset_version=17, ) ``` `dynamic_axes` 让导出的模型支持任意 batch size,否则固定为 dummy 的形状。 ## 2. 校验导出正确 ```python import onnx m = onnx.load('mnist.onnx') onnx.checker.check_model(m) print(onnx.helper.printable_graph(m.graph)) ``` `check_model` 不报错就 OK。 ## 3. 推理(Python) ```python import onnxruntime as ort import numpy as np sess = ort.InferenceSession('mnist.onnx', providers=['CPUExecutionProvider']) # GPU: providers=['CUDAExecutionProvider'] # 看输入输出 for i in sess.get_inputs(): print(f'input {i.name}: {i.shape} {i.type}') for o in sess.get_outputs(): print(f'output {o.name}: {o.shape} {o.type}') # 跑推理 x = np.random.rand(4, 1, 28, 28).astype(np.float32) logits = sess.run(['logits'], {'input': x})[0] pred = logits.argmax(axis=1) print(pred) ``` ## 4. 性能基准 ```python import time, numpy as np x = np.random.rand(1, 1, 28, 28).astype(np.float32) # warm up for _ in range(10): sess.run(['logits'], {'input': x}) t0 = time.time() for _ in range(1000): sess.run(['logits'], {'input': x}) print(f'avg latency: {(time.time()-t0)/1000*1000:.2f} ms') ``` 对比 PyTorch: ```python import torch model.eval() x = torch.randn(1, 1, 28, 28) with torch.no_grad(): for _ in range(10): model(x) t0 = time.time() for _ in range(1000): model(x) print(f'pytorch avg: {(time.time()-t0)/1000*1000:.2f} ms') ``` CPU 上 ONNX Runtime 通常比 PyTorch 快 1.5-3x(算子融合 + 简化 graph)。 ## 5. 性能优化 ```python sess = ort.InferenceSession( 'mnist.onnx', providers=['CPUExecutionProvider'], sess_options=ort.SessionOptions(), ) opts = ort.SessionOptions() opts.intra_op_num_threads = 4 opts.inter_op_num_threads = 1 opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL opts.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL sess = ort.InferenceSession('mnist.onnx', sess_options=opts, providers=['CPUExecutionProvider']) ``` 对于 transformer / 大模型,`enable_profiling=True` 让 ORT 输出每个算子的耗时 帮助找瓶颈。 ## 6. 多 provider ```python # 优先 GPU,没有就 CPU sess = ort.InferenceSession( 'mnist.onnx', providers=['CUDAExecutionProvider', 'CPUExecutionProvider'] ) print(sess.get_providers()) ``` NVIDIA:CUDAExecutionProvider / TensorrtExecutionProvider Apple:CoreMLExecutionProvider AMD:ROCMExecutionProvider Intel:OpenVINOExecutionProvider ## 7. 量化(减小模型 + 加速 CPU 推理) ```python from onnxruntime.quantization import quantize_dynamic, QuantType quantize_dynamic('mnist.onnx', 'mnist.int8.onnx', weight_type=QuantType.QInt8) ``` INT8 量化通常 2-4x 推理加速 + 模型大小 1/4。精度损失对 ResNet / 简单 CNN 很小(< 1% 准确率),对 BERT 类需要 calibration 复杂些。 ## 8. C++ 推理 ```cpp #include "onnxruntime_cxx_api.h" Ort::Env env(ORT_LOGGING_LEVEL_WARNING, "test"); Ort::SessionOptions opts; Ort::Session session(env, "mnist.onnx", opts); // ... 准备输入 tensor ... auto output = session.Run(...); ``` 完整 C++ 例子在 ONNX Runtime 仓库。集成到 C++ 服务里完全摆脱 Python 依赖。 ## 9. 浏览器推理(onnxruntime-web) ```html <script src="https://cdn.jsdelivr.net/npm/onnxruntime-web/dist/ort.min.js"></script> <script> const session = await ort.InferenceSession.create('mnist.onnx'); const input = new ort.Tensor('float32', data, [1, 1, 28, 28]); const results = await session.run({ input: input }); console.log(results.logits.data); </script> ``` WebGPU / WebGL / wasm 多后端自动选。小模型直接跑浏览器,数据不离开 用户设备。 ## 10. 部署模式比较 | 工具 | 适合 | |---|---| | ONNX Runtime | 跨语言、跨平台、单进程 | | TorchServe | 多 PyTorch 模型微服务 | | Triton Inference Server | NVIDIA GPU 多模型高并发 | | BentoML | Python 服务封装(含监控 / 队列) | | vLLM | LLM 专用(PagedAttention) | ONNX Runtime 是"最通用最简单"那档;要更高级 ops(动态 batching、 gpu 调度)上 Triton。 ## 踩过的坑 - 导出失败 "Unsupported ONNX opset version":升 `opset_version` 或者 降 PyTorch 中用的 op(替换 custom op)。 - 导出后形状对但数值差:训练时 BatchNorm 等 running stats 没保存好, 确保 `model.eval()` 后再 export。 - Dynamic axes 没写:onnx 模型固定 batch=1,部署时 batch=N 直接报错。 - ONNX 模型文件很大(含权重):考虑用 `onnx.save_model(m, ..., save_as_external_data=True)` 把权重分离存储,便于 CDN / 分发。

给一台 Linux 服务器装 Prometheus + Grafana + node_exporter(10 分钟版)

任何长期运行的服务都该有监控。Prometheus + Grafana 是事实标准: Prom 抓指标,Grafana 画图,免费 + 自托管。下面把它们装起来。 ## 1. 装 node_exporter(被监控机器上) ```bash # 在每台要被监控的机器上 curl -fsSL https://github.com/prometheus/node_exporter/releases/latest/download/node_exporter-1.8.2.linux-amd64.tar.gz \ | sudo tar xz -C /usr/local/bin --strip-components=1 \ node_exporter-1.8.2.linux-amd64/node_exporter sudo useradd -rs /bin/false node_exporter ``` systemd unit `/etc/systemd/system/node_exporter.service`: ```ini [Unit] Description=Prometheus node exporter After=network.target [Service] User=node_exporter ExecStart=/usr/local/bin/node_exporter \ --collector.systemd \ --collector.processes \ --collector.textfile.directory=/var/lib/node_exporter NoNewPrivileges=true ProtectSystem=strict ProtectHome=true [Install] WantedBy=multi-user.target ``` ```bash sudo systemctl enable --now node_exporter curl localhost:9100/metrics | head ``` ## 2. 装 Prometheus(中心机器上) ```bash curl -fsSL https://github.com/prometheus/prometheus/releases/latest/download/prometheus-2.55.1.linux-amd64.tar.gz \ | sudo tar xz -C /opt/ sudo ln -s /opt/prometheus-2.55.1.linux-amd64 /opt/prometheus sudo useradd -rs /bin/false prometheus sudo mkdir -p /var/lib/prometheus sudo chown -R prometheus:prometheus /var/lib/prometheus /opt/prometheus* ``` 配置 `/opt/prometheus/prometheus.yml`: ```yaml global: scrape_interval: 15s evaluation_interval: 15s external_labels: cluster: 'prod' scrape_configs: - job_name: 'prometheus' static_configs: - targets: ['localhost:9090'] - job_name: 'node' static_configs: - targets: - 'server1.example.com:9100' - 'server2.example.com:9100' - 'server3.example.com:9100' labels: environment: prod ``` systemd `/etc/systemd/system/prometheus.service`: ```ini [Unit] Description=Prometheus After=network.target [Service] User=prometheus ExecStart=/opt/prometheus/prometheus \ --config.file=/opt/prometheus/prometheus.yml \ --storage.tsdb.path=/var/lib/prometheus \ --storage.tsdb.retention.time=30d \ --web.listen-address=:9090 Restart=on-failure [Install] WantedBy=multi-user.target ``` ```bash sudo systemctl enable --now prometheus # 浏览器:http://中心机:9090 ``` ## 3. 装 Grafana ```bash sudo apt install -y apt-transport-https software-properties-common sudo mkdir -p /etc/apt/keyrings/ wget -q -O - https://apt.grafana.com/gpg.key | gpg --dearmor | \ sudo tee /etc/apt/keyrings/grafana.gpg > /dev/null echo "deb [signed-by=/etc/apt/keyrings/grafana.gpg] https://apt.grafana.com stable main" | \ sudo tee /etc/apt/sources.list.d/grafana.list sudo apt update && sudo apt install -y grafana sudo systemctl enable --now grafana-server # 浏览器:http://中心机:3000 (默认 admin/admin) ``` ## 4. 在 Grafana 加数据源 UI 上: ``` Connections → Data sources → Add data source → Prometheus URL: http://localhost:9090 Save & test ``` ## 5. 导入 node_exporter 仪表盘 Grafana 社区已经有现成的: ``` Dashboards → New → Import → ID: 1860 (Node Exporter Full) ``` 瞬间得到 CPU / RAM / 磁盘 / 网络 / I/O / 进程等完整指标可视化。 ## 6. 几条 PromQL 救命查询 ```promql # CPU 使用率(5 分钟均值) 100 - (avg by (instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) # 内存使用率 100 * (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) # 磁盘使用率(根分区) 100 - (node_filesystem_avail_bytes{mountpoint="/"} * 100 / node_filesystem_size_bytes{mountpoint="/"}) # 5 分钟内 5xx 错误率(如果你也抓 nginx 指标) sum(rate(nginx_http_requests_total{status=~"5.."}[5m])) by (instance) # 哪些机器 90% 内存 topk(5, 100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) ``` ## 7. 告警 `prometheus.yml` 加: ```yaml rule_files: - /opt/prometheus/rules/*.yml alerting: alertmanagers: - static_configs: - targets: ['localhost:9093'] ``` `/opt/prometheus/rules/node.yml`: ```yaml groups: - name: node rules: - alert: HighCPU expr: 100 - avg by (instance)(rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100 > 80 for: 10m labels: severity: warning annotations: summary: 'CPU > 80% on {{ $labels.instance }} for 10m' - alert: DiskAlmostFull expr: 100 - node_filesystem_avail_bytes{mountpoint="/"} * 100 / node_filesystem_size_bytes{mountpoint="/"} > 85 for: 5m labels: severity: critical annotations: summary: 'Disk > 85% on {{ $labels.instance }}' ``` 装 Alertmanager 把告警送到 Slack / 邮件 / 钉钉: ```bash curl -fsSL https://github.com/prometheus/alertmanager/releases/latest/download/alertmanager-0.27.0.linux-amd64.tar.gz \ | sudo tar xz -C /opt/ ``` `/opt/alertmanager/alertmanager.yml`: ```yaml route: receiver: slack group_wait: 30s group_interval: 5m repeat_interval: 4h receivers: - name: slack slack_configs: - api_url: 'https://hooks.slack.com/services/...' channel: '#alerts' title: '{{ .CommonAnnotations.summary }}' text: '{{ range .Alerts }}{{ .Annotations.summary }}\n{{ end }}' ``` ## 8. 防火墙 ```bash # Prometheus 端 sudo ufw allow from <Grafana_IP> to any port 9090 # node_exporter 端 sudo ufw allow from <Prometheus_IP> to any port 9100 ``` 或者 Prometheus / Grafana 都跑同一台(自己监控自己时常见), 只开 Grafana 80 / 443 给外网。 ## 9. 安全:永远别 9100 直接暴露公网 node_exporter 的指标包含敏感信息(启动参数、进程列表)。一定要: - 内网防火墙限制源 IP - 用 nginx 在前面套 basic auth - 或者 node_exporter 用 `--web.tls.config.file` 启 mTLS ## 10. 存储 retention ```bash --storage.tsdb.retention.time=30d # 默认 15d --storage.tsdb.retention.size=50GB ``` 长期存储用 Thanos / VictoriaMetrics / Cortex。单机 30 天指标 通常占几 GB-几十 GB。 ## 踩过的坑 - Prom UI 显示 target "down":`curl host:9100/metrics` 看是节点没起 还是网络不通;常常是防火墙 9100 没开给 Prom。 - 时区:Prometheus 内部用 UTC;Grafana 展示按浏览器时区。两者不一致 时建议在 Grafana 强制时区到本地。 - `scrape_interval` 太短:1s 抓导致 Prom 数据爆炸 + 网络流量大。 15s 是稳健默认;30s / 1m 在节点多时更省。 - 指标 cardinality 失控:label 取了 user_id / request_id 这种值无限的, Prom 时序数量爆炸 OOM。这是 Prom 最常见的 outage 根因。

FastAPI Depends() 实战:DI 让 testing 和 modularity 都受益

## 起因 FastAPI 的 `Depends()` 是其设计精华之一。新人常忽视它,把所有 view 写 成"自己 new DB connection + 拉 user 信息",导致: - 测试时无法 mock DB - 业务逻辑跟 framework 耦合 - 改 auth 流程要改 100 处 view 学会用 Depends 后代码组织上一个台阶。 ## 1. 基础 ```python from fastapi import FastAPI, Depends app = FastAPI() def get_db(): db = Session() try: yield db finally: db.close() @app.get('/users/{uid}') def read_user(uid: int, db = Depends(get_db)): return db.query(User).filter_by(id=uid).first() ``` `Depends(get_db)`: - 每个请求自动调 `get_db()` - yield 之前的代码:创建 session - yield 之后的代码:关闭 session - 类似 Django 的 middleware 但更灵活 ## 2. 嵌套依赖 ```python def get_db(): ... def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ) -> User: user = decode_token(token, db) if not user: raise HTTPException(401) return user @app.get('/me') def me(user: User = Depends(get_current_user)): return user ``` FastAPI 自动解析依赖图:`me` 需要 `get_current_user` 需要 `get_db` 和 `oauth2_scheme`。 **同一个请求里 `get_db` 只调一次**(即使多个依赖都需要它)。 ## 3. 权限检查依赖 ```python def require_admin(user: User = Depends(get_current_user)) -> User: if user.role != 'admin': raise HTTPException(403, 'admin required') return user @app.delete('/users/{uid}') def delete_user(uid: int, admin: User = Depends(require_admin), db: Session = Depends(get_db)): db.query(User).filter_by(id=uid).delete() db.commit() ``` 权限检查从 view body 抽出来 → 复用 + 类型安全。 ## 4. 类做依赖(state-ful) ```python class CommonQueryParams: def __init__(self, q: str = '', skip: int = 0, limit: int = 20): self.q = q self.skip = skip self.limit = limit @app.get('/search') def search(params: CommonQueryParams = Depends()): # params.q / params.skip / params.limit ... ``` `Depends()` 无参数时自动用类型 `CommonQueryParams` 当 dependency。 重复的 query param 模式抽成 class。 ## 5. router-level dependency ```python from fastapi import APIRouter admin_router = APIRouter( prefix='/admin', dependencies=[Depends(require_admin)], # router 级依赖 ) @admin_router.get('/dashboard') def dashboard(): pass @admin_router.get('/users') def users(): pass ``` `admin_router` 下所有 endpoint 自动要求 admin。不需要每个 view 写 Depends。 或 app 级: ```python app = FastAPI(dependencies=[Depends(log_request)]) # 全局 ``` ## 6. 测试:覆盖依赖 ```python def get_db_test(): return MockDB() app.dependency_overrides[get_db] = get_db_test def test_read_user(): client = TestClient(app) response = client.get('/users/1') assert response.json() == {'id': 1, 'name': 'mock'} ``` `dependency_overrides` 让测试用 mock 实现替换生产依赖。 **整个测试不真连 DB**。 对比:"view 内直接 import DB" → 没法 mock,要么 mock 整个 module, 要么真起 DB。 ## 7. Generator dependency(cleanup 必须 finally) ```python def get_db(): db = SessionLocal() try: yield db finally: db.close() ``` 不 try/finally 的话,view 内 exception → cleanup 不跑 → 连接泄露。 **永远 try/finally**。 ## 8. 同步 vs async dependency 混用 OK: ```python def get_settings(): # 同步 return Settings() async def get_user(): # async return await fetch_user() @app.get('/x') async def x( s: Settings = Depends(get_settings), u: User = Depends(get_user), ): ... ``` FastAPI 自动决定 thread pool 还是 await。 ## 9. Sub-app / mount ```python admin_app = FastAPI() @admin_app.get('/stats') def stats(): pass app.mount('/admin', admin_app) ``` 完全独立的 FastAPI 应用挂载到主 app。各自有独立 dependency / docs / middleware。复杂场景拆分。 ## 10. lifecycle hooks vs Depends ```python @app.on_event('startup') async def startup(): app.state.db = await create_pool() @app.on_event('shutdown') async def shutdown(): await app.state.db.close() ``` 或现代 lifespan: ```python from contextlib import asynccontextmanager @asynccontextmanager async def lifespan(app): app.state.db = await create_pool() yield await app.state.db.close() app = FastAPI(lifespan=lifespan) ``` 应用启动 / 关闭一次性资源(DB pool / Redis client)放这里,不是 per-request Depends。 之后 Depends 引用: ```python def get_db(request: Request): return request.app.state.db.acquire() ``` ## 完整模板:blog API 骨架 ```python # deps.py from fastapi import Depends, HTTPException, Request from sqlalchemy.orm import Session def get_db(request: Request) -> Session: db = request.app.state.SessionLocal() try: yield db finally: db.close() def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ) -> User: user = decode_jwt(token, db) if not user: raise HTTPException(401) return user def get_post(post_id: int, db: Session = Depends(get_db)) -> Post: post = db.query(Post).get(post_id) if not post: raise HTTPException(404) return post def require_post_owner( post: Post = Depends(get_post), user: User = Depends(get_current_user), ) -> Post: if post.author_id != user.id: raise HTTPException(403) return post ``` ```python # views.py @app.get('/posts/{post_id}') def read_post(post: Post = Depends(get_post)): return post @app.put('/posts/{post_id}') def update_post( data: UpdatePostIn, post: Post = Depends(require_post_owner), db: Session = Depends(get_db), ): post.title = data.title db.commit() return post @app.delete('/posts/{post_id}') def delete_post( post: Post = Depends(require_post_owner), db: Session = Depends(get_db), ): db.delete(post) db.commit() return {'ok': True} ``` 每个 view 体内不超过 5 行业务逻辑。权限 / DB session / 对象加载都 declarative。 ## 性能注意 每请求依赖图都重新解析。深嵌套 / 重操作 dependency 会累加。 但通常 dependency 都很轻(DB query / dict lookup),impact 极小。 ## 与 Django / Flask 对比 Django:middleware + `@login_required` 装饰器 + view-local code。 Flask:`@app.before_request` + 装饰器 + flask-login。 FastAPI Depends 优点: - type-safe(IDE / mypy 知道 user 是 User) - 显式(看签名就知道这个 endpoint 需要什么) - 测试友好(dependency_overrides) 但学习曲线略陡。习惯后回不去。 ## 踩过的坑 1. **Depends 写位置错**: ```python def view(user=Depends(get_user), uid: int = 1): # ❌ ``` Depends 不能有 default。改顺序: ```python def view(uid: int, user=Depends(get_user)): ``` 2. **循环依赖**:A depends B,B depends A → import error。架构上重新 设计;通常说明边界划错。 3. **dependency 内 raise** → FastAPI 自动 422 / 配你的 HTTPException。 写 `raise HTTPException(401, "Bad token")` 别 `return None`。 4. **测试 dependency_overrides 没清** → 测试间互相污染。每个测试 `app.dependency_overrides.clear()` 或用 fixture。 5. **`Depends()` 无参 + 类**:必须显式 type annotation: ```python def view(params: CommonQueryParams = Depends()): ``` 不写 type FastAPI 不知道用什么。