起因
ML 项目 6 个月后症状:
- train code 跑 feature engineering 一遍
- predict serve code 跑 feature engineering 另一遍
- 两套实现微妙不一致 → "线上线下不一致 (train-serve skew)"
- 跨 model 重复 feature 算 N 次
Feature store 解决:
- feature 定义统一(train 跟 serve 同源)
- 离线(batch)+ 在线(low-latency)双重 storage
- point-in-time 正确性(避免 future leakage)
工业级方案:Feast / Tecton / Hopsworks。重 + 学习曲线陡。
中小项目自己搭一个轻量的够用。
轻量设计
[raw data 表]
↓
[feature transformation SQL / Python] ← 唯一定义源
↓
batch ──→ [feature 表(PG / Snowflake)] ← train 时读
realtime → [Redis] ← serve 时读
关键:transformation 写一次,batch 和 realtime 两边都跑同样逻辑。
实现:transformation as dbt model
-- models/features/user_features.sql
{{ config(materialized='incremental') }}
SELECT
user_id,
-- profile
DATE_PART('year', CURRENT_DATE - birthdate) AS age,
country,
plan,
-- recent activity (30 days)
(SELECT COUNT(*) FROM events
WHERE events.user_id = u.user_id
AND created_at > CURRENT_DATE - INTERVAL '30 days') AS events_30d,
(SELECT AVG(amount) FROM orders
WHERE orders.user_id = u.user_id
AND created_at > CURRENT_DATE - INTERVAL '30 days') AS avg_order_30d,
CURRENT_TIMESTAMP AS computed_at
FROM users u
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(computed_at) FROM {{ this }})
{% endif %}
dbt run 每小时跑一次 → user_features 表更新。
sync to Redis
# tasks/sync_features_to_redis.py
import pandas as pd
import redis
import json
r = redis.Redis()
df = pd.read_sql("SELECT * FROM user_features WHERE computed_at > NOW() - INTERVAL '1 hour'", pg)
with r.pipeline() as pipe:
for _, row in df.iterrows():
key = f"features:user:{row.user_id}"
pipe.hset(key, mapping=row.drop(['user_id', 'computed_at']).to_dict())
pipe.expire(key, 86400 * 2) # 2 day TTL
pipe.execute()
每小时跑 → Redis 永远有"最新批次"的 feature。
预测时 Redis HGETALL 一下 → 几 ms。
train 时用
# 离线 train
features_df = pd.read_sql("""
SELECT * FROM user_features
WHERE computed_at <= '2025-04-30' -- 训练数据截止日
""", pg)
labels_df = pd.read_sql("""
SELECT user_id, churned
FROM user_outcomes
WHERE outcome_date BETWEEN '2025-04-30' AND '2025-05-30'
""", pg)
df = features_df.merge(labels_df, on='user_id')
X, y = df.drop('churned', axis=1), df.churned
model.fit(X, y)
serve 时用
# 在线 predict
def predict_churn(user_id):
features = r.hgetall(f'features:user:{user_id}')
features = {k.decode(): float(v) for k, v in features.items()}
X = pd.DataFrame([features])[FEATURE_COLUMNS]
return model.predict(X)[0]
FEATURE_COLUMNS 是 train 时保存的 column 顺序。
顺序一致 + Redis 来的 feature 跟 train 同源 → 无 skew。
point-in-time
train 时不要让 feature 包含 label 之后的数据(leakage):
-- bad
SELECT user_id, COUNT(*) FROM events WHERE user_id = u.user_id
-- 没限时间 → 包含 label 期之后
-- good
SELECT user_id, COUNT(*) FROM events
WHERE user_id = u.user_id AND created_at < {{ label_date }}
更严格的 point-in-time:feature value 用 label 时点的 snapshot:
SELECT user_id, label,
(SELECT events_30d FROM user_features
WHERE user_id = labels.user_id
AND computed_at < labels.label_date
ORDER BY computed_at DESC LIMIT 1) AS events_30d
FROM labels;
每个 label 取它"那时" 的 feature。
monitoring
每天对比 train vs serve feature 分布:
def check_skew():
train_df = pd.read_sql("SELECT * FROM user_features TABLESAMPLE 1%", pg)
serve_keys = r.scan_iter('features:user:*', count=10000)
serve_df = pd.DataFrame([...]) # 拉 Redis 几千 sample
for col in FEATURE_COLUMNS:
train_mean = train_df[col].mean()
serve_mean = serve_df[col].mean()
skew = abs(train_mean - serve_mean) / train_mean
if skew > 0.1:
alert(f'{col} skew {skew:.2%}')
10% 飘移 → 告警。
何时升级到 Feast
我们轻量方案撑到:
- ~50 feature
- 1 个 model in production
- batch update 频率小时级
- 单团队
超过这规模:
- 多 model 共享 feature
- 多团队
- 需要 streaming feature(秒级更新)
- 严格 point-in-time correctness 自动验证
→ 上 Feast。但 Feast 学习曲线 + 运维成本,不是必要别强上。
与 in-line 计算对比
| 自己算 | feature store | |
|---|---|---|
| 实现 | 简单 | 复杂 |
| skew 风险 | 高 | 低 |
| latency | 中(每次 query DB) | 低(Redis) |
| 复用 | 0 | 高 |
| 适合 | < 5 model | 5+ model |
第一个 model 直接 in-line 算。
第二个 model 用相同 feature → 抽出 user_features 表。
第三个 model → 上轻量 store。
N 个 → Feast。
真实 case
churn model 上线半年:
- 一开始 in-line 算 feature
- predict serve 跑 30 秒 / user(heavy SQL JOIN)
- 改 feature 表(dbt)+ Redis cache
- 跌到 5 ms / user
- 同时 train script 用相同表 → 删除重复 code
- 半年后第二个 model(LTV prediction)用同 feature 表 → 立即 reuse
踩过的坑
-
dtype 不一致:Redis 存 string,train 时 pandas 是 int → cast
错 → predict 0% 。serve code 严格 cast + 类型检查。 -
feature drift 没监控:上线半年后 model 慢慢失效,原因
feature distribution 变了没察觉。每周 skew check 必要。 -
feature 名字大小写:Redis HSET key 大小写敏感。train 大写
serve 小写 → KeyError。规范一律 snake_case。 -
Redis TTL:feature 旧但不 expire → 用陈旧 feature 预测 → 越
错越多。明确 TTL + sync 频率 > TTL 1/2。 -
新 feature 加 + 老 model 还要跑:feature 表加列,老 model
的 FEATURE_COLUMNS 不变(model 还按老 schema 输入)。
model artifact 跟 schema 绑定。
登录后参与评论。