# Python特征工程实战:用Pandas和Scikit-learn搞定数据预处理(附代码示例)
如果你在数据科学或机器学习领域摸爬滚打过一段时间,一定会认同这句话:**数据和特征决定了机器学习的上限,而模型和算法只是逼近这个上限的工具**。我见过太多团队花费数月时间调优复杂模型,却只换来几个百分点的提升;也见过一些看似简单的模型,因为特征工程做得好,在真实业务中表现惊人。
特征工程不是一门精确的科学,更像是一门艺术。它需要你对数据有敏锐的直觉,对业务有深刻的理解,还要有足够的耐心去尝试和验证。今天,我想和你分享的,不是教科书上的理论,而是我在实际项目中反复验证过的、真正有效的特征工程实战技巧。我们会用Pandas和Scikit-learn这两个Python核心库,一步步解决数据预处理中的实际问题。
## 1. 数据清洗:从脏数据到干净数据
拿到原始数据后的第一步永远是清洗。这一步看似基础,却决定了后续所有工作的质量。我习惯把数据清洗分成三个层次:**表面清理**、**深层修复**和**一致性检查**。
### 1.1 缺失值处理:不只是填充那么简单
缺失值处理是数据清洗中最常见的任务。很多人一看到缺失值就想着用均值、中位数或众数填充,但这往往是最糟糕的选择之一。让我分享一个真实的案例:曾经处理过一个电商用户数据集,其中“年龄”字段有30%的缺失。如果简单用均值填充,会导致30岁左右的用户数量异常增多,完全扭曲了真实的年龄分布。
**正确的做法是先分析缺失模式**:
```python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# 加载数据
df = pd.read_csv('user_data.csv')
# 查看缺失情况
missing_summary = pd.DataFrame({
'缺失数量': df.isnull().sum(),
'缺失比例': df.isnull().sum() / len(df) * 100,
'数据类型': df.dtypes
})
print(missing_summary[missing_summary['缺失数量'] > 0])
```
根据缺失比例和模式,我通常采用分层策略:
| 缺失比例 | 处理策略 | 适用场景 |
|---------|---------|---------|
| < 5% | 直接删除缺失行 | 数据量足够大,缺失随机 |
| 5%-30% | 基于其他特征预测填充 | 缺失有规律可循 |
| > 30% | 考虑删除特征或使用特殊标记 | 信息量不足 |
对于需要填充的情况,Scikit-learn的`SimpleImputer`提供了多种策略:
```python
from sklearn.impute import SimpleImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
# 简单填充策略
imputer_mean = SimpleImputer(strategy='mean')
imputer_median = SimpleImputer(strategy='median')
imputer_most_frequent = SimpleImputer(strategy='most_frequent')
# 对于数值型特征,我更喜欢用迭代填充
iterative_imputer = IterativeImputer(
max_iter=10,
random_state=42,
initial_strategy='median'
)
df_numeric = df.select_dtypes(include=[np.number])
df_imputed = pd.DataFrame(
iterative_imputer.fit_transform(df_numeric),
columns=df_numeric.columns
)
```
> **注意**:填充缺失值时,一定要将训练集和测试集分开处理。用训练集的统计量(均值、中位数等)来填充测试集,避免数据泄露。
### 1.2 异常值检测:是噪声还是信号?
异常值处理需要格外小心。有些异常值是数据录入错误,需要修正或删除;有些却是重要的业务信号。比如在金融风控中,一笔异常大的交易可能是欺诈行为,直接删除会丢失关键信息。
我常用的异常值检测方法有几种:
**基于统计的方法**:
```python
def detect_outliers_iqr(df, column, threshold=1.5):
"""使用IQR方法检测异常值"""
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
return outliers, lower_bound, upper_bound
# 可视化异常值
def plot_outliers(df, column):
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# 箱线图
axes[0].boxplot(df[column].dropna())
axes[0].set_title(f'{column} - 箱线图')
# 分布图
axes[1].hist(df[column].dropna(), bins=50, edgecolor='black')
axes[1].axvline(df[column].mean(), color='red', linestyle='--', label='均值')
axes[1].axvline(df[column].median(), color='green', linestyle='--', label='中位数')
axes[1].legend()
axes[1].set_title(f'{column} - 分布图')
plt.tight_layout()
plt.show()
```
**基于模型的方法**对于复杂场景更有效:
```python
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
# Isolation Forest
iso_forest = IsolationForest(
contamination=0.1, # 预期异常值比例
random_state=42,
n_estimators=100
)
outlier_labels = iso_forest.fit_predict(df_numeric)
# Local Outlier Factor (LOF)
lof = LocalOutlierFactor(
contamination=0.1,
novelty=False
)
lof_labels = lof.fit_predict(df_numeric)
# 标记异常值
df['is_outlier_iso'] = outlier_labels == -1
df['is_outlier_lof'] = lof_labels == -1
```
处理异常值时,我通常遵循这样的决策流程:
1. **业务判断**:与业务专家讨论,确认异常值是否合理
2. **影响评估**:计算异常值对模型的影响(如删除前后的模型性能对比)
3. **选择性处理**:
- 明显错误的数据:修正或删除
- 合理但极端的数据:保留但考虑使用鲁棒性更强的模型
- 不确定的数据:创建新特征标记异常,让模型自己学习
## 2. 特征编码:让计算机理解分类数据
分类数据编码是特征工程中的关键一步。选择错误的编码方式可能导致模型性能大幅下降,甚至引入虚假的相关性。
### 2.1 分类编码的实战选择
**独热编码(One-Hot Encoding)** 是最常用的方法,但很多人不知道它的陷阱。当类别数量很多时(比如邮政编码、产品ID),独热编码会产生大量稀疏特征,不仅增加计算负担,还可能引发维度灾难。
```python
from sklearn.preprocessing import OneHotEncoder
import pandas as pd
# 创建示例数据
df_categorical = pd.DataFrame({
'city': ['北京', '上海', '广州', '深圳', '北京', '上海'],
'category': ['A', 'B', 'C', 'A', 'B', 'C'],
'size': ['S', 'M', 'L', 'XL', 'S', 'M']
})
# 基础独热编码
ohe = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
encoded = ohe.fit_transform(df_categorical[['city']])
encoded_df = pd.DataFrame(
encoded,
columns=ohe.get_feature_names_out(['city'])
)
print(f"原始特征数: 1")
print(f"编码后特征数: {encoded_df.shape[1]}")
```
对于高基数分类特征,我推荐几种替代方案:
**目标编码(Target Encoding)**:
```python
from category_encoders import TargetEncoder
import numpy as np
# 假设我们有目标变量
np.random.seed(42)
df_categorical['target'] = np.random.randint(0, 2, size=len(df_categorical))
# 目标编码
te = TargetEncoder(cols=['city'])
df_encoded = te.fit_transform(df_categorical[['city']], df_categorical['target'])
# 添加平滑防止过拟合
def target_encode_with_smoothing(df, col, target, m=10):
"""带平滑的目标编码"""
# 计算全局均值
global_mean = df[target].mean()
# 计算每个类别的统计量
agg = df.groupby(col)[target].agg(['count', 'mean'])
counts = agg['count']
means = agg['mean']
# 计算平滑后的编码
smooth = (counts * means + m * global_mean) / (counts + m)
return df[col].map(smooth)
df_categorical['city_encoded'] = target_encode_with_smoothing(
df_categorical, 'city', 'target', m=5
)
```
**频率编码**对于某些场景也很有效:
```python
# 频率编码
frequency_map = df_categorical['city'].value_counts(normalize=True).to_dict()
df_categorical['city_freq'] = df_categorical['city'].map(frequency_map)
```
### 2.2 有序分类的特殊处理
对于有序分类(如评分:差、中、好),标签编码可能引入错误的距离关系。更好的方法是使用**序数编码**或**自定义映射**:
```python
# 自定义有序编码
size_mapping = {
'XS': 1,
'S': 2,
'M': 3,
'L': 4,
'XL': 5,
'XXL': 6
}
df_categorical['size_ordinal'] = df_categorical['size'].map(size_mapping)
# 或者使用scikit-learn的OrdinalEncoder
from sklearn.preprocessing import OrdinalEncoder
# 定义类别顺序
size_categories = [['XS', 'S', 'M', 'L', 'XL', 'XXL']]
ordinal_encoder = OrdinalEncoder(categories=size_categories)
df_categorical['size_encoded'] = ordinal_encoder.fit_transform(
df_categorical[['size']]
)
```
## 3. 数值特征处理:尺度、分布与非线性关系
数值特征的处理直接影响线性模型和距离基模型的性能。我经常看到初学者直接使用原始数值特征,结果模型收敛缓慢,性能也不理想。
### 3.1 标准化与归一化的选择
很多人分不清标准化(Standardization)和归一化(Normalization)的区别。简单来说:
- **标准化**:将数据转换为均值为0、标准差为1的分布,适合大多数场景
- **归一化**:将数据缩放到[0, 1]或[-1, 1]范围,适合需要固定输入范围的模型
```python
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
import numpy as np
# 生成示例数据
np.random.seed(42)
data = np.random.exponential(scale=2, size=(1000, 1))
data_with_outliers = np.concatenate([data, np.array([[50], [100]])])
# 不同缩放方法对比
scalers = {
'原始数据': None,
'标准化(StandardScaler)': StandardScaler(),
'最小最大缩放(MinMaxScaler)': MinMaxScaler(),
'鲁棒缩放(RobustScaler)': RobustScaler(),
'对数变换': None
}
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
axes = axes.flatten()
for i, (name, scaler) in enumerate(scalers.items()):
ax = axes[i]
if name == '原始数据':
scaled_data = data_with_outliers
elif name == '对数变换':
scaled_data = np.log1p(data_with_outliers)
else:
scaled_data = scaler.fit_transform(data_with_outliers)
ax.hist(scaled_data, bins=50, edgecolor='black', alpha=0.7)
ax.set_title(name)
ax.set_xlabel('值')
ax.set_ylabel('频数')
plt.tight_layout()
plt.show()
```
选择缩放方法时,我参考这个决策表:
| 数据特点 | 推荐方法 | 理由 |
|---------|---------|------|
| 近似正态分布 | StandardScaler | 保持标准正态特性 |
| 有界数据(如图像像素) | MinMaxScaler | 固定输入范围 |
| 存在异常值 | RobustScaler | 使用中位数和IQR,对异常值不敏感 |
| 严重偏态分布 | 先做对数/Box-Cox变换 | 使分布更对称 |
### 3.2 处理偏态分布
金融数据、用户行为数据经常呈现严重的偏态分布。直接使用这些数据,模型可能会被少数极端值主导。
**对数变换**是最简单有效的方法:
```python
# 对数变换的几种变体
def apply_log_transforms(series):
"""应用不同的对数变换"""
transforms = {}
# 1. 标准对数变换(要求值>0)
transforms['log'] = np.log(series[series > 0])
# 2. log1p变换(处理0值)
transforms['log1p'] = np.log1p(series)
# 3. 符号+对数变换(处理负值)
transforms['signed_log'] = np.sign(series) * np.log1p(np.abs(series))
return transforms
# Box-Cox变换(更通用的幂变换)
from scipy import stats
def boxcox_transform(series, plot=True):
"""Box-Cox变换,自动寻找最优lambda"""
# 确保数据为正
positive_data = series[series > 0] + 1e-6
# 寻找最优lambda
transformed_data, fitted_lambda = stats.boxcox(positive_data)
if plot:
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
# 原始分布
axes[0].hist(series, bins=50, edgecolor='black', alpha=0.7)
axes[0].set_title('原始分布')
axes[0].set_xlabel('值')
axes[0].set_ylabel('频数')
# 变换后分布
axes[1].hist(transformed_data, bins=50, edgecolor='black', alpha=0.7)
axes[1].set_title(f'Box-Cox变换后 (λ={fitted_lambda:.3f})')
axes[1].set_xlabel('变换后的值')
axes[1].set_ylabel('频数')
plt.tight_layout()
plt.show()
return transformed_data, fitted_lambda
```
### 3.3 分箱(Binning)策略
分箱可以将连续变量转换为分类变量,处理非线性关系,还能增强模型的鲁棒性。但分箱的关键在于**如何确定分箱边界**。
**等宽分箱 vs 等频分箱**:
```python
def compare_binning_strategies(data, n_bins=5):
"""比较不同分箱策略"""
# 等宽分箱
equal_width_bins = pd.cut(data, bins=n_bins, labels=False)
# 等频分箱(分位数分箱)
equal_freq_bins = pd.qcut(data, q=n_bins, labels=False, duplicates='drop')
# 基于聚类的分箱
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=n_bins, random_state=42, n_init=10)
cluster_bins = kmeans.fit_predict(data.values.reshape(-1, 1))
# 可视化比较
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
strategies = [
('等宽分箱', equal_width_bins),
('等频分箱', equal_freq_bins),
('聚类分箱', cluster_bins)
]
for idx, (title, bins) in enumerate(strategies):
ax = axes[idx]
for bin_num in range(n_bins):
bin_data = data[bins == bin_num]
ax.hist(bin_data, bins=30, alpha=0.5, label=f'箱{bin_num}')
ax.set_title(title)
ax.set_xlabel('值')
ax.set_ylabel('频数')
ax.legend()
plt.tight_layout()
plt.show()
return strategies
```
在实际项目中,我通常结合业务知识进行分箱。比如对用户年龄分箱时,不会简单按等宽或等频,而是参考人口统计学的标准分组:18-24(学生)、25-34(年轻职场人)、35-44(中年)、45-54(中老年)、55+(老年)。
## 4. 特征构造:从现有数据中挖掘黄金
特征构造是特征工程中最有创造性的部分。好的特征构造能让模型性能提升一个档次。我把它分为几个层次:**基础组合**、**领域知识驱动**和**自动化生成**。
### 4.1 基础特征组合
最简单的特征构造就是现有特征的加减乘除。但即使是简单的运算,也需要有业务意义。
```python
def create_basic_interactions(df):
"""创建基础交互特征"""
new_features = pd.DataFrame(index=df.index)
# 数值特征的四则运算
numeric_cols = df.select_dtypes(include=[np.number]).columns
# 创建所有两两组合的加减乘除特征
for i, col1 in enumerate(numeric_cols):
for col2 in numeric_cols[i+1:]:
# 加法
new_features[f'{col1}_plus_{col2}'] = df[col1] + df[col2]
# 减法
new_features[f'{col1}_minus_{col2}'] = df[col1] - df[col2]
# 乘法(交互项)
new_features[f'{col1}_times_{col2}'] = df[col1] * df[col2]
# 除法(避免除0)
mask = df[col2] != 0
new_features[f'{col1}_div_{col2}'] = np.where(
mask, df[col1] / df[col2], np.nan
)
return new_features
# 多项式特征(自动生成)
from sklearn.preprocessing import PolynomialFeatures
def create_polynomial_features(df, degree=2, interaction_only=False):
"""创建多项式特征"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
numeric_data = df[numeric_cols].fillna(0)
poly = PolynomialFeatures(
degree=degree,
interaction_only=interaction_only,
include_bias=False
)
poly_features = poly.fit_transform(numeric_data)
feature_names = poly.get_feature_names_out(numeric_cols)
poly_df = pd.DataFrame(poly_features, columns=feature_names, index=df.index)
# 移除原始特征(避免重复)
poly_df = poly_df[[col for col in poly_df.columns
if col not in numeric_cols]]
return poly_df
```
### 4.2 基于领域知识的特征构造
这是特征工程中最有价值的部分。需要深入理解业务,才能构造出有意义的特征。
以电商场景为例:
```python
def create_ecommerce_features(df):
"""电商场景特征构造"""
features = pd.DataFrame(index=df.index)
# 1. 用户行为特征
if all(col in df.columns for col in ['view_count', 'cart_count', 'purchase_count']):
# 转化率特征
features['view_to_cart_rate'] = df['cart_count'] / (df['view_count'] + 1)
features['cart_to_purchase_rate'] = df['purchase_count'] / (df['cart_count'] + 1)
features['view_to_purchase_rate'] = df['purchase_count'] / (df['view_count'] + 1)
# 行为集中度
total_actions = df[['view_count', 'cart_count', 'purchase_count']].sum(axis=1)
for col in ['view_count', 'cart_count', 'purchase_count']:
features[f'{col}_ratio'] = df[col] / (total_actions + 1)
# 2. 时间特征
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'])
# 时间分解
features['hour'] = df['timestamp'].dt.hour
features['day_of_week'] = df['timestamp'].dt.dayofweek
features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)
features['month'] = df['timestamp'].dt.month
features['quarter'] = df['timestamp'].dt.quarter
# 是否为购物高峰时段(假设9-12, 14-17, 20-22)
features['is_peak_hour'] = (
(features['hour'].between(9, 12)) |
(features['hour'].between(14, 17)) |
(features['hour'].between(20, 22))
).astype(int)
# 3. 价格敏感度特征
if all(col in df.columns for col in ['avg_price_viewed', 'avg_price_purchased']):
features['price_sensitivity'] = (
df['avg_price_purchased'] / (df['avg_price_viewed'] + 1e-6)
)
features['premium_indicator'] = (
df['avg_price_purchased'] > df['avg_price_viewed'] * 1.2
).astype(int)
# 4. 复购特征
if 'days_since_last_purchase' in df.columns and 'purchase_frequency' in df.columns:
features['purchase_regularity'] = 1 / (df['days_since_last_purchase'] + 1)
features['is_frequent_buyer'] = (
df['purchase_frequency'] > df['purchase_frequency'].median()
).astype(int)
return features
```
### 4.3 自动化特征生成工具
对于大规模特征工程,可以借助一些自动化工具。虽然不能完全替代人工,但能大大提高效率。
```python
# 使用featuretools进行自动化特征生成
import featuretools as ft
def automated_feature_engineering(df, entity_id='user_id', time_index='timestamp'):
"""使用featuretools进行自动化特征工程"""
# 创建实体集
es = ft.EntitySet(id='transactions')
# 添加数据框作为实体
es = es.add_dataframe(
dataframe_name='transactions',
dataframe=df,
index='transaction_id',
time_index=time_index,
logical_types={
'user_id': ft.logical_types.Categorical,
'product_id': ft.logical_types.Categorical,
'category': ft.logical_types.Categorical,
'amount': ft.logical_types.Double,
'quantity': ft.logical_types.Integer
}
)
# 定义关系
es = es.normalize_dataframe(
base_dataframe_name='transactions',
new_dataframe_name='users',
index='user_id'
)
es = es.normalize_dataframe(
base_dataframe_name='transactions',
new_dataframe_name='products',
index='product_id'
)
# 自动生成特征
feature_matrix, feature_defs = ft.dfs(
entityset=es,
target_dataframe_name='users',
agg_primitives=['sum', 'mean', 'count', 'min', 'max', 'std', 'skew'],
trans_primitives=['month', 'weekday', 'is_weekend'],
max_depth=2,
verbose=True
)
return feature_matrix, feature_defs
# 使用tsfresh处理时间序列特征
from tsfresh import extract_features
from tsfresh.utilities.dataframe_functions import roll_time_series
def extract_time_series_features(df, column_id='user_id', column_sort='timestamp'):
"""提取时间序列特征"""
# 滚动时间窗口
df_rolled = roll_time_series(
df,
column_id=column_id,
column_sort=column_sort,
max_timeshift=7,
min_timeshift=1
)
# 提取特征
extracted_features = extract_features(
df_rolled,
column_id=column_id,
column_sort=column_sort,
default_fc_parameters=tsfresh.feature_extraction.MinimalFCParameters()
)
return extracted_features
```
## 5. 特征选择:去芜存菁的艺术
特征不是越多越好。过多的特征会增加模型复杂度,可能导致过拟合,还会增加计算成本。特征选择的目标是找到最有信息量的特征子集。
### 5.1 过滤式方法(Filter Methods)
过滤式方法基于特征本身的统计特性进行选择,计算速度快,适合作为初步筛选。
```python
from sklearn.feature_selection import (
VarianceThreshold, SelectKBest, f_classif, mutual_info_classif,
chi2, f_regression
)
from sklearn.datasets import make_classification
# 生成示例数据
X, y = make_classification(
n_samples=1000,
n_features=20,
n_informative=5,
n_redundant=5,
random_state=42
)
def filter_methods_selection(X, y, problem_type='classification'):
"""多种过滤式特征选择方法"""
results = {}
# 1. 方差阈值(移除低方差特征)
selector_var = VarianceThreshold(threshold=0.01)
X_var_selected = selector_var.fit_transform(X)
results['variance_threshold'] = {
'selected_features': X_var_selected.shape[1],
'selector': selector_var
}
# 2. 基于统计检验的选择
if problem_type == 'classification':
# 对于分类问题
# ANOVA F值
selector_anova = SelectKBest(score_func=f_classif, k=10)
X_anova = selector_anova.fit_transform(X, y)
results['anova_f'] = {
'selected_features': X_anova.shape[1],
'scores': selector_anova.scores_,
'selector': selector_anova
}
# 互信息
selector_mi = SelectKBest(score_func=mutual_info_classif, k=10)
X_mi = selector_mi.fit_transform(X, y)
results['mutual_info'] = {
'selected_features': X_mi.shape[1],
'scores': selector_mi.scores_,
'selector': selector_mi
}
# 卡方检验(仅适用于非负特征)
X_nonnegative = X - X.min() + 1e-6 # 确保非负
selector_chi2 = SelectKBest(score_func=chi2, k=10)
X_chi2 = selector_chi2.fit_transform(X_nonnegative, y)
results['chi2'] = {
'selected_features': X_chi2.shape[1],
'scores': selector_chi2.scores_,
'selector': selector_chi2
}
elif problem_type == 'regression':
# 对于回归问题
selector_f = SelectKBest(score_func=f_regression, k=10)
X_f = selector_f.fit_transform(X, y)
results['f_regression'] = {
'selected_features': X_f.shape[1],
'scores': selector_f.scores_,
'selector': selector_f
}
return results
# 可视化特征重要性
def plot_feature_scores(scores_dict, feature_names):
"""可视化不同方法的特征得分"""
n_methods = len(scores_dict)
fig, axes = plt.subplots(n_methods, 1, figsize=(12, 4*n_methods))
for idx, (method_name, method_info) in enumerate(scores_dict.items()):
if 'scores' in method_info:
ax = axes[idx] if n_methods > 1 else axes
scores = method_info['scores']
# 创建DataFrame便于排序
score_df = pd.DataFrame({
'feature': feature_names[:len(scores)],
'score': scores
}).sort_values('score', ascending=False)
# 绘制条形图
ax.barh(score_df['feature'], score_df['score'])
ax.set_xlabel('得分')
ax.set_title(f'{method_name} - 特征重要性')
ax.invert_yaxis() # 最高分在顶部
plt.tight_layout()
plt.show()
```
### 5.2 包裹式方法(Wrapper Methods)
包裹式方法将特征选择看作一个搜索问题,通过评估特征子集对模型性能的影响来选择特征。
```python
from sklearn.feature_selection import RFE, RFECV
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold
def wrapper_methods_selection(X, y, cv_folds=5):
"""包裹式特征选择方法"""
results = {}
# 1. 递归特征消除(RFE)
estimator = LogisticRegression(max_iter=1000, random_state=42)
rfe = RFE(
estimator=estimator,
n_features_to_select=10,
step=1 # 每次迭代移除的特征数
)
rfe.fit(X, y)
results['rfe'] = {
'selected_features': rfe.support_,
'ranking': rfe.ranking_,
'estimator': rfe
}
# 2. 带交叉验证的递归特征消除(RFECV)
rfecv = RFECV(
estimator=estimator,
cv=StratifiedKFold(cv_folds),
scoring='accuracy',
min_features_to_select=5,
n_jobs=-1
)
rfecv.fit(X, y)
results['rfecv'] = {
'selected_features': rfecv.support_,
'ranking': rfecv.ranking_,
'cv_scores': rfecv.cv_results_['mean_test_score'],
'optimal_features': rfecv.n_features_,
'estimator': rfecv
}
# 3. 前向/后向选择(自定义实现)
def forward_selection(X, y, n_features_to_select, estimator, cv=5):
"""前向特征选择"""
n_features = X.shape[1]
selected = []
candidates = list(range(n_features))
for i in range(n_features_to_select):
scores = []
for feature in candidates:
features = selected + [feature]
X_subset = X[:, features]
# 使用交叉验证评估
cv_scores = cross_val_score(
estimator, X_subset, y, cv=cv, scoring='accuracy'
)
scores.append(cv_scores.mean())
# 选择最佳特征
best_idx = np.argmax(scores)
best_feature = candidates[best_idx]
selected.append(best_feature)
candidates.remove(best_feature)
return selected
forward_selected = forward_selection(
X, y, n_features_to_select=10,
estimator=LogisticRegression(max_iter=1000, random_state=42)
)
results['forward_selection'] = {
'selected_features': forward_selected
}
return results
```
### 5.3 嵌入式方法(Embedded Methods)
嵌入式方法在模型训练过程中进行特征选择,通常基于特征重要性或系数。
```python
from sklearn.linear_model import Lasso, Ridge, ElasticNet
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import LinearSVC
def embedded_methods_selection(X, y):
"""嵌入式特征选择方法"""
results = {}
# 1. L1正则化(Lasso) - 产生稀疏解
lasso = Lasso(alpha=0.01, random_state=42)
lasso.fit(X, y)
results['lasso'] = {
'coefficients': lasso.coef_,
'selected_features': np.abs(lasso.coef_) > 0.01,
'model': lasso
}
# 2. 随机森林特征重要性
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)
results['random_forest'] = {
'importances': rf.feature_importances_,
'selected_features': rf.feature_importances_ > np.mean(rf.feature_importances_),
'model': rf
}
# 3. 梯度提升树特征重要性
gb = GradientBoostingClassifier(n_estimators=100, random_state=42)
gb.fit(X, y)
results['gradient_boosting'] = {
'importances': gb.feature_importances_,
'selected_features': gb.feature_importances_ > np.mean(gb.feature_importances_),
'model': gb
}
# 4. 线性SVM的系数
svm = LinearSVC(C=0.01, penalty='l1', dual=False, random_state=42)
svm.fit(X, y)
results['linear_svm'] = {
'coefficients': svm.coef_[0],
'selected_features': np.abs(svm.coef_[0]) > 0.01,
'model': svm
}
# 可视化特征重要性对比
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.flatten()
methods = ['lasso', 'random_forest', 'gradient_boosting', 'linear_svm']
titles = ['Lasso系数', '随机森林重要性', '梯度提升重要性', '线性SVM系数']
for idx, (method, title) in enumerate(zip(methods, titles)):
ax = axes[idx]
importance = results[method]['importances'] if 'importances' in results[method] else results[method]['coefficients']
# 排序
sorted_idx = np.argsort(np.abs(importance))[-20:] # 显示最重要的20个
ax.barh(range(len(sorted_idx)), np.abs(importance[sorted_idx]))
ax.set_yticks(range(len(sorted_idx)))
ax.set_yticklabels([f'特征{i}' for i in sorted_idx])
ax.set_xlabel('重要性/系数绝对值')
ax.set_title(title)
plt.tight_layout()
plt.show()
return results
```
### 5.4 特征选择的实战策略
在实际项目中,我通常采用分层特征选择策略:
1. **第一层:基于方差和相关性**
- 移除方差极低的特征(几乎为常数)
- 移除高度相关的特征(相关系数 > 0.95)
2. **第二层:基于业务理解**
- 与业务专家讨论,移除明显无关的特征
- 考虑特征获取成本,移除难以获取的特征
3. **第三层:自动化选择**
- 使用过滤式方法进行初步筛选
- 使用嵌入式方法获取特征重要性
- 使用包裹式方法进行最终优化
4. **第四层:模型验证**
- 使用交叉验证比较不同特征子集的性能
- 考虑特征稳定性(在不同数据子集上的一致性)
```python
def comprehensive_feature_selection_pipeline(X, y, feature_names, cv=5):
"""综合特征选择流程"""
# 第一步:移除低方差特征
var_selector = VarianceThreshold(threshold=0.01)
X_var = var_selector.fit_transform(X)
selected_mask = var_selector.get_support()
print(f"第一步:移除低方差特征,保留 {X_var.shape[1]}/{X.shape[1]} 个特征")
# 第二步:移除高度相关特征
from scipy.stats import spearmanr
correlation_matrix = np.corrcoef(X_var.T)
highly_correlated = set()
for i in range(len(correlation_matrix)):
for j in range(i+1, len(correlation_matrix)):
if abs(correlation_matrix[i, j]) > 0.95:
highly_correlated.add(j) # 移除第j个特征
keep_indices = [i for i in range(X_var.shape[1]) if i not in highly_correlated]
X_uncorrelated = X_var[:, keep_indices]
print(f"第二步:移除高度相关特征,保留 {X_uncorrelated.shape[1]}/{X_var.shape[1]} 个特征")
# 第三步:基于模型的特征重要性
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_uncorrelated, y)
importances = rf.feature_importances_
threshold = np.percentile(importances, 25) # 保留重要性在前75%的特征
important_indices = np.where(importances > threshold)[0]
X_important = X_uncorrelated[:, important_indices]
print(f"第三步:基于重要性筛选,保留 {X_important.shape[1]}/{X_uncorrelated.shape[1]} 个特征")
# 第四步:递归特征消除
from sklearn.feature_selection import RFECV
rfecv = RFECV(
estimator=LogisticRegression(max_iter=1000, random_state=42),
cv=StratifiedKFold(cv),
scoring='accuracy',
min_features_to_select=5,
n_jobs=-1
)
rfecv.fit(X_important, y)
X_final = X_important[:, rfecv.support_]
print(f"第四步:递归特征消除,最终保留 {X_final.shape[1]} 个特征")
# 验证最终特征集
base_score = cross_val_score(
LogisticRegression(max_iter=1000, random_state=42),
X, y, cv=cv, scoring='accuracy'
).mean()
final_score = cross_val_score(
LogisticRegression(max_iter=1000, random_state=42),
X_final, y, cv=cv, scoring='accuracy'
).mean()
print(f"\n性能对比:")
print(f"原始特征 ({X.shape[1]}个): 准确率 = {base_score:.4f}")
print(f"最终特征 ({X_final.shape[1]}个): 准确率 = {final_score:.4f}")
print(f"特征减少: {100*(1 - X_final.shape[1]/X.shape[1]):.1f}%")
return X_final, rfecv.support_
```
## 6. 特征工程实战:完整工作流示例
让我们通过一个完整的示例,将前面讨论的所有技术整合起来。假设我们有一个电商用户数据集,目标是预测用户是否会购买某个产品。
```python
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import (
StandardScaler, OneHotEncoder,
FunctionTransformer, PowerTransformer
)
from sklearn.feature_selection import SelectFromModel
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, roc_auc_score
import joblib
class FeatureEngineeringPipeline:
"""完整的特征工程流水线"""
def __init__(self, random_state=42):
self.random_state = random_state
self.preprocessor = None
self.feature_selector = None
self.final_features = None
def load_and_explore_data(self, filepath):
"""加载和探索数据"""
print("步骤1: 加载和探索数据")
df = pd.read_csv(filepath)
print(f"数据集形状: {df.shape}")
print(f"\n数据类型:")
print(df.dtypes.value_counts())
print(f"\n缺失值统计:")
missing = df.isnull().sum()
missing_pct = (missing / len(df)) * 100
missing_df = pd.DataFrame({
'缺失数量': missing,
'缺失比例%': missing_pct
}).sort_values('缺失数量', ascending=False)
print(missing_df[missing_df['缺失数量'] > 0])
return df
def create_features(self, df):
"""创建新特征"""
print("\n步骤2: 创建新特征")
# 复制原始数据
df_engineered = df.copy()
# 1. 时间特征
if 'signup_date' in df.columns:
df_engineered['signup_date'] = pd.to_datetime(df_engineered['signup_date'])
df_engineered['signup_year'] = df_engineered['signup_date'].dt.year
df_engineered['signup_month'] = df_engineered['signup_date'].dt.month
df_engineered['signup_day'] = df_engineered['signup_date'].dt.day
df_engineered['signup_dayofweek'] = df_engineered['signup_date'].dt.dayofweek
df_engineered['signup_quarter'] = df_engineered['signup_date'].dt.quarter
# 计算用户年龄(以天为单位)
reference_date = pd.Timestamp('2024-01-01')
df_engineered['days_since_signup'] = (reference_date - df_engineered['signup_date']).dt.days
# 2. 交互特征
if all(col in df.columns for col in ['page_views', 'session_duration']):
df_engineered['views_per_minute'] = df_engineered['page_views'] / (
df_engineered['session_duration'] / 60 + 1e-6
)
if all(col in df.columns for col in ['cart_adds', 'wishlist_adds']):
df_engineered['cart_to_wishlist_ratio'] = df_engineered['cart_adds'] / (
df_engineered['wishlist_adds'] + 1
)
# 3. 聚合特征(如果有用户历史数据)
if 'user_id' in df.columns and 'purchase_amount' in df.columns:
user_stats = df.groupby('user_id').agg({
'purchase_amount': ['mean', 'std', 'sum', 'count'],
'session_duration': ['mean', 'sum']
}).fillna(0)
user_stats.columns = ['_'.join(col).strip() for col in user_stats.columns.values]
user_stats = user_stats.reset_index()
df_engineered = df_engineered.merge(user_stats, on='user_id', how='left')
print(f"创建了 {len(df_engineered.columns) - len(df.columns)} 个新特征")
print(f"总特征数: {len(df_engineered.columns)}")
return df_engineered
def build_preprocessing_pipeline(self, df, target_column):
"""构建预处理流水线"""
print("\n步骤3: 构建预处理流水线")
# 分离特征和目标
X = df.drop(columns=[target_column])
y = df[target_column]
# 识别特征类型
numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
datetime_features = X.select_dtypes(include=['datetime64']).columns.tolist()
print(f"数值特征: {len(numeric_features)} 个")
print(f"分类特征: {len(categorical_features)} 个")
print(f"时间特征: {len(datetime_features)} 个")
# 移除时间特征(已经提取了时间组件)
X = X.drop(columns=datetime_features)
# 更新特征列表
numeric_features = [col for col in numeric_features if col not in datetime_features]
categorical_features = [col for col in categorical_features if col not in datetime_features]
# 数值特征处理
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('power_transform', PowerTransformer(method='yeo-johnson'))
])
# 分类特征处理
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# 组合预处理步骤
self.preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
],
remainder='passthrough'
)
return X, y, numeric_features, categorical_features
def train_and_evaluate(self, X, y, test_size=0.2):
"""训练和评估模型"""
print("\n步骤4: 训练和评估模型")
# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=self.random_state, stratify=y
)
# 应用预处理
X_train_processed = self.preprocessor.fit_transform(X_train)
X_test_processed = self.preprocessor.transform(X_test)
# 获取特征名称
numeric_features = self.preprocessor.named_transformers_['num'].get_feature_names_out()
categorical_features = self.preprocessor.named_transformers_['cat'].get_feature_names_out()
all_features = list(numeric_features) + list(categorical_features)
# 特征选择
print("进行特征选择...")
selector = SelectFromModel(
RandomForestClassifier(n_estimators=100, random_state=self.random_state),
threshold='median'
)
X_train_selected = selector.fit_transform(X_train_processed, y_train)
X_test_selected = selector.transform(X_test_processed)
self.feature_selector = selector
self.final_features = [all_features[i] for i in selector.get_support(indices=True)]
print(f"原始特征数: {X_train_processed.shape[1]}")
print(f"选择后特征数: {X_train_selected.shape[1]}")
# 训练最终模型
print("\n训练最终模型...")
model = RandomForestClassifier(
n_estimators=200,
max_depth=10,
min_samples_split=5,
min_samples_leaf=2,
random_state=self.random_state,
n_jobs=-1
)
model.fit(X_train_selected, y_train)
# 评估模型
y_pred = model.predict(X_test_selected)
y_pred_proba = model.predict_proba(X_test_selected)[:, 1]
print("\n模型性能:")
print(classification_report(y_test, y_pred))
print(f"ROC AUC: {roc_auc_score(y_test, y_pred_proba):.4f}")
# 特征重要性
feature_importance = pd.DataFrame({
'feature': self.final_features,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("\nTop 10 重要特征:")
print(feature_importance.head(10))
return model, feature_importance
def save_pipeline(self, filepath):
"""保存整个流水线"""
pipeline = {
'preprocessor': self.preprocessor,
'feature_selector': self.feature_selector,
'final_features': self.final_features
}
joblib.dump(pipeline, filepath)
print(f"流水线已保存到: {filepath}")
def run_full_pipeline(self, data_path, target_column):
"""运行完整流水线"""
print("=" * 60)
print("开始特征工程流水线")
print("=" * 60)
# 1. 加载数据
df = self.load_and_explore_data(data_path)
# 2. 特征工程
df_engineered = self.create_features(df)
# 3. 构建预处理流水线
X, y, _, _ = self.build_preprocessing_pipeline(df_engineered, target_column)
# 4. 训练和评估
model, feature_importance = self.train_and_evaluate(X, y)
print("\n" + "=" * 60)
print("特征工程流水线完成")
print("=" * 60)
return model, feature_importance
# 使用示例
if __name__ == "__main__":
# 初始化流水线
pipeline = FeatureEngineeringPipeline(random_state=42)
# 运行完整流水线
# 注意:这里需要替换为实际的数据路径和目标列名
# model, importance = pipeline.run_full_pipeline('your_data.csv', 'target_column')
# 保存流水线
# pipeline.save_pipeline('feature_engineering_pipeline.pkl')
```
这个完整的流水线展示了特征工程的系统化方法。在实际项目中,我通常会根据具体问题调整每个步骤。比如对于时间序列数据,需要添加滞后特征和滚动统计量;对于文本数据,需要添加文本特征提取步骤。
特征工程是一个迭代过程。我习惯的做法是:先构建一个基础版本,然后通过特征重要性分析、模型性能监控、业务反馈等不断优化。每次迭代都记录下所做的更改和对应的性能提升,这样不仅能积累经验,还能在团队中形成可复用的模式库。
最后想说的是,特征工程没有银弹。最好的特征工程方法取决于你的数据、你的业务问题,以及你的计算资源。多尝试、多验证、多思考,这才是做好特征工程的关键。