# 从数据到洞察:Python实战解析XJTU电池数据集全流程
如果你刚接触电池数据科学,面对XJTU这样结构复杂、信息量庞大的公开数据集,可能会感到无从下手。数据就在那里,但如何把它变成清晰的图表和有价值的结论?这正是我们今天要解决的问题。这篇文章不是一篇简单的代码罗列,而是一份手把手的实战指南。我会带你从零开始,用Python一步步拆解XJTU数据集,完成从数据加载、清洗、特征提取到高级可视化的完整流程。无论你是电池领域的研究者,还是想学习如何处理时序数据的工程师,都能在这里找到可复用的方法和避开常见陷阱的技巧。我们不仅会重现经典分析,更会探索一些你可能没试过的数据洞察角度。
## 1. 理解XJTU数据集:结构与挑战
XJTU电池数据集是电池健康度预测领域一个广受认可的基准数据集。它包含了55个18650型锂离子电池在多种充放电策略下的老化实验数据,采样频率为1Hz,记录了电压、电流、温度、容量等关键参数。数据集按实验策略分为6个批次(Batch),每个批次对应不同的应力条件,例如不同的充电倍率、放电截止电压或随机放电模式。
初次接触这个数据集,你可能会遇到几个典型的挑战:
- **数据结构嵌套复杂**:原始数据是`.mat`格式的MATLAB文件,内部包含多层嵌套的数组和结构体,直接查看不易理解。
- **数据不完整与噪声**:并非所有循环都进行了完整的充放电,有些循环仅用于“测试容量”,导致数据点缺失。传感器噪声和实验误差也需要处理。
- **多阶段信号分离**:每个充放电循环包含充电、静置、放电等多个阶段,分析时需要准确地将这些阶段的数据剥离出来。
- **跨批次分析一致性**:不同批次的实验协议不同,在进行对比分析或构建统一模型时,需要做归一化或对齐处理。
为了高效应对这些挑战,我们不能简单地逐行读取数据,而是需要构建一个**面向对象的、功能完整的数据处理管道**。下面这个表格概括了数据集六个批次的核心差异,这决定了我们后续处理策略的选择:
| 批次 | 充电策略 | 放电策略 | 关键特点 | 数据完整性挑战 |
| :--- | :--- | :--- | :--- | :--- |
| Batch-1 | 固定2C恒流恒压充电 | 固定1C放电至2.5V | 基准条件,数据规整 | 低 |
| Batch-2 | 固定3C恒流恒压充电 | 固定1C放电至2.5V | 高倍率充电应力 | 低 |
| Batch-3 | 固定2C恒流恒压充电 | 变倍率(0.5C-5C)放电至2.5V | 放电应力变化 | 容量曲线波动大 |
| Batch-4 | 固定2C恒流恒压充电 | 变倍率放电至3.0V | 不完全放电 | 需插值获取真实容量 |
| Batch-5 | 变倍率随机充电/放电 | 随机放电时长与电流 | 模拟随机负载 | 数据高度非结构化 |
| Batch-6 | 固定2C恒流恒压充电 | 变时长放电(模拟卫星轨道) | 周期性负载 | 阶段划分与对齐复杂 |
> 提示:在处理Batch-4、5、6时,要特别注意`description`字段。其中包含“test capacity”标识的循环是用于测量基准容量的,其他循环的放电可能未完成,直接使用记录的“容量”值会导致错误。
## 2. 构建稳健的数据加载与清洗类
直接使用原始代码中的`Battery`类是一个不错的起点,但在实际项目中,我们需要更健壮、更易扩展的代码。我重构了一个增强版的`XJTU_Battery_Analyzer`类,它集成了错误处理、数据缓存和更灵活的数据查询方法。
首先,我们设置环境并定义核心类。我强烈建议使用`pathlib`来处理路径,它比传统的字符串拼接更安全、更直观。
```python
import numpy as np
import pandas as pd
from scipy.io import loadmat
from scipy import interpolate
from pathlib import Path
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Optional, Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
```
接下来是核心的数据加载器。我增加了类型提示和详细的文档字符串,这能让代码在几个月后依然易于理解和维护。
```python
@dataclass
class CycleData:
"""一个数据类,用于整洁地存储单个循环的数据"""
cycle_id: int
voltage: np.ndarray
current: np.ndarray
temperature: np.ndarray
capacity: float
description: str
timestamps: np.ndarray # 相对时间,单位分钟
class XJTU_Battery_Analyzer:
"""
增强版XJTU电池数据分析器。
支持数据懒加载、缓存、自动阶段分割和常见特征计算。
"""
def __init__(self, data_path: Path, batch_id: str):
self.data_path = Path(data_path)
self.batch_id = batch_id
self._raw_data = None
self._cycle_cache = {} # 缓存解析后的循环数据,提升性能
self._load_and_parse()
def _load_and_parse(self) -> None:
"""加载.mat文件并解析关键元数据"""
try:
mat_data = loadmat(self.data_path)
except FileNotFoundError:
raise FileNotFoundError(f"无法在路径 {self.data_path} 找到数据文件")
self._raw_data = mat_data['data']
self.summary = mat_data['summary'][0][0]
# 解析元数据
self.cycle_life = int(self.summary[8][0][0])
self.battery_description = self.summary[9][0]
self.nominal_capacity = 2.0 # Ah,根据数据集文档
# 变量名映射
self.var_map = {
0: 'system_time',
1: 'relative_time_min',
2: 'voltage_V',
3: 'current_A',
4: 'capacity_Ah',
5: 'power_Wh',
6: 'temperature_C',
7: 'description'
}
print(f"[INFO] 成功加载电池数据: {self.data_path.name}")
print(f" 循环寿命: {self.cycle_life}, 描述: {self.battery_description}")
def get_cycle(self, cycle_num: int, use_cache: bool = True) -> CycleData:
"""
获取指定循环编号的完整数据。
参数:
cycle_num: 循环编号(从1开始)
use_cache: 是否使用缓存以加速重复访问
返回:
CycleData对象
"""
if not 1 <= cycle_num <= self.cycle_life:
raise ValueError(f"循环编号需在1到{self.cycle_life}之间")
cache_key = cycle_num
if use_cache and cache_key in self._cycle_cache:
return self._cycle_cache[cache_key]
# 原始数据索引调整(MATLAB索引从1开始)
cycle_idx = cycle_num - 1
cycle_raw = self._raw_data[0][cycle_idx]
# 提取各通道数据
data_dict = {}
for var_idx, var_name in self.var_map.items():
raw_array = cycle_raw[var_idx]
# 描述字段是字符串,其他是数值数组
if var_name == 'description':
data_dict[var_name] = raw_array[0] if raw_array.size > 0 else ''
else:
data_dict[var_name] = raw_array.flatten()
# 计算该循环的放电容量(Ah)
# 注意:对于未完全放电的循环,此值为负的充电容量或无效值
capacity_ah = data_dict['capacity_Ah'][-1] if len(data_dict['capacity_Ah']) > 0 else 0.0
cycle_data = CycleData(
cycle_id=cycle_num,
voltage=data_dict['voltage_V'],
current=data_dict['current_A'],
temperature=data_dict['temperature_C'],
capacity=capacity_ah,
description=data_dict['description'],
timestamps=data_dict['relative_time_min']
)
if use_cache:
self._cycle_cache[cache_key] = cycle_data
return cycle_data
```
这个类的基础功能已经比原始版本更清晰。但真正的挑战在于数据清洗。XJTU数据集存在一些常见的数据质量问题,我们需要系统性地处理它们。
**常见数据问题与清洗策略:**
1. **时间戳重置点识别**:在每个循环中,`relative_time_min`会在阶段转换时重置为0。我们需要准确找到这些重置点来分割充电、静置、放电阶段。
2. **传感器异常值处理**:偶尔会出现电流或电压的瞬时尖峰,这可能是测量噪声。
3. **不完全放电循环的容量计算**:对于Batch-4,5,6,很多循环没有放电至截止电压,记录的“容量”值没有意义。我们需要利用“测试容量”循环进行插值。
下面是一个集成到类中的高级清洗与特征提取方法:
```python
def segment_cycle_phases(self, cycle_num: int) -> Dict[str, Dict]:
"""
将一个完整的循环分割为充电(CC/CV)、静置、放电三个阶段。
返回每个阶段的起始和结束索引,以及该阶段的数据切片。
"""
cycle = self.get_cycle(cycle_num)
timestamps = cycle.timestamps
# 找到时间戳重置为0的点,这些是阶段边界
# 注意:第一个点总是0,所以从第二个点开始找
zero_indices = np.where(timestamps[1:] == 0)[0] + 1
# 在开头和结尾补上索引
phase_boundaries = [0] + zero_indices.tolist() + [len(timestamps)]
phases = {}
phase_names = ['charge', 'rest_1', 'discharge', 'rest_2']
# Batch-6只有三个阶段
if 'Simulate satellite' in self.battery_description and len(phase_boundaries) == 4:
phase_names = ['charge', 'rest', 'discharge']
for i, (start, end) in enumerate(zip(phase_boundaries[:-1], phase_boundaries[1:])):
if i < len(phase_names):
phase_key = phase_names[i]
phases[phase_key] = {
'start_idx': start,
'end_idx': end,
'voltage': cycle.voltage[start:end],
'current': cycle.current[start:end],
'temperature': cycle.temperature[start:end],
'time': timestamps[start:end]
}
return phases
```
> 注意:阶段分割的准确性直接影响后续特征提取(如增量容量分析)的结果。务必通过可视化验证分割点是否正确,特别是在数据噪声较大的情况下。
## 3. 深度特征工程:超越基础曲线
仅仅画出电压、电流曲线是不够的。为了从数据中挖掘出电池健康状态的深层信息,我们需要计算一系列“衍生特征”。这些特征往往比原始信号更能揭示电池的老化机理。
### 3.1 健康状态(SOH)与容量衰减轨迹
电池的健康状态通常用容量衰减来定义。对于完全放电的循环(如Batch-1,2,3),我们可以直接使用放电容量。但对于不完全放电的批次,需要进行插值处理。
```python
def calculate_soh_trajectory(self, method: str = 'interpolate') -> Tuple[np.ndarray, np.ndarray]:
"""
计算电池从初始到终结的健康状态(SOH)轨迹。
SOH = 当前容量 / 额定容量 * 100%
参数:
method: 'raw' - 直接使用记录容量(仅适用于完全放电批次)
'interpolate' - 利用测试容量循环插值(推荐)
返回:
cycles: 循环编号数组
soh: 对应的SOH值数组(百分比)
"""
rated_capacity = self.nominal_capacity # 2.0 Ah
if method == 'raw':
# 简单方法:直接提取每个循环的容量
capacities = []
for cycle in range(1, self.cycle_life + 1):
cycle_data = self.get_cycle(cycle)
capacities.append(cycle_data.capacity)
capacities = np.array(capacities)
elif method == 'interpolate':
# 稳健方法:仅使用明确标记为容量测试的循环
test_cycles = []
test_capacities = []
for cycle in range(1, self.cycle_life + 1):
desc = self.get_cycle(cycle).description
if 'test capacity' in desc.lower():
test_cycles.append(cycle)
test_capacities.append(self.get_cycle(cycle).capacity)
if len(test_cycles) < 2:
warnings.warn(f"电池{self.batch_id}的测试容量循环少于2个,使用原始容量数据。")
return self.calculate_soh_trajectory(method='raw')
# 使用三次样条插值(数据点少时退化为线性)
try:
interp_func = interpolate.interp1d(
test_cycles, test_capacities,
kind='cubic',
fill_value='extrapolate'
)
except:
interp_func = interpolate.interp1d(
test_cycles, test_capacities,
kind='linear',
fill_value='extrapolate'
)
all_cycles = np.arange(1, self.cycle_life + 1)
capacities = interp_func(all_cycles)
else:
raise ValueError("method参数必须是'raw'或'interpolate'")
# 计算SOH
soh = capacities / rated_capacity * 100
cycles = np.arange(1, self.cycle_life + 1)
return cycles, soh
```
### 3.2 增量容量分析(ICA)与差分电压分析(DVA)
增量容量分析是研究锂离子电池老化机理的**黄金标准工具**。它通过分析容量对电压的微分(dQ/dV),可以识别电池内部副反应的发生,如固体电解质界面(SEI)膜的生长、锂沉积等。
```python
def incremental_capacity_analysis(self, cycle_num: int,
voltage_range: Tuple[float, float] = (3.5, 4.2),
voltage_step: float = 0.001) -> Tuple[np.ndarray, np.ndarray]:
"""
执行增量容量分析(ICA)。
参数:
cycle_num: 循环编号
voltage_range: 分析的电压窗口
voltage_step: 重采样的电压间隔(V)
返回:
voltages: 电压数组
ic_curve: 增量容量(dQ/dV)数组
"""
# 获取恒流充电阶段数据
phases = self.segment_cycle_phases(cycle_num)
charge_phase = phases.get('charge', {})
if not charge_phase:
raise ValueError(f"循环{cycle_num}未找到充电阶段数据")
v_charge = charge_phase['voltage']
i_charge = charge_phase['current']
t_charge = charge_phase['time'] # 分钟
# 转换为小时用于容量计算
t_hours = t_charge / 60.0
# 计算累积充电容量 Q = ∫ I dt
# 使用梯形数值积分
delta_t_hours = np.diff(t_hours)
avg_current = (i_charge[:-1] + i_charge[1:]) / 2.0
delta_q = avg_current * delta_t_hours # Ah
q_cumulative = np.concatenate(([0], np.cumsum(delta_q)))
# 筛选指定电压范围内的数据点
mask = (v_charge >= voltage_range[0]) & (v_charge <= voltage_range[1])
v_filtered = v_charge[mask]
q_filtered = q_cumulative[mask]
if len(v_filtered) < 10:
raise ValueError(f"电压范围{voltage_range}内数据点不足,无法进行ICA")
# 在均匀电压网格上重采样容量数据
voltage_grid = np.arange(voltage_range[0], voltage_range[1] + voltage_step, voltage_step)
# 使用线性插值,避免过拟合
q_interp_func = interpolate.interp1d(v_filtered, q_filtered, kind='linear',
bounds_error=False, fill_value='extrapolate')
q_grid = q_interp_func(voltage_grid)
# 计算微分 dQ/dV
dq = np.diff(q_grid)
dv = np.diff(voltage_grid)
ic_curve = dq / dv
# 返回电压中点对应的IC值
voltage_midpoints = (voltage_grid[:-1] + voltage_grid[1:]) / 2.0
return voltage_midpoints, ic_curve
```
在实际分析中,ICA曲线上的峰值位置和高度变化能告诉我们很多信息。例如,主峰的左移通常表示电池内阻增加,而峰高的降低则与活性锂的损失相关。为了系统化地提取这些特征,我们可以设计一个特征提取函数:
```python
def extract_ica_features(self, cycle_num: int) -> Dict[str, float]:
"""
从ICA曲线中提取关键特征点。
返回:
包含以下特征的字典:
- peak_voltage: 主峰电压位置(V)
- peak_height: 主峰高度(Ah/V)
- peak_area: 主峰面积(近似积分)
- curve_center: 曲线重心电压(V)
"""
voltages, ic = self.incremental_capacity_analysis(cycle_num)
if len(ic) == 0:
return {}
# 找到主峰(最高峰)
peak_idx = np.argmax(ic)
peak_voltage = voltages[peak_idx]
peak_height = ic[peak_idx]
# 计算主峰面积(在峰值±0.1V范围内积分)
window = 0.1
mask = (voltages >= peak_voltage - window) & (voltages <= peak_voltage + window)
peak_area = np.trapz(ic[mask], voltages[mask])
# 计算曲线重心(加权平均电压)
if np.sum(ic) > 0:
curve_center = np.average(voltages, weights=np.abs(ic))
else:
curve_center = np.mean(voltages)
return {
'peak_voltage': float(peak_voltage),
'peak_height': float(peak_height),
'peak_area': float(peak_area),
'curve_center': float(curve_center)
}
```
### 3.3 温度动力学特征
电池温度不仅是安全指标,也反映了内部反应的热效应。我们可以从温度曲线中提取多个特征:
```python
def extract_temperature_features(self, cycle_num: int) -> Dict[str, float]:
"""
提取温度相关特征。
返回:
包含以下特征的字典:
- max_temp: 最高温度(°C)
- temp_rise: 温升(最高-最低)
- avg_charge_temp: 充电阶段平均温度
- temp_integral: 温度-时间积分,反映总产热
"""
cycle = self.get_cycle(cycle_num)
phases = self.segment_cycle_phases(cycle_num)
features = {}
# 整体温度特征
features['max_temp'] = float(np.max(cycle.temperature))
features['min_temp'] = float(np.min(cycle.temperature))
features['temp_rise'] = features['max_temp'] - features['min_temp']
features['avg_temp'] = float(np.mean(cycle.temperature))
# 分阶段温度分析
for phase_name, phase_data in phases.items():
if phase_data:
phase_temp = phase_data['temperature']
phase_time = phase_data['time']
if len(phase_temp) > 0:
key_prefix = f"{phase_name}_"
features[f"{key_prefix}avg_temp"] = float(np.mean(phase_temp))
features[f"{key_prefix}max_temp"] = float(np.max(phase_temp))
# 温度-时间积分(近似产热)
if len(phase_time) > 1:
time_min = phase_time[-1] - phase_time[0]
features[f"{key_prefix}temp_integral"] = float(np.trapz(phase_temp, phase_time))
return features
```
## 4. 高级可视化:从静态图表到动态洞察
有了清洗后的数据和提取的特征,可视化是呈现洞察的关键。我们将超越简单的折线图,创建一系列信息丰富、可用于出版或报告的专业图表。
### 4.1 多电池对比分析仪表板
当需要同时分析多个电池或不同批次时,一个综合仪表板比多个独立图表更有价值。下面的代码创建一个包含四个关键视图的仪表板:
```python
def create_battery_dashboard(analyzer_list, battery_names, selected_cycle=100):
"""
创建多电池对比分析仪表板。
参数:
analyzer_list: XJTU_Battery_Analyzer对象列表
battery_names: 对应的电池名称列表
selected_cycle: 用于详细分析的特定循环
"""
fig = plt.figure(figsize=(16, 12))
# 1. 容量衰减曲线对比
ax1 = plt.subplot(2, 2, 1)
colors = plt.cm.tab10(np.linspace(0, 1, len(analyzer_list)))
for idx, (analyzer, name, color) in enumerate(zip(analyzer_list, battery_names, colors)):
cycles, soh = analyzer.calculate_soh_trajectory(method='interpolate')
ax1.plot(cycles, soh, label=name, color=color, linewidth=1.5, alpha=0.8)
ax1.set_xlabel('循环次数', fontsize=11)
ax1.set_ylabel('健康状态 SOH (%)', fontsize=11)
ax1.set_title('容量衰减轨迹对比', fontsize=13, fontweight='bold')
ax1.grid(True, alpha=0.3)
ax1.legend(loc='best', fontsize=9)
# 2. 特定循环的电压-电流曲线
ax2 = plt.subplot(2, 2, 2)
for idx, (analyzer, name, color) in enumerate(zip(analyzer_list, battery_names, colors)):
try:
cycle_data = analyzer.get_cycle(selected_cycle)
# 只绘制前5000个点以保证清晰度
sample_points = min(5000, len(cycle_data.voltage))
ax2.plot(cycle_data.voltage[:sample_points],
cycle_data.current[:sample_points],
label=name, color=color, alpha=0.7, linewidth=1)
except:
continue
ax2.set_xlabel('电压 (V)', fontsize=11)
ax2.set_ylabel('电流 (A)', fontsize=11)
ax2.set_title(f'第{selected_cycle}次循环的电压-电流曲线', fontsize=13, fontweight='bold')
ax2.grid(True, alpha=0.3)
ax2.legend(loc='best', fontsize=9)
# 3. ICA曲线对比(早期、中期、晚期循环)
ax3 = plt.subplot(2, 2, 3)
cycle_markers = [1, selected_cycle//2, selected_cycle]
marker_styles = ['o', 's', '^']
for idx, analyzer in enumerate(analyzer_list[:3]): # 只显示前3个电池
for cycle, marker in zip(cycle_markers, marker_styles):
try:
voltages, ic = analyzer.incremental_capacity_analysis(cycle)
if len(ic) > 10:
ax3.plot(voltages, ic, marker=marker, markersize=4,
label=f'{battery_names[idx]} 循环{cycle}',
linewidth=1, alpha=0.7)
except:
continue
ax3.set_xlabel('电压 (V)', fontsize=11)
ax3.set_ylabel('增量容量 dQ/dV (Ah/V)', fontsize=11)
ax3.set_title('增量容量分析(ICA)曲线演变', fontsize=13, fontweight='bold')
ax3.grid(True, alpha=0.3)
ax3.legend(loc='best', fontsize=8, ncol=2)
# 4. 温度特征散点图
ax4 = plt.subplot(2, 2, 4)
for idx, (analyzer, name, color) in enumerate(zip(analyzer_list, battery_names, colors)):
try:
# 采样一些循环的温度特征
sample_cycles = np.linspace(1, analyzer.cycle_life, 20, dtype=int)
max_temps = []
temp_rises = []
for cycle in sample_cycles:
features = analyzer.extract_temperature_features(cycle)
if features:
max_temps.append(features.get('max_temp', np.nan))
temp_rises.append(features.get('temp_rise', np.nan))
ax4.scatter(max_temps, temp_rises, label=name, color=color,
alpha=0.6, s=50, edgecolors='w', linewidth=0.5)
except:
continue
ax4.set_xlabel('最高温度 (°C)', fontsize=11)
ax4.set_ylabel('温升 (°C)', fontsize=11)
ax4.set_title('温度特征相关性分析', fontsize=13, fontweight='bold')
ax4.grid(True, alpha=0.3)
ax4.legend(loc='best', fontsize=9)
plt.tight_layout()
return fig
```
### 4.2 老化轨迹的3D可视化
电池老化是一个多维过程,仅用2D图表难以全面展示。我们可以将循环次数、电压和增量容量三个维度结合起来,创建3D曲面图来直观显示老化过程。
```python
def plot_3d_ica_evolution(analyzer, cycle_step=10, voltage_range=(3.5, 4.2)):
"""
创建ICA曲线随循环次数演变的3D曲面图。
参数:
analyzer: XJTU_Battery_Analyzer实例
cycle_step: 采样的循环间隔
voltage_range: 电压分析范围
"""
from mpl_toolkits.mplot3d import Axes3D
# 准备数据
cycles_to_plot = range(1, analyzer.cycle_life + 1, cycle_step)
all_voltages = []
all_cycles = []
all_ic_values = []
for cycle in cycles_to_plot:
try:
voltages, ic = analyzer.incremental_capacity_analysis(cycle, voltage_range=voltage_range)
if len(ic) > 5: # 确保有足够的数据点
all_voltages.extend(voltages)
all_cycles.extend([cycle] * len(voltages))
all_ic_values.extend(ic)
except Exception as e:
continue
if not all_voltages:
print("没有足够的数据创建3D图")
return None
# 转换为数组并重塑为网格
voltages_array = np.array(all_voltages)
cycles_array = np.array(all_cycles)
ic_array = np.array(all_ic_values)
# 创建唯一值网格
unique_v = np.unique(voltages_array)
unique_c = np.unique(cycles_array)
# 创建网格
V, C = np.meshgrid(unique_v, unique_c)
# 使用网格插值填充IC值
from scipy.interpolate import griddata
IC_grid = griddata((voltages_array, cycles_array), ic_array, (V, C), method='cubic')
# 创建3D图
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')
# 绘制曲面
surf = ax.plot_surface(V, C, IC_grid, cmap='viridis',
alpha=0.8, linewidth=0, antialiased=True)
ax.set_xlabel('电压 (V)', labelpad=10)
ax.set_ylabel('循环次数', labelpad=10)
ax.set_zlabel('增量容量 dQ/dV', labelpad=10)
ax.set_title(f'ICA曲面演化 - {analyzer.batch_id}', fontsize=14, fontweight='bold')
# 添加颜色条
fig.colorbar(surf, ax=ax, shrink=0.5, aspect=5, label='dQ/dV (Ah/V)')
# 调整视角
ax.view_init(elev=25, azim=-45)
return fig
```
### 4.3 交互式可视化与动态探索
对于深度分析,静态图表有时不够用。我们可以使用`plotly`库创建交互式图表,让读者能够缩放、平移和悬停查看数据点详情。
```python
def create_interactive_degradation_plot(analyzer):
"""
使用plotly创建交互式容量衰减曲线。
需要安装plotly: pip install plotly
"""
try:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
except ImportError:
print("请先安装plotly: pip install plotly")
return None
# 计算SOH轨迹
cycles, soh = analyzer.calculate_soh_trajectory(method='interpolate')
# 创建主图
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('容量衰减轨迹', '循环内电压曲线', '温度分布', 'ICA特征演化'),
specs=[[{"type": "scatter"}, {"type": "scatter"}],
[{"type": "box"}, {"type": "scatter"}]]
)
# 1. 容量衰减曲线
fig.add_trace(
go.Scatter(x=cycles, y=soh, mode='lines+markers',
name='SOH', line=dict(color='royalblue', width=2),
hovertemplate='循环: %{x}<br>SOH: %{y:.2f}%<extra></extra>'),
row=1, col=1
)
# 添加趋势线(多项式拟合)
if len(cycles) > 10:
z = np.polyfit(cycles, soh, 3)
p = np.poly1d(z)
trendline = p(cycles)
fig.add_trace(
go.Scatter(x=cycles, y=trendline, mode='lines',
name='趋势线', line=dict(color='firebrick', width=2, dash='dash')),
row=1, col=1
)
# 2. 采样几个循环的电压曲线
sample_cycles = np.linspace(1, min(analyzer.cycle_life, 500), 5, dtype=int)
colors = px.colors.sequential.Viridis
for i, cycle in enumerate(sample_cycles):
try:
cycle_data = analyzer.get_cycle(cycle)
# 下采样以提升性能
step = max(1, len(cycle_data.voltage) // 1000)
fig.add_trace(
go.Scatter(x=np.arange(len(cycle_data.voltage[::step])),
y=cycle_data.voltage[::step],
mode='lines', name=f'循环{cycle}',
line=dict(color=colors[i], width=1),
hovertemplate='时间点: %{x}<br>电压: %{y:.3f}V<extra></extra>',
showlegend=True),
row=1, col=2
)
except:
continue
# 3. 温度分布箱线图
# 采样多个循环的温度数据
temp_samples = []
sample_indices = np.linspace(1, analyzer.cycle_life, 20, dtype=int)
for cycle in sample_indices:
try:
cycle_data = analyzer.get_cycle(cycle)
if len(cycle_data.temperature) > 0:
# 取中间部分避免边缘效应
mid_idx = len(cycle_data.temperature) // 2
sample = cycle_data.temperature[mid_idx-100:mid_idx+100]
temp_samples.append(sample)
except:
continue
if temp_samples:
fig.add_trace(
go.Box(y=np.concatenate(temp_samples), name='温度分布',
boxpoints='outliers', jitter=0.3, pointpos=-1.8,
marker_color='lightseagreen'),
row=2, col=1
)
# 4. ICA峰值电压随循环的变化
peak_voltages = []
peak_cycles = []
for cycle in range(1, analyzer.cycle_life + 1, max(1, analyzer.cycle_life//50)):
try:
features = analyzer.extract_ica_features(cycle)
if features and 'peak_voltage' in features:
peak_voltages.append(features['peak_voltage'])
peak_cycles.append(cycle)
except:
continue
if peak_cycles:
fig.add_trace(
go.Scatter(x=peak_cycles, y=peak_voltages, mode='lines+markers',
name='ICA峰值电压', line=dict(color='darkorange', width=2),
hovertemplate='循环: %{x}<br>峰值电压: %{y:.3f}V<extra></extra>'),
row=2, col=2
)
# 更新布局
fig.update_layout(
height=800,
title_text=f"电池{analyzer.batch_id}综合分析仪表板",
showlegend=True,
hovermode='closest'
)
# 更新坐标轴标签
fig.update_xaxes(title_text="循环次数", row=1, col=1)
fig.update_yaxes(title_text="SOH (%)", row=1, col=1)
fig.update_xaxes(title_text="时间点", row=1, col=2)
fig.update_yaxes(title_text="电压 (V)", row=1, col=2)
fig.update_yaxes(title_text="温度 (°C)", row=2, col=1)
fig.update_xaxes(title_text="循环次数", row=2, col=2)
fig.update_yaxes(title_text="ICA峰值电压 (V)", row=2, col=2)
return fig
```
## 5. 实战案例:Batch-3随机放电模式分析
让我们以Batch-3数据集为例,展示如何应用上述方法进行深入分析。Batch-3的特点是放电电流在{0.5, 1, 2, 3, 5}C之间循环变化,这为我们研究放电倍率对电池老化的影响提供了绝佳案例。
首先,加载数据并初始化分析器:
```python
# 实战示例:分析Batch-3中的随机放电模式
batch3_path = Path('./data/Batch-3/R2.5_battery-1.mat') # 请替换为实际路径
analyzer_b3 = XJTU_Battery_Analyzer(batch3_path, batch_id='Batch-3-R2.5')
# 查看基本信息
print(f"电池描述: {analyzer_b3.battery_description}")
print(f"总循环次数: {analyzer_b3.cycle_life}")
# 提取放电倍率序列
discharge_rates = []
for cycle in range(1, min(100, analyzer_b3.cycle_life) + 1): # 只看前100个循环
desc = analyzer_b3.get_cycle(cycle).description
# 从描述中解析放电倍率
if 'discharge' in desc.lower():
# 简单解析逻辑,实际可能需要更复杂的正则表达式
if '0.5C' in desc:
discharge_rates.append(0.5)
elif '1C' in desc:
discharge_rates.append(1.0)
elif '2C' in desc:
discharge_rates.append(2.0)
elif '3C' in desc:
discharge_rates.append(3.0)
elif '5C' in desc:
discharge_rates.append(5.0)
else:
discharge_rates.append(np.nan)
else:
discharge_rates.append(np.nan)
print(f"前100次循环的放电倍率序列: {discharge_rates[:20]}...") # 显示前20个
```
接下来,我们分析不同放电倍率下的电池行为差异。我将创建一个综合对比图,展示五个不同放电倍率循环的关键参数:
```python
def analyze_discharge_rate_impact(analyzer, cycle_numbers, rate_labels):
"""
分析不同放电倍率对电池行为的影响。
参数:
analyzer: 电池分析器实例
cycle_numbers: 要分析的循环编号列表
rate_labels: 对应的放电倍率标签列表
"""
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()
# 为每个倍率分配颜色
colors = plt.cm.Set3(np.linspace(0, 1, len(cycle_numbers)))
# 1. 电压曲线对比
ax = axes[0]
for cycle, label, color in zip(cycle_numbers, rate_labels, colors):
try:
phases = analyzer.segment_cycle_phases(cycle)
if 'discharge' in phases:
v_discharge = phases['discharge']['voltage']
time_norm = np.linspace(0, 1, len(v_discharge)) # 归一化时间
ax.plot(time_norm, v_discharge, label=f'{label}',
color=color, linewidth=2, alpha=0.8)
except:
continue
ax.set_xlabel('归一化时间', fontsize=11)
ax.set_ylabel('放电电压 (V)', fontsize=11)
ax.set_title('不同放电倍率下的电压曲线', fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend(title='放电倍率', fontsize=9)
# 2. 温度上升对比
ax = axes[1]
max_temps = []
temp_rises = []
for cycle, label in zip(cycle_numbers, rate_labels):
try:
features = analyzer.extract_temperature_features(cycle)
if features:
max_temps.append(features.get('discharge_max_temp', np.nan))
temp_rises.append(features.get('discharge_temp_rise', np.nan))
else:
max_temps.append(np.nan)
temp_rises.append(np.nan)
except:
max_temps.append(np.nan)
temp_rises.append(np.nan)
x_pos = np.arange(len(rate_labels))
width = 0.35
ax.bar(x_pos - width/2, max_temps, width, label='最高温度',
color='lightcoral', alpha=0.8)
ax.bar(x_pos + width/2, temp_rises, width, label='温升',
color='steelblue', alpha=0.8)
ax.set_xlabel('放电倍率', fontsize=11)
ax.set_ylabel('温度 (°C)', fontsize=11)
ax.set_title('放电过程中的温度特性', fontsize=12, fontweight='bold')
ax.set_xticks(x_pos)
ax.set_xticklabels(rate_labels)
ax.grid(True, alpha=0.3, axis='y')
ax.legend(fontsize=9)
# 3. 放电容量对比
ax = axes[2]
discharge_capacities = []
for cycle in cycle_numbers:
try:
cycle_data = analyzer.get_cycle(cycle)
# 计算放电容量(放电电流对时间积分)
phases = analyzer.segment_cycle_phases(cycle)
if 'discharge' in phases:
i_discharge = phases['discharge']['current']
t_discharge = phases['discharge']['time'] / 60.0 # 转换为小时
# 只考虑放电阶段(电流为负)
discharge_mask = i_discharge < 0
if np.any(discharge_mask):
i_discharge_neg = i_discharge[discharge_mask]
t_discharge_neg = t_discharge[discharge_mask]
# 梯形积分
if len(t_discharge_neg) > 1:
capacity = np.trapz(np.abs(i_discharge_neg), t_discharge_neg)
discharge_capacities.append(capacity)
else:
discharge_capacities.append(np.nan)
else:
discharge_capacities.append(np.nan)
else:
discharge_capacities.append(np.nan)
except:
discharge_capacities.append(np.nan)
ax.bar(rate_labels, discharge_capacities, color='mediumseagreen', alpha=0.7)
ax.set_xlabel('放电倍率', fontsize=11)
ax.set_ylabel('放电容量 (Ah)', fontsize=11)
ax.set_title('不同倍率下的放电容量', fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3, axis='y')
# 4. 内阻估算(ΔV/ΔI)
ax = axes[3]
estimated_resistance = []
for cycle, label in zip(cycle_numbers, rate_labels):
try:
phases = analyzer.segment_cycle_phases(cycle)
if 'discharge' in phases:
v_discharge = phases['discharge']['voltage']
i_discharge = phases['discharge']['current']
# 找到放电开始和稳定后的点
# 开始点:电流首次小于-0.1A
discharge_start = np.where(i_discharge < -0.1)[0]
if len(discharge_start) > 10:
start_idx = discharge_start[0]
# 稳定点:开始后1秒(假设1Hz采样)
stable_idx = min(start_idx + 10, len(v_discharge) - 1)
ΔV = v_discharge[start_idx] - v_discharge[stable_idx]
ΔI = abs(i_discharge[start_idx] - i_discharge[stable_idx])
if ΔI > 0.1: # 避免除零
resistance = ΔV / ΔI
estimated_resistance.append(resistance)
else:
estimated_resistance.append(np.nan)
else:
estimated_resistance.append(np.nan)
else:
estimated_resistance.append(np.nan)
except:
estimated_resistance.append(np.nan)
ax.plot(rate_labels, estimated_resistance, 'o-', linewidth=2,
markersize=8, color='darkviolet', alpha=0.8)
ax.set_xlabel('放电倍率', fontsize=11)
ax.set_ylabel('估算内阻 (Ω)', fontsize=11)
ax.set_title('放电倍率与内阻关系', fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3)
# 5. 能量效率计算
ax = axes[4]
energy_efficiencies = []
for cycle in cycle_numbers:
try:
phases = analyzer.segment_cycle_phases(cycle)
if 'charge' in phases and 'discharge' in phases:
# 充电能量
v_charge = phases['charge']['voltage']
i_charge = phases['charge']['current']
t_charge = phases['charge']['time'] / 60.0 # 小时
charge_power = v_charge * i_charge
charge_energy = np.trapz(charge_power, t_charge)
# 放电能量(只考虑负电流部分)
v_discharge = phases['discharge']['voltage']
i_discharge = phases['discharge']['current']
t_discharge = phases['discharge']['time'] / 60.0
discharge_mask = i_discharge < 0
if np.any(discharge_mask):
discharge_power = v_discharge[discharge_mask] * i_discharge[discharge_mask]
discharge_energy = np.trapz(np.abs(discharge_power), t_discharge[discharge_mask])
if charge_energy > 0:
efficiency = discharge_energy / charge_energy * 100
energy_efficiencies.append(efficiency)
else:
energy_efficiencies.append(np.nan)
else:
energy_efficiencies.append(np.nan)
else:
energy_efficiencies.append(np.nan)
except:
energy_efficiencies.append(np.nan)
ax.bar(rate_labels, energy_efficiencies, color='goldenrod', alpha=0.7)
ax.set_xlabel('放电倍率', fontsize=11)
ax.set_ylabel('能量效率 (%)', fontsize=11)
ax.set_title('不同倍率下的充放电能量效率', fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3, axis='y')
ax.set_ylim([80, 100]) # 典型锂离子电池效率范围
# 6. 电压弛豫分析(放电结束后的电压恢复)
ax = axes[5]
for cycle, label, color in zip(cycle_numbers, rate_labels, colors):
try:
phases = analyzer.segment_cycle_phases(cycle)
if 'discharge' in phases and 'rest_2' in phases:
# 放电最后10秒的电压
v_discharge_end = phases['discharge']['voltage'][-10:]
v_discharge_mean = np.mean(v_discharge_end) if len(v_discharge_end) > 0 else np.nan
# 静置开始10秒的电压
v_rest_start = phases['rest_2']['voltage'][:10]
v_rest_mean = np.mean(v_rest_start) if len(v_rest_start) > 0 else np.nan
if not (np.isnan(v_discharge_mean) or np.isnan(v_rest_mean)):
voltage_recovery = v_rest_mean - v_discharge_mean
ax.bar(label, voltage_recovery, color=color, alpha=0.7)
except:
continue
ax.set_xlabel('放电倍率', fontsize=11)
ax.set_ylabel('电压恢复 (V)', fontsize=11)
ax.set_title('放电结束后的电压弛豫', fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
return fig
# 使用示例:分析Batch-3中不同放电倍率的循环
# 假设我们已经识别出放电倍率为0.5C, 1C, 2C, 3C, 5C的循环编号
sample_cycles = [2, 3, 4, 5, 6] # 根据实际数据调整
rate_labels = ['0.5C', '1C', '2C', '3C', '5C']
fig = analyze_discharge_rate_impact(analyzer_b3, sample_cycles, rate_labels)
plt.show()
```
这个分析揭示了几个关键发现:高倍率放电会导致更明显的电压降、更高的温升、更低的放电容量和能量效率。这些洞察对于电池管理系统(BMS)的设计和电池使用策略的优化至关重要。
## 6. 构建可复用的分析管道与最佳实践
在完成上述分析后,我们需要思考如何将这个过程产品化,使其能够自动化处理整个数据集。以下是我在实际项目中总结的一些最佳实践和可复用代码模式。
### 6.1 批量处理与特征存储
当需要分析多个电池或整个批次时,手动逐个处理是不现实的。我们可以构建一个批处理管道,将所有特征提取出来并存储为结构化的格式(如CSV或Parquet),便于后续的机器学习建模。
```python
import pandas as pd
from tqdm import tqdm
import pickle
class BatchProcessor:
"""批量处理多个电池数据的处理器"""
def __init__(self, data_dir: Path, batch_pattern: str = "*.mat"):
self.data_dir = Path(data_dir)
self.batch_pattern = batch_pattern
self.analyzers = {}
self.features_df = None
def load_all_batteries(self) -> Dict[str, XJTU_Battery_Analyzer]:
"""加载指定目录下的所有电池数据"""
mat_files = list(self.data_dir.glob(self.batch_pattern))
print(f"找到 {len(mat_files)} 个数据文件")
for mat_file in tqdm(mat_files, desc="加载电池数据"):
try:
battery_id = mat_file.stem
analyzer = XJTU_Battery_Analyzer(mat_file, battery_id)
self.analyzers[battery_id] = analyzer
except Exception as e:
print(f"加载文件 {mat_file.name} 时出错: {e}")
return self.analyzers
def extract_features_for_battery(self, battery_id: str,
sample_cycles: int = 50) -> pd.DataFrame:
"""
为单个电池提取特征。
参数:
battery_id: 电池标识符
sample_cycles: 采样的循环数量(用于控制计算量)
返回:
包含所有提取特征的DataFrame
"""
if battery_id not in self.analyzers:
raise ValueError(f"未找到电池 {battery_id}")
analyzer = self.analyzers[battery_id]
features_list = []
# 确定采样哪些循环
if sample_cycles >= analyzer.cycle_life:
cycles_to_sample = range(1, analyzer.cycle_life + 1)
else:
cycles_to_sample = np.linspace(1, analyzer.cycle_life, sample_cycles, dtype=int)
for cycle in tqdm(cycles_to_sample, desc=f"处理 {battery_id}", leave=False):
try:
# 基础循环信息
cycle_data = analyzer.get_cycle(cycle)
cycle_features = {
'battery_id': battery_id,
'cycle': cycle,
'description': cycle_data.description,
'capacity_ah': cycle_data.capacity,
}
# SOH(如果可用)
if cycle == 1:
# 首次循环容量作为参考
initial_capacity = cycle_data.capacity
# 温度特征
temp_features = analyzer.extract_temperature_features(cycle)
if temp_features:
cycle_features.update({f'temp_{k}': v for k, v in temp_features.items()})
# ICA特征(仅在充电循环中计算)
if 'charge' in cycle_data.description.lower():
try:
ica_features = analyzer.extract_ica_features(cycle)
if ica_features:
cycle_features.update({f'ica_{k}': v for k, v in ica_features.items()})
except:
pass # 某些循环可能无法计算ICA
# 电压统计特征
voltage_stats = {
'voltage_mean': float(np.mean(cycle_data.voltage)),
'voltage_std': float(np.std(cycle_data.voltage)),
'voltage_min': float(np.min(cycle_data.voltage)),
'voltage_max': float(np.max(cycle_data.voltage)),
'voltage_range': float(np.ptp(cycle_data.voltage))
}
cycle_features.update(voltage_stats)
# 电流统计特征
current_stats = {
'current_mean': float(np.mean(cycle_data.current)),
'current_std': float(np.std(cycle_data.current)),
'current_min': float(np.min(cycle_data.current)),
'current_max': float(np.max(cycle_data.current))
}
cycle_features.update(current_stats)
features_list.append(cycle_features)
except Exception as e:
print(f"处理电池 {battery_id} 循环 {cycle} 时出错: {e}")
continue
return pd.DataFrame(features_list)
def process_batch(self, output_dir: Path, sample_cycles: int = 100) -> pd.DataFrame:
"""
处理所有电池并保存特征到文件。
返回:
合并所有电池特征的DataFrame
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
all_features = []
for battery_id in tqdm(self.analyzers.keys(), desc="处理所有电池"):
try:
df_battery = self.extract_features_for_battery(battery_id, sample_cycles)
# 保存单个电池的特征
csv_path = output_dir / f"{battery_id}_features.csv"
df_battery.to_csv(csv_path, index=False)
all_features.append(df_battery)
# 同时保存处理后的数据对象(可选,用于后续分析)
pkl_path = output_dir / f"{battery_id}_analyzer.pkl"
with open(pkl_path, 'wb') as f:
pickle.dump(self.analyzers[battery_id], f)
except Exception as e:
print(f"处理电池 {battery_id} 时出错: {e}")
continue
# 合并所有电池的特征
if all_features:
self.features_df = pd.concat(all_features, ignore_index=True)
batch_csv_path = output_dir / "all_batteries_features.csv"
self.features_df.to_csv(batch_csv_path, index=False)
print(f"特征已保存至 {batch_csv_path}")
return self.features_df
def create_summary_report(self, output_dir: Path) -> None:
"""创建处理摘要报告"""
if self.features_df is None or self.features_df.empty:
print("没有可用的特征数据,请先运行process_batch()")
return
report_lines = [
"# XJTU电池数据集处理报告",
f"生成时间: {pd.Timestamp.now()}",
f"处理的电池数量: {self.features_df['battery_id'].nunique()}",
f"总循环数: {len(self.features_df)}",
"\n## 电池统计信息",
]
# 按电池分组统计
battery_stats = self.features_df.groupby('battery_id').agg({
'cycle': ['count', 'min', 'max'],
'capacity_ah': ['mean', 'std', 'min', 'max'],
'voltage_mean': ['mean', 'std'],
'temp_max': ['mean', 'max'] if 'temp_max' in self.features_df.columns else ('mean', 'mean')
}).round(3)
report_lines.append(battery_stats.to_string())
# 特征相关性分析
numeric_cols = self.features_df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 1:
# 计算容量与其他特征的相关性
if 'capacity_ah' in numeric_cols:
corr_with_capacity = self.features_df[numeric_cols].corr()['capacity_ah'].sort_values(ascending=False)
report_lines.append("\n## 特征与容量的相关性")
report_lines.append(corr_with_capacity.head(10).to_string())
# 写入报告文件
report_path = output_dir / "processing_report.md"
with open(report_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(report_lines))
print(f"报告已保存至 {report_path}")
# 使用示例
processor = BatchProcessor(Path('./data/Batch-3'), "*.mat")
processor.load_all_batteries()
features_df = processor.process_batch(Path('./output/Batch-3'), sample_cycles=100)
processor.create_summary_report(Path('./output/Batch-3'))
```
### 6.2 性能优化与内存管理
处理大型数据集时,性能是关键考虑因素。以下是一些优化技巧:
```python
class OptimizedBatteryAnalyzer(XJTU_Battery_Analyzer):
"""针对大型数据集优化的电池分析器"""
def __init__(self, data_path: Path, batch_id: str,
use_disk_cache: bool = True, cache_dir: Path = None):
super().__init__(data_path, batch_id)
self.use_disk_cache = use_disk_cache
self.cache_dir = cache_dir or Path('./cache')
self.cache_dir.mkdir(exist_ok=True)
# 使用更高效的数据结构
self._cycle_data_cache = {} # 内存缓存
self._precomputed_features = {} # 预计算特征缓存
def get_cycle_optimized(self, cycle_num: int) -> CycleData:
"""带磁盘缓存的循环数据获取"""
cache_key = f"{self.batch_id}_cycle_{cycle_num}"
# 检查内存缓存
if cache_key in self._cycle_data_cache:
return self._cycle_data_cache[cache_key]
# 检查磁盘缓存
if self.use_disk_cache:
cache_file = self.cache_dir / f"{cache_key}.pkl"
if cache_file.exists():
try:
with open(cache_file, 'rb') as f:
cycle_data = pickle.load(f)
self._cycle_data_cache[cache_key] = cycle_data
return cycle_data
except:
pass # 缓存文件损坏,重新计算
# 计算并缓存
cycle_data = super().get_cycle(cycle_num, use_cache=False)
# 更新内存缓存
self._cycle_data_cache[cache_key] = cycle_data
# 更新磁盘缓存
if self.use_disk_cache:
cache_file = self.cache_dir / f"{cache_key}.pkl"
with open(cache_file, 'wb') as f:
pickle.dump(cycle_data, f, protocol=pickle.HIGHEST_PROTOCOL)
return cycle_data
def precompute_common_features(self, cycle_step: int = 1) -> None:
"""预计算常用特征以加速后续访问"""
print(f"预计算电池 {self.batch_id} 的特征...")
for cycle in tqdm(range(1, self.cycle_life + 1, cycle_step),
desc=f"预计算 {self.batch_id}"):
cache_key = f"{self.batch_id}_features_{cycle}"
# 跳过已计算的
if cache_key in self._precomputed_features:
continue
features = {}
try:
# 计算并存储各种特征
cycle_data = self.get_cycle_optimized(cycle)
# 基础统计
features['voltage_stats'] = {
'mean': float(np.mean(cycle_data.voltage)),
'std': float(np.std(cycle_data.voltage)),
'min': float(np.min(cycle_data.voltage)),
'max': float(np.max(cycle_data.voltage))
}
features['current_stats'] = {
'mean': float(np.mean(cycle_data.current)),
'std': float(np.std(cycle_data.current)),
'min': float(np.min(cycle_data.current)),
'max': float(np.max(cycle_data.current))
}
# 温度特征
temp_features = self.extract_temperature_features(cycle)
if temp_features:
features['temperature'] = temp_features
# 阶段分割(缓存结果)
phases = self.segment_cycle_phases(cycle)
features['phase_boundaries'] = {
name: (data['start_idx'], data['end_idx'])
for name, data in phases.items() if data
}
self._precomputed_features[cache_key] = features
except Exception as e:
# 记录错误但继续处理其他循环
print(f"预计算循环 {cycle} 时出错: {e}")
continue
# 将预计算的特征保存到磁盘
if self.use_disk_cache:
features_file = self.cache_dir / f"{self.batch_id}_precomputed.pkl"
with open(features_file, 'wb') as f:
pickle.dump(self._precomputed_features, f, protocol=pickle.HIGHEST_PROTOCOL)
print(f"预计算完成,缓存了 {len(self._precomputed_features)} 个循环的特征")
```
### 6.3 错误处理与数据验证
在实际应用中,数据质量问题不可避免。我们需要健壮的错误处理和数据验证机制:
```python
def validate_battery_data(analyzer: XJTU_Battery_Analyzer,
report_file: Path = None) -> Dict[str, List[str]]:
"""
验证电池数据的完整性和质量。
返回发现的问题字典。
"""
issues = {
'missing_data': [],
'out_of_range': [],
'inconsistent_phases': [],
'anomalies': []
}
print(f"开始验证电池 {analyzer.batch_id}...")
for cycle in tqdm(range(1, analyzer.cycle_life + 1),
desc=f"验证 {analyzer.batch_id}"):
try:
cycle_data = analyzer.get_cycle(cycle)
# 检查数据完整性
if len(cycle_data.voltage) == 0:
issues['missing_data'].append(f"循环 {cycle}: 电压数据为空")
if len(cycle_data.current) == 0:
issues['missing_data'].append(f"循环 {cycle}: 电流数据为空")
if len(cycle_data.temperature) == 0:
issues['missing_data'].append(f"循环 {cycle}: 温度数据为空")
# 检查数值范围
if np.any(cycle_data.voltage < 2.0) or np.any(cycle_data.voltage > 5.0):
issues['out_of_range'].append(f"循环 {cycle}: 电压超出合理范围 (2.0-5.0V)")
if np.any(np.abs(cycle_data.current) > 10.0):
issues['out_of_range'].append(f"循环 {cycle}: 电流绝对值超过10A")
if np.any(cycle_data.temperature < 15) or np.any(cycle_data.temperature > 50):
issues['out_of_range'].append(f"循环 {cycle}: 温度超出合理范围 (15-50°C)")
# 检查阶段一致性
phases = analyzer.segment_cycle_phases(cycle)
phase_counts = len([p for p in phases.values() if p])
# Batch-6应有3个阶段,其他批次应有4个阶段
expected_phases = 3 if 'satellite' in analyzer.battery_description.lower() else 4
if phase_counts != expected_phases:
issues['inconsistent_phases'].append(
f"循环 {cycle}: 发现 {phase_counts} 个阶段,预期 {expected_phases} 个"
)
# 检测异常值(使用3σ原则)
for var_name, data in [('电压', cycle_data.voltage),
('电流', cycle_data.current),
('温度', cycle_data.temperature)]:
if len(data) > 10: # 需要有足够的数据点
mean_val = np.mean(data)
std_val = np.std(data)
outliers = np.abs(data - mean_val) > 3 * std_val
if np.any(outliers):
issues['anomalies'].append(
f"循环 {cycle}: {var_name} 检测到 {np.sum(outliers)} 个异常值"
)
except Exception as e:
issues['missing_data'].append(f"循环 {cycle}: 处理错误 - {str(e)}")
# 生成报告
if report_file:
with open(report_file, 'w', encoding='utf-8') as f:
f.write(f"电池 {analyzer.batch_id} 数据验证报告\n")
f.write(f"生成时间: {pd.Timestamp.now()}\n")
f.write(f"总循环数: {analyzer.cycle_life}\n")
f.write("=" * 50 + "\n\n")
for issue_type, issue_list in issues.items():
f.write(f"## {issue_type.replace('_', ' ').title()}\n")
f.write(f"发现问题数量: {len(issue_list)}\n")
if issue_list:
for issue in issue_list[:10]: # 只显示前10个问题
f.write(f"- {issue}\n")
if len(issue_list) > 10:
f.write(f"- ... 还有 {len(issue_list) - 10} 个问题未显示\n")
f.write("\n")
# 打印摘要
print(f"\n验证完成 - 电池 {analyzer.batch_id}:")
for issue_type, issue_list in issues.items():
print(f" {issue_type}: {len(issue_list)} 个问题")
return issues
# 使用验证函数
issues = validate_battery_data(analyzer_b3, Path('./validation_report.txt'))
```
处理XJTU这类复杂数据集时,最耗时的部分往往是数据加载和预处理。我在实际项目中发现,将清洗后的数据保存为高效的二进制格式(如HDF5或Feather),可以大幅提升后续分析的效率。对于需要频繁访问的数据,建立内存缓存机制也是必要的。另一个容易忽略的细节是时间戳的处理——原始数据中的相对时间戳会在每个阶段重置为零,正确识别这些重置点是准确分割充放电阶段的关键,我通常会在分割后添加一个全局时间戳以便于跨阶段分析。