# XGBoost实战:5个工业级应用案例解析(附Python代码)
如果你已经对XGBoost的原理和基本用法有所了解,那么下一步最迫切的问题可能就是:这东西在实际项目中到底怎么用?它真的能解决我手头那些复杂、混乱、充满挑战的工业数据问题吗?答案是肯定的,但关键在于如何将理论转化为有效的实践。这篇文章不是一篇入门教程,而是一次深度实战巡礼。我们将穿越五个截然不同的工业场景——从金融风控的严谨逻辑,到电商推荐的动态博弈,再到医疗诊断的精准追求,以及供应链和能源领域的复杂预测。每个案例都不仅仅是“跑通一个模型”,而是会深入探讨业务背景、数据挑战、特征工程的独特考量、模型调优的实战技巧,以及最终如何将模型结果落地并产生实际价值。我们会提供清晰、可复现的Python代码片段,但更重要的是,分享代码背后那些来自一线的决策逻辑和避坑指南。无论你是希望将XGBoost引入现有业务的数据科学家,还是寻求拓宽应用视野的开发者,这里都有你需要的“弹药”。
## 1. 金融信贷违约预测:从数据清洗到风险定价
金融风控是XGBoost最经典的应用领域之一。这里的核心任务通常是二分类:预测一个贷款申请者未来是否会违约。但工业级应用远不止调用`fit`和`predict`那么简单。
**业务挑战与数据特性**:金融数据往往具有高维度、强相关性、大量缺失值和极端不平衡(违约样本极少)的特点。此外,特征往往具有明确的业务含义(如“近6个月查询次数”、“负债收入比”),模型的可解释性要求极高,因为风控策略需要人工审核和监管报备。
一个典型的实战流程始于数据理解。假设我们有一份包含用户基本信息、信用历史、资产负债和行为数据的表格。
```python
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import xgboost as xgb
import warnings
warnings.filterwarnings('ignore')
# 模拟加载数据
# df = pd.read_csv('loan_application.csv')
# 这里我们创建一个模拟数据集以演示
np.random.seed(42)
n_samples = 10000
data = {
'age': np.random.randint(20, 70, n_samples),
'income': np.random.lognormal(mean=10, sigma=0.5, size=n_samples),
'debt_to_income_ratio': np.random.beta(a=2, b=5, size=n_samples) * 2, # 模拟DTI比率
'credit_inquiries_6m': np.random.poisson(lam=1.5, size=n_samples),
'payment_history_avg': np.random.uniform(0.5, 1.0, n_samples), # 模拟还款记录评分
'existing_loan_num': np.random.randint(0, 5, n_samples),
'job_type_cat': np.random.choice(['A', 'B', 'C', 'D'], n_samples), # 职业类别
'missing_income_ind': np.random.binomial(1, 0.02, n_samples) # 2%的收入数据缺失
}
df = pd.DataFrame(data)
# 模拟生成违约标签(非线性和交互关系)
log_odds = (-3 + 0.05*df['age'] - 0.8*np.log(df['income'])
+ 2.5*df['debt_to_income_ratio'] + 0.3*df['credit_inquiries_6m']**2
- 4*df['payment_history_avg'] + np.where(df['job_type_cat']=='D', 0.5, 0))
prob_default = 1 / (1 + np.exp(-log_odds))
df['default'] = np.random.binomial(1, prob_default, n_samples)
print(f"违约率: {df['default'].mean():.2%}")
print(df.head())
```
**特征工程实战要点**:
1. **缺失值处理**:XGBoost本身能处理缺失值(`np.nan`),它会学习缺失值的最佳分裂方向。但在金融场景,缺失本身可能就是重要信息(例如,拒绝填写某些项的用户风险更高)。因此,我们常同时采用两种策略:
```python
# 方法1:利用XGBoost原生能力,将缺失值保留为np.nan
# 方法2:创建缺失指示器,并将缺失值填充为一个远离正常值的数(如-999)
df['income_imputed'] = df['income'].where(df['missing_income_ind']==0, -999)
df['income_missing_flag'] = df['missing_income_ind']
```
2. **分箱与WOE编码**:对于连续变量,如“年龄”和“收入”,直接输入模型可能无法捕捉非线性关系。在金融风控中,**证据权重(Weight of Evidence, WOE)编码**是标准做法。它将连续变量离散化为若干箱,并用每个箱内好坏样本的比例来计算一个代表风险高低的数值。这不仅能提升模型性能,还能极大增强可解释性,满足评分卡模型的要求。
3. **交互特征与业务理解**:单纯的特征可能不够。例如,“高负债收入比”且“近期多次信用查询”的用户风险可能呈指数上升。创建这样的交互特征需要深厚的业务知识。
> 注意:在金融建模中,必须严格防范**数据泄露(Data Leakage)**。任何特征都不能包含未来信息(例如,用“是否发生违约”后的交易记录来预测“是否违约”)。特征工程和缺失值填充都应在交叉验证的每个折叠内独立进行,或严格基于时间窗口。
**模型训练与调优**:面对不平衡数据,我们使用`scale_pos_weight`参数,并采用分层交叉验证。
```python
# 准备特征和标签
# 这里简化处理,实际中需进行完整的特征工程
X = df.drop(columns=['default', 'missing_income_ind']) # 简单起见,先去掉原始缺失指示列
y = df['default']
# 对分类变量进行标签编码(注意:对于无序多分类,最好使用独热编码或目标编码)
label_encoders = {}
for col in X.select_dtypes(include=['object']).columns:
le = LabelEncoder()
X[col] = le.fit_transform(X[col])
label_encoders[col] = le
# 划分训练集和测试集(按时间划分更符合实际,此处随机划分仅作演示)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
# 初始化XGBoost分类器,设置处理不平衡数据的初始参数
model = xgb.XGBClassifier(
objective='binary:logistic',
eval_metric='auc',
use_label_encoder=False,
scale_pos_weight=(len(y_train) - sum(y_train)) / sum(y_train), # 自动计算正负样本权重比
n_estimators=100,
max_depth=6,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=42
)
# 使用交叉验证进行训练和早停,防止过拟合
eval_set = [(X_train, y_train), (X_test, y_test)]
model.fit(X_train, y_train, eval_set=eval_set, verbose=False)
# 评估模型
y_pred_proba = model.predict_proba(X_test)[:, 1]
y_pred = (y_pred_proba > 0.5).astype(int) # 默认阈值为0.5
print("测试集AUC: ", roc_auc_score(y_test, y_pred_proba))
print("\n分类报告:")
print(classification_report(y_test, y_pred))
print("\n混淆矩阵:")
print(confusion_matrix(y_test, y_pred))
```
**模型解释与部署**:训练完成后,**特征重要性(Feature Importance)** 分析至关重要。XGBoost提供了`weight`(被用作分裂点的次数)、`gain`(分裂带来的平均增益)和`cover`(分裂覆盖的样本数)三种重要性度量。通常`gain`最具参考价值。
```python
import matplotlib.pyplot as plt
# 获取特征重要性(基于增益)
importance_df = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("特征重要性排名(基于增益):")
print(importance_df)
# 可视化
plt.figure(figsize=(10, 6))
plt.barh(importance_df['feature'][:10], importance_df['importance'][:10])
plt.xlabel('Feature Importance (Gain)')
plt.title('Top 10 Feature Importance')
plt.gca().invert_yaxis()
plt.show()
```
最终,模型输出的违约概率可以映射到具体的风险分数,集成到自动审批决策引擎中,或作为人工信审员的重要参考。
## 2. 电商个性化点击率预估:实时排序的引擎
在电商和内容平台,推荐系统的核心任务之一是**点击率预估**。给定一个用户和一个商品/内容,预测用户点击它的概率。这是一个典型的二分类问题,但有其特殊性:数据量极其庞大(数十亿样本),特征维度高(用户画像、商品属性、上下文环境),且需要近乎实时的预测以支持在线排序。
**场景与数据挑战**:数据通常是用户行为日志流,包含正样本(点击)和大量负样本(曝光未点击)。特征通常分为几大类:
- **用户特征**:用户ID、 demographics、历史行为统计(点击率、购买率、品类偏好向量)。
- **商品特征**:商品ID、品类、价格、品牌、上架时间、历史统计CTR。
- **上下文特征**:请求时间、星期几、用户设备、网络环境、当前页面位置。
- **交叉特征**:用户-商品交互特征,如用户对该商品所属品类的历史偏好度。
由于用户ID和商品ID是海量且稀疏的类别特征,直接进行标签编码或独热编码不可行。工业界普遍采用**嵌入(Embedding)** 或 **特征哈希(Hashing Trick)** 将其转化为稠密向量,或者使用**均值编码(Mean Encoding/Target Encoding)**,即用目标变量(如点击率)的统计量(如全局均值、平滑后的类别均值)来替代ID本身。
**使用XGBoost进行CTR预估的实战流程**:
```python
# 模拟电商点击日志数据
np.random.seed(123)
n_logs = 50000
user_ids = np.random.randint(0, 10000, n_logs) # 1万个用户
item_ids = np.random.randint(0, 5000, n_logs) # 5千个商品
# 生成一些模拟特征
data_ctr = pd.DataFrame({
'user_id': user_ids,
'item_id': item_ids,
'user_avg_ctr': np.random.beta(a=2, b=10, size=n_logs), # 用户历史平均CTR
'item_avg_ctr': np.random.beta(a=1, b=20, size=n_logs), # 商品历史平均CTR
'price_level': np.random.choice(['low', 'medium', 'high'], n_logs, p=[0.6, 0.3, 0.1]),
'category_match': np.random.rand(n_logs) > 0.7, # 商品类别是否匹配用户偏好
'position': np.random.randint(1, 11, n_logs), # 展示位置,1为最前
'hour_of_day': np.random.randint(0, 24, n_logs)
})
# 模拟点击标签生成(依赖特征的非线性组合)
click_logit = (-2.5
+ 1.5 * data_ctr['user_avg_ctr']
+ 2.0 * data_ctr['item_avg_ctr']
+ 0.8 * data_ctr['category_match']
- 0.1 * data_ctr['position']
+ np.sin(data_ctr['hour_of_day'] / 24 * 2 * np.pi) * 0.5 # 模拟时间效应
+ np.random.normal(0, 0.5, n_logs))
click_prob = 1 / (1 + np.exp(-click_logit))
data_ctr['click'] = np.random.binomial(1, click_prob, n_logs)
print(f"整体CTR: {data_ctr['click'].mean():.4f}")
```
**关键特征工程:目标编码**:对于`user_id`和`item_id`这类高基数特征,我们使用目标编码。为了防止过拟合(即用未来信息编码当前样本),必须在交叉验证的每个折叠内,仅使用该折叠训练集的数据来计算编码值。
```python
from sklearn.model_selection import KFold
def target_encode_with_cv(df, col, target, n_splits=5, smooth=50):
"""
使用交叉验证进行目标编码,防止数据泄露。
df: 数据框
col: 需要编码的列名
target: 目标列名
n_splits: 交叉验证折数
smooth: 平滑系数,用于在小样本类别时向全局均值收缩
"""
df = df.copy()
global_mean = df[target].mean()
df[f'{col}_encoded'] = np.nan
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
for train_idx, val_idx in kf.split(df):
df_train, df_val = df.iloc[train_idx], df.iloc[val_idx]
# 计算训练集上每个类别的目标均值(平滑处理)
agg = df_train.groupby(col)[target].agg(['mean', 'count'])
# 平滑公式: (count * mean + smooth * global_mean) / (count + smooth)
smoothed_means = (agg['count'] * agg['mean'] + smooth * global_mean) / (agg['count'] + smooth)
# 映射到验证集
df.loc[df.index[val_idx], f'{col}_encoded'] = df_val[col].map(smoothed_means).fillna(global_mean).values
# 对于训练集整体,可以用全体数据计算一个最终编码用于最终模型(或留一法)
# 此处为简化,用交叉验证填充后的均值填充可能剩余的NaN(通常很少)
df[f'{col}_encoded'].fillna(global_mean, inplace=True)
return df
# 对user_id和item_id进行目标编码
data_ctr = target_encode_with_cv(data_ctr, 'user_id', 'click', n_splits=5)
data_ctr = target_encode_with_cv(data_ctr, 'item_id', 'click', n_splits=5)
# 处理其他分类特征
data_ctr['price_level'] = data_ctr['price_level'].map({'low':0, 'medium':1, 'high':2})
data_ctr['category_match'] = data_ctr['category_match'].astype(int)
# 准备训练数据
features_ctr = ['user_avg_ctr', 'item_avg_ctr', 'price_level', 'category_match', 'position', 'hour_of_day', 'user_id_encoded', 'item_id_encoded']
X_ctr = data_ctr[features_ctr]
y_ctr = data_ctr['click']
X_train_ctr, X_test_ctr, y_train_ctr, y_test_ctr = train_test_split(X_ctr, y_ctr, test_size=0.2, random_state=42, stratify=y_ctr)
# 训练XGBoost CTR模型
# 注意:CTR预估通常使用对数损失(logloss)作为评估指标
ctr_model = xgb.XGBClassifier(
objective='binary:logistic',
eval_metric='logloss',
use_label_encoder=False,
n_estimators=200,
max_depth=5, # 树不宜过深,防止过拟合稀疏特征
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
reg_alpha=1, # L1正则,有助于稀疏特征
reg_lambda=10, # L2正则
random_state=42
)
ctr_model.fit(X_train_ctr, y_train_ctr, eval_set=[(X_test_ctr, y_test_ctr)], verbose=50)
# 评估
y_pred_proba_ctr = ctr_model.predict_proba(X_test_ctr)[:, 1]
print(f"测试集LogLoss: {log_loss(y_test_ctr, y_pred_proba_ctr):.4f}")
print(f"测试集AUC: {roc_auc_score(y_test_ctr, y_pred_proba_ctr):.4f}")
```
**在线服务与模型更新**:训练好的模型可以通过XGBoost的`save_model`和`load_model`函数序列化,并集成到微服务中,接受特征向量并返回CTR预估值。由于用户行为模式会变化,模型需要定期(如每天)用新数据重新训练或进行在线学习(增量更新)。XGBoost支持在已有模型基础上继续训练(`xgb.train`中设置`xgb_model`参数),这为在线学习提供了可能。
## 3. 医疗诊断辅助:处理高维异构数据与可解释性
在医疗领域,XGBoost常用于辅助诊断、疾病风险预测和患者分群。数据来源多样,包括电子健康记录、医学影像特征、基因组学数据等,具有高维、异构、小样本(相对于特征数)和缺失普遍的特点。模型的可解释性和可靠性是生命攸关的要求。
**案例:基于临床指标的糖尿病预测**。我们使用公开的Pima Indians Diabetes数据集作为示例,但会模拟更复杂的工业场景,如添加缺失值和更多衍生特征。
```python
# 加载并模拟扩展数据集
from sklearn.datasets import load_diabetes
# 注意:load_diabetes是回归数据集,我们这里用另一个分类数据集示例,并手动构造
# 这里我们使用一个模拟的、更复杂的医疗数据集
np.random.seed(7)
n_patients = 2000
data_medical = pd.DataFrame({
'age': np.random.normal(50, 15, n_patients).clip(20, 90),
'bmi': np.random.normal(28, 6, n_patients).clip(15, 50),
'blood_pressure': np.random.normal(80, 12, n_patients).clip(60, 180),
'cholesterol_total': np.random.lognormal(5.0, 0.3, n_patients),
'cholesterol_hdl': np.random.lognormal(1.3, 0.2, n_patients),
'glucose_fasting': np.random.normal(100, 30, n_patients).clip(70, 300),
'insulin': np.random.lognormal(3.5, 0.8, n_patients),
'family_history': np.random.binomial(1, 0.3, n_patients), # 家族史,0/1
'smoking_status': np.random.choice(['never', 'former', 'current'], n_patients, p=[0.5, 0.3, 0.2]),
'exercise_freq': np.random.choice(['none', 'light', 'moderate', 'heavy'], n_patients, p=[0.2, 0.5, 0.2, 0.1])
})
# 计算一些衍生指标,这在医疗中很常见
data_medical['bmi_category'] = pd.cut(data_medical['bmi'], bins=[0, 18.5, 25, 30, 100], labels=['under', 'normal', 'over', 'obese'])
data_medical['chol_ratio'] = data_medical['cholesterol_total'] / data_medical['cholesterol_hdl'] # 总胆固醇/HDL比值,重要指标
data_medical['glucose_category'] = pd.cut(data_medical['glucose_fasting'], bins=[0, 100, 126, 300], labels=['normal', 'prediabetes', 'diabetes'])
# 模拟生成糖尿病标签(基于非线性关系)
# 高风险因素:高年龄、高BMI、高血糖、低HDL、有家族史
risk_score = (0.03 * (data_medical['age'] - 50)
+ 0.05 * (data_medical['bmi'] - 25)
+ 0.02 * (data_medical['glucose_fasting'] - 100)
- 0.1 * (data_medical['cholesterol_hdl'] - 1.3)
+ 0.8 * data_medical['family_history']
+ np.where(data_medical['smoking_status']=='current', 0.3, 0)
+ np.where(data_medical['exercise_freq']=='none', 0.4, 0))
prob_diabetes = 1 / (1 + np.exp(-risk_score))
data_medical['diabetes'] = np.random.binomial(1, prob_diabetes, n_patients)
# 模拟数据缺失(医疗数据常见)
for col in ['cholesterol_hdl', 'insulin', 'glucose_fasting']:
missing_mask = np.random.rand(n_patients) < 0.05 # 5%缺失
data_medical.loc[missing_mask, col] = np.nan
print(f"糖尿病患病率: {data_medical['diabetes'].mean():.2%}")
print(data_medical.isnull().sum())
```
**处理医疗数据的特殊考量**:
- **缺失值**:XGBoost可以处理缺失值。但对于关键指标,业务规则填充(如用正常范围中值)有时比模型学习更可控。
- **特征标准化**:虽然树模型对尺度不敏感,但为了某些衍生计算和解释,对连续特征进行标准化(如Z-score)有时仍有帮助。
- **类别特征处理**:对于有序类别(如`exercise_freq`),可以使用标签编码或映射为数值。对于无序类别(如`smoking_status`),使用独热编码或目标编码。
```python
# 数据预处理
# 处理有序类别特征
exercise_map = {'none': 0, 'light': 1, 'moderate': 2, 'heavy': 3}
data_medical['exercise_freq_encoded'] = data_medical['exercise_freq'].map(exercise_map)
smoking_map = {'never': 0, 'former': 1, 'current': 2}
data_medical['smoking_status_encoded'] = data_medical['smoking_status'].map(smoking_map)
# 处理无序类别特征(bmi_category)使用独热编码
bmi_dummies = pd.get_dummies(data_medical['bmi_category'], prefix='bmi', drop_first=True) # drop_first避免共线性
data_medical = pd.concat([data_medical, bmi_dummies], axis=1)
# 选择用于建模的特征
feature_cols = ['age', 'bmi', 'blood_pressure', 'cholesterol_total', 'cholesterol_hdl',
'glucose_fasting', 'insulin', 'family_history', 'chol_ratio',
'exercise_freq_encoded', 'smoking_status_encoded',
'bmi_over', 'bmi_obese', 'bmi_under'] # 注意'bmi_normal'作为基准被drop了
X_med = data_medical[feature_cols]
y_med = data_medical['diabetes']
# 划分数据集
X_train_med, X_test_med, y_train_med, y_test_med = train_test_split(X_med, y_med, test_size=0.2, stratify=y_med, random_state=42)
# 训练模型 - 医疗模型更注重校准性和可解释性,可能使用更简单的树
med_model = xgb.XGBClassifier(
objective='binary:logistic',
eval_metric='logloss',
use_label_encoder=False,
n_estimators=150,
max_depth=4, # 限制深度,增强可解释性
learning_rate=0.05,
subsample=0.9,
colsample_bytree=0.8,
reg_lambda=5, # 较强的正则化防止过拟合,提高泛化能力
random_state=42
)
med_model.fit(X_train_med, y_train_med, eval_set=[(X_test_med, y_test_med)], verbose=50)
# 评估
y_pred_proba_med = med_model.predict_proba(X_test_med)[:, 1]
y_pred_med = (y_pred_proba_med > 0.3).astype(int) # 医疗场景可能调整阈值,平衡召回率和精确率
print("医疗诊断模型评估:")
print(f"AUC: {roc_auc_score(y_test_med, y_pred_proba_med):.4f}")
print(classification_report(y_test_med, y_pred_med, target_names=['Non-Diabetic', 'Diabetic']))
```
**模型可解释性进阶:SHAP值**:在医疗领域,仅仅知道特征重要性不够,还需要知道每个特征对单个预测的具体贡献是正向还是负向。SHAP(SHapley Additive exPlanations)值提供了这种个体级别的解释。
```python
import shap
# 计算SHAP值(这可能需要一些时间)
explainer = shap.TreeExplainer(med_model)
shap_values = explainer.shap_values(X_test_med)
# 1. 全局特征重要性(与XGBoost内置的略有不同,但更一致)
shap.summary_plot(shap_values, X_test_med, plot_type="bar")
# 2. 特征影响方向与程度摘要图
shap.summary_plot(shap_values, X_test_med)
# 3. 对单个样本的预测进行解释
sample_idx = 10 # 查看测试集中第10个样本
shap.force_plot(explainer.expected_value, shap_values[sample_idx, :], X_test_med.iloc[sample_idx, :], matplotlib=True)
```
通过SHAP图,医生可以理解为什么模型对某个患者给出了高风险预测——例如,主要是因为“空腹血糖过高”和“HDL胆固醇过低”这两个因素。这种可解释性是医疗AI模型获得信任和临床应用的关键。
## 4. 供应链需求预测:应对时间序列与外部因素
供应链管理中的需求预测是一个经典的回归问题,但往往具有时间序列特性,并受多种外部因素(促销、节假日、天气、竞争对手行为)影响。XGBoost因其能灵活处理各种特征类型和非线性关系,在此领域表现出色。
**场景**:预测某零售商品未来一周的日需求量。特征可能包括:
- **历史需求**:滞后特征(如过去1天、7天、28天的销量)。
- **时间特征**:年、月、日、星期几、是否为节假日、是否为月初/月末。
- **促销信息**:是否有促销、促销类型、折扣力度。
- **商品信息**:品类、价格、生命周期阶段。
- **外部因素**:天气(温度、降水量)、当地事件、经济指标。
**关键点**:必须严格避免使用未来信息进行预测。所有特征都必须是“已知到预测时刻为止”的信息。例如,预测第T天的需求,不能使用第T天及之后的促销信息。
```python
# 模拟创建时间序列需求数据
np.random.seed(2023)
date_range = pd.date_range(start='2022-01-01', end='2023-12-31', freq='D')
n_days = len(date_range)
# 生成基础趋势、季节性和随机性
trend = np.linspace(100, 150, n_days) # 缓慢上升趋势
weekly_seasonality = 20 * np.sin(2 * np.pi * np.arange(n_days) / 7) # 以7天为周期
yearly_seasonality = 30 * np.sin(2 * np.pi * np.arange(n_days) / 365.25) # 以年为周期
noise = np.random.normal(0, 10, n_days)
# 生成促销和节假日效应
promo_days = np.random.choice(n_days, size=60, replace=False)
holiday_days = np.random.choice(n_days, size=20, replace=False)
promo_effect = np.zeros(n_days)
holiday_effect = np.zeros(n_days)
promo_effect[promo_days] = np.random.uniform(30, 80, len(promo_days)) # 促销提升30-80单位
holiday_effect[holiday_days] = np.random.uniform(-20, 50, len(holiday_days)) # 节假日效应不定
# 合成需求
demand = trend + weekly_seasonality + yearly_seasonality + promo_effect + holiday_effect + noise
demand = np.maximum(demand, 0).astype(int) # 需求非负
# 创建DataFrame
df_demand = pd.DataFrame({
'date': date_range,
'demand': demand,
'is_promo': (np.isin(np.arange(n_days), promo_days)).astype(int),
'is_holiday': (np.isin(np.arange(n_days), holiday_days)).astype(int),
'temperature': np.random.normal(20, 10, n_days).clip(-5, 35), # 模拟温度
'weekday': date_range.weekday, # 周一=0, 周日=6
'month': date_range.month,
})
# 创建滞后特征(必须小心,避免数据泄露)
lags = [1, 7, 14, 28] # 过去1天、1周、2周、4周的需求
for lag in lags:
df_demand[f'demand_lag_{lag}'] = df_demand['demand'].shift(lag)
# 创建滚动统计特征
df_demand['demand_rolling_mean_7'] = df_demand['demand'].shift(1).rolling(window=7, min_periods=1).mean()
df_demand['demand_rolling_std_7'] = df_demand['demand'].shift(1).rolling(window=7, min_periods=1).std()
# 由于创建了滞后特征,前28行数据会有NaN
df_demand = df_demand.iloc[28:].reset_index(drop=True) # 丢弃前28天
print(df_demand.head())
```
**构建回归模型**:目标变量是连续的需求量,因此使用回归任务。
```python
# 定义特征和目标
feature_cols_demand = ['is_promo', 'is_holiday', 'temperature', 'weekday', 'month',
'demand_lag_1', 'demand_lag_7', 'demand_lag_14', 'demand_lag_28',
'demand_rolling_mean_7', 'demand_rolling_std_7']
target_col = 'demand'
X_demand = df_demand[feature_cols_demand]
y_demand = df_demand[target_col]
# 按时间划分训练集和测试集(不能随机划分!)
split_date = pd.Timestamp('2023-06-01')
train_mask = df_demand['date'] < split_date
test_mask = df_demand['date'] >= split_date
X_train_demand, X_test_demand = X_demand[train_mask], X_demand[test_mask]
y_train_demand, y_test_demand = y_demand[train_mask], y_demand[test_mask]
print(f"训练集大小: {len(X_train_demand)}, 测试集大小: {len(X_test_demand)}")
# 训练XGBoost回归模型
demand_model = xgb.XGBRegressor(
objective='reg:squarederror',
eval_metric='rmse',
n_estimators=300,
max_depth=6,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
reg_alpha=0.1,
reg_lambda=1,
random_state=42
)
demand_model.fit(X_train_demand, y_train_demand, eval_set=[(X_test_demand, y_test_demand)], verbose=50)
# 评估
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
y_pred_demand = demand_model.predict(X_test_demand)
print("需求预测模型评估:")
print(f"R^2 Score: {r2_score(y_test_demand, y_pred_demand):.4f}")
print(f"MAE: {mean_absolute_error(y_test_demand, y_pred_demand):.2f}")
print(f"RMSE: {np.sqrt(mean_squared_error(y_test_demand, y_pred_demand)):.2f}")
# 可视化部分预测结果
plt.figure(figsize=(14, 6))
plt.plot(df_demand['date'][test_mask], y_test_demand.values, label='Actual Demand', alpha=0.7)
plt.plot(df_demand['date'][test_mask], y_pred_demand, label='Predicted Demand', alpha=0.7, linestyle='--')
plt.xlabel('Date')
plt.ylabel('Demand')
plt.title('Demand Forecast vs Actual')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
```
**模型迭代与监控**:供应链需求预测模型需要持续监控和更新。可以计算预测误差(如平均绝对百分比误差MAPE),并设置警报阈值。当误差持续增大时,可能意味着市场模式发生了变化,需要重新训练模型或调整特征。
## 5. 工业设备故障预测:从状态监测到预测性维护
预测性维护旨在利用设备传感器数据,预测其未来发生故障的概率或剩余使用寿命,从而安排维护,避免非计划停机。这是一个分类(是否故障)或回归(剩余寿命)问题,数据通常是高频率、多变量的时间序列。
**场景**:基于多台设备的传感器读数(如振动、温度、压力、电流)历史数据,预测未来24小时内发生故障的概率。
**数据与特征工程挑战**:
1. **数据不平衡**:故障事件远少于正常状态。
2. **时间序列特征**:需要从原始传感器信号中提取有意义的特征(时域、频域、时频域特征)。
3. **工况识别**:设备在不同负载、速度下的“正常”状态可能不同。
4. **序列建模**:故障往往由一段时间内的异常模式导致,而非单个时间点。
**实战步骤**:
1. **数据准备**:将连续的传感器数据切割成固定长度的时间窗口(例如,过去6小时的数据作为一个样本)。
2. **特征提取**:对每个时间窗口内的每个传感器通道计算统计特征。
3. **标签生成**:如果窗口结束后的一定时间(如24小时)内发生了故障,则该窗口的标签为“正”(即将故障)。
4. **建模与评估**:使用XGBoost进行分类。
```python
# 模拟设备传感器数据
np.random.seed(99)
n_machines = 50
n_timesteps = 10000
time_index = pd.date_range(start='2023-01-01', periods=n_timesteps, freq='H')
# 模拟三个传感器:振动、温度、电流
sensor_data = []
for machine_id in range(n_machines):
# 基础正常信号
vibration_base = np.random.normal(0, 1, n_timesteps)
temperature_base = np.random.normal(70, 5, n_timesteps)
current_base = np.random.normal(10, 0.5, n_timesteps)
# 模拟故障发生(随机选择一台机器,在某个时间点附近引入异常)
if machine_id == 5: # 以第5台机器为例
fault_start = 8000
# 故障前兆:振动幅度缓慢增大,温度轻微上升,电流波动加剧
vibration_base[fault_start-200:fault_start] += np.linspace(0, 3, 200) * np.random.randn(200)
temperature_base[fault_start-200:fault_start] += np.linspace(0, 10, 200)
current_base[fault_start-200:fault_start] += np.random.normal(0, 0.3, 200)
# 故障发生:信号突变
vibration_base[fault_start:] += np.random.normal(5, 2, n_timesteps-fault_start)
temperature_base[fault_start:] += np.random.normal(15, 3, n_timesteps-fault_start)
current_base[fault_start:] += np.random.normal(2, 0.8, n_timesteps-fault_start)
fault_label = 1
else:
fault_label = 0
df_machine = pd.DataFrame({
'timestamp': time_index,
'machine_id': machine_id,
'vibration': vibration_base,
'temperature': temperature_base,
'current': current_base,
'fault_occurred': 0 # 初始化为0
})
# 标记故障发生的时间点(如果发生了故障)
if fault_label == 1:
df_machine.loc[fault_start:, 'fault_occurred'] = 1
sensor_data.append(df_machine)
df_all = pd.concat(sensor_data, ignore_index=True)
# 为每台机器生成预测标签:如果未来24小时内发生故障,则当前时刻标签为1
df_all['fault_in_next_24h'] = df_all.groupby('machine_id')['fault_occurred'].shift(-24).fillna(0).astype(int)
# 注意:在真实场景中,需要确保在故障发生后的一段时间内(如维修期)的数据不被用作正样本或负样本,这里简化处理。
print(f"故障预警正样本比例: {df_all['fault_in_next_24h'].mean():.4%}")
```
**特征工程:滑动窗口特征提取**:
```python
def extract_window_features(df_group, window_size=6):
"""为单个设备的数据提取滑动窗口特征"""
features_list = []
sensor_cols = ['vibration', 'temperature', 'current']
for i in range(len(df_group) - window_size + 1):
window = df_group.iloc[i:i+window_size]
features = {}
features['machine_id'] = df_group['machine_id'].iloc[0]
features['timestamp'] = df_group['timestamp'].iloc[i + window_size - 1] # 窗口结束时间
for col in sensor_cols:
series = window[col].values
# 时域特征
features[f'{col}_mean'] = np.mean(series)
features[f'{col}_std'] = np.std(series)
features[f'{col}_max'] = np.max(series)
features[f'{col}_min'] = np.min(series)
features[f'{col}_range'] = features[f'{col}_max'] - features[f'{col}_min']
features[f'{col}_skew'] = pd.Series(series).skew()
features[f'{col}_kurt'] = pd.Series(series).kurtosis()
# 简单频域特征:通过FFT获取主频幅度(简化)
fft_vals = np.fft.fft(series)
magnitudes = np.abs(fft_vals)
features[f'{col}_fft_max'] = np.max(magnitudes[1:len(magnitudes)//2]) # 忽略直流分量,取一半
# 标签是窗口结束时刻的“未来24小时故障”标签
features['label'] = df_group['fault_in_next_24h'].iloc[i + window_size - 1]
features_list.append(features)
return pd.DataFrame(features_list)
# 应用特征提取(这里只对示例机器进行以节省时间,实际应对所有机器)
sample_machine_ids = [5, 10, 15] # 包含故障机器和正常机器
windowed_features_list = []
for mid in sample_machine_ids:
df_machine = df_all[df_all['machine_id'] == mid].reset_index(drop=True)
features_df = extract_window_features(df_machine, window_size=12) # 12小时窗口
windowed_features_list.append(features_df)
features_df_all = pd.concat(windowed_features_list, ignore_index=True)
print(f"特征数据集形状: {features_df_all.shape}")
print(f"正样本比例: {features_df_all['label'].mean():.4%}")
```
**训练故障预测模型**:
```python
# 准备训练数据
X_pm = features_df_all.drop(columns=['machine_id', 'timestamp', 'label'])
y_pm = features_df_all['label']
# 由于数据高度不平衡,我们使用scale_pos_weight参数
pos_weight = (len(y_pm) - sum(y_pm)) / sum(y_pm)
X_train_pm, X_test_pm, y_train_pm, y_test_pm = train_test_split(X_pm, y_pm, test_size=0.3, stratify=y_pm, random_state=42)
pm_model = xgb.XGBClassifier(
objective='binary:logistic',
eval_metric='aucpr', # 对于不平衡数据,AUCPR(精确率-召回率曲线下面积)有时比AUC更合适
use_label_encoder=False,
scale_pos_weight=pos_weight,
n_estimators=200,
max_depth=5,
learning_rate=0.05,
subsample=0.7,
colsample_bytree=0.7,
reg_alpha=0.5,
reg_lambda=5,
random_state=42
)
pm_model.fit(X_train_pm, y_train_pm, eval_set=[(X_test_pm, y_test_pm)], verbose=50)
# 评估 - 重点关注召回率(尽可能捕捉故障)和精确率(减少误报)
y_pred_proba_pm = pm_model.predict_proba(X_test_pm)[:, 1]
# 寻找最佳阈值(例如,最大化F2-score,更重视召回率)
from sklearn.metrics import precision_recall_curve, fbeta_score
precisions, recalls, thresholds = precision_recall_curve(y_test_pm, y_pred_proba_pm)
f2_scores = (5 * precisions * recalls) / (4 * precisions + recalls + 1e-10) # F_beta, beta=2
optimal_idx = np.argmax(f2_scores[:-1]) # 最后一个元素是1.0
optimal_threshold = thresholds[optimal_idx]
y_pred_opt = (y_pred_proba_pm >= optimal_threshold).astype(int)
print("设备故障预测模型评估 (优化阈值后):")
print(f"最优阈值: {optimal_threshold:.3f}")
print(classification_report(y_test_pm, y_pred_opt, target_names=['Normal', 'Imminent Failure']))
print(f"AUCPR: {average_precision_score(y_test_pm, y_pred_proba_pm):.4f}")
```
**部署与行动**:模型输出的是未来一段时间内的故障概率。运维团队可以设置概率阈值(如0.7),当概率超过阈值时触发预警工单,安排预防性检查。通过持续收集新的传感器数据和维护结果,模型可以不断迭代优化。