## 水质安全数据分析与可视化完整代码实现
### 1. 项目配置与数据加载
#### 1.1 环境配置与数据加载
```python
# main.py - 主程序入口
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
class WaterQualityAnalyzer:
"""水质安全数据分析器"""
def __init__(self, data_path=None, url=None):
"""
初始化分析器
Args:
data_path: 本地数据文件路径
url: 在线数据URL(优先使用)
"""
self.data = None
self.processed_data = None
self.analysis_results = {}
if url:
self.load_from_url(url)
elif data_path:
self.load_from_file(data_path)
def load_from_url(self, url):
"""从URL加载数据集"""
print(f"正在从URL加载数据: {url}")
try:
self.data = pd.read_csv(url)
print(f"数据加载成功!形状: {self.data.shape}")
print(f"数据列名: {list(self.data.columns)}")
self._basic_info()
except Exception as e:
print(f"URL加载失败: {e}")
# 备用方案:使用requests
import requests
from io import StringIO
try:
response = requests.get(url)
if response.status_code == 200:
self.data = pd.read_csv(StringIO(response.text))
print(f"通过requests加载成功!形状: {self.data.shape}")
self._basic_info()
else:
raise ConnectionError(f"HTTP错误: {response.status_code}")
except Exception as e2:
print(f"备用方案也失败: {e2}")
def load_from_file(self, filepath):
"""从本地文件加载数据"""
try:
self.data = pd.read_csv(filepath)
print(f"本地文件加载成功!形状: {self.data.shape}")
self._basic_info()
except Exception as e:
print(f"文件加载失败: {e}")
def _basic_info(self):
"""显示数据基本信息"""
print("\n" + "="*50)
print("数据基本信息:")
print("="*50)
print(f"总行数: {len(self.data)}")
print(f"总列数: {len(self.data.columns)}")
print(f"列名: {list(self.data.columns)}")
print(f"数据类型:\n{self.data.dtypes}")
print(f"缺失值统计:\n{self.data.isnull().sum()}")
print(f"重复行数: {self.data.duplicated().sum()}")
```
### 2. 数据清洗与预处理
```python
class DataPreprocessor:
"""数据预处理类"""
def __init__(self, data):
self.data = data.copy()
self.numeric_cols = None
self.categorical_cols = None
def clean_data(self):
"""数据清洗主流程"""
print("\n开始数据清洗...")
# 1. 处理列名(去除空格,统一小写)
self.data.columns = [col.strip().lower().replace(' ', '_') for col in self.data.columns]
# 2. 识别列类型
self._identify_column_types()
# 3. 处理缺失值
self._handle_missing_values()
# 4. 处理异常值
self._handle_outliers()
# 5. 数据类型转换
self._convert_data_types()
print("数据清洗完成!")
return self.data
def _identify_column_types(self):
"""识别数值型和分类型列"""
self.numeric_cols = self.data.select_dtypes(
include=[np.number]
).columns.tolist()
self.categorical_cols = self.data.select_dtypes(
include=['object', 'category']
).columns.tolist()
print(f"数值型列 ({len(self.numeric_cols)}): {self.numeric_cols}")
print(f"分类型列 ({len(self.categorical_cols)}): {self.categorical_cols}")
def _handle_missing_values(self):
"""处理缺失值"""
missing_before = self.data.isnull().sum().sum()
print(f"处理前缺失值总数: {missing_before}")
if missing_before > 0:
# 数值型列用中位数填充
for col in self.numeric_cols:
if self.data[col].isnull().sum() > 0:
median_val = self.data[col].median()
self.data[col].fillna(median_val, inplace=True)
print(f" 列 '{col}': {self.data[col].isnull().sum()}个缺失值 -> 用中位数{median_val:.4f}填充")
# 分类型列用众数填充
for col in self.categorical_cols:
if self.data[col].isnull().sum() > 0:
mode_val = self.data[col].mode()[0] if not self.data[col].mode().empty else 'Unknown'
self.data[col].fillna(mode_val, inplace=True)
print(f" 列 '{col}': 用众数'{mode_val}'填充")
missing_after = self.data.isnull().sum().sum()
print(f"处理后缺失值总数: {missing_after}")
def _handle_outliers(self, method='iqr'):
"""处理异常值(IQR方法)"""
print("检测异常值...")
for col in self.numeric_cols:
Q1 = self.data[col].quantile(0.25)
Q3 = self.data[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = self.data[(self.data[col] < lower_bound) | (self.data[col] > upper_bound)]
outlier_count = len(outliers)
if outlier_count > 0:
print(f" 列 '{col}': 发现{outlier_count}个异常值 ({outlier_count/len(self.data)*100:.1f}%)")
# 用边界值替换异常值
self.data[col] = np.where(
self.data[col] < lower_bound, lower_bound,
np.where(self.data[col] > upper_bound, upper_bound, self.data[col])
)
def _convert_data_types(self):
"""数据类型转换"""
# 检测日期列
date_patterns = ['date', 'time', 'year', 'month', 'day']
for col in self.data.columns:
if any(pattern in col.lower() for pattern in date_patterns):
try:
self.data[col] = pd.to_datetime(self.data[col])
print(f" 列 '{col}' 转换为日期时间类型")
except:
pass
# 检测布尔列
for col in self.data.columns:
if self.data[col].dtype == 'object':
unique_vals = self.data[col].unique()
if len(unique_vals) == 2 and set(unique_vals) == {'0', '1'}:
self.data[col] = self.data[col].astype('int')
print(f" 列 '{col}' 转换为整数类型")
```
### 3. 探索性数据分析(EDA)
```python
class ExploratoryDataAnalysis:
"""探索性数据分析"""
def __init__(self, data):
self.data = data
self.numeric_cols = data.select_dtypes(include=[np.number]).columns.tolist()
self.categorical_cols = data.select_dtypes(include=['object']).columns.tolist()
def comprehensive_analysis(self):
"""综合EDA分析"""
print("\n" + "="*50)
print("探索性数据分析(EDA)")
print("="*50)
# 1. 基本统计信息
self._basic_statistics()
# 2. 分布分析
self._distribution_analysis()
# 3. 相关性分析
self._correlation_analysis()
# 4. 时间趋势分析(如果有时间列)
self._time_series_analysis()
# 5. 分类变量分析
self._categorical_analysis()
def _basic_statistics(self):
"""基本统计信息"""
print("\n1. 基本统计信息:")
print("-"*30)
stats_df = pd.DataFrame({
'均值': self.data[self.numeric_cols].mean(),
'中位数': self.data[self.numeric_cols].median(),
'标准差': self.data[self.numeric_cols].std(),
'最小值': self.data[self.numeric_cols].min(),
'最大值': self.data[self.numeric_cols].max(),
'偏度': self.data[self.numeric_cols].skew(),
'峰度': self.data[self.numeric_cols].kurtosis(),
'缺失值': self.data[self.numeric_cols].isnull().sum()
})
print(stats_df.round(4))
# 保存到结果
self.stats_summary = stats_df
def _distribution_analysis(self):
"""分布分析"""
print("\n2. 分布分析:")
print("-"*30)
# 选择前6个数值列进行可视化
cols_to_plot = self.numeric_cols[:6] if len(self.numeric_cols) >= 6 else self.numeric_cols
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()
for idx, col in enumerate(cols_to_plot):
if idx < len(axes):
# 直方图
axes[idx].hist(self.data[col].dropna(), bins=30, alpha=0.7, color='skyblue', edgecolor='black')
axes[idx].axvline(self.data[col].mean(), color='red', linestyle='--', linewidth=2, label='均值')
axes[idx].axvline(self.data[col].median(), color='green', linestyle='--', linewidth=2, label='中位数')
axes[idx].set_title(f'{col}分布', fontsize=12)
axes[idx].set_xlabel('值')
axes[idx].set_ylabel('频数')
axes[idx].legend()
axes[idx].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('distribution_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
# 箱线图
fig, ax = plt.subplots(figsize=(12, 6))
box_data = [self.data[col].dropna() for col in cols_to_plot]
ax.boxplot(box_data, labels=cols_to_plot, patch_artist=True)
ax.set_title('数值变量箱线图', fontsize=14)
ax.set_ylabel('数值')
ax.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('boxplot_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
def _correlation_analysis(self):
"""相关性分析"""
print("\n3. 相关性分析:")
print("-"*30)
if len(self.numeric_cols) >= 2:
# 计算相关系数矩阵
corr_matrix = self.data[self.numeric_cols].corr()
print("相关系数矩阵:")
print(corr_matrix.round(3))
# 热力图可视化
plt.figure(figsize=(12, 10))
mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
sns.heatmap(corr_matrix,
mask=mask,
annot=True,
fmt='.2f',
cmap='coolwarm',
center=0,
square=True,
linewidths=1,
cbar_kws={"shrink": 0.8})
plt.title('特征相关性热力图', fontsize=16)
plt.tight_layout()
plt.savefig('correlation_heatmap.png', dpi=300, bbox_inches='tight')
plt.show()
# 找出强相关特征(|r| > 0.7)
strong_corr = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
if abs(corr_matrix.iloc[i, j]) > 0.7:
strong_corr.append((
corr_matrix.columns[i],
corr_matrix.columns[j],
corr_matrix.iloc[i, j]
))
if strong_corr:
print("\n强相关特征对 (|r| > 0.7):")
for feat1, feat2, corr in strong_corr:
print(f" {feat1} - {feat2}: {corr:.3f}")
def _time_series_analysis(self):
"""时间序列分析"""
print("\n4. 时间序列分析:")
print("-"*30)
# 查找日期列
date_cols = [col for col in self.data.columns if 'date' in col.lower() or 'time' in col.lower()]
if date_cols:
date_col = date_cols[0]
try:
self.data[date_col] = pd.to_datetime(self.data[date_col])
# 按时间排序
self.data = self.data.sort_values(date_col)
# 提取时间特征
self.data['year'] = self.data[date_col].dt.year
self.data['month'] = self.data[date_col].dt.month
self.data['quarter'] = self.data[date_col].dt.quarter
# 时间趋势分析
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 1. 按年统计
yearly_stats = self.data.groupby('year')[self.numeric_cols[:3]].mean()
for i, col in enumerate(yearly_stats.columns[:3]):
axes[0, 0].plot(yearly_stats.index, yearly_stats[col], marker='o', label=col)
axes[0, 0].set_title('年度趋势', fontsize=12)
axes[0, 0].set_xlabel('年份')
axes[0, 0].set_ylabel('平均值')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 2. 按月统计
monthly_stats = self.data.groupby('month')[self.numeric_cols[0]].mean()
axes[0, 1].bar(monthly_stats.index, monthly_stats.values, color='lightcoral')
axes[0, 1].set_title('月度变化', fontsize=12)
axes[0, 1].set_xlabel('月份')
axes[0, 1].set_ylabel('平均值')
axes[0, 1].grid(True, alpha=0.3)
# 3. 季节变化
seasonal_stats = self.data.groupby('quarter')[self.numeric_cols[0]].agg(['mean', 'std'])
axes[1, 0].errorbar(seasonal_stats.index, seasonal_stats['mean'],
yerr=seasonal_stats['std'], fmt='o-', capsize=5)
axes[1, 0].set_title('季度变化(带误差棒)', fontsize=12)
axes[1, 0].set_xlabel('季度')
axes[1, 0].set_ylabel('平均值±标准差')
axes[1, 0].grid(True, alpha=0.3)
# 4. 移动平均
if len(self.data) > 30:
window_size = min(30, len(self.data)//10)
moving_avg = self.data[self.numeric_cols[0]].rolling(window=window_size).mean()
axes[1, 1].plot(self.data[date_col], self.data[self.numeric_cols[0]],
alpha=0.5, label='原始值')
axes[1, 1].plot(self.data[date_col], moving_avg,
color='red', linewidth=2, label=f'{window_size}天移动平均')
axes[1, 1].set_title('时间序列与移动平均', fontsize=12)
axes[1, 1].set_xlabel('日期')
axes[1, 1].set_ylabel('值')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('time_series_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
except Exception as e:
print(f"时间序列分析失败: {e}")
else:
print("未找到日期列,跳过时间序列分析")
def _categorical_analysis(self):
"""分类变量分析"""
print("\n5. 分类变量分析:")
print("-"*30)
if self.categorical_cols:
for col in self.categorical_cols[:3]: # 分析前3个分类变量
print(f"\n变量: {col}")
print(f" 唯一值数量: {self.data[col].nunique()}")
print(f" 值分布:")
value_counts = self.data[col].value_counts().head(10)
for val, count in value_counts.items():
percentage = count / len(self.data) * 100
print(f" {val}: {count} ({percentage:.1f}%)")
#