scikit-learn Pipeline + ColumnTransformer 把"训练泄漏"杀掉

起因

ML 新手最容易写出"data leakage"——把测试集的统计信息(均值 / 编码 /
缺失值填补)混到训练里。常见错法:

# ❌ 整个数据集上算均值标准差,然后切分
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler().fit(X)
X = scaler.transform(X)
X_train, X_test, y_train, y_test = train_test_split(X, y)

测试集的均值"漏"进了 scaler,cross-validation 分数偏高,部署到真实
新数据立刻拉胯。

Pipeline + ColumnTransformer 强制把所有 preprocessing 关在 cv split
内部,自动避免泄漏。

解决方案

1. 简单 Pipeline

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('clf', LogisticRegression()),
])

scores = cross_val_score(pipe, X, y, cv=5)
print(f'mean acc: {scores.mean():.3f} ± {scores.std():.3f}')

每折 cv 时 pipeline 重新 fit scaler,只在那一折的训练数据上算
mean / std。没有泄漏。

2. 混合类型特征:ColumnTransformer

数据集常常包含 numerical + categorical 两类:

import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

df = pd.read_csv('users.csv')
y = df.pop('churn')
X = df

num_cols = ['age', 'days_active', 'monthly_revenue']
cat_cols = ['country', 'plan', 'referral_source']

preprocessor = ColumnTransformer([
    ('num', Pipeline([
        ('imp', SimpleImputer(strategy='median')),
        ('sc', StandardScaler()),
    ]), num_cols),
    ('cat', Pipeline([
        ('imp', SimpleImputer(strategy='constant', fill_value='missing')),
        ('oh', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
    ]), cat_cols),
], remainder='drop')

full_pipe = Pipeline([
    ('prep', preprocessor),
    ('clf', GradientBoostingClassifier()),
])

scores = cross_val_score(full_pipe, X, y, cv=5, scoring='roc_auc')

要点:

  • 每列类型独立配 imputer + encoder/scaler
  • handle_unknown='ignore':测试集里出现训练集没见过的 category 值不报错
  • remainder='drop' 显式:未声明的列扔掉(safer than passthrough

3. 超参搜索:所有阶段一起搜

from sklearn.model_selection import GridSearchCV

param_grid = {
    'prep__num__imp__strategy': ['mean', 'median'],
    'clf__n_estimators': [100, 200, 500],
    'clf__max_depth': [3, 5, 7],
}

search = GridSearchCV(full_pipe, param_grid, cv=5, scoring='roc_auc',
                      n_jobs=-1, verbose=1)
search.fit(X, y)
print(search.best_params_, search.best_score_)

注意命名:stage_name__sub_stage__param,双下划线层级。

n_jobs=-1 用所有 CPU 核并行 fit 各组合。

4. 保存 / 加载整个 pipeline

import joblib

joblib.dump(search.best_estimator_, 'model.pkl')

# 部署侧
model = joblib.load('model.pkl')
y_pred = model.predict(new_X)
# 自动跑 preprocessor → estimator,新数据原始 DataFrame 进去

整套 preprocessing + 模型在一个文件里,不需要"先 scale 再 predict"
两步走,部署更安全。

5. 自定义 transformer

业务相关的 feature engineering 包成 transformer:

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class LogRatio(BaseEstimator, TransformerMixin):
    """两列做 log(a/b) 派生"""
    def __init__(self, num_col, denom_col):
        self.num_col, self.denom_col = num_col, denom_col

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        ratio = (X[self.num_col] + 1) / (X[self.denom_col] + 1)
        return np.log(ratio).to_frame('log_ratio')

# 加进 ColumnTransformer
preprocessor = ColumnTransformer([
    ('lr', LogRatio('revenue', 'days_active'), ['revenue', 'days_active']),
    ('num', ..., num_cols),
    ('cat', ..., cat_cols),
])

业务知识沉淀进 pipeline,每次实验都用。

效果

  • cross-validation 分数与真实 holdout / 线上 A/B 偏差 < 1%
    (之前漏了泄漏时偏差 5-10%)
  • 部署只 joblib.load + predict,前后端不再重复实现 preprocessing
  • 改特征工程只动 pipeline 定义,下游所有实验自动跟着
  • 团队新同事接手时一个 .pkl 文件即可,无须读 100 行 prep 脚本

踩过的坑

  1. drop='first' 在 OneHotEncoder:drop 第一个 category 用于 linear
    model 避免共线性,但 tree-based model 不需要。GBT / RF 用 drop=None
    信息更全。

  2. categorical 列只出现在测试集的新值handle_unknown='error' 默认
    会报错。生产用 handle_unknown='ignore' 当全 0 处理;或 'infrequent_if_exist'
    归入 "rare" 桶。

  3. SimpleImputer 把整数列变浮点:填了 median 后 int → float。
    下游模型可能内存翻倍。pandas 2.0+ 用 nullable Int 类型避开。

  4. 大 categorical 高基数(user_id)做 OneHot:维度爆炸。
    用 target encoding / hashing trick / embeddings。

  5. fit_transform(X_train) + transform(X_test) 写错顺序
    测试集 fit_transform = 灾难。强迫自己用 Pipeline 就规避了
    —— 你只 pipe.fit(X_train) + pipe.predict(X_test),永远不会写
    到 transform 这个层。

精确评价 共 0 人评价
可复现性
可复现 · 0 不可复现 · 0
文风
文风流畅 · 0 文风晦涩 · 0
立场
支持 · 0 反对 · 0

登录后即可对本帖作出评价。

评论区 0 条 · 所有人可在此交流

登录后参与评论。

还没有评论,来说两句。