ML feature store:自己搭轻量版(不上 Feast / Tecton)

起因

ML 项目 6 个月后症状:

  • train code 跑 feature engineering 一遍
  • predict serve code 跑 feature engineering 另一遍
  • 两套实现微妙不一致 → "线上线下不一致 (train-serve skew)"
  • 跨 model 重复 feature 算 N 次

Feature store 解决:

  1. feature 定义统一(train 跟 serve 同源)
  2. 离线(batch)+ 在线(low-latency)双重 storage
  3. point-in-time 正确性(避免 future leakage)

工业级方案:Feast / Tecton / Hopsworks。重 + 学习曲线陡。
中小项目自己搭一个轻量的够用。

轻量设计

[raw data 表]
    ↓
[feature transformation SQL / Python]  ← 唯一定义源
    ↓
batch ──→ [feature 表(PG / Snowflake)]   ← train 时读
realtime → [Redis]                          ← serve 时读

关键:transformation 写一次,batch 和 realtime 两边都跑同样逻辑。

实现:transformation as dbt model

-- models/features/user_features.sql
{{ config(materialized='incremental') }}

SELECT
    user_id,
    -- profile
    DATE_PART('year', CURRENT_DATE - birthdate) AS age,
    country,
    plan,
    -- recent activity (30 days)
    (SELECT COUNT(*) FROM events
     WHERE events.user_id = u.user_id
       AND created_at > CURRENT_DATE - INTERVAL '30 days') AS events_30d,
    (SELECT AVG(amount) FROM orders
     WHERE orders.user_id = u.user_id
       AND created_at > CURRENT_DATE - INTERVAL '30 days') AS avg_order_30d,
    CURRENT_TIMESTAMP AS computed_at
FROM users u
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(computed_at) FROM {{ this }})
{% endif %}

dbt run 每小时跑一次 → user_features 表更新。

sync to Redis

# tasks/sync_features_to_redis.py
import pandas as pd
import redis
import json

r = redis.Redis()
df = pd.read_sql("SELECT * FROM user_features WHERE computed_at > NOW() - INTERVAL '1 hour'", pg)

with r.pipeline() as pipe:
    for _, row in df.iterrows():
        key = f"features:user:{row.user_id}"
        pipe.hset(key, mapping=row.drop(['user_id', 'computed_at']).to_dict())
        pipe.expire(key, 86400 * 2)        # 2 day TTL
    pipe.execute()

每小时跑 → Redis 永远有"最新批次"的 feature。
预测时 Redis HGETALL 一下 → 几 ms。

train 时用

# 离线 train
features_df = pd.read_sql("""
    SELECT * FROM user_features
    WHERE computed_at <= '2025-04-30'    -- 训练数据截止日
""", pg)

labels_df = pd.read_sql("""
    SELECT user_id, churned
    FROM user_outcomes
    WHERE outcome_date BETWEEN '2025-04-30' AND '2025-05-30'
""", pg)

df = features_df.merge(labels_df, on='user_id')
X, y = df.drop('churned', axis=1), df.churned

model.fit(X, y)

serve 时用

# 在线 predict
def predict_churn(user_id):
    features = r.hgetall(f'features:user:{user_id}')
    features = {k.decode(): float(v) for k, v in features.items()}
    X = pd.DataFrame([features])[FEATURE_COLUMNS]
    return model.predict(X)[0]

FEATURE_COLUMNS 是 train 时保存的 column 顺序。
顺序一致 + Redis 来的 feature 跟 train 同源 → 无 skew。

point-in-time

train 时不要让 feature 包含 label 之后的数据(leakage):

-- bad
SELECT user_id, COUNT(*) FROM events WHERE user_id = u.user_id
-- 没限时间 → 包含 label 期之后

-- good
SELECT user_id, COUNT(*) FROM events
WHERE user_id = u.user_id AND created_at < {{ label_date }}

更严格的 point-in-time:feature value 用 label 时点的 snapshot:

SELECT user_id, label,
       (SELECT events_30d FROM user_features
        WHERE user_id = labels.user_id
          AND computed_at < labels.label_date
        ORDER BY computed_at DESC LIMIT 1) AS events_30d
FROM labels;

每个 label 取它"那时" 的 feature。

monitoring

每天对比 train vs serve feature 分布:

def check_skew():
    train_df = pd.read_sql("SELECT * FROM user_features TABLESAMPLE 1%", pg)
    serve_keys = r.scan_iter('features:user:*', count=10000)
    serve_df = pd.DataFrame([...])     # 拉 Redis 几千 sample

    for col in FEATURE_COLUMNS:
        train_mean = train_df[col].mean()
        serve_mean = serve_df[col].mean()
        skew = abs(train_mean - serve_mean) / train_mean
        if skew > 0.1:
            alert(f'{col} skew {skew:.2%}')

10% 飘移 → 告警。

何时升级到 Feast

我们轻量方案撑到:

  • ~50 feature
  • 1 个 model in production
  • batch update 频率小时级
  • 单团队

超过这规模:

  • 多 model 共享 feature
  • 多团队
  • 需要 streaming feature(秒级更新)
  • 严格 point-in-time correctness 自动验证

→ 上 Feast。但 Feast 学习曲线 + 运维成本,不是必要别强上。

与 in-line 计算对比

自己算 feature store
实现 简单 复杂
skew 风险
latency 中(每次 query DB) 低(Redis)
复用 0
适合 < 5 model 5+ model

第一个 model 直接 in-line 算。
第二个 model 用相同 feature → 抽出 user_features 表。
第三个 model → 上轻量 store。
N 个 → Feast。

真实 case

churn model 上线半年:

  • 一开始 in-line 算 feature
  • predict serve 跑 30 秒 / user(heavy SQL JOIN)
  • 改 feature 表(dbt)+ Redis cache
  • 跌到 5 ms / user
  • 同时 train script 用相同表 → 删除重复 code
  • 半年后第二个 model(LTV prediction)用同 feature 表 → 立即 reuse

踩过的坑

  1. dtype 不一致:Redis 存 string,train 时 pandas 是 int → cast
    错 → predict 0% 。serve code 严格 cast + 类型检查。

  2. feature drift 没监控:上线半年后 model 慢慢失效,原因
    feature distribution 变了没察觉。每周 skew check 必要。

  3. feature 名字大小写:Redis HSET key 大小写敏感。train 大写
    serve 小写 → KeyError。规范一律 snake_case。

  4. Redis TTL:feature 旧但不 expire → 用陈旧 feature 预测 → 越
    错越多。明确 TTL + sync 频率 > TTL 1/2。

  5. 新 feature 加 + 老 model 还要跑:feature 表加列,老 model
    的 FEATURE_COLUMNS 不变(model 还按老 schema 输入)。
    model artifact 跟 schema 绑定。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。