知识广场
按学科筛选:计算机科学
«计算机科学» 分类下共 256 篇帖子
跑 ML 实验最容易混乱的是"我那个 lr=0.001 + dropout=0.3 的 run 是哪天哪份代码?" wandb 自动记录:超参、loss / metric 曲线、代码 git hash、系统资源、模型权重。 免费层个人项目无限。 ## 安装 + 注册 ```bash uv add wandb wandb login # 浏览器打开,复制 API key 进来 # 或:export WANDB_API_KEY=... ``` ## 最小集成(5 行代码) ```python import wandb wandb.init( project='mnist-cnn', config={ 'lr': 0.001, 'batch_size': 128, 'epochs': 5, 'dropout': 0.25, 'model': 'cnn-v1', }, ) # 训练循环里 for epoch in range(5): train_loss, train_acc = train_epoch() val_loss, val_acc = evaluate() wandb.log({ 'train/loss': train_loss, 'train/acc': train_acc, 'val/loss': val_loss, 'val/acc': val_acc, 'epoch': epoch, }) wandb.finish() ``` 跑一下 `python train.py`,wandb 打印一个 URL,打开就能看到实时曲线。 ## config 优先级 ```python # 1. 默认在代码里 wandb.init(config={'lr': 0.001}) # 2. 命令行覆盖(用 argparse / typer / hydra) import argparse p = argparse.ArgumentParser() p.add_argument('--lr', type=float, default=0.001) args = p.parse_args() wandb.init(config=vars(args)) # 3. Sweep 时由 wandb 注入 config = wandb.config # 读,不能写 lr = config.lr ``` ## Sweep:自动超参搜索 ```yaml # sweep.yaml program: train.py method: bayes metric: name: val/acc goal: maximize parameters: lr: distribution: log_uniform_values min: 1e-5 max: 1e-2 dropout: values: [0.1, 0.25, 0.5] batch_size: values: [64, 128, 256] ``` ```bash wandb sweep sweep.yaml # Output: wandb agent yourname/project/sweep_id wandb agent yourname/project/sweep_id --count 20 ``` 跑 20 个实验,自动按 Bayesian / grid / random 选超参。 多台机器并行:每台跑 `wandb agent ...` 共享同一个 sweep。 ## 记录媒体 ```python # 图片 import matplotlib.pyplot as plt fig, ax = plt.subplots() ax.plot(losses) wandb.log({'curve': wandb.Image(fig)}) # 表格 wandb.log({ 'predictions': wandb.Table( columns=['image', 'pred', 'truth'], data=[[wandb.Image(x), p, y] for x, p, y in samples], ) }) # 直方图 wandb.log({'grad_norm': wandb.Histogram(grad_norms)}) # 视频 / 音频 wandb.log({'video': wandb.Video('output.mp4')}) ``` ## 保存模型(Artifact) ```python artifact = wandb.Artifact(name='mnist-cnn', type='model') artifact.add_file('checkpoints/best.pt') wandb.log_artifact(artifact) ``` 模型权重 + 元数据存在 wandb 服务端(免费层有配额)。后续加载: ```python api = wandb.Api() artifact = api.artifact('yourname/project/mnist-cnn:latest') artifact.download(root='./model') ``` ## 监控系统资源 `wandb` 自动记录: - GPU 利用率 / 显存 - CPU / 内存 - 磁盘 / 网络 - Python 进程 无需任何代码,看 dashboard 的 "System" 标签页。 ## 代码 + git 状态 `wandb` 自动 capture: - 当前 git commit hash - 未提交的 diff(不要在没干净 commit 时跑实验!) - 命令行参数 - Python 版本 + 包列表 回放某次 run:知道用了哪份代码 + 哪个数据。 ## 离线模式 无网时: ```bash wandb offline # 或:export WANDB_MODE=offline python train.py # 数据存在本地 ./wandb/ # 之后有网时 wandb sync ./wandb/offline-run-* ``` CI / 内网集群里非常有用。 ## 与 PyTorch Lightning / HuggingFace 集成 ```python # PyTorch Lightning from pytorch_lightning.loggers import WandbLogger logger = WandbLogger(project='myproj') trainer = Trainer(logger=logger) # HuggingFace Transformers training_args = TrainingArguments( report_to='wandb', run_name='bert-finetune-v3', ... ) ``` 不用手写 log,框架自动调 wandb。 ## 团队协作 - **Project** 是大目录(按代号 / 任务) - **Run** 是单次实验 - **Group** 让你把同一个 sweep / 同一组对比放一起 - **Tags** 给 run 打标签(baseline / experiment / final) ```python wandb.init( project='myproj', group='ablation-dropout', tags=['final', 'cnn-v2'], notes='increased lr to 5e-3 to test convergence speed', ) ``` ## 数据 dashboard dashboard 默认按时间线显示。常用功能: - **Reports**:把多个 run 拖到一份"报告"里,加文字 + 自动同步图表, 当作"项目周报"或 paper 草稿 - **Parallel Coordinates**:可视化超参 → metric 的关系, 找哪个超参影响最大 - **Compare runs**:勾几个 run 一起看曲线 / config diff ## 隐私 / 自托管 wandb cloud 免费个人无限,团队收费。如果数据敏感不能上 cloud: ```bash # 自托管 wandb server(社区版免费) docker run -p 8080:8080 wandb/local ``` Python 端: ```python wandb.init(project='...', host='https://wandb.your-company.com') ``` ## 替代方案 - **MLflow**:开源,自托管简单。tracking + model registry,UI 朴素 - **TensorBoard**:本地用,无云端 - **Comet / Neptune**:商业产品类似 - **Aim**:开源极简,无云 个人项目 wandb 最快;公司里数据合规要求高用 MLflow。 ## 踩过的坑 - 忘 `wandb.finish()` —— 长 run 退出后 wandb sync 一直挂着。 脚本最后必须 `finish()` 或者用 `with wandb.init(...) as run:`。 - 每次 `wandb.log` 都立刻发到云端 → 训练时大量请求可能拖慢。 设 `commit=False` 累积后批量发:`wandb.log({...}, commit=False)`, 最后 `wandb.log({}, commit=True)`。 - 在 jupyter 里 wandb.init 多次:会创建多个 run。每次重启 kernel 之前先 `wandb.finish()`。 - artifact 配额:免费个人 100GB,超了不能再传。定期清老 artifact。
Caddy 把"自动签 Let's Encrypt 证书 + HTTP/2 + 自动续期"做成了默认行为。 nginx 配同样的事情至少 30 行配置 + certbot crontab,Caddy 是 1 行: ``` example.com { reverse_proxy localhost:3000 } ``` ## 安装 ```bash # 官方 apt 仓库(也有 yum / 二进制下载) sudo apt install -y debian-keyring debian-archive-keyring apt-transport-https curl -1sLf 'https://dl.cloudsmith.io/public/caddy/stable/gpg.key' \ | sudo gpg --dearmor -o /usr/share/keyrings/caddy-stable-archive-keyring.gpg curl -1sLf 'https://dl.cloudsmith.io/public/caddy/stable/debian.deb.txt' \ | sudo tee /etc/apt/sources.list.d/caddy-stable.list sudo apt update && sudo apt install -y caddy ``` ## 最小可工作 Caddyfile `/etc/caddy/Caddyfile`: ``` example.com { encode zstd gzip reverse_proxy 127.0.0.1:3000 header { Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" X-Content-Type-Options nosniff Referrer-Policy strict-origin-when-cross-origin } } api.example.com { reverse_proxy 127.0.0.1:8000 log { output file /var/log/caddy/api.log format json } } static.example.com { root * /var/www/static file_server encode zstd gzip } ``` `encode` 自动启用 Brotli / zstd / gzip 链路压缩;`reverse_proxy` 自动透传 `X-Forwarded-*`;TLS 证书 80 端口 ACME HTTP-01 自动签 + 续期。 ## 启用 ```bash sudo caddy validate --config /etc/caddy/Caddyfile sudo systemctl reload caddy journalctl -u caddy -n 50 ``` `reload` 是热重载,不会断现有连接。 ## 防火墙 ```bash sudo ufw allow 80/tcp # ACME challenge 需要 80 sudo ufw allow 443/tcp ``` 80 必须开 —— Let's Encrypt 通过 80 端口验证域名所有权。 ## 进阶:通配符证书 + DNS-01 如果你想签 `*.example.com`,必须用 DNS-01 而不是 HTTP-01。Caddy 需要装 带 DNS 插件的版本: ```bash sudo caddy add-package github.com/caddy-dns/cloudflare ``` 然后: ``` *.example.com { tls { dns cloudflare {env.CF_API_TOKEN} } reverse_proxy 127.0.0.1:3000 } ``` 环境变量在 systemd drop-in 里设: ```bash sudo systemctl edit caddy # [Service] # Environment=CF_API_TOKEN=xxxx ``` ## 与 nginx 对比 | 维度 | Caddy | nginx + certbot | |---|---|---| | 配置行数 | 1-3 行 | 20-30 行 | | 自动续期 | 默认 | crontab + reload hook | | HTTP/3 | 默认 | 编译时 `--with-http_v3_module` | | 性能 | 略低 | 略高(C 写的) | | 配置复杂度 | 低 | 高,但灵活 | 小规模 / 个人项目用 Caddy;流量上去后或者需要复杂 rewrite 用 nginx。 ## 踩过的坑 - 第一次签证书会卡几十秒,是正常 ACME 流程;不要看到日志没动静就 restart, 会把证书签到一半的 nonce 弄丢,触发 LE 频率限制。 - LE 每周每域名最多签 50 张;测试时用 `tls internal` 走 Caddy 内置 CA, 浏览器手动信任即可。 - Caddy 默认证书 / 凭据存 `/var/lib/caddy/.local/share/caddy/`,备份时别漏。
无状态鉴权用 JWT 是行业标准,但很多实现有安全 / UX 问题: - access token 一次性给 24 小时:泄露后窗口太长 - 永远不轮换:被偷了没法停 - 把 JWT 放 localStorage:XSS 直接拿走 正确做法:**access token 短(5-15 分钟)+ refresh token 长(7-30 天)+ refresh 时轮换**。 ## 1. 流程概览 ``` 登录 → 返回 access (15min) + refresh (7d) API 请求 → 带 access token access 过期 → 用 refresh 拿新的 access + 新的 refresh(轮换) 退出 → 把 refresh token 加入服务端 blacklist ``` ## 2. 后端:FastAPI 例子 ```python from datetime import datetime, timedelta import secrets import jwt from fastapi import FastAPI, HTTPException, Depends, Response, Cookie from passlib.hash import bcrypt SECRET = 'long-random-secret-from-env' ACCESS_TTL = timedelta(minutes=15) REFRESH_TTL = timedelta(days=7) app = FastAPI() def create_access_token(user_id: str) -> str: return jwt.encode({ 'sub': user_id, 'type': 'access', 'exp': datetime.utcnow() + ACCESS_TTL, 'jti': secrets.token_urlsafe(8), }, SECRET, algorithm='HS256') def create_refresh_token(user_id: str) -> tuple[str, str]: jti = secrets.token_urlsafe(16) token = jwt.encode({ 'sub': user_id, 'type': 'refresh', 'exp': datetime.utcnow() + REFRESH_TTL, 'jti': jti, }, SECRET, algorithm='HS256') return token, jti # 登录 @app.post('/auth/login') def login(email: str, password: str, response: Response): user = db.find_user(email) if not user or not bcrypt.verify(password, user.password_hash): raise HTTPException(401) access = create_access_token(str(user.id)) refresh, refresh_jti = create_refresh_token(str(user.id)) db.save_refresh_token(user_id=user.id, jti=refresh_jti, ip=request.client.host) # Refresh token 放 HttpOnly Cookie,不让 JS 碰 response.set_cookie( 'refresh_token', refresh, httponly=True, secure=True, samesite='strict', max_age=int(REFRESH_TTL.total_seconds()), path='/auth', ) return {'access_token': access, 'expires_in': int(ACCESS_TTL.total_seconds())} ``` ## 3. 用 access token 访问 API ```python def current_user(authorization: str = Header(...)) -> User: if not authorization.startswith('Bearer '): raise HTTPException(401) token = authorization[7:] try: payload = jwt.decode(token, SECRET, algorithms=['HS256']) except jwt.ExpiredSignatureError: raise HTTPException(401, detail='token_expired') except jwt.InvalidTokenError: raise HTTPException(401) if payload['type'] != 'access': raise HTTPException(401) return db.get_user(payload['sub']) @app.get('/api/me') def me(user = Depends(current_user)): return {'id': user.id, 'email': user.email} ``` ## 4. 刷新 ```python @app.post('/auth/refresh') def refresh(response: Response, refresh_token: str = Cookie(None)): if not refresh_token: raise HTTPException(401) try: payload = jwt.decode(refresh_token, SECRET, algorithms=['HS256']) except jwt.InvalidTokenError: raise HTTPException(401) if payload['type'] != 'refresh': raise HTTPException(401) user_id, jti = payload['sub'], payload['jti'] # 关键安全检查:refresh token 还在白名单里吗? stored = db.get_refresh_token(user_id, jti) if not stored or stored.revoked: raise HTTPException(401, detail='refresh_revoked') # 轮换:把旧 refresh 撤销,发新的 db.revoke_refresh_token(jti) new_access = create_access_token(user_id) new_refresh, new_jti = create_refresh_token(user_id) db.save_refresh_token(user_id=user_id, jti=new_jti, ip=request.client.host) response.set_cookie('refresh_token', new_refresh, httponly=True, ...) return {'access_token': new_access, 'expires_in': int(ACCESS_TTL.total_seconds())} ``` 每次刷新都换新 refresh token,旧的立即作废。被攻击者偷了旧 refresh 后, 用户下次正常刷新会让攻击者那份失效 → 攻击者被踢出。 ## 5. 退出 ```python @app.post('/auth/logout') def logout(response: Response, refresh_token: str = Cookie(None)): if refresh_token: try: payload = jwt.decode(refresh_token, SECRET, algorithms=['HS256']) db.revoke_refresh_token(payload['jti']) except jwt.InvalidTokenError: pass response.delete_cookie('refresh_token', path='/auth') return {'ok': True} ``` access token 因为短(15 分钟)不需要单独撤销 —— 自然过期。 真要立即吊销,加一个"用户已注销 / 改密"的 token version 字段: ```python # 在 access token 里加 user.token_version # 修改密码 / 强制下线时 user.token_version += 1 # 校验时检查 payload['ver'] == user.token_version ``` ## 6. 前端处理 ```ts // 拦截器:401 时自动 refresh + 重试 api.interceptors.response.use( r => r, async (err) => { if (err.response?.status === 401 && err.response?.data?.detail === 'token_expired') { const r = await fetch('/auth/refresh', { method: 'POST', credentials: 'include' }) if (r.ok) { const { access_token } = await r.json() saveAccessToken(access_token) // 重试原请求 err.config.headers.Authorization = `Bearer ${access_token}` return api.request(err.config) } // refresh 也失败 → 强制重新登录 window.location = '/login' } return Promise.reject(err) } ) ``` 注意: - access token 存内存(不要 localStorage / sessionStorage —— XSS 风险) - refresh token 走 HttpOnly cookie(JS 拿不到) - API fetch 加 `credentials: 'include'` 才会发送 cookie ## 7. 防 CSRF refresh 通过 cookie 发 → CSRF 风险。两种防御: 1. **SameSite=Strict cookie**:跨站请求不发 cookie(前面已经设了) 2. **CSRF token**:refresh 要求 header 里带 CSRF token(额外参数) SameSite=Strict 已经能挡 99% CSRF;想更严就加 CSRF token。 ## 8. 算法选择 - HS256:HMAC + 共享 secret。最简单,单后端 OK - RS256:RSA 非对称。分布式系统(验证方 ≠ 签发方)必选 - EdDSA / Ed25519:性能 + 安全比 RSA 更好(新版 PyJWT 支持) 不要用 `alg: 'none'`(无签名)—— 历史上多次造成漏洞。 ## 9. 密钥管理 ```bash # 生成强 secret openssl rand -base64 64 ``` 存 `.env`,不要进 git。 定期轮换: 1. 部署新 SECRET + 让代码支持 fallback 验证(旧 + 新两个 key) 2. 等所有老 token 过期(access 15 min + refresh 7d = 7 天后) 3. 移除老 SECRET ## 10. 单点登录 / SSO 业务复杂后用 OIDC(OpenID Connect)—— OAuth2 + 身份层。 Auth0 / Keycloak / Authelia / authentik 提供完整流程。 自己不要从零造身份系统。 ## 踩过的坑 - access TTL 设 24h:泄露窗口太长。15 分钟 + refresh 是 sweet spot。 - refresh 不轮换:被偷了攻击者长期持有。每次刷新换新 refresh + 撤销旧的。 - 把 JWT 放 localStorage:XSS 直接读走。永远内存(access)+ HttpOnly cookie (refresh)。 - 算法降级攻击:服务端用 `jwt.decode(token, key)` 但没指定 algorithms → 攻击者用 `alg: 'none'` 假签名通过。**永远指定 `algorithms=['HS256']`**。
## 起因 我想知道两种压缩方案哪个快——`gzip -9` vs `zstd -19` vs `xz -9`。 传统做法: ```bash time gzip -9 < big.txt > /dev/null # real 0m4.231s time zstd -19 < big.txt > /dev/null # real 0m3.876s ``` 跑一次的数字根本不可信(系统抖动 5-10%)。要科学就得跑 N 次取平均, 还要算标准差,手写麻烦。 `hyperfine` 是 Rust 写的 benchmark 工具,自动做 warmup + 多次运行 + 统计 + 多命令对比。 ## 安装 ```bash # Debian / Ubuntu sudo apt install hyperfine # 或者 brew install hyperfine cargo install hyperfine hyperfine --version ``` ## 单命令 benchmark ```bash hyperfine 'gzip -9 < big.txt > /dev/null' ``` 输出: ``` Benchmark 1: gzip -9 < big.txt > /dev/null Time (mean ± σ): 4.187 s ± 0.082 s [User: 4.135 s, System: 0.041 s] Range (min … max): 4.073 s … 4.298 s 10 runs ``` 默认跑 10 次(不够稳定会自动加),算均值 + 标准差 + 范围。 ## 多命令对比(最实用) ```bash hyperfine \ 'gzip -9 < big.txt > /dev/null' \ 'zstd -19 < big.txt > /dev/null' \ 'xz -9 < big.txt > /dev/null' ``` 输出: ``` Benchmark 1: gzip -9 < big.txt > /dev/null Time (mean ± σ): 4.187 s ± 0.082 s Benchmark 2: zstd -19 < big.txt > /dev/null Time (mean ± σ): 3.421 s ± 0.064 s Benchmark 3: xz -9 < big.txt > /dev/null Time (mean ± σ): 15.842 s ± 0.214 s Summary 'zstd -19 < big.txt > /dev/null' ran 1.22 ± 0.03 times faster than 'gzip -9 < big.txt > /dev/null' 4.63 ± 0.10 times faster than 'xz -9 < big.txt > /dev/null' ``` 最后的 `Summary` 直接告诉你倍数差距。 ## 参数扫描 ```bash hyperfine --parameter-list level 1,3,5,9,19 \ 'zstd -{level} < big.txt > /dev/null' ``` 会跑 5 个 benchmark(level=1, 3, 5, 9, 19),出对比表 + 倍数。 数值扫描: ```bash hyperfine --parameter-scan threads 1 16 \ 'cargo build --jobs {threads}' ``` `threads=1, 2, 3, ..., 16` 各跑一次。 ## warmup ```bash hyperfine --warmup 3 'some-command' ``` 先跑 3 次"不计入统计"——让 OS page cache 热起来 / JIT 预编译。 对涉及磁盘 IO 或冷启动的命令 essential。 ## 准备 / 清理钩子 ```bash hyperfine \ --prepare 'sync && echo 3 | sudo tee /proc/sys/vm/drop_caches' \ --cleanup 'rm /tmp/out' \ 'cp big.file /tmp/out' ``` `--prepare` 每次 run 之前跑(这里是清 page cache,模拟冷启动)。 `--cleanup` 每次 run 之后跑。 ## 导出原始数据 ```bash hyperfine --export-json bench.json --export-markdown bench.md cmd1 cmd2 # 或 CSV hyperfine --export-csv bench.csv cmd1 cmd2 ``` JSON 给脚本分析;Markdown 直接贴博客 / PR 描述。 ## 命令名 输出里的 `Benchmark 1` 不好看,自己命名: ```bash hyperfine \ -n 'old algorithm' 'old_cmd args' \ -n 'new algorithm' 'new_cmd args' Summary 'new algorithm' ran 1.34 ± 0.05 times faster than 'old algorithm' ``` PR 里直接贴这个 summary 是最好的"性能改进"证据。 ## 在 CI 跑 benchmark ```yaml # .github/workflows/bench.yml - name: Benchmark run: | hyperfine --warmup 3 --runs 20 \ --export-markdown bench.md \ 'cargo build --release' \ './before-binary' \ './after-binary' - name: Comment PR uses: peter-evans/create-or-update-comment@v4 with: issue-number: ${{ github.event.pull_request.number }} body-path: bench.md ``` 每次 PR 自动跑 benchmark + 评论到 PR。 ## 与 perf / time / Pythontimeit 对比 - **time**:粗糙,单次运行 - **/usr/bin/time -v**:详细但单次,还得自己算多次 - **Pythontimeit**:仅 Python 函数级 - **hyperfine**:黑盒命令多次运行 + 统计 它们解决不同问题: | 工具 | 适合 | |---|---| | hyperfine | "我的脚本快了吗" / "两个命令哪个快" | | perf | "函数 X 里哪行慢" | | flamegraph | "整个程序时间花在哪" | | timeit | "Python 这个表达式多快" | ## 高级:show-output 默认 hyperfine 把 stdout/stderr 重定向到 /dev/null(避免输出 IO 影响)。 debug 时打开: ```bash hyperfine --show-output 'cmd' ``` ## 限制时间 ```bash hyperfine --time-unit second --warmup 5 --min-runs 10 --max-runs 100 \ 'some-command' ``` `--min-runs` 保证统计意义,`--max-runs` 防止单次太慢导致总时间炸。 ## 效果 - 几秒钟解决 "A vs B 哪个快" 的争论,给出 ± 误差 - PR 里贴 hyperfine summary 比口头"快了一些"有说服力 100 倍 - CI 集成后性能回归被自动捕捉 - 团队选型(algorithm / lib / 构建工具)有了客观依据 ## 踩过的坑 1. **命令本身极快(< 1ms)**:hyperfine overhead 比命令还大。 测纳秒级用 `criterion`(Rust 库)或者 Python `timeit`。 2. **第一次跑 IO 命令明显慢**:page cache 没热。`--warmup 3` 或者 `--prepare 'sync && drop_caches'` 看你想测"warm" 还是"cold"。 3. **shell expansion 不一致**: ```bash # ❌ shell 把 *.txt 展开后传给 hyperfine,第一次有效,后续可能不一致 hyperfine 'cat *.txt > /dev/null' # ✅ 用 sh -c 让命令在子 shell 里展开 hyperfine 'sh -c "cat *.txt > /dev/null"' ``` 4. **CI runner 抖动大**:共享 runner 受其它 job 影响。专用 self-hosted runner 或者用 baseline 算相对差异,不看绝对数。 5. **--prepare 失败不退出**:prepare 命令出错 hyperfine 不知道。 prepare 命令里 `set -e` 自保。
## 起因 git diff 经典痛点: ```diff - function process(items) { + async function process(items, options = {}) { ``` 实际只加了 `async` 和 `options` 参数,但 diff 显示整行替换。 review 慢 + 看不出真正变化。 更糟:reformatter(prettier / black)改空白 → diff 一片噪音,藏住 真正逻辑改动。 **difftastic** (`difft`) 用 tree-sitter 解析语法 → 按 AST 节点 diff → 真正的"结构化" diff。 ## 装 ```bash brew install difftastic cargo install difftastic ``` ## 单 file diff ```bash $ difft old.js new.js ``` 输出 side-by-side,**按 AST 节点高亮变化**: ``` old.js new.js function process(items) { async function process(items, options = {}) { ^^^^ ^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^ (unchanged) new new ... ``` 只高亮变化的 token,不变的部分原样。 看一眼就知道 "加了 async" + "加了 options 参数"。 ## git 集成 `~/.gitconfig`: ```ini [diff] external = difft [pager] difftool = false [difftool] prompt = false [difftool "difftastic"] cmd = difft "$LOCAL" "$REMOTE" ``` 或者只在需要时用: ```bash GIT_EXTERNAL_DIFF=difft git diff GIT_EXTERNAL_DIFF=difft git log -p ``` alias 一下: ```bash alias gdt='GIT_EXTERNAL_DIFF=difft git diff' alias glogt='GIT_EXTERNAL_DIFF=difft git log -p' ``` ## 跨语言 difftastic 用 tree-sitter,支持几十种语言: - Python / JS / TS / Rust / Go / Java / C / C++ - HTML / CSS / Markdown - JSON / YAML / TOML - SQL / GraphQL 每语言知道 "function" / "class" / "import" 等结构。 ## 实际效果 reformatter run 改空白: ```diff # 普通 git diff - def foo(x): - return x + 1 + def foo(x): + return x + 1 # difftastic [no changes detected] ``` 啥都不改的格式化 → difftastic 不显,git diff 显一堆假动。 ## 重命名变量 ```python - def process(data): - result = data * 2 - return result + def process(items): + output = items * 2 + return output ``` 普通 git diff 看不出"几个变量重命名",看到"3 行换 3 行"。 difftastic 高亮:data→items, result→output。 理解快。 ## 大改动也清晰 ```js // before const x = a + b * c; // after const x = (a + b) * c; ``` 普通 diff: `- ... + ...` 整行。 difftastic 高亮加的括号 → 一眼看出。 ## 限制 - **性能**:大 file(>1MB)解析慢。git log -p 多 commit 时累。 - **只 syntax**:不是 semantic diff(不知 var rename 是同 var)。 - **某些语言 parse 错**:tree-sitter parser 偶有 bug。 不 fit 时降级 `git --no-external-diff diff`。 ## 与 delta 对比 `delta`(dandavison/delta)是另一 diff 工具: - 漂亮 syntax highlighting(仍是行级 diff) - side-by-side / line numbers - 不是真 AST diff,但视觉好 | | delta | difftastic | |---|---|---| | 方式 | 行级 + highlighting | AST 结构 | | 速度 | 快 | 中 | | 噪音 (whitespace) | 仍显示 | 跳过 | | 重命名 | 不识别 | 识别 | 我 daily git diff 用 delta(速度 + 通用),复杂 PR review 用 difftastic。 ## 配 GitHub Actions PR review ```yaml - run: | GIT_EXTERNAL_DIFF=difft git diff origin/main...HEAD > review.txt - uses: actions/upload-artifact@v4 with: name: structural-diff path: review.txt ``` CI 生成结构化 diff,review 时下载看。 ## 配 fzf ```bash # 选 commit 看 difft git log --oneline | fzf --preview 'GIT_EXTERNAL_DIFF=difft git show {1}' ``` ## 与 IDE diff 对比 VS Code / JetBrains 内置 diff 视图也强。 - 行级 + 块对比 - inline editing - 但不识别 AST 结构 difftastic 在 terminal / PR review 流程中 fill gap。 ## 真实 case 某 PR refactor 1000 行: - 普通 diff 显 800 行变化(多数是 whitespace + import 顺序) - difftastic 显 50 行真实结构变化 review 时间从 1 小时 → 15 分钟。 看到真正改的逻辑而不是噪声。 ## ast-grep 替代某些场景 semantic refactor 看 `ast-grep` (sg): ```bash sg --pattern 'console.log($A)' --rewrite 'logger.info($A)' ``` 按 AST pattern 替换。 不是 diff 工具但相关:semantic understanding of code。 ## 踩过的坑 1. **git log -p 慢**:每 commit difft → 100 commit 几分钟。 只在需要时用,不要全局 default。 2. **alias 没生效**:`GIT_EXTERNAL_DIFF=difft` 必须 env var, alias `git diff` 不带 env 不行。用 `git -c diff.external=difft diff`。 3. **某语言不识别**:esoteric 文件 difft 不解析 → 行级 fallback。 仍可用,但失去 AST 优势。 4. **side-by-side 太宽**:窄终端被裁。`--display inline` 用 inline 模式。 5. **PR tool 集成弱**:GitHub PR review UI 不能用 difftastic(GitHub 是 server-side)。本地拉 PR 后 difftastic 看。
## 起因 要做一个"任务跑完通知前端"的功能。WebSocket 是常见方案但: - 需要保持长连接 + 心跳 + 重连逻辑 - HTTP/2 + nginx 反代各种配置 - 鉴权 / cookies / CORS 跟普通 HTTP 差异大 如果**只需要服务端推、客户端不需要回传**(80% 通知场景), Server-Sent Events (SSE) 是更简单的方案:基于 HTTP 长连接,浏览器 原生支持 EventSource API,自动重连。 ## 解决方案 ### 1. 服务端:FastAPI ```python from fastapi import FastAPI from fastapi.responses import StreamingResponse import asyncio, json app = FastAPI() async def event_stream(user_id: str): while True: # 等下一条 event(可以从 Redis pubsub / DB poll / 内存队列拿) event = await pubsub.get_event_for_user(user_id) # SSE 格式:data: <json>\n\n yield f"data: {json.dumps(event)}\n\n" @app.get('/events') async def sse(user_id: str): return StreamingResponse( event_stream(user_id), media_type='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no', # nginx 不要 buffer }, ) ``` SSE 是纯 HTTP,response body 持续 stream,每个 event 一段: ``` event: notification data: {"type": "task_done", "id": 42} id: 12345 retry: 5000 event: update data: {"progress": 0.5} ``` 字段: - `data:` 必填,可多行 - `event:` 可选事件类型(前端按类型路由) - `id:` 可选,用于断线重连时 Last-Event-Id - `retry:` 客户端断线后重连等待时长(ms) - 双 `\n\n` 结束一个 event ### 2. 前端:EventSource API ```js const es = new EventSource('/events?user_id=42') es.onopen = () => console.log('connected') es.onmessage = (e) => { // 默认 event(没 event: 字段时) const data = JSON.parse(e.data) console.log('event:', data) } es.addEventListener('notification', (e) => { const n = JSON.parse(e.data) showToast(n.message) }) es.addEventListener('progress', (e) => { updateProgressBar(JSON.parse(e.data).percent) }) es.onerror = (e) => { console.warn('SSE error, will reconnect automatically') // 浏览器自动 reconnect(用 retry 字段指定的间隔) } // 主动关闭 // es.close() ``` `EventSource` 浏览器自动: - 重连(断线后等 retry 时间) - 发送 `Last-Event-Id` header 让服务端从断点继续 - HTTP/1.1 长连接 ### 3. 实际后端实现:Redis pubsub 桥 ```python import redis.asyncio as redis r = redis.from_url('redis://localhost') async def event_stream(user_id: str): pubsub = r.pubsub() await pubsub.subscribe(f'user:{user_id}') # heartbeat:每 30s 发一个注释行,防止反代认为连接死了断开 async def heartbeat(): while True: await asyncio.sleep(30) yield ': keepalive\n\n' try: async for msg in pubsub.listen(): if msg['type'] != 'message': continue data = msg['data'].decode() yield f'data: {data}\n\n' finally: await pubsub.unsubscribe(f'user:{user_id}') await pubsub.close() ``` 业务方在任何地方 publish 事件: ```python import redis r = redis.from_url(...) # 任务完成时 r.publish(f'user:{user_id}', json.dumps({'type': 'task_done', 'task_id': 42})) ``` ### 4. 断线重连 + Last-Event-Id 服务端要支持"从某 id 之后开始重发": ```python from fastapi import Header @app.get('/events') async def sse( user_id: str, last_event_id: str | None = Header(None, alias='Last-Event-Id'), ): async def gen(): # 1. 重连场景:发送 last_event_id 之后错过的事件 if last_event_id: for e in get_events_after(user_id, last_event_id): yield f'id: {e.id}\ndata: {e.data}\n\n' # 2. 然后实时推送 async for e in subscribe_user_events(user_id): yield f'id: {e.id}\ndata: {e.data}\n\n' return StreamingResponse(gen(), media_type='text/event-stream') ``` 每个 event 给 `id:` → 浏览器记住 → 重连时自动 header 发 `Last-Event-ID`。 保证不丢事件。 ### 5. nginx 配置 SSE 需要不被 buffer + 不超时: ```nginx location /events { proxy_pass http://app; proxy_buffering off; # 关键:不要 buffer proxy_cache off; proxy_set_header Connection ''; proxy_http_version 1.1; chunked_transfer_encoding off; proxy_read_timeout 24h; # 长连接不要超时 } ``` `X-Accel-Buffering: no` header(应用层发的)也起同样作用。 ### 6. 鉴权 SSE 是 HTTP,cookie 自动带(同源)。跨域要 CORS: ```python @app.get('/events') async def sse(...): return StreamingResponse( gen(), media_type='text/event-stream', headers={ 'Access-Control-Allow-Origin': 'https://web.example.com', 'Access-Control-Allow-Credentials': 'true', }, ) ``` ```js new EventSource('https://api.example.com/events', { withCredentials: true }) ``` EventSource 不支持 custom headers(如 Authorization),鉴权要走 cookie 或 URL query。query 传 token 不安全(log 可能记),生产用 cookie session。 ### 7. 多 worker 的事件分发 多进程 / 多机器 worker 时,event publish 给某 user 但 SSE 连接在别的 worker → 丢。Redis pubsub 解决:所有 worker 订阅,user 的连接在哪个 worker 就由那个 worker push。 或者用 Centrifugo / Mercure 等专用 SSE/WebSocket 后端,扩容简单。 ## 与 WebSocket / polling 对比 | | SSE | WebSocket | Long polling | |---|---|---|---| | 协议 | HTTP(单向) | 自定义 frame(双向) | HTTP | | 浏览器原生 | EventSource | WebSocket | XHR | | 自动重连 | ✅ | ❌(自己写) | ❌ | | 鉴权 | cookie 简单 | 复杂 | 简单 | | Header / Auth | 受限 | 灵活 | 灵活 | | nginx 反代 | 简单 | 中(HTTP/1.1 upgrade) | 简单 | | 客户端到服务端 | 用普通 HTTP | 内置 | 用 XHR | 适合 SSE 场景: - 通知 / push 单向流(订单状态 / 后台任务进度 / 行情 ticker) - AI 聊天 / LLM streaming response - log tail / real-time dashboard 适合 WebSocket: - 双向频繁(聊天 / 多人编辑 / 游戏) - 二进制(视频 / 音频) ### LLM streaming 是 SSE 的杀手应用 OpenAI / Anthropic / Ollama 等 LLM API 的 streaming 输出都用 SSE: ```python async def chat_stream(prompt: str): yield 'data: {"role": "assistant", "content": ""}\n\n' async for chunk in llm.stream(prompt): yield f'data: {json.dumps({"content": chunk})}\n\n' yield 'data: [DONE]\n\n' ``` 前端用 EventSource 接收每个 token,UI 打字机效果显示。 ## 效果 我们改 SSE 替代之前的 long-polling 通知: - 推送延迟从 1-5s(poll 间隔)→ < 100ms - 服务端请求数 -90%(不再每 5s 轮询) - 客户端代码 60 行 → 15 行(EventSource 包办) - 断线重连自动 + 无丢事件(Last-Event-Id) ## 踩过的坑 1. **nginx buffer 没关**:response 在 nginx 端聚合,客户端永远收不到 单个 event(全部等到 buffer 满才发)。`proxy_buffering off` 必加。 2. **同源限制 6 个 SSE 连接**:浏览器对每域名 HTTP/1.1 连接数限制 ~6。开多个 SSE 把额度吃光,普通请求被排队。 解决:HTTP/2(多路复用,无 6 限制)+ 同源访问。 3. **多 tab 共享一个 SSE 用 SharedWorker**:1 个 tab 1 个 SSE 连接, 10 个 tab 客户端 10 倍开销 + 服务端 10 倍连接。SharedWorker 让多 tab 共享一个连接。 4. **mobile Safari 后台断连**:iOS 把后台 tab 的 SSE 断了不通知。 `visibilitychange` 事件触发时检查 EventSource.readyState, 不是 OPEN 就重连。 5. **回避 EventSource 不支持自定义 header**:要 JWT 鉴权又不想存 cookie: 用 `EventSourcePolyfill`(npm 包)支持 fetch API + header。
## 起因 业务给了一个 50GB 用户行为日志 CSV,让我做个简单的"每个城市 PV / UV / 转化率"统计。pandas 直接 `read_csv` 把机器爆了(机器只有 32GB 内存)。改成 chunksize=100000 一块块读、自己累加?写起来又难维护 (要管 partial state)。 polars 的 `scan_csv` + lazy + streaming 是这种"out-of-core"分析的 标准解法。 ## 解决方案 ### 装 ```bash uv add polars ``` ### 一行做完聚合 ```python import polars as pl result = ( pl.scan_csv('events_50gb.csv') # 不读,只 plan .filter(pl.col('event_type') == 'pageview') .group_by(['city', 'date']) .agg([ pl.col('user_id').n_unique().alias('uv'), pl.len().alias('pv'), ]) .sort('pv', descending=True) .collect(streaming=True) # 流式执行 ) print(result.head(20)) ``` 关键点: 1. **`scan_csv` 而不是 `read_csv`**:只构建 query plan,不读文件 2. **链式 lazy 操作**:filter → group_by → agg → sort 都在 plan 里 3. **`collect(streaming=True)`**:让 polars 流式跑(边读边算, 不把全部数据放内存) 50GB 文件在我 32GB 机器上跑了 8 分钟,内存峰值 ~4 GB。 ### 看 query plan ```python q = ( pl.scan_csv('events.csv') .filter(pl.col('event_type') == 'pageview') .group_by('city') .agg(pl.col('user_id').n_unique()) ) print(q.explain()) # AGGREGATE # [col("user_id").n_unique()] # BY [col("city")] # FROM # Csv SCAN events.csv # PROJECT 3/15 COLUMNS # SELECTION: [col("event_type") == "pageview"] ``` polars 自动做: - **column pruning**:只读用到的 3 列(不读 15 列里的其它 12 列) - **predicate pushdown**:filter 下推到 scan 层(不读不匹配的行) - **operator fusion**:能合并的操作合并 这些都是数据库 query optimizer 的标准手法,polars 把它带到 DataFrame 里。 ### 转 Parquet:以后查询快 10x ```python # 一次性把 CSV 转 Parquet(列存 + zstd 压缩) pl.scan_csv('events_50gb.csv').sink_parquet('events.parquet', compression='zstd') # 50GB CSV → ~12GB Parquet # 之后所有查询用 parquet result = pl.scan_parquet('events.parquet').filter(...).group_by(...).agg(...).collect() # 同样查询 8 分钟 → 30 秒 ``` Parquet 列存让"只读需要的列"在 IO 层就生效,比 CSV 快得多。 ### 按字段分区写入 ```python # 数据按 date 分区写 pl.scan_csv('events.csv').sink_parquet( 'events/', partition_by=['date'], ) # 生成:events/date=2024-01-01/data_0.parquet ... ``` ```python # 之后查询某一天的数据自动 prune 到该分区 pl.scan_parquet('events/').filter(pl.col('date') == '2024-01-01') ``` 巨大效率提升 —— 比扫整个 50GB 高几个数量级。 ### join 大表 ```python # 50GB 主表 join 100MB 维度表 result = ( pl.scan_parquet('events.parquet') .join(pl.scan_csv('cities.csv'), on='city_id', how='left') .group_by('city_name') .agg(pl.len().alias('events')) .collect(streaming=True) ) ``` polars 自动选 hash join(小表 build hash,大表 stream probe), 内存友好。 ### 性能 vs pandas / dask 50GB CSV → `count by city`: | 工具 | 时间 | 内存峰值 | 写代码量 | |---|---|---|---| | pandas chunked | 25 min | ~6 GB | 30 行(手动累加) | | dask | 12 min | ~8 GB | 8 行 | | **polars lazy + streaming** | 8 min | ~4 GB | 6 行 | | **polars on parquet** | 30 s | ~2 GB | 6 行 | 转 Parquet 一次后所有后续查询都飞快。 ## 效果 - 50GB CSV 单机能跑通(之前要上集群) - 探索性分析迭代速度 25 倍快 - 代码 30 行 → 6 行,可维护性大幅提升 - 后续团队类似分析都按"CSV → Parquet → polars query"模板做 ## 高级 tip ### 流式写入 巨大计算结果直接落盘,不全部放内存: ```python (pl.scan_csv('events.csv') .filter(pl.col('amount') > 100) .sink_parquet('high_value.parquet')) ``` `sink_*` 系列函数都是流式写。 ### profile 查询 ```python import time t0 = time.time() result = q.collect(streaming=True) print(f'took {time.time()-t0:.1f}s') # 更细:每个 node 时间 result, profile = q.profile() print(profile) ``` ### 字符串列消耗内存大 ```python df = df.with_columns( pl.col('country').cast(pl.Categorical), # 像 pandas categorical ) ``` categorical 把字符串映射到 int,对低基数字段省 80% 内存。 ## 踩过的坑 1. **`streaming=True` 不是所有操作都支持**:极个别 window 操作目前 还回落到 in-memory。运行时如果没真的流式,看 explain 输出会有 "STREAMING" 字样标记。 2. **多文件 scan 时 schema 不一致**:`scan_csv('logs/*.csv')` 假设所有 文件 schema 一样。第 N 个文件多一列就出错。`infer_schema_length=10000` 或显式指定 schema。 3. **`group_by(['col1', 'col2'])` 在 streaming 模式不一定流式**: 分组键基数极高(如 user_id)时内存爆。改成"先按 user_id hash 分桶 再聚合"的策略。 4. **写 Parquet 时 dtype 不对**:polars `null` 列写 Parquet 是 null type, 其它工具读不出来。`cast` 到具体类型再写。 5. **collect 后切回 eager 操作**:`collect()` 返回 `DataFrame`, 不再是 lazy。后续操作如果想再 lazy 就 `.lazy()`。
## 起因 shell prompt 需要显示: - 当前 git 分支 / 状态 - 当前 Python venv / Node 版本 - 上一命令 exit code - 是不是 ssh - 当前 k8s context 老办法: - bash 写 `PS1='...'` 一长串 escape - zsh 装 oh-my-zsh + theme - 每个 shell(bash / zsh / fish / nu)都要重新配 `starship`(Rust)是**跨 shell 统一 prompt engine**。一份 TOML 配置 所有 shell 受用 + 快 + 美观。 ## 装 ```bash brew install starship curl -sS https://starship.rs/install.sh | sh # 集成 eval "$(starship init bash)" # bash eval "$(starship init zsh)" # zsh starship init fish | source # fish # 加到 rc file echo 'eval "$(starship init zsh)"' >> ~/.zshrc ``` ## 默认效果 ``` ~/projects/myapp on feature/auth [✱] ❯ ``` 显示: - 路径 - git 分支 + 修改标记 - 自动检测项目类型(如 Node project 显示 node 版本,Rust 显示 cargo 版本) ## 自定义 ~/.config/starship.toml ```toml # 整体格式 format = """ $directory\ $git_branch$git_status\ $python$nodejs$rust\ $kubernetes\ $line_break\ $character""" # 单组件配置 [directory] truncation_length = 3 truncate_to_repo = true style = "bold cyan" [git_branch] symbol = " " style = "bold green" [git_status] modified = "*" staged = "+" untracked = "?" ahead = "⇡${count}" behind = "⇣${count}" diverged = "⇕" [python] symbol = " " format = '[$symbol$pyenv_prefix($version )(\($virtualenv\) )]($style)' [nodejs] symbol = " " disabled = false [kubernetes] disabled = false format = '[$symbol$context( \($namespace\))]($style) ' [character] success_symbol = "[❯](green)" error_symbol = "[❯](red)" ``` ## 性能 starship 测量自己执行时间: ```bash starship time ``` 100-200ms 慢的话,prompt 觉得卡。常见原因: - git status 在大 repo 慢 - python module check 慢 优化: ```toml [git_status] disabled = false ignore_submodules = true # 大 submodule repo 跳过 [python] disabled = true # 不显 python 版本 ``` 或者用 `command_timeout = 500`(毫秒)限制慢操作。 ## 项目类型自动检测 starship 在每次按回车跑一次,判断当前目录类型: - `package.json` → 显 node 版本 - `pyproject.toml` / `requirements.txt` → 显 python - `Cargo.toml` → rust - `go.mod` → go - `Dockerfile` → docker - `Gemfile` → ruby - ... 不在项目里时不显(不噪音)。 ## 多线设计 ```toml format = """ [╭─](bold yellow) $username@$hostname $directory\ $git_branch$git_status$git_state$git_metrics [╰─](bold yellow)$character""" ``` ``` ╭─ alice@laptop ~/proj/myapp on main ╰─❯ ``` 第二行只放 prompt 字符 → 命令永远在固定位置,长 prompt 不挤右侧。 ## right prompt ```toml right_format = "$cmd_duration $time" [cmd_duration] min_time = 500 # > 500ms 才显 format = "took [$duration](bold yellow) " [time] disabled = false format = '[$time]($style) ' time_format = "%R" ``` ``` ~/proj/myapp on main took 2.3s 15:42 ❯ ``` 长命令耗时 + 时间右侧显示。 ## 通过 ssh 改色 ```toml [hostname] ssh_only = true format = "[$hostname](bold red) " ``` 只在 ssh 里显主机名,本地不显。提醒"我在远程"。 ## kubernetes context ```toml [kubernetes] disabled = false format = '⛵ [$context(\($namespace\))]($style) ' style = "purple" [kubernetes.context_aliases] "gke_my-project-prod_us-central1_cluster" = "prod" "gke_my-project-staging_us-central1_cluster" = "stg" ``` ``` ⛵ prod(default) ~/k8s on main ❯ ``` 不同 context 染色 → 误操作 prod 之前肉眼能注意到。 ## env_var ```toml [env_var.AWS_PROFILE] format = "AWS:[$env_value](bold blue) " default = "" ``` 显当前 AWS profile(避免 prod 误操作)。 ## 与 zsh theme 对比 | | starship | powerlevel10k | oh-my-zsh themes | |---|---|---|---| | 跨 shell | ✅ | ❌(zsh only) | ❌ | | 速度 | 快(rust) | 极快(缓存) | 慢 | | 配置 | TOML | zsh wizard | shell script | | 可读性 | 高 | 中 | 中 | | 新人友好 | 高 | 中 | 中 | p10k 在 zsh 里最快(用 instant prompt)。 starship 跨 shell + 简单配置。我用 starship。 ## fish 友好 fish 用户最爱 starship。`fish_prompt` 自定义很麻烦,starship 一行接管。 ## nushell nushell 也支持: ```nu # ~/.config/nushell/config.nu $env.PROMPT_COMMAND = { || starship prompt } ``` ## 我的精简配置 ```toml # ~/.config/starship.toml format = """ $directory$git_branch$git_status$python$nodejs$kubernetes $character""" right_format = "$cmd_duration" [directory] truncation_length = 4 truncate_to_repo = true [git_branch] symbol = " " format = "on [$symbol$branch]($style) " [git_status] format = '([\[$all_status$ahead_behind\]]($style)) ' [python] symbol = "py " format = '[$symbol(\($virtualenv\) )]($style)' [nodejs] symbol = "node " detect_files = ["package.json"] [kubernetes] disabled = false symbol = "⎈ " format = '[$symbol$context]($style) ' [cmd_duration] min_time = 1000 format = "[$duration]($style) " [character] success_symbol = "[❯](green)" error_symbol = "[❯](red)" vimcmd_symbol = "[❮](green)" ``` 效果: ``` ~/proj/myapp on main [*+] ⎈ prod ❯ ``` 干净 + 信息量足。 ## 踩过的坑 1. **NerdFont 字符不显**:图标方块乱码。换支持 NerdFont 的字体 (JetBrainsMono Nerd / Hack Nerd / Iosevka)。 2. **大 monorepo git status 慢**:每次回车 100-500ms。 `disabled = true` 或限 `ignore_submodules = true`。 3. **conda env 没显**:默认 starship 不显 conda。`[conda] disabled = false`。 4. **远程 ssh prompt 不换色**:忘配 `[hostname] ssh_only = true`。 5. **shell init 慢**:`time zsh -i -c exit`。starship init 一般 < 10ms, 慢的话 starship 版本太老 / 配置 toml 错。
## 起因 应用要"用 Google / GitHub 账号登录"。 OAuth 2.0 有多种 flow: - Authorization Code(有 backend,推荐) - Implicit(弃用) - Resource Owner Password(弃用) - Client Credentials(machine-to-machine) SPA / mobile 之前用 implicit flow(token 直接放 URL),现在标准是 **Authorization Code + PKCE**。 ## PKCE 是什么 Authorization Code flow 需要 client secret 验证 client 身份。 SPA / mobile 不能藏 secret(公共 client)。 PKCE (Proof Key for Code Exchange) 用临时 challenge 替代 secret: 1. client 生成随机 `code_verifier`(高熵 string) 2. SHA256 hash 得到 `code_challenge` 3. authorize 请求带 `code_challenge` 4. callback 拿到 code 后换 token 时带原 `code_verifier` 5. server 验 hash(verifier) == challenge → 确认是同一 client 防止"中间人截获 authorization code 后冒充换 token"。 ## flow 详细 ### 1. client 生成 verifier + challenge ```js const codeVerifier = generateRandomString(64); const codeChallenge = base64UrlEncode(await sha256(codeVerifier)); // 存到 sessionStorage(callback 时取) sessionStorage.setItem('pkce_verifier', codeVerifier); ``` ### 2. 跳转 authorize URL ```js const params = new URLSearchParams({ client_id: 'my-client-id', redirect_uri: 'https://app.example.com/callback', response_type: 'code', scope: 'openid profile email', state: randomState, // 防 CSRF code_challenge: codeChallenge, code_challenge_method: 'S256', }); window.location = `https://auth.example.com/authorize?${params}`; ``` 用户在 IdP 登录 → 同意授权 → IdP redirect 回: ``` https://app.example.com/callback?code=ABC123&state=... ``` ### 3. callback 换 token ```js async function callback() { const code = new URLSearchParams(location.search).get('code'); const verifier = sessionStorage.getItem('pkce_verifier'); const res = await fetch('https://auth.example.com/token', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'authorization_code', code, client_id: 'my-client-id', redirect_uri: 'https://app.example.com/callback', code_verifier: verifier, }), }); const tokens = await res.json(); // { access_token, refresh_token, id_token, expires_in } sessionStorage.removeItem('pkce_verifier'); storeTokens(tokens); } ``` server 验 `hash(verifier) == challenge` → 发 token。 ## token 存哪 SPA 痛点:access_token 存哪? - **localStorage**:XSS 一漏全暴露 - **sessionStorage**:同 localStorage + tab close 丢 - **memory (JS variable)**:刷新丢 - **httpOnly cookie**:最安全但需要 same-origin 或者 cross-origin 配置 + BFF backend 主流推荐:**BFF (Backend-for-Frontend) 模式** ``` [SPA] ↔ same-origin httpOnly cookie ↔ [BFF Node/Python] ↓ access_token [Resource API] ``` - BFF 持 token,SPA 仅 session cookie - XSS 拿不到 token - CSRF 用 sameSite=strict cookie 防 复杂但最安全。如果纯前端:access_token in memory + refresh 在 httpOnly cookie。 ## refresh token access_token 短(15 min - 1 h),过期用 refresh_token 换新。 PKCE flow 也能给 refresh_token:`scope=offline_access`。 refresh_token rotation(每次用过失效)防 leak 后无限续。 现代 OAuth 服务器(Auth0 / Okta / Keycloak)默认开。 ## state 防 CSRF ```js // 跳 authorize 之前 const state = generateRandom(); sessionStorage.setItem('oauth_state', state); // 加到 URL params // callback 验 const returnedState = new URLSearchParams(location.search).get('state'); const expectedState = sessionStorage.getItem('oauth_state'); if (returnedState !== expectedState) throw 'CSRF'; ``` 不验 state → 攻击者可让用户登入攻击者账号(account takeover)。 **必须验**。 ## scope OAuth scope 控制 token 能干啥: ``` scope=openid profile email # OIDC 标准 scope=read:user user:email # GitHub scope=https://www.googleapis.com/auth/drive.readonly # Google API ``` **最小权限**:只要 `email`,不要 `read:user`。 ## OIDC vs OAuth - OAuth 2.0:授权("app 能代你访问 API") - OpenID Connect (OIDC):在 OAuth 上面加身份层("who is the user") OIDC 加 `id_token`(JWT containing 用户信息)+ `userinfo` endpoint。 login 场景用 OIDC(need user identity),不仅是 API 授权。 scope 加 `openid` 启用 OIDC。 ## Library 不要手写 OAuth,用库: - JS: `oidc-client-ts`, `@auth0/auth0-spa-js` - Python: `authlib`, `httpx-auth` - Go: `golang.org/x/oauth2` PKCE flow 几行代码: ```js import { UserManager } from 'oidc-client-ts'; const userManager = new UserManager({ authority: 'https://auth.example.com', client_id: 'my-client', redirect_uri: 'https://app.example.com/callback', response_type: 'code', scope: 'openid profile email', // PKCE 自动 }); await userManager.signinRedirect(); // 跳 authorize const user = await userManager.signinRedirectCallback(); // callback 处理 ``` ## device code flow CLI / TV / IoT 没浏览器,用 device flow: 1. 应用问 server 拿 `device_code` + `user_code`(如 "ABC-123") 2. 用户开浏览器去 `https://example.com/device` 输 user_code 3. 应用 poll `/token` endpoint 等用户授权完成 GitHub CLI `gh auth login` 就是 device flow。 ## 实战 case:第三方"用 GitHub 登录" ```python # backend (FastAPI + authlib) from authlib.integrations.starlette_client import OAuth oauth = OAuth() oauth.register( 'github', client_id=GITHUB_CLIENT_ID, client_secret=GITHUB_CLIENT_SECRET, access_token_url='https://github.com/login/oauth/access_token', authorize_url='https://github.com/login/oauth/authorize', api_base_url='https://api.github.com/', client_kwargs={'scope': 'user:email'}, ) @app.get('/auth/github') async def login(request): redirect = request.url_for('callback') return await oauth.github.authorize_redirect(request, redirect) @app.get('/auth/github/callback') async def callback(request): token = await oauth.github.authorize_access_token(request) user = await oauth.github.get('user', token=token) user_data = user.json() # create / find local user + issue session ... ``` backend 持 client_secret(GitHub OAuth app 经典 confidential client), 不需要 PKCE。 如果纯 SPA 直接调 GitHub → PKCE。但 GitHub OAuth app 不支持 PKCE, 要用 GitHub App + device flow / 后端中转。 ## 踩过的坑 1. **redirect_uri 没 register**:authorize 返回 invalid redirect。 到 IdP console 加 callback URL(每环境一个:dev/staging/prod)。 2. **state 验缺失**:CSRF 风险。库一般帮做,但自己写时容易漏。 3. **token 存 localStorage**:XSS 后即失。BFF 或者 httpOnly cookie。 4. **scope 过宽**:要 email 却 request 整个 drive 访问 → 用户怕 + 不授权。最小 scope。 5. **PKCE verifier 没存好**:刷新页 / 跨 tab → callback 找不到 verifier。sessionStorage 同 tab 内 OK;BFF 模式用 server session。
图像是网页最大的字节来源。下面是一份能让 Lighthouse 分数从 60 涨到 95 的实战清单。 ## 1. 用对格式 - **AVIF**:2024 主流浏览器都支持,文件最小,复杂图比 WebP 还小 20-30% - **WebP**:兼容性更广(IE11 之外都行),编码快 - **JPEG**:照片,进度加载支持好 - **PNG**:透明背景、像素图 / icon - **SVG**:矢量图(logo、icon) 服务端按 Accept 头返回最佳格式: ```html <picture> <source srcset="hero.avif" type="image/avif"> <source srcset="hero.webp" type="image/webp"> <img src="hero.jpg" alt="Hero" width="1200" height="600" loading="lazy"> </picture> ``` 浏览器从上到下选第一个支持的。`<img>` 是兜底。 ## 2. 永远写 width / height ```html <img src="thumb.jpg" alt="" width="320" height="180"> ``` 让浏览器在加载图片前知道占位空间,避免 CLS(Cumulative Layout Shift)。 没写的图片加载完会"撑开"页面跳动。 CSS 控制实际尺寸: ```css img { max-width: 100%; height: auto; } ``` 但 HTML 的 width/height 用来告诉浏览器宽高比(aspect ratio),必加。 ## 3. responsive:不同视口不同尺寸 ```html <img src="hero-800.jpg" srcset="hero-400.jpg 400w, hero-800.jpg 800w, hero-1600.jpg 1600w" sizes="(max-width: 600px) 100vw, (max-width: 1200px) 50vw, 33vw" alt="Hero" width="1600" height="800" > ``` `srcset` 提供多分辨率,`sizes` 告诉浏览器图片实际显示大小, 浏览器结合 device pixel ratio 选最优 src。 ## 4. lazy loading(原生属性) ```html <img src="below-fold.jpg" loading="lazy" alt="" width="..." height="..."> <iframe src="..." loading="lazy"></iframe> ``` `loading="lazy"` 让浏览器在元素进入视口前不加载。 首屏 / above-the-fold 的图片不要加(用 `loading="eager"` 默认)。 ## 5. decoding="async" ```html <img src="x.jpg" loading="lazy" decoding="async"> ``` 让浏览器异步解码图片,不阻塞主线程。 ## 6. priority hints 首屏关键图加 `fetchpriority="high"`: ```html <img src="hero.jpg" fetchpriority="high" loading="eager"> ``` 预加载 + 高优先级,关键内容更快出现。LCP(Largest Contentful Paint) 指标提升明显。 ## 7. preload 关键资源 ```html <link rel="preload" as="image" href="hero.jpg" imagesrcset="hero-800.jpg 800w, hero-1600.jpg 1600w" imagesizes="100vw"> ``` 页面 head 里 preload 首屏大图,浏览器立刻并行下载。 ## 8. CDN 上做实时转换 ``` https://your-cdn.example.com/image?src=foo.jpg&w=800&fmt=avif ``` CDN 收到请求后按参数生成对应格式 / 尺寸,缓存返回。 服务:Cloudinary、imgix、Cloudflare Images、Bunny.net、自己用 imgproxy 搭建。 ## 9. 工具:批量优化 ```bash # AVIF npm install -g sharp-cli sharp -i 'images/*.jpg' -o 'dist/' avif --quality 60 # WebP cwebp -q 80 input.png -o output.webp # JPEG 进度 + mozjpeg 优化 mozjpeg -quality 80 -progressive -outfile out.jpg in.jpg # PNG 优化 pngquant --quality=70-90 in.png # 有损但视觉无差 oxipng --opt 4 in.png # 无损 # SVG svgo -i in.svg -o out.svg ``` CI 阶段把 source 全转一遍。 ## 10. Next / Vite 框架内置 ### Next.js ```tsx import Image from 'next/image' <Image src="/hero.jpg" width={1200} height={600} alt="Hero" priority // 首屏 LCP placeholder="blur" /> ``` `next/image` 自动生成多分辨率 + AVIF / WebP + 懒加载。 ### Vite `vite-imagetools` plugin: ```ts import hero from './hero.jpg?w=400;800;1600&format=avif;webp;jpg&as=picture' <img {...hero} loading="lazy" /> ``` Build 时自动生成所有尺寸 / 格式 + 注入 srcset。 ## 11. 模糊占位符(LQIP) 预生成 base64 编码的极小图当占位: ```html <img src="data:image/jpeg;base64,/9j/..." /> <!-- 同时异步加载真图,加载完替换 --> ``` 或者用 BlurHash 算法(更紧凑,几十字节)。Next.js `placeholder="blur"` 自动做这个。 ## 12. SVG sprite(多个 icon) ```html <svg style="display:none"> <symbol id="icon-cart" viewBox="0 0 24 24"> <path d="..." /> </symbol> </svg> <!-- 使用 --> <svg><use href="#icon-cart" /></svg> ``` 一次下载所有 icon,按需引用,比 icon font 性能 / 灵活性都好。 ## 13. 验证 Lighthouse / WebPageTest 跑一遍: - LCP < 2.5s ✅ - CLS < 0.1 ✅ - 没有 "Properly size images" 警告 - 没有 "Serve images in next-gen formats" 警告 ## 14. 视频替代 GIF GIF 巨大 + 不可压缩。替代: ```html <video autoplay loop muted playsinline> <source src="anim.webm" type="video/webm"> <source src="anim.mp4" type="video/mp4"> </video> ``` 同一段动画 WebM 通常比 GIF 小 80-90%。 ## 踩过的坑 - 用 `<img src=` 但没写 `width/height` → CLS 0.3+ 直接红色。 - AVIF 编码慢(CPU 几秒一张):build 时缓存输出,不要每次重编码。 - responsive `srcset` 算错 `sizes` → 浏览器选了过大或过小的图。 开 DevTools 看 Network 实际请求的 URL 校验。 - "压缩太狠" → JPEG quality 60 以下肉眼可见 artifact。建议 70-85 甜点。
任何对外 API 都需要限流,否则一波突发流量 / 恶意刷接口能把服务打挂。 四种主流算法各有适用场景,下面手写各一份 + 选择建议。 ## 算法对比 | 算法 | 适合 | 突发支持 | 实现复杂度 | |---|---|---|---| | Fixed Window | 简单计数器 | 边界双倍突发 | 极简 | | Sliding Window | 平滑限流 | 可控 | 中 | | Leaky Bucket | 强制匀速输出 | 不支持 | 中 | | Token Bucket | 突发友好 | 支持 | 中 | ## 1. Fixed Window(最简,但有边界问题) ```python import redis r = redis.Redis() def fixed_window_allow(key: str, limit: int, window_sec: int) -> bool: bucket = f'{key}:{int(time.time()) // window_sec}' n = r.incr(bucket) if n == 1: r.expire(bucket, window_sec) return n <= limit # 用法:limit=100/分钟 allowed = fixed_window_allow(f'rl:user:{uid}', 100, 60) ``` 问题:59:59 一波 100 次 + 00:01 一波 100 次 = 2 秒内 200 次,违反"每分钟 100" 意图。 ## 2. Sliding Window(用 sorted set 精确,开销稍高) ```python def sliding_window_allow(key: str, limit: int, window_sec: int) -> bool: now = time.time() pipe = r.pipeline() pipe.zremrangebyscore(key, 0, now - window_sec) # 删窗口外 pipe.zcard(key) # 当前窗口内的请求数 pipe.zadd(key, {str(now): now}) # 加这次 pipe.expire(key, window_sec) _, count, _, _ = pipe.execute() return count < limit ``` 精确:当前到过去 window_sec 秒内最多 limit 次。 代价:每个 user 一个 sorted set;高并发下 Redis 内存增加。 ## 3. Leaky Bucket(强制匀速出) ```python def leaky_bucket_allow(key: str, capacity: int, leak_per_sec: float) -> bool: """ 模拟一个桶:用户每次请求往桶里加一滴水;桶在背景以 leak_per_sec 漏水。 桶满 → 拒绝。 """ now = time.time() state = r.hgetall(key) or {} last = float(state.get(b'last', now)) water = float(state.get(b'water', 0)) # 时间差里漏掉的水 water = max(0, water - (now - last) * leak_per_sec) if water >= capacity: return False water += 1 r.hset(key, mapping={'water': water, 'last': now}) r.expire(key, int(capacity / leak_per_sec) + 1) return True # 用法:bucket cap=10, 漏速 2/秒 → 最大允许短期 10 突发,长期 2/秒 allowed = leaky_bucket_allow(f'rl:lb:{uid}', 10, 2) ``` 漏速恒定 → 输出节奏稳定,但不允许突发"全速消费"。 ## 4. Token Bucket(突发友好,最常用) ```python def token_bucket_allow(key: str, capacity: int, refill_per_sec: float) -> bool: """ 桶里装 token:请求消耗一个 token;桶在背景以 refill_per_sec 添加 token。 没 token → 拒绝。 """ now = time.time() state = r.hgetall(key) or {} last = float(state.get(b'last', now)) tokens = float(state.get(b'tokens', capacity)) tokens = min(capacity, tokens + (now - last) * refill_per_sec) if tokens < 1: return False tokens -= 1 r.hset(key, mapping={'tokens': tokens, 'last': now}) r.expire(key, int(capacity / refill_per_sec) + 1) return True # 用法:cap=100, 速率 10/秒 → 短期可一次性 100 个,平均 10/秒 allowed = token_bucket_allow(f'rl:tb:{uid}', 100, 10) ``` 桶满了允许突发用 capacity 个,之后稳态 refill_per_sec。 **实际生产 99% 用这个**:API 客户端通常零散调用,偶尔一波,token bucket 最贴合。 ## 5. 原子性问题 上面 Python 版本有 race condition(get → 算 → set 之间被别的请求抢插)。 生产用 Lua 脚本一次性原子完成: ```lua -- token_bucket.lua local key = KEYS[1] local capacity = tonumber(ARGV[1]) local refill_rate = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local state = redis.call('HMGET', key, 'tokens', 'last') local tokens = tonumber(state[1]) or capacity local last = tonumber(state[2]) or now tokens = math.min(capacity, tokens + (now - last) * refill_rate) local allowed = 0 if tokens >= 1 then tokens = tokens - 1 allowed = 1 end redis.call('HMSET', key, 'tokens', tokens, 'last', now) redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1) return allowed ``` Python 调用: ```python TOKEN_BUCKET_SCRIPT = r.register_script(open('token_bucket.lua').read()) def allow(key, capacity, refill_rate): return TOKEN_BUCKET_SCRIPT( keys=[key], args=[capacity, refill_rate, time.time()], ) ``` ## 6. nginx 限流 不要总在应用层做。nginx 内置 limit_req(leaky bucket)很强: ```nginx # 每客户端 IP 10 req/s,最大突发 20 limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s; server { location /api/ { limit_req zone=api burst=20 nodelay; proxy_pass http://backend; } } ``` `burst=20 nodelay` = 突发 20 个立即过,超过的直接 503。 不带 `nodelay` 是排队处理(leaky bucket 标准行为)。 API gateway(Kong / Tyk / Envoy)也都内置。 ## 7. 分布式限流的精度 单 Redis 实例 + Lua 是强一致;多 Redis cluster 时同一 user 的请求被 hash 到不同分片,限流可能略松。一般可接受。 需要严格全局限流可用 sentinel pattern + 唯一 master。 ## 8. 限流的响应 ```http HTTP/1.1 429 Too Many Requests Retry-After: 60 X-RateLimit-Limit: 100 X-RateLimit-Remaining: 0 X-RateLimit-Reset: 1735689600 ``` `Retry-After`(秒或日期)告诉客户端何时再试。 ## 9. 按维度限流 不同维度组合: - 按 IP:防匿名滥用 - 按 API key:按付费等级 - 按 endpoint:贵 endpoint 单独严格 - 按 user-id:登录用户 ```python def check_rate_limits(req): if not allow(f'ip:{req.ip}', 1000, 100): return 'IP too fast' if not allow(f'user:{req.user_id}', 100, 10): return 'user quota exceeded' if req.path == '/expensive': if not allow(f'user:{req.user_id}:exp', 5, 0.1): return 'expensive endpoint quota exceeded' return None ``` ## 10. 反例 + 注意 - 用全局锁实现限流:性能差,单点故障 - 用 DB 计数:DB 不是限流工具,QPS 上来扛不住 - 限流粒度太粗(all users 共享):恶意用户拖累所有人 ## 11. 第三方库 - **slowapi**(FastAPI / Starlette):装饰器限流 - **django-ratelimit**:Django 装饰器 - **redis-cell**:Redis module,提供原子 CL.THROTTLE 命令 ```python # slowapi 例 from slowapi import Limiter limiter = Limiter(key_func=lambda req: req.client.host) @app.get('/api/x') @limiter.limit('100/minute') def x(request: Request): ... ``` ## 踩过的坑 - 限流 key 包含敏感信息(user email),泄露 Redis = 数据泄漏。 hash 一下当 key。 - TTL 算错让 key 永远不过期,Redis 占用涨。 - 限流应用在 readiness probe 上 → 监控系统刷它把自己 ban 了。 排除 health endpoint。 - 测试期忘了关限流 → CI 测试因 429 间歇性失败。环境变量分场景。
经常要 `cd ~/projects/my-app/src/components` 这种长路径——zoxide 让你只打部分名字就能跳过去。它会自动学习你访问过的目录及频率, 下次靠模糊匹配 + 频率排序找最可能的目标。 ## 安装 ```bash # Debian 12+ / Ubuntu 22.04+ sudo apt install -y zoxide # 或 cargo install zoxide zoxide --version ``` ## 集成到 shell ```bash # bash: ~/.bashrc eval "$(zoxide init bash)" # zsh: ~/.zshrc eval "$(zoxide init zsh)" # fish: ~/.config/fish/config.fish zoxide init fish | source ``` `zoxide init` 注入一个 `z` 函数 + Tab completion + 命令钩子。 重新加载 shell 后开始用。 ## 用法 ```bash cd ~/projects/my-app/src/components # 第一次像往常一样 cd # zoxide 学到这个目录 cd ~/projects/other-app # 再去别处 # ... # 之后只打部分名字: z components # → ~/projects/my-app/src/components z my-app # → ~/projects/my-app z proj # → ~/projects/my-app(最近访问 + 频率高的) ``` 匹配规则: - 子串匹配(不是 fuzzy) - 按 "frecency" 排序:frequency × recency 的组合分 - 最近访问 + 经常去的最优先 ## 交互式选择(zi) 不确定哪个: ```bash zi compon # 弹出 fzf 让你选 ``` 需要装 `fzf`。`zi` 是 zoxide 的交互模式。 ## 多 token ```bash z my src # 必须同时含 "my" 和 "src" z app /home # 用 / 把不同部分隔开 ``` ## 列 / 清理 ```bash z -l # 列所有记录的目录(按 frecency) z -l projects # 列匹配 "projects" 的 zoxide remove ~/old-dir # 删某条 ``` ## 和原生 cd 共存 `z` 是新加的命令,`cd` 还是 `cd`。一些人喜欢 alias: ```bash alias cd='z' # 重激进;遇到 z 没记录的目录还是会回退到 cd ``` 我个人保留 `cd` 给一级目录(`cd ~`、`cd /etc`、`cd ..`), `z` 用于深层项目目录。 ## 与 fzf / starship 配合 ```bash # 用 fzf 浏览全部记录 zoxide query --interactive # 等价于 zi ``` starship prompt 不影响 z,但 z 命令很快所以 prompt 渲染不会卡。 ## 实际工作流改变 之前: ```bash cd ~/projects/myapp cd ../another-app/backend/src/main cd ../../../config ``` 之后: ```bash z myapp z main z config ``` 每次 1-2 个字符,速度感受非常明显。 ## 数据存哪 `~/.local/share/zoxide/db.zo`,纯二进制文件。换机器导出 + 导入: ```bash zoxide query --list --score > z.bak # 导出 # 新机器 cat z.bak | while read line; do score=$(echo "$line" | awk '{print $1}') path=$(echo "$line" | cut -d' ' -f2-) zoxide add "$path" done ``` 或者干脆删掉重新养——通常一周就练熟。 ## 与 docker / container 的限制 在容器内 zoxide 数据库是独立的,不会自动同步主机的。挂载主机的 `~/.local/share/zoxide` 也行,但容器路径和主机路径不一样时跳进 不存在的目录。容器开发一般不用 z。 ## 高级 hooks zoxide 在每次 cd / pwd 改变时记录目录。修改 `_z_dir_hook`: ```bash # 只记录 git 仓库根目录 _zoxide_should_add() { git -C "$PWD" rev-parse --show-toplevel >/dev/null 2>&1 } ``` 通常默认就够,不必魔改。 ## 踩过的坑 - 装了但 z 命令不工作:忘了 `eval "$(zoxide init bash)"` 或者放在了 rc 文件错的位置。`type z` 确认是个函数而不是 alias。 - `z foo` 跳到完全无关的目录:第一次访问陌生路径优先匹配最高频的记录; 解决办法 `cd /full/path/to/foo` 让 zoxide 学一下。 - 老 NFS 目录 unmount 后 zoxide 仍记着,每次 z 这个名字报错 "no directory matched"。`zoxide remove '/old/nfs/*'` 清掉。 - 同名目录在多个项目下:z 只给一个结果(frecency 最高)。要别的版本 用 `zi name` 交互选。
## 起因 RAG 场景需要向量检索: - embed 文档 chunk → 存向量 - query 时 embed query → 找最近的 K 个 chunk - chunk 喂 LLM 做答 向量 DB 选择多:pgvector / Qdrant / Weaviate / Milvus / Pinecone / LanceDB / Chroma。痛苦。 下面对比 + 我的选型建议。 ## 候选 ### pgvector(Postgres 扩展) ```sql CREATE EXTENSION vector; CREATE TABLE chunks ( id BIGSERIAL PRIMARY KEY, content TEXT, embedding vector(1536) ); CREATE INDEX ON chunks USING hnsw (embedding vector_cosine_ops); -- 查 SELECT content FROM chunks ORDER BY embedding <=> '[0.1, 0.2, ...]'::vector LIMIT 10; ``` - 已有 PG → 0 引入新组件 - 同事务支持 metadata + 向量 - 性能:HNSW index 几百万 vector OK,千万级别仍可(精度 vs 速度调) ### Qdrant(Rust) ```python from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct client = QdrantClient('localhost', port=6333) client.recreate_collection('docs', vectors_config=VectorParams(size=1536, distance=Distance.COSINE)) client.upsert('docs', points=[ PointStruct(id=1, vector=[0.1]*1536, payload={'text': '...'}), ]) results = client.search('docs', query_vector=[0.1]*1536, limit=10, query_filter={'must': [{'key': 'lang', 'match': {'value': 'zh'}}]}) ``` - Rust 写,单机性能强 - payload 支持 filter(带 metadata 过滤) - 部署简单(docker run 一行) ### LanceDB ```python import lancedb db = lancedb.connect('./lance_data') tbl = db.create_table('docs', data=[ {'vector': [0.1]*1536, 'text': '...'}, ]) results = tbl.search([0.1]*1536).limit(10).to_pandas() ``` - 嵌入式(无 server,类似 SQLite) - 单 binary,无依赖 - 数据存 Lance 列式格式 - 适合 < 1 亿 vector / 不要分布式 ### Milvus - 老牌(2019+) - 真正分布式 + 集群(cloud-native) - 适合数十亿 vector / 高 QPS - 部署复杂 ### Chroma / Weaviate - Chroma:embedded / server,开发体验最好 - Weaviate:Go 写,schema + GraphQL - 我用得少 ## 性能对比 10M vectors / 1536d / 测试: | DB | 索引时间 | P50 query | RAM | |---|---|---|---| | pgvector (HNSW) | 30 min | 12 ms | 25 GB | | Qdrant | 15 min | 5 ms | 18 GB | | LanceDB | 10 min | 8 ms | 8 GB (disk-based) | | Milvus | 12 min | 4 ms | 20 GB | Qdrant / Milvus 在 raw 性能最强。pgvector 略慢但 ergonomics 最好。 ## 选型建议 | 场景 | 推荐 | |---|---| | 已有 PG 应用 + < 1000w vector | pgvector | | 独立向量服务 + 中等规模(千万级) | Qdrant | | 嵌入应用 / 单机 / 不想跑 server | LanceDB | | 数十亿规模 / 集群 | Milvus | | 全托管不想运维 | Pinecone(贵)/ Qdrant Cloud / Pinecone serverless | 我个人项目 100% pgvector: - DB 已经在 - transaction 跟其它 data 一致 - 不想多维护一个组件 - 性能够(< 100w vector) ## pgvector 详细 + 优化 ```sql -- HNSW index(更适合高维) CREATE INDEX ON chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64); -- 查询时 ef_search 控制精度/速度 SET hnsw.ef_search = 100; -- 半精度向量(节省一半空间,损失小) ALTER TABLE chunks ALTER COLUMN embedding TYPE halfvec(1536); CREATE INDEX ON chunks USING hnsw (embedding halfvec_cosine_ops); ``` `halfvec`(pgvector 0.7+)省内存 50%,精度损失 1-2%。 ### filter + 向量混合查 ```sql -- 找符合 metadata + 最近向量 SELECT content FROM chunks WHERE lang = 'zh' AND created_at > '2025-01-01' ORDER BY embedding <=> $query LIMIT 10; ``` PG 查询计划器自动选 index。带 filter 时 HNSW + B-tree 配合(pg17+ 更智能 prefilter)。 ## hybrid search 向量召回往往不如 keyword + 向量混合。 ```python def hybrid_search(query, k=10): # 1. BM25 (PG FTS) bm25_results = pg.execute(""" SELECT id, ts_rank(...) AS score FROM chunks WHERE chunk_tsv @@ to_tsquery(%s) ORDER BY score DESC LIMIT 50 """, query) # 2. 向量 vec_results = pg.execute(""" SELECT id, 1 - (embedding <=> %s::vector) AS score FROM chunks ORDER BY embedding <=> %s::vector LIMIT 50 """, [vec, vec]) # 3. RRF (Reciprocal Rank Fusion) return rrf_merge(bm25_results, vec_results, k=k) ``` 实际 RAG 效果显著提升。pgvector / Qdrant 都有内置 hybrid(不同程度)。 ## embedding model 选 - **OpenAI text-embedding-3-small / large**:好但要 API - **bge-m3** / **bge-large-zh**(BGE 系列):开源中英双语强 - **nomic-embed-text-v1.5**:开源,128 维 - 768 维可调 - **e5-mistral-7b**:高质量但贵 - **multilingual-e5-large**:100+ 语言 中文 RAG 我用 bge-m3,足够好 + 开源 + 本地 GPU 跑(~150 MB model)。 ## 真实 case:知识库 RAG 我们一个内部知识库: - 5 万文档 / 切 chunk 后 30 万段 - 用 bge-m3 embedding(1024 维) - 存 pgvector - query:BM25 + vector hybrid 部署: - Postgres 16 + pgvector 0.7 - 单 16 GB RAM 服务器 - query P50: 50 ms(含 embed 模型推理) 成本:$50/月 server,对比 Pinecone $200+。 功能足够,没必要专门 vector DB。 ## chunk 策略 向量质量第一影响 chunk: - 太大(> 1000 token):embedding 模糊,定位差 - 太小(< 100 token):上下文缺失 - 重叠(10-20%):边界 case 通用 500 token + 100 overlap。技术文档可能 800 + 150。 ## 踩过的坑 1. **pgvector index 建得慢**:百万 vector 建 HNSW 30 分钟。 `SET maintenance_work_mem = '2GB'` 加快。 2. **embedding dimension 改了**:换模型 → 维度变 → 老数据 schema 不 兼容。必须重新索引全部。 3. **filter selectivity 低 + 向量查**:query plan 选 seq scan vector field(不走 HNSW)。`SET enable_seqscan = off` 测试。 4. **cosine vs dot**:normalized vector 用 cosine OK;non-normalized 用 dot product。混 → 结果错。 5. **vector 类型 cast**:`'[0.1]'::vector` JSON-like 字符串。从 list `[0.1, 0.2]` 转字符串小心 numpy float 序列化精度丢失。
分布式锁是绝大多数后端早晚都要的"防止并发执行某操作"机制。 Redis 是最常用的实现,但实现细节不对会导致**重复执行 / 死锁 / 误释放**。 下面讲两种正确写法。 ## 单实例 SET NX PX(最常用) 适合非关键路径:"同一任务同一时刻只跑一份",可以接受极小概率冲突。 ```python import secrets import redis r = redis.Redis(decode_responses=True) def acquire_lock(key: str, ttl_ms: int) -> str | None: """Return token if locked, None otherwise.""" token = secrets.token_hex(16) ok = r.set(key, token, nx=True, px=ttl_ms) return token if ok else None def release_lock(key: str, token: str) -> bool: """只在 token 匹配时才删 —— 防止误释放别人的锁。""" lua = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """ return bool(r.eval(lua, 1, key, token)) ``` 使用: ```python token = acquire_lock('job:nightly-report', ttl_ms=60000) if not token: print('another worker is running this; skip') return try: do_the_work() finally: release_lock('job:nightly-report', token) ``` ## 三个关键点 1. **TTL 必须**:worker 崩了 / 网断了,锁自动过期,否则死锁 2. **token 必须**:删锁时验证是不是自己的;否则你的锁超时了, 别人拿了同 key 的锁,你回头 del 把别人的锁删了 3. **释放用 Lua**:GET + DEL 不是原子的,中间可能锁到期被别人拿走 不做 token 验证的"`del`" 释放是大坑。 ## 上下文管理器封装 ```python import contextlib @contextlib.contextmanager def redis_lock(key, ttl_ms=30000): token = acquire_lock(key, ttl_ms) if not token: raise RuntimeError(f'lock {key} already held') try: yield finally: release_lock(key, token) # 用法 with redis_lock('job:sync', ttl_ms=120000): do_sync() ``` ## 长任务的 TTL 续期(看门狗) 如果 do_the_work 可能跑超过 ttl,需要在后台周期性续期: ```python import threading, time class WatchdogLock: def __init__(self, key, ttl_ms=30000): self.key, self.ttl_ms, self.token = key, ttl_ms, None self._stop = threading.Event() self._thread = None def __enter__(self): self.token = acquire_lock(self.key, self.ttl_ms) if not self.token: raise RuntimeError('lock busy') self._thread = threading.Thread(target=self._renew, daemon=True) self._thread.start() return self def __exit__(self, *args): self._stop.set() self._thread.join() release_lock(self.key, self.token) def _renew(self): # 每 ttl/3 续一次 while not self._stop.wait(self.ttl_ms / 3 / 1000): lua = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end """ r.eval(lua, 1, self.key, self.token, self.ttl_ms) ``` ## Redlock —— 多 Redis 实例时 如果 Redis 是单机,主从切换的瞬间锁可能丢(旧 master 上有锁, 新 master 上没有)。**Redlock 算法** 在多个独立 Redis 实例上同时 申请锁,多数派同意才算成功。 但 Redlock 有争议(Martin Kleppmann vs antirez 之争):分布式系统专家 认为它在某些时钟漂移场景下不安全。**结论:除非你真的不能容忍上面这种 极端故障,否则单实例 SET NX PX 已经够好**。 如果一定要 Redlock,用 [redis-py-lock](https://github.com/redis/redis-py) 内置的 `r.lock()` 实现(基于 Redlock): ```python import redis r = redis.Redis() with r.lock('myresource', timeout=60, blocking_timeout=5) as locked: do_work() ``` 或者多实例版: ```python from redis import Redis from redis.lock import Lock rs = [Redis(host=h) for h in ('r1', 'r2', 'r3')] # 你需要自己写 Redlock 算法:在 majority 上获取 ``` Python 生态用 [`redlock-py`](https://github.com/SPSCommerce/redlock-py) 更省事。 ## 什么时候**不要**用 Redis 锁 - **严格不可重复执行**:付款扣库存、唯一 ID 申领。 数据库的事务 / unique constraint 是真正的源(Redis 锁可能因为时钟 / 网络分区误放过两次)。 - **跨数据中心**:Redis 跨 DC 复制延迟通常 > 锁的 TTL;用 etcd / Zookeeper 的 lease 机制。 - **强一致需求**:见上,用支持 raft / paxos 的工具。 ## 整数 ID / 任务队列防重复 如果你要"任务 X 在 30 分钟内只处理一次",比锁更简单的是 dedup key: ```python key = f'dedup:taskX:{job_id}' if not r.set(key, '1', nx=True, ex=1800): return # 已经处理过 / 正在处理 do_work() ``` 不需要释放——靠 TTL 自然过期。简单粗暴。 ## 踩过的坑 - 没 token 直接 `del`:worker A 拿锁后任务跑超时锁过期 → worker B 拿到 同 key 锁 → A 任务结束直接 del → A 把 B 的锁删了 → C 又拿到锁 → 重复执行。 - TTL 太短:网络抖动 / GC 暂停 / 数据库慢一下,TTL 到了任务还没完, 下一个 worker 进来。要么调长 TTL 要么用看门狗。 - 用 Redis 锁保护 DB 操作:注意 Redis 锁本身只是建议性,DB 一致性还是 靠 DB 事务 / UNIQUE。锁主要是"避免重复劳动"而不是"保证正确性"。 - redis-py 默认 connection pool 在多线程不安全。每个线程拿自己的 Redis 实例,或者用 redis-py 4+ 的 connection pool(已优化)。
## 起因 要把一个 30M 参数的 ResNet 部署到 ARM 手机上。FP32 模型 120 MB + 推理慢得卡顿。INT8 量化把模型缩到 30 MB + 推理快 3 倍,精度只掉 0.5%。 深度学习模型 INT8 量化已成熟,几行代码搞定。 ## 三种量化策略 ### A. Dynamic quantization:仅 weight 量化,最简单 ```python import torch from torchvision.models import resnet50 model = resnet50(pretrained=True).eval() # 一行:把所有 Linear 层 weight 量化到 INT8 quantized = torch.quantization.quantize_dynamic( model, {torch.nn.Linear, torch.nn.LSTM, torch.nn.RNN}, dtype=torch.qint8, ) torch.save(quantized.state_dict(), 'resnet50_int8.pt') ``` **适合**:BERT / transformer / RNN 类(Linear 主导)。 **不适合**:CNN(Conv2d 占比大,dynamic 不量化它)。 ### B. Static quantization:weight + activation 全量化 需要 calibration(用代表性数据跑一遍找 activation 范围): ```python import torch import torch.ao.quantization as Q model = MyModel().eval() # 1. 准备:插入 observer 收 activation 统计 model.qconfig = Q.get_default_qconfig('fbgemm') # x86; 'qnnpack' for ARM model_prepared = Q.prepare(model, inplace=False) # 2. Calibration: 跑 ~100-1000 张代表性图片 with torch.no_grad(): for img in calibration_loader: model_prepared(img) # 3. Convert: observer → 实际 quant op model_int8 = Q.convert(model_prepared, inplace=False) torch.save(model_int8.state_dict(), 'resnet50_static_int8.pt') ``` 效果通常 比 dynamic 更激进,**全模型 INT8**。 代价:要 calibration data + 模型架构必须支持(含 Conv-BN fuse 等)。 ### C. Quantization-aware training (QAT):训练时模拟量化 精度损失最小(< 0.5%)但要重训: ```python model.qconfig = Q.get_default_qat_qconfig('fbgemm') model_prepared = Q.prepare_qat(model, inplace=False) # 训练(模型在前向时模拟 INT8 round 噪声) for epoch in range(5): for x, y in train_loader: loss = criterion(model_prepared(x), y) loss.backward() optimizer.step() model_int8 = Q.convert(model_prepared.eval()) ``` QAT 适合:精度对生产关键的模型,可承受 5-20 epoch 重训。 ## 实测对比(ResNet50 + ImageNet val) | | 大小 | CPU 推理(ms/img) | top-1 acc | |---|---|---|---| | FP32 | 98 MB | 65 | 76.13% | | Dynamic INT8 | ~95 MB | 65 | 76.13% (Linear 没主导) | | Static INT8 | 25 MB | 28 | 75.84% | | QAT INT8 | 25 MB | 28 | 76.02% | CNN 类 static / QAT 显著有效。BERT 类 dynamic 也能 4x 小 + 2-3x 快。 ## ONNX Runtime + INT8(生产推荐) PyTorch 量化导出 ONNX 后用 ONNX Runtime 跑,性能 / 跨平台都更好: ```python import torch from torch.ao.quantization import quantize_dynamic q_model = quantize_dynamic(model, {torch.nn.Linear}, dtype=torch.qint8) dummy = torch.randn(1, 3, 224, 224) torch.onnx.export(q_model, dummy, 'model_int8.onnx', opset_version=13) ``` 或者直接 ONNX Runtime 的量化工具(更稳): ```python from onnxruntime.quantization import quantize_dynamic, QuantType quantize_dynamic( model_input='model_fp32.onnx', model_output='model_int8.onnx', weight_type=QuantType.QInt8, ) ``` ONNX Runtime 在 ARM / x86 / Apple Silicon 都有 INT8 优化 kernel。 ## bitsandbytes:LLM 用 4-bit / 8-bit quantization ```bash uv add bitsandbytes accelerate ``` ```python from transformers import AutoModelForCausalLM, BitsAndBytesConfig bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type='nf4', bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) model = AutoModelForCausalLM.from_pretrained( 'meta-llama/Llama-3.1-70B-Instruct', quantization_config=bnb_config, device_map='auto', ) ``` 70B 模型从 140 GB → 35 GB。单 A100 80GB 或 4090 + offload 跑得动。 精度损失:相对 FP16 通常 < 1% benchmark(NF4 比 INT4 更稳)。 ## GPTQ / AWQ:post-training quantization for LLM 针对 LLM 优化的 4-bit 量化算法(比 bitsandbytes NF4 更好): ```python # GPTQ from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig quantize_config = BaseQuantizeConfig(bits=4, group_size=128) model = AutoGPTQForCausalLM.from_pretrained( 'meta-llama/Llama-3.1-8B', quantize_config=quantize_config, ) model.quantize(calibration_dataset) model.save_quantized('llama-3.1-8b-4bit-gptq') ``` ```python # AWQ from awq import AutoAWQForCausalLM model = AutoAWQForCausalLM.from_pretrained('llama-3.1-8b') model.quantize(tokenizer, quant_config={...}) ``` 社区已有大量预量化的 GPTQ / AWQ model 在 HuggingFace(搜 `-GPTQ-4bit` / `-AWQ` 后缀)。直接下载用,省得自己量化。 ## 部署侧:vLLM / llama.cpp 用量化模型 ```bash # vLLM vllm serve TheBloke/Llama-3.1-70B-AWQ --quantization awq # llama.cpp(CPU / Apple Silicon Metal 极快) llama-cli -m llama-3.1-8b.Q4_K_M.gguf -p 'hello' ``` llama.cpp GGUF 格式包含量化(Q4_K_M / Q5_K_M / Q8_0 等), Mac M 系列上 8B 模型 30+ tokens/s。 ## 效果 我们的几个生产模型量化后: | 模型 | 之前 | 量化后 | 精度损失 | |---|---|---|---| | ResNet50 (移动 app) | 98 MB / 65ms | 25 MB / 22ms | -0.3% | | BERT-base (后台) | 440 MB / 80ms | 110 MB / 30ms | -0.5% | | Llama 7B (RAG) | 14 GB / 100 token/s | 4 GB / 230 token/s | < 1% | 移动 / 边缘 / CPU 推理场景量化几乎是必做。 ## 几个陷阱 1. **量化前 fuse 模块**: ```python torch.ao.quantization.fuse_modules( model, [['conv', 'bn', 'relu']], inplace=True) ``` Conv-BN-ReLU 合成一个 op 后量化效果更好。漏 fuse 精度可能掉 2-5%。 2. **observer 范围错**:calibration 数据不代表 production → activation 范围估计错 → 量化 clip 严重。calibration 一定用真实分布数据。 3. **某些 layer 不能量化**:softmax / layernorm 等保留 FP32。 `model.qconfig = ...` 全局设后,对这些 layer 显式 `qconfig=None`。 4. **不同硬件 backend**:`fbgemm` 是 x86 优化,`qnnpack` 是 ARM 优化。 部署目标错了性能差 2-5 倍。 5. **量化后调试难**:bug 是模型本身的还是量化引入的?保留 FP32 reference 模型对比每层 activation 找漂移最大的 layer。 ## 总结 | 场景 | 推荐 | |---|---| | BERT / transformer post-hoc | dynamic INT8 | | CNN 上 ARM / edge | static INT8 + QAT | | LLM 推理 | bitsandbytes NF4 / AWQ / GGUF | | 跨平台部署 | ONNX Runtime + INT8 | | 极致精度要求 | QAT | | 不想自己折腾 | 用社区预量化模型 | 量化是 ML 生产工程的标准动作,不做白扔 70% 推理性能。