知识广场
按学科筛选:计算机科学
«计算机科学» 分类下共 256 篇帖子
开发期想要一个随手起、不污染主机、带管理界面的 PostgreSQL。 Compose 是最低摩擦方案。 ## 目录结构 ``` ~/pg-stack/ ├── docker-compose.yml ├── .env ├── init/ │ └── 01-init.sql └── data/ # 卷挂载点(git 忽略) ``` ## docker-compose.yml ```yaml services: db: image: postgres:16-alpine restart: unless-stopped environment: POSTGRES_USER: ${PG_USER} POSTGRES_PASSWORD: ${PG_PASS} POSTGRES_DB: ${PG_DB} # 让 init/*.sql 在首次启动时跑 PGDATA: /var/lib/postgresql/data/pgdata volumes: - ./data:/var/lib/postgresql/data - ./init:/docker-entrypoint-initdb.d:ro ports: - "127.0.0.1:5432:5432" # 只在 localhost 暴露 healthcheck: test: ["CMD-SHELL", "pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB"] interval: 10s timeout: 3s retries: 5 pgadmin: image: dpage/pgadmin4:latest restart: unless-stopped environment: PGADMIN_DEFAULT_EMAIL: ${PGADMIN_EMAIL} PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_PASS} PGADMIN_LISTEN_PORT: 80 ports: - "127.0.0.1:5050:80" depends_on: db: condition: service_healthy volumes: - pgadmin-data:/var/lib/pgadmin volumes: pgadmin-data: ``` ## .env(不要进 git) ``` PG_USER=appuser PG_PASS=change-me-some-long-random-string PG_DB=appdb PGADMIN_EMAIL=admin@local PGADMIN_PASS=change-me-too ``` ## init/01-init.sql ```sql -- 启用常用扩展 CREATE EXTENSION IF NOT EXISTS pgcrypto; CREATE EXTENSION IF NOT EXISTS pg_stat_statements; -- 只读分析账号 CREATE USER analytics WITH PASSWORD 'analytics-pass'; GRANT CONNECT ON DATABASE appdb TO analytics; GRANT USAGE ON SCHEMA public TO analytics; ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO analytics; ``` ## 启动 ```bash docker compose up -d docker compose ps docker compose logs -f db # 看初始化 ``` ## 校验 ```bash # 通过 psql 客户端连 docker compose exec db psql -U appuser -d appdb -c '\dx' # 或者本地 psql psql -h 127.0.0.1 -U appuser -d appdb ``` pgAdmin 打开 `http://localhost:5050`,登录用 `.env` 里的邮箱密码。 进去后 "Add new server": - Host: `db`(容器名,pgAdmin 在同一 Compose 网络) - Port: 5432 - User/Password: 同 `.env` ## 备份 / 恢复 ```bash # 备份 docker compose exec -T db pg_dump -U appuser -d appdb -Fc > backup-$(date +%F).dump # 恢复(连接已存在的数据库会失败,先 drop) docker compose exec -T db psql -U postgres -c 'DROP DATABASE appdb;' docker compose exec -T db psql -U postgres -c 'CREATE DATABASE appdb OWNER appuser;' docker compose exec -T db pg_restore -U appuser -d appdb < backup-2026-05-20.dump ``` ## 升级 Postgres 大版本 主版本(如 16 → 17)跨越时 **数据文件不兼容**,需要 dump/restore: ```bash docker compose exec -T db pg_dumpall -U postgres > all.sql # 改 image: postgres:17-alpine # 清空 ./data 重启 docker compose down rm -rf ./data docker compose up -d docker compose exec -T db psql -U postgres < all.sql ``` ## 踩过的坑 - 端口写成 `5432:5432` 而不是 `127.0.0.1:5432:5432`:整个公网都能连到 你的数据库,扫描器十分钟内就来撞密码。一定要绑 127.0.0.1。 - `./data` 不要用 NFS 挂载点,PostgreSQL 对 `fsync` 行为要求严格, NFS 上跑会丢数据。 - `init/` 只在 **数据目录为空** 时执行;如果你后来改了 init.sql 想重跑, 得先 `docker compose down -v` 清掉。 - Alpine 镜像比 Debian 小但 glibc 差异偶尔翻车(特别是某些 extension), 生产用 `postgres:16-bookworm` 更稳。
MNIST 手写数字识别是 ML 的 "hello world"。下面用 PyTorch 写一个从 数据加载到训练 / 评估的完整 pipeline。60 行可读代码,能跑到 99%+ 准确率。 ## 完整代码 ```python import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import DataLoader from torchvision import datasets, transforms device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f'using {device}') # 1. 数据 transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)), ]) train_set = datasets.MNIST('./data', train=True, download=True, transform=transform) test_set = datasets.MNIST('./data', train=False, download=True, transform=transform) train_loader = DataLoader(train_set, batch_size=128, shuffle=True, num_workers=4) test_loader = DataLoader(test_set, batch_size=512, shuffle=False, num_workers=4) # 2. 模型 class Net(nn.Module): def __init__(self): super().__init__() self.conv1 = nn.Conv2d(1, 32, 3, padding=1) self.conv2 = nn.Conv2d(32, 64, 3, padding=1) self.fc1 = nn.Linear(64 * 7 * 7, 128) self.fc2 = nn.Linear(128, 10) self.drop = nn.Dropout(0.25) def forward(self, x): x = F.relu(F.max_pool2d(self.conv1(x), 2)) # 28x28 -> 14x14 x = F.relu(F.max_pool2d(self.conv2(x), 2)) # 14x14 -> 7x7 x = x.flatten(1) x = F.relu(self.fc1(x)) x = self.drop(x) return self.fc2(x) model = Net().to(device) opt = torch.optim.Adam(model.parameters(), lr=1e-3) loss_fn = nn.CrossEntropyLoss() # 3. 训练 def train_epoch(epoch): model.train() total, correct, total_loss = 0, 0, 0.0 for x, y in train_loader: x, y = x.to(device), y.to(device) opt.zero_grad() logits = model(x) loss = loss_fn(logits, y) loss.backward() opt.step() total_loss += loss.item() * x.size(0) correct += (logits.argmax(1) == y).sum().item() total += x.size(0) print(f'epoch {epoch} train loss {total_loss/total:.4f} acc {correct/total:.4f}') # 4. 评估 def evaluate(): model.eval() total, correct = 0, 0 with torch.no_grad(): for x, y in test_loader: x, y = x.to(device), y.to(device) correct += (model(x).argmax(1) == y).sum().item() total += x.size(0) print(f'test acc {correct/total:.4f}') for ep in range(5): train_epoch(ep) evaluate() torch.save(model.state_dict(), 'mnist.pt') ``` 5 个 epoch 后测试集准确率应 ≥ 99%。GPU 上每 epoch < 10 秒; CPU 大约 30-60 秒。 ## 解释几个关键点 ### `Normalize((0.1307,), (0.3081,))` MNIST 训练集统计出的均值和标准差。归一化让输入分布更均匀, 训练更稳。 ### `num_workers=4` DataLoader 用 4 个子进程并行 prefetch 数据。GPU 训练时 IO 是瓶颈, 设大点(4-8)能让 GPU 持续吃满。CPU 训练时设 0 反而更快(避免进程切换)。 ### `opt.zero_grad()` PyTorch 默认 gradient 累加。每次 backward 前清零。 忘了清零 → loss 一直涨。 ### `with torch.no_grad():` 评估时不需要梯度,省内存 + 快。等价的还有装饰器 `@torch.no_grad()` 或 `torch.inference_mode()`(更激进,PyTorch 1.9+)。 ### `.to(device)` 每个 batch 显式搬到 GPU;model 一次性 `.to(device)`。 ## 加几个常见优化 ### 1. 学习率调度 ```python sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=5) # 每个 epoch 结束后调用 sched.step() ``` cosine annealing 在前期保持高 lr 探索,后期降低做精调。 ### 2. 混合精度(AMP) ```python scaler = torch.cuda.amp.GradScaler() for x, y in train_loader: x, y = x.to(device), y.to(device) opt.zero_grad() with torch.cuda.amp.autocast(): logits = model(x) loss = loss_fn(logits, y) scaler.scale(loss).backward() scaler.step(opt) scaler.update() ``` 混合精度让大部分计算用 fp16,需要精度的(loss / 权重更新)用 fp32。 RTX 30 系以上 ~2x 加速,显存降一半。MNIST 这种小模型差别不大, 但对 transformer / 大网络明显。 ### 3. 用 `torch.compile()`(PyTorch 2.0+) ```python model = torch.compile(model) ``` 编译期内联 + 算子融合,训练 / 推理 1.3-2x 加速。第一次 batch 慢 (编译),之后变快。 ### 4. 早停 ```python best_acc = 0 patience = 3 no_improve = 0 for ep in range(50): train_epoch(ep) acc = evaluate() if acc > best_acc: best_acc = acc no_improve = 0 torch.save(model.state_dict(), 'best.pt') else: no_improve += 1 if no_improve >= patience: print('early stop') break ``` ## 推理 ```python model = Net().to(device) model.load_state_dict(torch.load('mnist.pt')) model.eval() # 单张图片 from PIL import Image img = Image.open('test.png').convert('L').resize((28, 28)) x = transform(img).unsqueeze(0).to(device) with torch.no_grad(): pred = model(x).argmax(1).item() print(f'predicted: {pred}') ``` ## 可视化训练曲线 ```python import matplotlib.pyplot as plt losses = [] accs = [] # ... 在 train_epoch / evaluate 里 append fig, ax = plt.subplots(1, 2, figsize=(10, 4)) ax[0].plot(losses); ax[0].set_title('train loss') ax[1].plot(accs); ax[1].set_title('test acc') plt.savefig('curves.png') ``` 或者用 `tensorboard` / `wandb`(参见专题文章)。 ## 踩过的坑 - 第一次跑会下载 MNIST 数据(~10MB),需要网络。`download=True` 给 你这个能力。 - forward 里 `x.view(-1, 784)` vs `x.flatten(1)`:view 要求内存连续, flatten 不要求。前者偶尔报 "view size is not compatible..." 错。 - 不调用 `model.train()` / `model.eval()` → Dropout / BatchNorm 行为错。 eval 期间 Dropout 应该关(输出固定);BatchNorm 应该用 running stats。 - `torch.cuda.empty_cache()` 是给 PyTorch 内部 caching allocator 用的, 通常没必要手动调;调了也不会真的还给 OS。担心 OOM 应该减小 batch。
## 起因 存了 200 小时的会议录音 + 课程视频,想搜里面"我哪节课讲过 X 主题"。 传统方案:手工转录(贵 / 慢)、剪映等付费云转录(隐私 + 钱)。 Whisper 是 OpenAI 开源的 ASR 模型,本地跑能转 90+ 种语言, 中文识别质量直接打过国内大多数云服务。 ## 解决方案 最快的方式是 `whisper.cpp`(C++ 重新实现,CPU/GPU 都很快,无需 PyTorch)。 ### 装 ```bash git clone https://github.com/ggerganov/whisper.cpp cd whisper.cpp # Apple Silicon 用 Metal;NVIDIA 用 CUDA;纯 CPU 也可以 make -j # 下模型(large-v3 中文最准;medium 速度+准确度平衡;base 最快但中文一般) bash ./models/download-ggml-model.sh large-v3 # 1.5 GB 左右 ``` ### 转录单个文件 ```bash # 输入要是 16kHz mono WAV ffmpeg -i lecture.mp4 -vn -ar 16000 -ac 1 -c:a pcm_s16le tmp.wav ./build/bin/whisper-cli -m models/ggml-large-v3.bin \ -l zh -f tmp.wav -osrt -otxt # 生成 tmp.wav.srt + tmp.wav.txt ``` 参数解释: - `-l zh`:源语言中文(不指定的话自动检测) - `-osrt`:输出 SubRip 字幕 - `-otxt`:输出纯文本 ### 批量脚本 ```bash #!/usr/bin/env bash for f in *.mp4; do base="${f%.mp4}" [ -f "$base.srt" ] && { echo "skip $f (already done)"; continue; } ffmpeg -y -i "$f" -vn -ar 16000 -ac 1 -c:a pcm_s16le /tmp/_audio.wav ./whisper-cli -m models/ggml-large-v3.bin -l zh -f /tmp/_audio.wav \ -osrt -of "$base" done ``` `[ -f "$base.srt" ] && continue` 做断点续传:跑了一半中断重启不重做。 ### Python 版(openai-whisper) 如果要嵌入到 pipeline: ```bash uv add openai-whisper ``` ```python import whisper model = whisper.load_model('large-v3') # 第一次自动下载 result = model.transcribe('lecture.mp4', language='zh', verbose=True) print(result['text']) # 带时间戳 for seg in result['segments']: print(f"[{seg['start']:.1f}s] {seg['text']}") ``` GPU 大模型一篇 1 小时音频约 5-10 分钟。CPU 慢 5-10 倍。 ## 性能 / 选型 我笔记本 M1 Pro + whisper.cpp + large-v3: | 模型 | 中文 WER | 速度 | 显存 | |---|---|---|---| | tiny | ~25% | 30x realtime | 1 GB | | base | ~18% | 16x | 1 GB | | small | ~13% | 6x | 2 GB | | medium | ~9% | 2x | 5 GB | | **large-v3** | ~6% | 1x | 10 GB | "realtime" = 处理 1 分钟音频要多少秒。`large-v3` 比 medium 准确率明显 但速度慢 2 倍,中文场景值得。 ## 效果 - 200 小时录音转完一晚上跑完 - `.srt` 字幕直接拖进 VLC / 剪映对应视频 - 搜索 `grep -l "知识点关键词" *.srt` 一秒定位"哪一节哪一段说过" - 转录质量手工抽查:科技 / 普通话清晰场景准确率 > 95%,方言 / 嘈杂 环境降到 80% 左右 ## 踩过的坑 1. **GPU 装错版本**:whisper-py 默认装 CPU torch。要手动装 CUDA 版: `uv add torch --index https://download.pytorch.org/whl/cu124`。 2. **音频不是 16kHz mono**:whisper.cpp 严格要求格式;自动失败但 error message 不明显。永远先 `ffmpeg -ar 16000 -ac 1`。 3. **长视频中间 hallucination**:超过 30 秒的安静段落 whisper 会"幻想" 一些重复的"谢谢观看"之类的话。`--no-speech-threshold 0.6` 加严格点 过滤,或者 `--temperature 0` 减少创造性。 4. **断句不准**:whisper 自动给句号但有时一长串没标点。后处理用 `ja-tokenize` 或者再过一遍 LLM 加标点: ```python client.chat.completions.create(messages=[{ 'role': 'user', 'content': f'给下面文本加合理标点,不要改字:\n{raw_text}' }]) ``` 5. **GPU 显存不够**:large-v3 需要 10 GB+。降级到 medium(5 GB)+ `--word-timestamps` 也能拿很好结果。
## 起因 机器上同时维护 4 个项目: - 一个 Node 16 老仓库 - 一个 Node 20 新项目 - 一个 Python 3.9 后端 - 一个 Python 3.12 数据 pipeline 之前装了 nvm + pyenv + rbenv + goenv。每个工具一套 shell hook, shell 启动时间 1.5 秒(全是 PATH 操作)。切项目时 `nvm use` `pyenv shell` 排队按。 `mise`(前身 rtx)一个工具管所有语言版本,shell 启动只加 50ms。 ## 安装 ```bash # 一行装 curl https://mise.run | sh # 或包管理器 brew install mise sudo apt install mise # 装完接到 shell echo 'eval "$(mise activate bash)"' >> ~/.bashrc echo 'eval "$(mise activate zsh)"' >> ~/.zshrc echo 'mise activate fish | source' >> ~/.config/fish/config.fish mise --version ``` ## 装语言 ```bash # 全局装版本 mise use --global node@20 mise use --global [email protected] mise use --global [email protected] mise use --global rust@stable # 在某项目目录里指定(写入 .mise.toml 或 .tool-versions) cd ~/projects/legacy-app mise use node@16 # 这个目录用 Node 16 cd ~/projects/data-pipeline mise use [email protected] # 自动切换:cd 进目录 mise 自动 use 对应版本 ``` ## .mise.toml 项目配置 ```toml # .mise.toml [tools] node = "20" python = "3.12" go = "1.22" rust = "stable" "npm:pnpm" = "latest" # 通过 npm 装 pnpm "pipx:poetry" = "latest" # 通过 pipx 装 poetry "go:github.com/jesseduffield/lazygit" = "latest" [env] DATABASE_URL = "postgresql://localhost/myapp" PYTHONDONTWRITEBYTECODE = "1" [tasks.test] description = "Run tests" run = ["pytest", "npm test"] [tasks.dev] description = "Start dev server" run = "npm run dev" ``` 之后: ```bash mise install # 装 .mise.toml 声明的所有工具 mise run test # 跑 [tasks.test] mise run dev mise tasks # 列所有 tasks ``` ## 列出 / 切换 ```bash mise ls # 看当前激活的工具版本 mise ls --installed # 看本机装了哪些版本 mise outdated # 看哪些工具有新版本 mise current # 当前目录最终生效的版本(含继承) mise where node # 当前 node 二进制路径 ``` ## env 管理 `mise.toml` 的 `[env]` 段在 mise activate 后 cd 进目录自动注入: ```toml [env] AWS_PROFILE = "dev" DATABASE_URL = "postgresql://localhost/myapp" _.path = ["./bin", "./node_modules/.bin"] # 加 PATH _.file = ".env" # 也读 .env 文件 ``` 替代了 direnv 大部分用法。 ## 与 .tool-versions(asdf 兼容) asdf 用户已有 `.tool-versions`: ``` nodejs 20.10.0 python 3.12.1 ruby 3.2.0 ``` mise 直接读,零迁移。新项目也建议用更强大的 `.mise.toml` 格式。 ## 团队协作 把 `.mise.toml` 进 git: ```bash git add .mise.toml git commit -m 'chore: pin tool versions via mise' ``` 新人 clone 后: ```bash cd repo mise install # 装齐所有工具版本 ``` CI 里: ```yaml - uses: jdx/mise-action@v2 with: cache: true - run: mise run test ``` ## 性能对比 | | 启动延迟 | 多语言 | |---|---|---| | nvm | 500-1500ms | ❌ | | pyenv | 200-400ms | ❌ | | asdf | 200-500ms | ✅ | | **mise** | 30-50ms | ✅ | Rust 写的 + 智能 PATH shim 让启动几乎瞬时。 ## 替代了什么 机器上以前装的: - nvm → 删 - pyenv → 删 - rbenv → 删 - goenv → 删 - direnv(大部分用法)→ 删(保留给复杂 shell 逻辑) `.zshrc` 从 80 行清到 30 行,shell 启动 1.5s → 0.2s。 ## mise tasks vs Makefile `mise tasks` 比 Makefile 优势: - 不需要 tab 缩进 - 跨平台一致(make 在 Windows 不友好) - 自动激活该项目的工具版本 - TOML 比 Makefile 易读 ```toml [tasks.lint] run = ["ruff check .", "mypy src/"] [tasks.fmt] run = ["ruff format .", "ruff check --fix ."] [tasks.ci] depends = ["lint", "test"] run = "echo all green" [tasks.test] run = "pytest --cov" ``` ```bash mise run ci # 自动跑 lint + test ``` ## 效果 - 4 个项目切换无感(cd 进去自动切版本) - shell 启动 < 0.2s - 一个工具管所有语言:心智模型干净 - .mise.toml 进 git 让新同事 onboard "git clone + mise install" 完事 - task runner 顺带替代了一半 Makefile / package.json scripts ## 踩过的坑 1. **从 nvm 迁过来 PATH 顺序乱**:nvm 残留 PATH 里。彻底清 nvm: ```bash rm -rf ~/.nvm # ~/.bashrc 里删掉所有 nvm 相关行 ``` 2. **mise 后台编译 Python / Ruby 慢**:源码编译需要 build-essential + libssl-dev 等系统库。Debian/Ubuntu: ```bash sudo apt install -y build-essential libssl-dev libffi-dev \ libsqlite3-dev libbz2-dev libreadline-dev zlib1g-dev \ libncurses-dev liblzma-dev tk-dev ``` 3. **CI 没缓存导致每次都重装**:mise-action 加 `cache: true`。 4. **某些工具用 GitHub release 装**:网络问题在国内有时拉不下来。 设 `MISE_GITHUB_TOKEN` 或者镜像源。 5. **VSCode Python 解释器找不到**:mise 装的 Python 在 `~/.local/share/mise/installs/python/3.12.0/bin/python`。 `.vscode/settings.json` 写 `"python.defaultInterpreterPath": "${workspaceFolder}/.venv/bin/python"` 让它用项目 venv。
写到一半 feature 分支,突然 main 上有紧急 bug 要修。传统流程: 1. `git stash` 2. `git checkout main && git checkout -b hotfix` 3. 修完合回 4. `git checkout feature && git stash pop` 容易出错(stash 冲突、忘了某些 untracked 文件)。 `git worktree` 让你在同一个仓库下挂多个 working directory,每个独立 checkout 一个分支。 ## 1. 加一个 worktree ```bash cd ~/repos/myapp # 你正在 feature-x 分支上 git worktree add ../myapp-hotfix main # Preparing worktree (new branch 'hotfix' from 'main') # HEAD is now at a1b2c3d Latest main commit ``` 现在 `~/repos/myapp-hotfix/` 是一个完整的工作目录,checkout 在 main 上: ```bash cd ~/repos/myapp-hotfix git status # On branch main # nothing to commit, working tree clean ``` 你的原目录 `~/repos/myapp/` 还在 feature-x 上,没动。 修完 hotfix: ```bash cd ~/repos/myapp-hotfix git checkout -b hotfix-cve-1234 # 改代码 git commit -am 'fix: CVE-1234' git push -u origin hotfix-cve-1234 # 开 PR / 合并 ``` 完事删 worktree: ```bash cd ~/repos/myapp git worktree remove ../myapp-hotfix ``` ## 2. 列出所有 worktree ```bash git worktree list # /home/me/repos/myapp a1b2c3d [feature-x] # /home/me/repos/myapp-hotfix e4f5g6h [hotfix-cve-1234] # /home/me/repos/myapp-review i7j8k9l [pr-42-review] ``` ## 3. 常用 pattern ### A. PR review 不打断当前工作 ```bash git worktree add ../myapp-pr42 origin/feature-from-coworker cd ../myapp-pr42 # review、跑测试、不干扰主工作目录 ``` ### B. 每个项目固定一个 main worktree ```bash ~/repos/myapp/ # 永远 main,用于 git pull / 看 README ~/repos/myapp-dev/ # feature 分支工作目录 ~/repos/myapp-pr/ # 任意 review 用 ``` ### C. 用于 git bisect bisect 期间不能用工作目录干别的。开 worktree 让 bisect 单独跑, 你继续工作: ```bash git worktree add ../myapp-bisect HEAD cd ../myapp-bisect git bisect start git bisect good v1.0 git bisect bad HEAD # bisect 在这个 worktree 来回 checkout,不影响主工作目录 ``` ## 4. 用 alias 加速 ```bash git config --global alias.wta 'worktree add' git config --global alias.wtl 'worktree list' git config --global alias.wtr 'worktree remove' ``` 之后: ```bash git wta ../hotfix main git wtl git wtr ../hotfix ``` ## 5. 替代 multiple clone 老方法是 `git clone` 同一个 repo 多份: - 磁盘占用 N 倍 - 不能跨工作目录复用 stash / config / hooks - 拉新 commit 要在每个 clone 上 pull worktree 共享同一个 `.git/`,所有 worktree 共享对象数据库, 拉一次所有 worktree 立即可见。 ## 6. detached HEAD worktree(无分支) 只想 checkout 某个 tag / commit 看看: ```bash git worktree add ../myapp-v1.0 v1.0 # 自动 detached HEAD ``` 完了 `git worktree remove ../myapp-v1.0` 就清掉。 ## 7. 子目录 + IDE VSCode / IntelliJ 各开一个窗口指到不同 worktree,能同时调试多个分支 而不互相干扰。 ## 8. CI worktree CI 想在同一 runner 上并发跑多个分支的测试: ```bash git worktree add /tmp/test-pr-42 origin/pr-42 & git worktree add /tmp/test-pr-43 origin/pr-43 & wait # 然后并行 cd 进去跑测试 ``` 避免反复 clone 的开销。 ## 9. lock / unlock worktree 在外置磁盘 / 网络盘上时,磁盘没挂载 worktree 会被 "automatic prune"。给它加锁防止: ```bash git worktree lock ../myapp-external --reason 'on external SSD' git worktree unlock ../myapp-external ``` ## 10. prune worktree 目录被手动 rm 但 `.git/worktrees/` 里还有元数据: ```bash git worktree prune # 清理孤立元数据 ``` ## 踩过的坑 - 同一个分支不能同时 checkout 到两个 worktree(git 会拒绝)。需要的话 用 `git worktree add --force` 或者 detached HEAD 指到分支 tip。 - worktree 在 NTFS / FAT 上:`.git/worktrees/<name>/gitdir` 的路径用了 绝对路径,跨平台 / 移动后失效。Linux 用就好。 - 在 worktree 里删了文件然后 `git stash`:stash 是 repo 级别的,主 worktree 也能看到这个 stash。共享 stash 偶尔混淆,建议 stash 加 message 区分。 - 切换 IDE 不会自动重读 worktree —— 关掉 IDE 进程重新打开, 否则 IDE 缓存的 git 状态可能错。
## 起因 写一个数据导出库,接收任何"长得像 file-like 对象的"输入: 内置 `open()` 返回值、`io.BytesIO`、Django 的 `UploadedFile`、 S3 client 返回的 streaming body…… ```python def export(stream): stream.write(b'header') for row in data: stream.write(row.serialize()) ``` 类型怎么标?`stream: BinaryIO`?只 cover 标准库;`stream: object`? 失去类型提示。 `typing.Protocol`(PEP 544)解决:定义"结构子类型"(structural subtyping), 描述"具有 X 方法的任何对象",无需对方继承。 ## 解决方案 ```python from typing import Protocol class WritableBytes(Protocol): def write(self, data: bytes) -> int: ... def flush(self) -> None: ... def export(stream: WritableBytes) -> None: stream.write(b'header') for row in data: stream.write(row.serialize()) stream.flush() ``` 任何"有 write(bytes) → int 和 flush() → None" 方法的类都满足。 不需要 import 我的 Protocol、不需要 inherit、不需要 register。 类型检查器(mypy / pyright)静态确认: ```python import io export(io.BytesIO()) # ✅ export(open('out.bin', 'wb')) # ✅ export("not a file") # ❌ mypy 报错 ``` ## runtime check 需要在运行时判断:"这东西满足 Protocol 吗?" ```python from typing import Protocol, runtime_checkable @runtime_checkable class WritableBytes(Protocol): def write(self, data: bytes) -> int: ... if isinstance(obj, WritableBytes): obj.write(b'x') ``` `@runtime_checkable` 让 `isinstance` 工作。但只检查方法**存在**, 不检查方法签名 —— 比静态检查弱。 ## 实战例子:可插拔的 storage backend ```python from typing import Protocol class Storage(Protocol): def get(self, key: str) -> bytes | None: ... def set(self, key: str, value: bytes, ttl: int | None = None) -> None: ... def delete(self, key: str) -> None: ... class InMemoryStorage: def __init__(self): self._d: dict[str, bytes] = {} def get(self, key): return self._d.get(key) def set(self, key, value, ttl=None): self._d[key] = value def delete(self, key): self._d.pop(key, None) class RedisStorage: def __init__(self, url): self._r = redis.from_url(url) def get(self, key): return self._r.get(key) def set(self, key, value, ttl=None): if ttl: self._r.set(key, value, ex=ttl) else: self._r.set(key, value) def delete(self, key): self._r.delete(key) def setup_cache(storage: Storage) -> Cache: return Cache(storage) setup_cache(InMemoryStorage()) setup_cache(RedisStorage('redis://localhost')) ``` `InMemoryStorage` / `RedisStorage` 都没 inherit `Storage`, 但都 conform 该结构 → mypy 通过。 ## 与 ABC 对比 ```python from abc import ABC, abstractmethod class Storage(ABC): @abstractmethod def get(self, key: str) -> bytes | None: ... @abstractmethod def set(self, key: str, value: bytes, ttl: int | None = None) -> None: ... @abstractmethod def delete(self, key: str) -> None: ... class InMemoryStorage(Storage): # 必须显式继承 ... ``` ABC 强制继承。Protocol 不强制 → 第三方库的类不需要修改就能用。 适用场景: - **Protocol**:写库 / interface 定义,鸭子类型友好 - **ABC**:内部类型层次、要 share 实现(Mixin)、强制 inherit ## generic protocol ```python from typing import Protocol, TypeVar T = TypeVar('T', covariant=True) class Iterable(Protocol[T]): def __iter__(self) -> 'Iterator[T]': ... class Iterator(Protocol[T]): def __next__(self) -> T: ... ``` 实际 `Iterable` 已在 `typing` 模块,举例说明语法。 ## 在标准库里你已经在用 `typing` / `collections.abc` 模块里很多就是 Protocol: ```python from collections.abc import Iterable, Mapping, Hashable, Container from typing import Protocol, SupportsLen, SupportsInt ``` `SupportsLen` 是 `def __len__(self) -> int`。所以 `len(x)` 能用的 都满足。 ## 给现有类"贴" Protocol ```python from third_party import SomeClass class HasFoo(Protocol): def foo(self) -> str: ... # SomeClass 有 foo() 方法但作者没标注 x: HasFoo = SomeClass() # mypy 检查 OK x.foo() ``` 零侵入。 ## 实战 case:测试替身 ```python class Notifier(Protocol): def send(self, msg: str, to: str) -> None: ... def process_order(order: Order, notifier: Notifier) -> None: # ... notifier.send(f'Order {order.id} confirmed', to=order.email) # 生产 process_order(order, EmailNotifier()) # 测试 class FakeNotifier: def __init__(self): self.sent: list[tuple[str, str]] = [] def send(self, msg, to): self.sent.append((msg, to)) def test_confirm(): n = FakeNotifier() process_order(test_order, n) assert ('Order 1 confirmed', '[email protected]') in n.sent ``` `FakeNotifier` 不需要 inherit `Notifier` ABC,纯写实现即可。 ## 与 `__class_getitem__` / `TypeVar` 联用 ```python from typing import Protocol, TypeVar K = TypeVar('K') V = TypeVar('V') class Cache(Protocol[K, V]): def get(self, key: K) -> V | None: ... def set(self, key: K, value: V) -> None: ... class StringIntCache: def get(self, key: str) -> int | None: ... def set(self, key: str, value: int) -> None: ... def use(c: Cache[str, int]): v = c.get('x') # mypy 知道 v: int | None ``` ## 效果 - 库的 API 类型严格但不要求用户继承 - 测试时随便造 fake,不需要 mock 框架 - 重构内部实现时 Protocol 是"接口",业务代码改少 - mypy / pyright 在 IDE 里实时提示,写错立刻知道 ## 踩过的坑 1. **Protocol 不能 instantiate**:`x = WritableBytes()` 报错(没意义, 它是接口)。 2. **runtime_checkable 检查只看方法名**:`isinstance(obj, WritableBytes)` 只确认有 `write` 和 `flush`,不查签名。运行时碰到方法签名不对仍 crash。 3. **Protocol 的方法有 default impl** 让它变成 mixin 又不强制继承: ```python class Repr(Protocol): def __repr__(self) -> str: ... class Mixin(Repr): def __repr__(self): return f'<{type(self).__name__}>' ``` 语义微妙,团队约定清楚。 4. **structural subtyping 太宽容**:所有有 `read()` 方法的都被当 readable,比如自定义类 `class Sensor: def read(self): ...` 不该 是 file-like 但 mypy 不报错。给 Protocol 多放几个方法(read + readable 等)让匹配更严。 5. **Protocol 间不能继承 default impl**:跟 mixin 不同。要复用代码用 ABC + Protocol 组合,或者纯函数化。
## 起因 服务器上跑了 8 个 Docker 容器,每个都想用自己的子域名访问 + HTTPS。 nginx 每加一个服务要改 conf + 申证书 + reload;Caddy 也要改 Caddyfile。 有没有"加新容器自动注册"的方案? Traefik 是 Go 写的反代,原生支持"从 Docker / K8s / Consul 等动态发现 服务" + Let's Encrypt 自动签证书。加一个容器 = 加一组 label,零配置改动。 ## 解决方案 ### 1. Traefik 容器本身 ```yaml # /srv/traefik/docker-compose.yml services: traefik: image: traefik:v3.1 restart: unless-stopped ports: - "80:80" - "443:443" # Traefik dashboard (内网可见) - "127.0.0.1:8080:8080" volumes: - /var/run/docker.sock:/var/run/docker.sock:ro - ./letsencrypt:/letsencrypt - ./traefik.yml:/etc/traefik/traefik.yml:ro networks: - web networks: web: external: true ``` 创建共享网络: ```bash docker network create web ``` ### 2. traefik.yml ```yaml api: dashboard: true insecure: true # 仅 127.0.0.1 暴露所以可以 true entryPoints: web: address: ':80' http: redirections: entryPoint: to: websecure scheme: https permanent: true websecure: address: ':443' providers: docker: exposedByDefault: false # 容器必须显式 enable 才被 routing network: web certificatesResolvers: le: acme: email: [email protected] storage: /letsencrypt/acme.json httpChallenge: entryPoint: web # 想用 DNS-01 + Cloudflare: # dnsChallenge: # provider: cloudflare # delayBeforeCheck: 0 log: level: INFO accessLog: {} ``` 启动: ```bash docker compose up -d ``` ### 3. 加业务容器:只加 label,不改 traefik `/srv/myapp/docker-compose.yml`: ```yaml services: myapp: image: myorg/myapp:latest restart: unless-stopped networks: - web labels: - 'traefik.enable=true' - 'traefik.http.routers.myapp.rule=Host(`myapp.example.com`)' - 'traefik.http.routers.myapp.entrypoints=websecure' - 'traefik.http.routers.myapp.tls.certresolver=le' - 'traefik.http.services.myapp.loadbalancer.server.port=3000' networks: web: external: true ``` ```bash docker compose up -d ``` Traefik 一秒内: - 发现新容器 - 加路由 `myapp.example.com → myapp:3000` - 通过 HTTP-01 challenge 自动签 Let's Encrypt 证书 - 80 自动跳 443 `curl https://myapp.example.com/` 立刻可用。 ### 4. 多服务批量 ```yaml services: api: image: api:latest networks: [web] labels: - traefik.enable=true - traefik.http.routers.api.rule=Host(`api.example.com`) - traefik.http.routers.api.entrypoints=websecure - traefik.http.routers.api.tls.certresolver=le - traefik.http.services.api.loadbalancer.server.port=8000 blog: image: ghost:5 networks: [web] labels: - traefik.enable=true - traefik.http.routers.blog.rule=Host(`blog.example.com`) - traefik.http.routers.blog.entrypoints=websecure - traefik.http.routers.blog.tls.certresolver=le - traefik.http.services.blog.loadbalancer.server.port=2368 uptime: image: louislam/uptime-kuma:1 networks: [web] labels: - traefik.enable=true - traefik.http.routers.up.rule=Host(`up.example.com`) - traefik.http.routers.up.entrypoints=websecure - traefik.http.routers.up.tls.certresolver=le - traefik.http.services.up.loadbalancer.server.port=3001 ``` 3 个服务,3 套 label,3 个域名。零修改 traefik 配置。 ### 5. 加中间件:basic auth / IP 白名单 / rate limit label 里声明 + 应用: ```yaml labels: - traefik.enable=true - traefik.http.routers.admin.rule=Host(`admin.example.com`) - traefik.http.routers.admin.entrypoints=websecure - traefik.http.routers.admin.tls.certresolver=le - traefik.http.routers.admin.middlewares=auth,ratelimit - 'traefik.http.middlewares.auth.basicauth.users=alice:$$apr1$$abc...' - 'traefik.http.middlewares.ratelimit.ratelimit.average=10' - 'traefik.http.middlewares.ratelimit.ratelimit.burst=20' - traefik.http.services.admin.loadbalancer.server.port=8000 ``` basic auth password 用 `htpasswd -nb alice secret` 生成(注意 `$` 在 YAML 里 escape 成 `$$`)。 ### 6. dashboard 打开 SSH tunnel: ```bash ssh -L 8080:localhost:8080 server # 浏览器:http://localhost:8080 ``` 看到所有 router / service / middleware,状态可视化。 ## 效果 - 加新容器从"改 nginx + certbot + reload + DNS 配 + …" 收敛到 "docker-compose up,加 5 行 label" - 8 个服务的反代配置文件总长 < 100 行(之前 nginx 200+ 行 / 4 个 vhost 文件) - 证书自动续期,不再有 "忘续期 → 网站红框" 事故 - 加 / 删服务无 traefik 重启 - dashboard 一眼看到所有 route 状态 ## 与 nginx / Caddy / HAProxy 对比 | | Traefik | nginx | Caddy | HAProxy | |---|---|---|---|---| | Docker 自动发现 | ✅ 原生 | ❌(需第三方) | 一定程度 | ❌ | | 自动 HTTPS | ✅ | ❌(需 certbot) | ✅ | ❌ | | K8s ingress | ✅ | ✅ | ✅ | ✅ | | 性能 | 中 | 极高 | 高 | 极高 | | 配置 | YAML / labels | nginx 语法 | Caddyfile | HAProxy 语法 | | 学习曲线 | 中 | 高 | 低 | 高 | 容器化场景选 Traefik;静态站 / 极致性能选 nginx;混合简单场景 Caddy。 ## 踩过的坑 1. **`network: web` 不指定 Traefik 不知道连哪个 net**:容器在多个网络 时 Traefik 不确定走哪个。一定在 traefik.yml 里 `providers.docker.network: web` 或每个服务 label 写 `traefik.docker.network=web`。 2. **acme.json 权限不对**:Traefik 要求 600。`chmod 600 letsencrypt/acme.json`。 3. **HTTP-01 challenge 失败**:80 端口被占 / 路由器没转发。检查 `curl -I http://myapp.example.com/.well-known/acme-challenge/test` 能不能到 Traefik 容器。 4. **生产 dashboard 暴露公网**:`api.insecure=true` 千万别开公网。 要外网访问 dashboard 加 router + auth middleware 包起来。 5. **Let's Encrypt 频率限制**:测试期间反复重启 + 反复申证书会被 LE rate limit。dev 用 `acme.caServer: https://acme-staging-v02.api.letsencrypt.org/directory` 测试,跑通后切回生产 CA。
rsync 是最稳的小型备份方案:传输层 SSH 已加密,增量算法只传 delta, 配合 `--link-dest` 可以在目标侧做"硬链接快照"——每个快照看起来像 完整目录,磁盘占用却只算改动量。 适用场景:单机或少量机器的备份目标是另一台 Linux 主机 / NAS。 量大或对 dedup 要求高时换 Restic / Borg。 ## 备份脚本 ```bash #!/usr/bin/env bash # /usr/local/sbin/snap-backup.sh set -euo pipefail SRC="/srv/app /var/log /etc" DEST_HOST="[email protected]" DEST_BASE="/volume1/backups/$(hostname -s)" STAMP="$(date +%Y%m%d-%H%M%S)" LATEST="${DEST_BASE}/latest" NEW="${DEST_BASE}/${STAMP}" ssh "${DEST_HOST}" "mkdir -p '${DEST_BASE}'" rsync -aAXH --delete --numeric-ids \ --link-dest="${LATEST}" \ --rsync-path="ionice -c 3 rsync" \ --info=stats2,progress2 \ ${SRC} "${DEST_HOST}:${NEW}/" ssh "${DEST_HOST}" "rm -f '${LATEST}' && ln -sfn '${STAMP}' '${LATEST}'" # 保留最近 14 个快照 ssh "${DEST_HOST}" " cd '${DEST_BASE}' \ && ls -1 | grep -E '^[0-9]{8}-[0-9]{6}\$' | sort | head -n -14 \ | xargs -r -I{} rm -rf -- {} " ``` 关键参数: - `-aAXH`:等价于 `-rlptgoD` + 保留 ACL、xattr、硬链接关系 - `--numeric-ids`:用数字 UID/GID 而不是名字,跨机器一致性更好 - `--link-dest`:与上一次快照做硬链接,未改的文件几乎不占空间 - `--rsync-path="ionice -c 3 rsync"`:远端 rsync 进程降到 idle I/O, 备份时段不影响其它服务 ## 校验 ```bash # 远端校验 checksum 是否匹配(慢,每月一次足够) rsync -aAXH --dry-run --checksum --itemize-changes \ ${SRC} "${DEST_HOST}:${LATEST}/" | head -20 # 任何输出都意味着源 vs 备份不一致,需要排查 ``` ## SSH 配置 避免脚本里硬编码密钥路径,统一在 SSH config: ``` # ~/.ssh/config(或 /root/.ssh/config) Host nas.example.com User backup IdentityFile ~/.ssh/id_backup_ed25519 IdentitiesOnly yes ``` NAS 上限制这个 key 只能跑 rsync: ``` # 远端 ~backup/.ssh/authorized_keys command="rsync --server -vlogDtprAXe.iLsfxC --numeric-ids --delete --link-dest=* . /volume1/backups/*",no-port-forwarding,no-X11-forwarding,no-agent-forwarding,no-pty ssh-ed25519 AAAA... ``` ## systemd timer ```ini # /etc/systemd/system/snap-backup.timer [Timer] OnCalendar=*-*-* 02:00:00 RandomizedDelaySec=30m Persistent=true [Install] WantedBy=timers.target ``` ## 踩过的坑 - `--link-dest` 路径必须是 **目标侧** 的绝对路径,而且当时已存在 上一次快照目录;第一次跑没快照时 link-dest 自然失效,rsync 会回退到 完整拷贝,正常。 - 备份 `/etc` 不开 `-H` 时,sudoers / cron / 各种 symlink 容易脱钩, 恢复时一头雾水。 - `--delete` 是双刃剑:源被误删后下次备份会同步删掉目标。所以快照轮转 保留多个版本很重要。
## 起因 LLM 应用上生产后: - 用户报"AI 回答错" → 哪 prompt?哪 model call?怎么调试? - prompt 改一版 → 怎么评估质量没退步? - 月 OpenAI bill $5000 → 哪些 endpoint 烧钱? 传统 web app observability 工具不适合: - 输入 / 输出是大段文本 - 嵌套 LLM call(agent / tool use) - 需要 evaluation(不只是 metric) **Langfuse**:LLM 应用专用 observability + prompt management + eval。 open source + self-host 友好。 ## 装 ```bash # self-host docker compose up -d -f langfuse-docker-compose.yml # UI on localhost:3000 ``` 或 SaaS:cloud.langfuse.com 免费 tier。 ## SDK 集成 ```python from langfuse import Langfuse from langfuse.decorators import observe lf = Langfuse(public_key='pk-...', secret_key='sk-...') @observe() def answer_question(q: str) -> str: # 自动 trace 这函数 + 嵌套 LLM call context = retrieve(q) prompt = f"Context: {context}\n\nQ: {q}" response = openai.chat.completions.create( model='gpt-4', messages=[{'role': 'user', 'content': prompt}], ) return response.choices[0].message.content ``` `@observe()` 装饰器自动 track 函数调用 + 输入 / 输出 + 子调用 trace。 OpenAI call 用 Langfuse 包装的 client → 自动捕获 prompt / response / token / cost。 ## langchain / llamaindex 集成 ```python from langfuse.callback import CallbackHandler handler = CallbackHandler() chain.invoke({'q': q}, config={'callbacks': [handler]}) ``` LangChain 整 chain / agent 自动 trace(每步骤 / 每 tool call / 每 LLM call)。 ## UI trace view: ``` trace: answer_question("How to deploy?") ├─ retrieve (5 docs found) ├─ format_prompt ├─ openai.chat.completions │ model: gpt-4 │ tokens: 1234 in / 234 out │ cost: $0.024 │ latency: 1.2s └─ output: "To deploy, run..." ``` 每 LLM call 看到: - input / output 全文 - model / params - token + cost - latency - error 嵌套 trace 直观看 agent 步骤。 ## sessions + users ```python @observe() def chat(user_id: str, session_id: str, message: str): ... lf.update_current_trace(user_id=user_id, session_id=session_id) ``` UI 按 user / session 聚合 → 看某用户对话历史 + cost。 ## prompt management prompt 不写代码里,存 Langfuse: ```python prompt = lf.get_prompt('answer-question', version='production') formatted = prompt.compile(context=ctx, question=q) ``` UI 改 prompt → 自动 versioned → 部署不用改 code。 ## evaluation 每 trace 加 score: ```python lf.score( trace_id=trace.id, name='accuracy', value=0.9, ) ``` 或者 user feedback: ```python @app.post('/feedback') async def feedback(trace_id: str, thumbs_up: bool): lf.score(trace_id=trace_id, name='user_feedback', value=1 if thumbs_up else 0) ``` UI 聚合:哪些 prompt 评分低?哪些 model 效果好? LLM-as-judge eval: ```python def llm_judge(trace): judgment = openai.chat.completions.create( model='gpt-4', messages=[{ 'role': 'user', 'content': f'Rate this answer 1-5: Q: {trace.input}, A: {trace.output}', }], ) lf.score(trace_id=trace.id, name='llm_judge', value=parse(judgment)) ``` LLM 评 LLM → 自动 quality monitoring。 ## dataset + experiment ```python # 上传 test dataset lf.create_dataset(name='qa-test-set', items=[ {'input': q1, 'expected': a1}, {'input': q2, 'expected': a2}, ]) # 跑实验 for item in lf.get_dataset('qa-test-set'): actual = answer_question(item.input) lf.create_dataset_item_score(item, 'exact_match', actual == item.expected) ``` 新 prompt / model → 跑 dataset → 对比 score。 回归测试 for LLM。 ## cost tracking dashboard 看: - 每 model 总 cost - 每 user 总 cost - 每 endpoint trace count + cost - 趋势 ``` Last 7 days: gpt-4: $234 (1234 calls) gpt-4o: $89 (5678 calls) embedding: $12 Top user: user-42 ($45 last week) ``` 提前发现"某 endpoint 烧钱" → 优化(用 cheaper model / cache)。 ## 与替代品 | | Langfuse | LangSmith | Helicone | Weights & Biases | |---|---|---|---|---| | 开源 | ✅ | ❌(SaaS) | ✅ | ❌ | | Self-host | ✅ | ❌ | ✅ | enterprise | | trace | ✅ | ✅ | ✅ | ✅ | | prompt mgmt | ✅ | ✅ | 弱 | ❌ | | eval | ✅ | ✅ | 弱 | ✅ | | price | free / 用量 | 付费 | free / 用量 | 付费 | 我用 Langfuse self-host(数据敏感 + 开源 + 功能全)。 ## 真实 case 我们一个客户 RAG 应用上线: - 1000+ QPS LLM call - 接入 Langfuse 一周 - 发现: - 某 endpoint 答错率 30%(prompt 有 bug,UI 看 trace 立刻发现) - 某 user 每天 $50 cost(abuse 检测) - retrieval 召回率低 → 改 chunking 策略 如果没 trace,全靠 user complain → 慢 + 漏 80%。 ## privacy LLM 内容可能含 PII。 Langfuse 配 PII redaction: ```python lf.flush() # 含 mask sensitive ``` 或者在客户端 mask 后再发: ```python def mask(text): return re.sub(r'\b\d{16}\b', '[CARD]', text) @observe() def chat(msg): msg_masked = mask(msg) ... ``` ## 部署 self-host 简单 docker compose(langfuse + postgres + clickhouse)。 clickhouse 存 trace(大量 string 数据 → columnar 高效)。 ## 与 OTEL 集成 Langfuse 1.0+ 支持 OTLP receiver: ```python # OpenTelemetry trace 自动转 Langfuse ``` 跟现有 observability stack 整合。 ## 踩过的坑 1. **flush 异步**:默认异步 send,应用退出前要 `lf.flush()` 否则 trace 丢。 2. **trace input/output 大**:长文本占 DB。配 truncate。 3. **cost 计算不准**:Langfuse 内置 cost map 可能滞后于 OpenAI 价格 调整。自己 update 或者验证。 4. **self-host clickhouse 重**:单 server 几 GB RAM 起。小项目用 SaaS 简单。 5. **prompt version 滥**:每改一字一版本 → UI 难看。考虑 staging / production tag。
## 起因 我们的推荐模型在训练时算"用户过去 30 天点击次数"用 pandas, 线上推理时用 Redis lookup。两套代码必然漂移:一次 pandas 用错时区 窗口对不上 → 训练效果在线上拉胯。 "训练 / 在线特征不一致" 是 ML 生产最常见的痛点。 Feast 是开源 feature store,把特征定义统一在一处,训练 / 在线都从 同一份代码读。 ## 解决方案 ### 装 ```bash uv add feast feast version ``` ### 项目初始化 ```bash feast init my_store cd my_store/feature_repo ``` 生成示例 `example_repo.py`。 ### 定义特征 ```python # feature_repo/user_features.py from datetime import timedelta from feast import Entity, Feature, FeatureView, FileSource, ValueType from feast.types import Float32, Int64 user = Entity(name='user_id', value_type=ValueType.INT64) # 数据源:parquet 文件(生产用 BigQuery / Snowflake / S3 等) user_clicks_source = FileSource( path='data/user_clicks.parquet', timestamp_field='event_time', ) user_clicks_30d = FeatureView( name='user_clicks_30d', entities=[user], ttl=timedelta(days=30), schema=[ Feature(name='click_count', dtype=Int64), Feature(name='avg_dwell_seconds', dtype=Float32), Feature(name='top_category', dtype='string'), ], source=user_clicks_source, online=True, ) ``` ```bash feast apply # 注册 entity / feature view 元数据 ``` ### 训练时拉历史特征 ```python from feast import FeatureStore import pandas as pd store = FeatureStore(repo_path='feature_repo') # 训练样本(user_id + label + event_time) training_df = pd.DataFrame({ 'user_id': [1, 2, 3], 'event_time': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']), 'label': [1, 0, 1], }) # point-in-time 拼接特征(保证不用未来数据) training = store.get_historical_features( entity_df=training_df, features=[ 'user_clicks_30d:click_count', 'user_clicks_30d:avg_dwell_seconds', 'user_clicks_30d:top_category', ], ).to_df() # train X = training[['click_count', 'avg_dwell_seconds']] y = training['label'] model.fit(X, y) ``` Feast 自动按 entity_df 的 event_time 去 source 找当时点的特征值, **避免 data leakage**(不会用未来的 click_count 训练历史样本)。 ### 在线 materialize(把最近特征推到 Redis) ```bash feast materialize-incremental $(date -u +%Y-%m-%dT%H:%M:%S) # 把 last_materialization → now 的特征写到 online store (Redis) ``` 定时跑(每小时 / 每天): ```bash # /etc/systemd/system/feast-materialize.timer [Timer] OnCalendar=*-*-* */1:00:00 ``` ### 在线推理时读特征 ```python features = store.get_online_features( features=[ 'user_clicks_30d:click_count', 'user_clicks_30d:avg_dwell_seconds', ], entity_rows=[{'user_id': 42}], ).to_dict() x = [[features['click_count'][0], features['avg_dwell_seconds'][0]]] prediction = model.predict(x) ``` **同一份 FeatureView 定义** 决定了训练和在线都拿"click_count"的相同 语义。漂移消失。 ### Online store backend Feast 支持多种 online store: ```yaml # feature_repo/feature_store.yaml online_store: type: redis connection_string: 'localhost:6379' # 或: online_store: type: dynamodb region: us-east-1 table_name: feast_online # 或: online_store: type: sqlite # dev 用 ``` ### 数据源支持 ```yaml offline_store: type: file # parquet # 或 bigquery / snowflake / redshift / spark / trino ``` 不同公司栈用不同 source,Feast 抽象统一。 ## 实战流程 ``` ┌─────────────────┐ │ FeatureView 定义 │ │ (Python 代码) │ └────────┬────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌────────────┐ ┌──────────┐ │ 训练 pipeline │ │ materialize │ │ 在线推理 │ │ get_historical│ │ → Redis │ │ get_online│ └──────────┘ └────────────┘ └──────────┘ ``` 特征定义是 single source of truth。 ## 何时该上 feature store - 多个模型共用相同特征(用户画像 / 商品画像) - 训练 / 在线漂移导致过线下效果不在线上复现 - 特征工程团队跟 ML 团队分工(特征 owner 模型) - 需要 point-in-time 一致性(防 data leakage) 何时**不**该上(杀鸡用牛刀): - 单模型 + 简单特征(计算一下 mean / sum 直接做) - 团队 < 5 人 ML 工程师 - 没有上线 / 用 batch 预测 ## 替代方案对比 | | Feast (OSS) | Hopsworks (商业) | Tecton (商业) | 自己拼 (Redis + DAG) | |---|---|---|---|---| | 复杂度 | 中 | 中 | 高 | 低-高 | | 价格 | 免费 | 付费 | 付费 | 0 | | 在线 store | 多选 | HBase 主 | 多 | 自选 | | 离线 store | 多选 | 自家 + S3 | 多 | 自选 | | 实时特征 | 较弱 | 强 | 强 | 看实现 | 中小团队 Feast;大企业 Tecton / Hopsworks(带运维支持); 极简团队拼 Redis + cron 也能 work。 ## 与训练 pipeline 集成 ```python # Kubeflow / Airflow / Dagster pipeline: @task def fetch_features(entity_df): store = FeatureStore(...) return store.get_historical_features(entity_df, ...).to_df() @task def train(df): model.fit(df[features], df['label']) return model @task def deploy(model): save_to_s3(model) feast materialize-incremental ... ``` 特征 fetch 是 pipeline 第一步,后续 train / deploy 都用 feast。 ## 效果 我们一个 churn 预测模型用 Feast 后: - 训练 / 在线特征一致性:100%(之前 ~95%,漂移 5% 是 bug) - 新模型上线时间从 2 周 → 3 天(特征不用重写,复用现成 view) - 多模型共享 user_features view:避免重复算 - "为什么这个用户预测分数低" 类调试:直接 feast 查在线特征 + 对照训练 分布 ## 踩过的坑 1. **`event_time` 时区**:feast 假设所有时间是 UTC。本地时间传进去 会算偏差。always to_datetime(...).tz_localize('UTC')。 2. **materialize 漏数据**:`materialize-incremental` 从 last_materialization 开始。如果之前没 materialize 过,从 ttl 之前开始,可能遗漏。 首次用 `materialize <start> <end>` 全量。 3. **online store 一致性**:Redis 单机时 materialize 期间挂掉 → 部分 特征写进去 + 部分没写 → 在线读到旧 + 新混合值。Redis cluster 有助稳定。 4. **特征 schema 变了**:加新字段简单(FeatureView 重新 apply); 删 / 改字段类型麻烦,需要重新 materialize 整 entity 群。 5. **point-in-time join 慢**:百万级 entity_df 跨多个 feature view join 几分钟到几十分钟。生产 source 用 BigQuery / Snowflake 让 join 推到 DB 端比较快。
asyncio 已经是 Python 标准并发模型。但坑也多——很多人以为加 async/await 就能"并行",结果代码顺序跑得跟同步一样。下面讲实际让代码并发的关键。 ## 1. 单纯加 async/await 不会并行 ```python async def fetch_one(url): async with aiohttp.ClientSession() as s: async with s.get(url) as r: return await r.text() async def main(): for url in urls: await fetch_one(url) # 串行!每个等上一个完 ``` `await` 让出控制权,但 for + await 仍然是顺序的。要并发用 `gather` 或 `TaskGroup`。 ## 2. asyncio.gather ```python async def main(): results = await asyncio.gather(*[fetch_one(u) for u in urls]) ``` 所有 fetch 同时进行,gather 等全部完成返回列表。 注意: - 任何一个抛异常默认会取消其它任务并向上抛 - 用 `return_exceptions=True` 让异常也作为结果返回,不取消其它 ```python results = await asyncio.gather(*coros, return_exceptions=True) for r in results: if isinstance(r, Exception): log.warning('one failed: %s', r) else: process(r) ``` ## 3. TaskGroup(Python 3.11+,推荐) ```python async def main(): async with asyncio.TaskGroup() as tg: t1 = tg.create_task(fetch_one(u1)) t2 = tg.create_task(fetch_one(u2)) t3 = tg.create_task(fetch_one(u3)) # 所有 task 自动完成 + 自动 cancel 兄弟 task 当一个失败 print(t1.result(), t2.result()) ``` TaskGroup 是 PEP 654 + 3.11 引入的结构化并发。优点: - 异常传播更清晰(aggregates as ExceptionGroup) - 自动 cancel 兄弟,避免 "task 还在跑" 的孤儿 - 比 gather 更结构化 新代码优先 TaskGroup。 ## 4. 限并发:Semaphore 如果有 10000 个 URL 要抓,全用 gather 同时跑会爆 socket / 被目标限流。 用 Semaphore 控制并发度: ```python sem = asyncio.Semaphore(20) async def bounded_fetch(url): async with sem: return await fetch_one(url) results = await asyncio.gather(*[bounded_fetch(u) for u in urls]) ``` `sem` 上限 20 → 最多 20 个 fetch 同时进行。 更高级的限速(按时间,比如每秒 50 个请求): ```python import aiometer async def fetch_one(url): ... results = await aiometer.run_on_each( fetch_one, urls, max_at_once=20, max_per_second=50, ) ``` ## 5. 取消 + 超时 ```python # 单个任务超时 try: result = await asyncio.wait_for(fetch_one(url), timeout=5.0) except asyncio.TimeoutError: log.warning('timeout') ``` `asyncio.wait_for` 在超时后取消 coroutine(抛 CancelledError)。 任务里要正确处理这个 cancel: ```python async def fetch_one(url): try: async with session.get(url) as r: return await r.text() except asyncio.CancelledError: # 清理资源(如果还在持有) log.info('cancelled') raise # 必须 re-raise,否则任务不算被取消 ``` 不 re-raise 会让 cancel 信号丢失,cleanup 顺序乱。 ## 6. 不要在 async 里跑同步阻塞 ```python async def bad(): time.sleep(5) # 阻塞整个事件循环! result = requests.get('...') # 同步 IO ``` 修复: ```python async def good(): await asyncio.sleep(5) async with aiohttp.ClientSession() as s: async with s.get(...) as r: await r.text() ``` 如果必须用同步库(pandas / boto3 / heavy CPU 计算): ```python # 跑到 thread pool 不阻塞 loop result = await asyncio.to_thread(blocking_function, args) ``` `asyncio.to_thread` 把同步函数包成 awaitable,在 thread pool 里跑。 ## 7. CPU 密集任务用 ProcessPoolExecutor ```python from concurrent.futures import ProcessPoolExecutor executor = ProcessPoolExecutor() loop = asyncio.get_running_loop() result = await loop.run_in_executor(executor, cpu_heavy_function, data) ``` CPU 密集(图片处理 / 加密 / 解析)放进程池,避开 GIL。 ## 8. 信号 / 优雅退出 ```python import signal async def main(): stop = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, stop.set) server_task = asyncio.create_task(run_server()) await stop.wait() log.info('shutting down') server_task.cancel() try: await server_task except asyncio.CancelledError: pass asyncio.run(main()) ``` 收到 Ctrl-C 时优雅 cancel + 等待清理。Windows 不支持 add_signal_handler。 ## 9. 共享状态 asyncio 是单线程协作式并发,访问全局变量**不需要锁**。 但 await 之后状态可能被其它 coroutine 改了: ```python counter = 0 async def bad(): global counter n = counter await asyncio.sleep(0.001) # 让出执行 counter = n + 1 # n 可能已经过时 ``` 需要原子读改写时用 `asyncio.Lock`: ```python lock = asyncio.Lock() async def good(): async with lock: n = counter await ... counter = n + 1 ``` ## 10. 调试 ```python import asyncio asyncio.run(main(), debug=True) # 或 export PYTHONASYNCIODEBUG=1 ``` debug 模式会警告: - coroutine 没 await - 协程跑超过 100ms(可能阻塞了 loop) - 任务 leaks `asyncio.all_tasks()` 看当前所有任务。 ## 11. 常用第三方 - **aiohttp**:HTTP client/server - **httpx**:HTTP,同步 + 异步统一 API(更现代) - **asyncpg**:PostgreSQL,比 aiopg / sqlalchemy-asyncio 快很多 - **redis-py 4+**:内置 async - **anyio**:跨 asyncio / trio 的抽象层 新项目 HTTP 优先用 httpx,老项目 aiohttp 仍然稳定。 ## 踩过的坑 - `coro = some_async_fn()` 但忘 await:coroutine 不会执行 + Python 抛 "coroutine was never awaited" 警告。 - `asyncio.run()` 不能嵌套:在 Jupyter / IPython 里要用 `await main()` 直接(IPython 7+ 自动包 await)。 - 多线程跑 asyncio:每个线程需要自己 `asyncio.new_event_loop()`。 通常不推荐——asyncio 设计是单线程的。 - `aiohttp.ClientSession` 是有 connection pool 的,每个请求建一个新 session 性能差。整个程序生命周期共享一个 session。
flexbox 适合一维,CSS Grid 适合二维。"侧栏 + 主内容 + 副栏"经典三栏 布局用 Grid 写比 flex 简洁 5 倍。 ## 1. 最简版 ```html <div class="layout"> <aside>侧栏</aside> <main>主内容</main> <section>副栏</section> </div> ``` ```css .layout { display: grid; grid-template-columns: 200px 1fr 280px; gap: 24px; } ``` `fr` 是"剩余空间分数"。`1fr` 中间吃满,左右固定。 ## 2. 窄屏自动折叠 ```css .layout { display: grid; grid-template-columns: 200px 1fr 280px; gap: 24px; } @media (max-width: 1000px) { .layout { grid-template-columns: 200px 1fr; } .layout > section { grid-column: 1 / -1; /* 副栏跨整行,垂直堆叠 */ } } @media (max-width: 700px) { .layout { grid-template-columns: 1fr; } } ``` `grid-column: 1 / -1` 是 Grid 的常用语法:从第 1 条线到最后一条线, 即占满全宽。 ## 3. 命名区域(更清晰的多栏 / 多行) ```css .layout { display: grid; grid-template-columns: 200px 1fr 280px; grid-template-rows: auto 1fr auto; grid-template-areas: "header header header" "sidebar main rail" "footer footer footer"; gap: 16px; min-height: 100vh; } .layout > header { grid-area: header; } .layout > aside { grid-area: sidebar; } .layout > main { grid-area: main; } .layout > section { grid-area: rail; } .layout > footer { grid-area: footer; } ``` 窄屏重排,只改 `grid-template-areas`: ```css @media (max-width: 800px) { .layout { grid-template-columns: 1fr; grid-template-areas: "header" "main" "rail" "sidebar" "footer"; } } ``` ## 4. 12 列网格系统(不需要 Bootstrap) ```css .grid12 { display: grid; grid-template-columns: repeat(12, 1fr); gap: 20px; } .col-3 { grid-column: span 3; } .col-4 { grid-column: span 4; } .col-6 { grid-column: span 6; } .col-12 { grid-column: span 12; } @media (max-width: 800px) { .col-3, .col-4, .col-6 { grid-column: span 12; } } ``` ```html <div class="grid12"> <div class="col-4">A</div> <div class="col-4">B</div> <div class="col-4">C</div> <div class="col-6">D</div> <div class="col-6">E</div> </div> ``` 整套不到 30 行 CSS,干掉 Bootstrap grid 一整个模块。 ## 5. 让卡片网格自动决定列数(最常用!) ```css .cards { display: grid; grid-template-columns: repeat(auto-fill, minmax(260px, 1fr)); gap: 16px; } ``` `auto-fill` + `minmax`:每列最小 260px,最大平分;窗口宽就放多列、 窗口窄就放少列,不需要任何 media query。视口超过 4×260 时一行 4 个, 缩到 2×260 时一行 2 个。 ## 6. 让子元素拉满高度 ```css .cards { display: grid; grid-auto-rows: 1fr; ... } ``` `grid-auto-rows: 1fr` 让每行所有 cell 等高,再加 `align-self: stretch` 内部内容拉满。 ## 7. subgrid(2024 + 浏览器全支持) 子元素的 grid track 对齐父元素: ```css .card { display: grid; grid-template-rows: subgrid; grid-row: span 3; /* 跨父网格 3 行 */ } ``` 适合"卡片列表里每张卡片内部各自有 header / body / footer,但希望同一行 卡片的 header / body / footer 严格对齐"。 ## 8. DevTools Chrome / Firefox 的 Inspector 里点 `grid` 标记会显示 grid 线条和 area 名字, 是调试 grid 唯一有效的方法。别盲调。 ## 踩过的坑 - `minmax(260px, 1fr)` 在窄屏(< 260px 视口)会撑破父容器;想严格防溢出 用 `minmax(min(260px, 100%), 1fr)`。 - `gap` 在 Safari 14 之前的 flexbox 不支持,Grid 支持。如果用 flex 老 Safari 兼容性差。 - 用 `grid-template-areas` 时 area 名要 **每个 cell 都有** 字符串。 `"header . header"` 用 `.` 表示空。 - 大量 grid 嵌套对老设备性能不友好。深嵌套 grid 时考虑用 `contain: layout` 限制重排范围。
## 起因 Go 服务上线后偶发"connection pool exhausted" 和 PG "too many connections" 错误。debug 后发现 Go `database/sql` 默认连接池配置 极宽松(无 max),高并发下能瞬间打开几百个 DB 连接。 下面讲怎么正确配 + 排查问题。 ## 默认行为陷阱 ```go db, _ := sql.Open("postgres", dsn) // db 默认 MaxOpenConns = 0(无限制) // MaxIdleConns = 2 // ConnMaxLifetime = 0(永不过期) ``` 并发 1000 请求 + 每个 query 50ms → 瞬间 1000 个 DB 连接 → PG max_connections 100 撞墙 → 报错。 ## 正确配置 ```go db, err := sql.Open("postgres", dsn) if err != nil { ... } db.SetMaxOpenConns(25) // 同时最多 25 个连接 db.SetMaxIdleConns(10) // 池里 idle 保留 10 个 db.SetConnMaxIdleTime(5 * time.Minute) // idle 超 5 分钟关闭 db.SetConnMaxLifetime(30 * time.Minute) // 连接最长存活 30 分钟(轮换) ``` 四个参数: | | 作用 | 推荐 | |---|---|---| | MaxOpenConns | 并发上限 | DB max_connections / 进程数 | | MaxIdleConns | 闲置池上限 | MaxOpenConns 的 1/2 | | ConnMaxIdleTime | 闲置多久关 | 5-10 min | | ConnMaxLifetime | 总寿命 | 30-60 min(防 DB 重启 / NAT timeout) | ## 计算 MaxOpenConns 设 PG `max_connections = 100`,集群跑 4 个 Go 服务进程: ``` 每进程 MaxOpenConns = (100 - 10 reserved) / 4 = 22 ``` 留 10 个 superuser / 监控 / DBA 用。 如果你的应用还会跑 background worker 进程也吃连接,进一步分配。 ## 测试当前配置 ```go import "fmt" import "time" func printPoolStats(db *sql.DB) { for range time.Tick(5 * time.Second) { s := db.Stats() fmt.Printf( "[db] open=%d in_use=%d idle=%d wait_count=%d wait_dur=%s\n", s.OpenConnections, s.InUse, s.Idle, s.WaitCount, s.WaitDuration, ) } } go printPoolStats(db) ``` `WaitCount > 0` 持续增长 = MaxOpenConns 太小(请求在等池)。 `InUse` 接近 MaxOpenConns 时常 = 业务高峰;偶尔正常,持续要扩。 Prometheus exporter: ```go import "github.com/prometheus/client_golang/prometheus" func collectDBMetrics(db *sql.DB) { s := db.Stats() openGauge.Set(float64(s.OpenConnections)) inUseGauge.Set(float64(s.InUse)) waitCountCounter.Add(float64(s.WaitCount)) } ``` ## ctx 超时控制 ```go ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() rows, err := db.QueryContext(ctx, "SELECT ...") ``` `QueryContext` 而不是 `Query` —— query 卡死时 ctx cancel 让 query 立刻断 + 连接归还池。 否则连接被卡住一直占。 ## 长事务陷阱 ```go // ❌ tx, _ := db.BeginTx(ctx, nil) sendEmail(...) // 慢 IO(几秒) tx.Commit() ``` 事务期间一直占一个连接。1000 请求 × 3 秒邮件 = 池被压爆。 ```go // ✅ sendEmail(...) // 先发邮件 tx, _ := db.BeginTx(ctx, nil) defer tx.Rollback() // 短的 DB 操作 tx.Commit() ``` 事务内只做 DB;非 DB 操作放外面。 ## rows 没 Close → 连接泄露 ```go // ❌ panic 时 rows.Close() 没跑 rows, _ := db.Query("...") for rows.Next() { ... } ``` ```go // ✅ defer rows, err := db.Query("...") if err != nil { return err } defer rows.Close() for rows.Next() { ... } return rows.Err() // 别忘 Err 检查 ``` 或者用更高级的 ORM(sqlx / GORM)封装这些细节。 ## prepared statement cache ```go stmt, err := db.PrepareContext(ctx, "SELECT * FROM users WHERE id = $1") // stmt 可以跨多次 query 复用 for _, id := range ids { rows, _ := stmt.QueryContext(ctx, id) // ... } stmt.Close() ``` 但 PG 的 prepared statement 是 per-connection 的。`database/sql` 自动处理(每次 PrepareContext 实际 ad-hoc prepared per connection)。 Postgres 推荐用 prepared statement 提升性能。或者用 `pgx` 替代 (更原生)。 ## pgx vs database/sql ```bash go get github.com/jackc/pgx/v5 ``` ```go // pgx 直接连接池 import "github.com/jackc/pgx/v5/pgxpool" pool, err := pgxpool.New(ctx, dsn) // 配置: config, _ := pgxpool.ParseConfig(dsn) config.MaxConns = 25 config.MinConns = 5 config.MaxConnLifetime = 30 * time.Minute pool, _ = pgxpool.NewWithConfig(ctx, config) // query rows, _ := pool.Query(ctx, "SELECT id FROM users WHERE x = $1", x) ``` pgx 优势: - 直接 PostgreSQL,性能比 database/sql 包 lib/pq 快 30-50% - 原生 prepared statement cache - 支持 PG 特有类型(jsonb / array / range) 如果你只用 PG,直接 pgx 替代。 ## PgBouncer:连接池中间层 如果有 N 个进程,每个开 25 连接 = N × 25 连接到 PG。 PG 单连接 5-10 MB → 100 连接 = 1 GB+ RAM。 PgBouncer 在 PG 前面挡: ``` app1 ─┐ app2 ─┤ ... ─┼→ PgBouncer (在 app 端,pool transaction-level) → PG (少量真连接) appN ─┘ ``` ```bash sudo apt install -y pgbouncer ``` `/etc/pgbouncer/pgbouncer.ini`: ```ini [databases] mydb = host=pg-server dbname=mydb [pgbouncer] listen_addr = 127.0.0.1 listen_port = 6432 auth_type = scram-sha-256 auth_file = /etc/pgbouncer/userlist.txt pool_mode = transaction # 关键:transaction-level pooling max_client_conn = 1000 default_pool_size = 25 reserve_pool_size = 5 server_idle_timeout = 60 ``` 应用端连 `localhost:6432` 替代 PG 5432。 `pool_mode=transaction` 让连接事务级共享,1000 client → ~25 PG 连接。 极适合 short-lived query 场景。 注意:transaction mode 不能用 prepared statement / SET LOCAL(跨事务 状态丢)。pgx 5.0+ 有 PgBouncer 兼容模式。 ## DB 端配置 PostgreSQL `postgresql.conf`: ``` max_connections = 200 # 集群总上限 shared_buffers = 8GB # 通常 RAM 25% effective_cache_size = 24GB # RAM 75% work_mem = 64MB # per query maintenance_work_mem = 1GB ``` `max_connections = 200` × `work_mem = 64MB` = 最差 12.8 GB(每 query 峰值)。可能 OOM。 工业上: - max_connections 100-200 - 应用端 PgBouncer pool 共享 - 单连接 work_mem 16-32MB ## 实战调参流程 1. 启动配 `MaxOpenConns = 25`,pprof 看实际使用 2. `WaitCount` 持续上涨 → 增大池或加 PG 3. PG 端 `pg_stat_activity` 看实际并发: ```sql SELECT count(*) FROM pg_stat_activity WHERE state = 'active'; ``` 4. 加 PgBouncer 减少 PG 物理连接数 ## 踩过的坑 1. **每个请求 `sql.Open` 新连接** → 没池化,几秒内打满。 `sql.Open` 一次性 + db 全局共享 + DI 注入。 2. **死锁 + 长事务**:事务 A 持有 row 锁 + 等连接(池满), 事务 B 持有连接 + 等 A 的 row 锁。 解决:短事务 + ctx timeout。 3. **MaxIdleConns > MaxOpenConns**:参数互相矛盾,实际生效的是 MaxOpenConns。Idle 上限被 cap。 4. **NAT 后面 DB**:长连接经过几小时 NAT 表项过期 → 应用以为还活, 实际 DB 已断 → 报 "broken pipe"。设 ConnMaxLifetime < NAT timeout (通常 30 min 安全)。 5. **使用 `Ping()` 测连接**:高频 Ping 浪费连接 + 是 round-trip 开销。 只在启动时验证一次;运行时用 ConnMaxLifetime + 实际 query 检测。
## 起因 写完代码 → 推分支 → 打开浏览器 → 找到仓库 → 点 "Compare & pull request" → 填标题描述 → 加 reviewer → 等审 → 收通知 → 看评论 → 回评论 → 改代码 → 重复。一天来回浏览器和编辑器几十次,每次都打断 flow。 `gh` 是 GitHub 官方 CLI,把 PR 的整个生命周期搬到终端。 ## 解决方案 ### 装 + 登录 ```bash # macOS / Linux brew install gh # 或 Debian/Ubuntu sudo apt install gh gh auth login # 选 GitHub.com → HTTPS → 浏览器登录 ``` ### 开 PR ```bash # 推完分支后一行开 PR gh pr create --base main --title "feat: add OAuth" \ --body "Closes #42. ... ## Test plan - [ ] manual login - [ ] CI green" # 或者完全交互式 gh pr create # 自动用最近的 commit 信息预填 ``` 更便利的: ```bash # 创建 + 自动指 reviewer + draft gh pr create --draft --reviewer alice,bob,@org/backend ``` ### 看 / review / 合 ```bash # 看本仓库所有 open PR gh pr list gh pr list --author @me gh pr list --label bug # 看某 PR 详情 gh pr view 123 gh pr view 123 --web # 还是想浏览器看 # 看 diff gh pr diff 123 gh pr diff 123 --color always | less -R # checkout 这个 PR 到本地 gh pr checkout 123 # Review gh pr review 123 --approve --body "LGTM" gh pr review 123 --comment --body "some notes" gh pr review 123 --request-changes --body "..." # 看 CI 状态 gh pr checks 123 # 合并(squash + 删分支) gh pr merge 123 --squash --delete-branch # admin override (绕过 required reviews,谨慎) gh pr merge 123 --admin --squash ``` ### 看自己/团队的 PR backlog 写个 alias: ```bash alias my-prs='gh pr list --author @me --state all --limit 20' alias to-review='gh pr list --search "review-requested:@me"' alias team-prs='gh pr list --search "org:my-org is:open"' ``` `to-review` 是我每天上班第一件事 —— 一行看到所有等我 review 的 PR。 ### issue 也一并管 ```bash gh issue list --assignee @me gh issue create --title "..." --body "..." --label bug --assignee @me gh issue close 42 --comment "fixed in PR #43" ``` ### CI / workflow ```bash # 看 actions run gh run list --workflow=test.yml gh run view 8123456789 gh run watch # 等当前分支最新 run 跑完,CI 通过通知 # 触发手动 workflow gh workflow run deploy.yml -f environment=prod ``` `gh run watch` 是 release 时神器:push 后跑一行,CI 跑完自动桌面通知, 不必去浏览器盯。 ### 自动复制 PR 链接 ```bash gh pr view 123 --json url --jq .url | pbcopy # macOS gh pr view 123 --json url --jq .url | xclip # Linux ``` 写到团队 Slack 时一秒粘上 URL。 ### gh extensions ```bash gh extension install dlvhdr/gh-dash # TUI 仪表盘 gh extension install vilmibm/gh-screensaver # ASCII 烟花(彩蛋) gh extension install yusukebe/gh-markdown-preview ``` `gh dash` 是装了之后再也戒不掉的 TUI:一屏看自己的 PR / 待 review / 最近 issue,j/k 浏览,o 在浏览器打开。 ## 效果 - 开 PR 平均 < 30 秒(之前要 2-3 分钟) - review 流程不切上下文:`gh pr diff 123` + `gh pr review --approve` - CI 状态 `gh pr checks` 比刷网页快 - 每天浏览器切换次数下降 70%+ ## 踩过的坑 1. **token 权限不够**:`gh auth status` 看 scope。如果要管 GitHub Actions / Pages 等需要额外 scope,`gh auth refresh -s admin:org` 补权限。 2. **多账号**:个人 + 公司 GitHub 都用 gh,`gh auth switch` 切。 `--user` flag 也能临时指定。 3. **company GitHub Enterprise**:`gh auth login --hostname github.company.com` 单独登录;命令默认走该 host。 4. **gh pr create 不显示 default branch**:仓库设置 default branch 没 配,gh 默认拿你 push 的 branch。先 `gh repo edit --default-branch main`。 5. **`gh pr checkout` 不能 fetch 别的 fork 的 PR 时**:在公开 fork 的 PR 上 有时 git 配置不对。手动 `git fetch upstream pull/123/head:pr-123` 然后 `git checkout pr-123`。
pandas 是 2010 年代标准工具但有几个本质局限: - 单线程(GIL) - eager evaluation,复杂 pipeline 中间结果都实例化 - API 历史包袱(10 种 index、SettingWithCopyWarning、`inplace=True`) polars 是 Rust 写的列存数据框,多线程 + lazy 模式 + 干净的链式 API。 1GB+ 的 CSV 处理快 5-30 倍。 ## 安装 ```bash uv add polars pyarrow ``` `pyarrow` 不是必需但读 Parquet / Feather 时性能好。 ## 30 秒上手 ```python import polars as pl df = pl.read_csv('sales.csv') print(df.head()) print(df.schema) # 过滤 + 聚合 + 排序 result = ( df.filter(pl.col('country') == 'CN') .group_by('product') .agg( pl.col('amount').sum().alias('total'), pl.col('amount').count().alias('orders'), ) .sort('total', descending=True) .head(10) ) print(result) ``` 注意: - `pl.col('x')` 是表达式,可以组合(`pl.col('x') * 2`) - `group_by().agg()` 链式 - `.alias()` 重命名 - 不像 pandas 那样有索引;纯列数据 ## lazy 模式 ```python # 用 scan_csv 而不是 read_csv 进 lazy 模式 q = ( pl.scan_csv('sales.csv') .filter(pl.col('country') == 'CN') .group_by('product') .agg(pl.col('amount').sum()) .sort('amount', descending=True) ) # 这一步还没读文件!只构建了 query plan print(q.explain()) # 看到优化后的 plan(如 filter 下推) # 真正执行 + 取结果 result = q.collect() ``` lazy 让 polars 做查询优化:predicate pushdown、projection pushdown、 predicate fusion 等。处理 10GB CSV 时 collect 之前都不占内存。 ## Parquet:列存的好处 ```python # 写 Parquet df.write_parquet('sales.parquet', compression='zstd') # 读 df = pl.read_parquet('sales.parquet') # lazy + projection q = (pl.scan_parquet('sales.parquet') .select(['country', 'amount']) # 只读这两列 .filter(pl.col('country') == 'US') .group_by('country').agg(pl.col('amount').sum()) .collect()) ``` Parquet 列存意味着 `.select(['country', 'amount'])` 完全不读其它列。 10GB 表只读 2 列可能只 IO 1GB。 ## 性能对比 |任务(1GB CSV,5000 万行)| pandas | polars eager | polars lazy | |---|---|---|---| | 读文件 | 18s | 6s | 0.5s (scan) | | filter + groupby + agg | 25s | 4s | 3s | | 内存峰值 | 8GB | 3GB | 1.5GB | 数字会因数据 / 机器而异,但量级对。 ## 常用对照表 | pandas | polars | |---|---| | `df['col']` | `df['col']` 或 `df.get_column('col')` | | `df[df['x'] > 0]` | `df.filter(pl.col('x') > 0)` | | `df.groupby('a')['b'].sum()` | `df.group_by('a').agg(pl.col('b').sum())` | | `df.merge(other, on='id')` | `df.join(other, on='id')` | | `df['col'].str.lower()` | `df.with_columns(pl.col('col').str.to_lowercase())` | | `df.dropna()` | `df.drop_nulls()` | | `df.fillna(0)` | `df.fill_null(0)` | | `pd.concat([df1, df2])` | `pl.concat([df1, df2])` | | `df.pivot_table` | `df.pivot()` | ## 窗口函数 ```python # 按 user 排序后的 cumsum df = df.with_columns( pl.col('amount').cum_sum().over('user').alias('cum_amount') ) # 计算每行相对所在组的平均 df = df.with_columns( (pl.col('amount') / pl.col('amount').mean().over('country')).alias('rel') ) ``` `.over(...)` 是 partition by 的简洁写法。 ## 与 pandas 互转 ```python # polars → pandas pdf = df.to_pandas() # pandas → polars df2 = pl.from_pandas(pdf) ``` 适合渐进迁移:现有 pandas 流程改一段为 polars 跑性能瓶颈。 ## 与 Arrow / DuckDB 联用 polars 内部就是 Arrow 格式,零拷贝传给 DuckDB / Arrow Compute: ```python import duckdb df = pl.read_parquet('big.parquet') # 直接把 polars df 当 DuckDB 视图 result = duckdb.sql("SELECT country, SUM(amount) FROM df GROUP BY 1").pl() ``` DuckDB 跑 SQL,polars 拿结果,全程 Arrow buffer。 ## 写入数据库 ```python import polars as pl df = pl.read_csv('users.csv') df.write_database( table_name='users', connection='postgresql://localhost/mydb', if_table_exists='replace', ) ``` 底层用 ConnectorX / SQLAlchemy 写。 ## 何时仍用 pandas - ML 库强制:scikit-learn 接受 numpy / pandas,polars 要 `.to_numpy()` - 小数据 + 现有代码:pandas 足够时迁移没收益 - 复杂的 multi-level index 需求 不过 polars 团队明确目标是覆盖 pandas 90% 的用法,差距越来越小。 ## 踩过的坑 - 没有 index 的概念:以前 pandas `df.loc['2024-01-01']` 这种行为不存在。 polars 都用 column filter。 - `with_columns` 返回新 DataFrame(immutable),忘了赋值回去 `df = ...`: ```python df.with_columns(pl.col('x') * 2) # 没改 df! df = df.with_columns(pl.col('x') * 2) # 对 ``` - group_by 的 Python 列表语法 → 改成表达式: ```python # pandas df.groupby(['a', 'b']).agg({'c': 'sum'}) # polars df.group_by(['a', 'b']).agg(pl.col('c').sum()) ``` - 日期解析自动推断不总是对:`pl.read_csv(..., try_parse_dates=True)` 或者显式 `pl.col('date').str.to_datetime('%Y-%m-%d')`。