要在现有Pandas核心操作基础上详细扩展Python数据处理的内容,我们需要从数据处理的完整生命周期来组织内容,涵盖数据收集、清洗、转换、分析和可视化等关键环节。以下是详细补充:
## 一、数据收集与读取
Python提供了多种方式来收集和读取数据,这是数据处理的第一步。
### 1.1 从多种文件格式读取数据
除了CSV文件,Pandas支持读取多种格式的数据文件:
```python
import pandas as pd
import numpy as np
# 读取Excel文件
df_excel = pd.read_excel('data.xlsx', sheet_name='Sheet1') # [ref_6]
# 读取JSON文件
df_json = pd.read_json('data.json', orient='records') # [ref_6]
# 读取HTML表格(网页数据)
url = 'https://example.com/table.html'
df_html = pd.read_html(url)[0] # 返回列表,取第一个表格
# 读取SQL数据库
import sqlite3
conn = sqlite3.connect('database.db')
df_sql = pd.read_sql_query("SELECT * FROM table_name", conn) # [ref_6]
# 读取Parquet格式(大数据场景常用)
df_parquet = pd.read_parquet('data.parquet')
# 读取HDF5格式(科学计算常用)
df_hdf = pd.read_hdf('data.h5', key='dataset')
```
### 1.2 从API获取数据
```python
import requests
import json
# 从REST API获取数据
response = requests.get('https://api.example.com/data')
data = response.json()
df_api = pd.DataFrame(data['results']) # [ref_6]
# 处理分页API
all_data = []
for page in range(1, 11):
response = requests.get(f'https://api.example.com/data?page={page}')
page_data = response.json()['results']
all_data.extend(page_data)
df_paginated = pd.DataFrame(all_data)
```
## 二、数据清洗与预处理
数据清洗是确保数据质量的关键步骤,通常占数据处理工作的60-70%时间。
### 2.1 处理缺失值
```python
# 创建包含缺失值的示例数据
df = pd.DataFrame({
'A': [1, 2, np.nan, 4, 5],
'B': [np.nan, 2, 3, np.nan, 5],
'C': [1, 2, 3, 4, 5],
'D': ['a', 'b', np.nan, 'd', 'e']
})
# 1. 检测缺失值
print("缺失值统计:")
print(df.isnull().sum()) # 每列缺失值数量 [ref_4]
print(f"\n总缺失值比例:{df.isnull().sum().sum() / df.size:.2%}")
# 2. 删除缺失值
df_dropped = df.dropna() # 删除任何包含NaN的行
df_dropped_col = df.dropna(axis=1) # 删除任何包含NaN的列
df_dropped_thresh = df.dropna(thresh=3) # 至少要有3个非NaN值才保留该行 [ref_4]
# 3. 填充缺失值(根据业务场景选择合适的方法)
df_filled = df.copy()
# 数值列用统计量填充
df_filled['A'].fillna(df_filled['A'].mean(), inplace=True) # 均值填充 [ref_2][ref_4]
df_filled['B'].fillna(df_filled['B'].median(), inplace=True) # 中位数填充
df_filled['B'].fillna(method='ffill', inplace=True) # 前向填充
df_filled['B'].fillna(method='bfill', inplace=True) # 后向填充
# 分类列用众数或特定值填充
df_filled['D'].fillna('unknown', inplace=True) # 用特定值填充
df_filled['D'].fillna(df_filled['D'].mode()[0], inplace=True) # 用众数填充 [ref_4]
# 4. 插值法填充(时间序列数据常用)
df_interpolated = df.copy()
df_interpolated['A'] = df_interpolated['A'].interpolate(method='linear') # 线性插值
df_interpolated['A'] = df_interpolated['A'].interpolate(method='time') # 时间插值
```
### 2.2 处理异常值
```python
# 创建包含异常值的数据
np.random.seed(42)
data = pd.DataFrame({
'value': np.concatenate([np.random.normal(100, 10, 95),
np.random.normal(200, 10, 5)]) # 添加5个异常值
})
# 1. 基于标准差识别异常值(3σ原则)
mean = data['value'].mean()
std = data['value'].std()
lower_bound = mean - 3 * std
upper_bound = mean + 3 * std
outliers_std = data[(data['value'] < lower_bound) | (data['value'] > upper_bound)]
# 2. 基于IQR(四分位距)识别异常值
Q1 = data['value'].quantile(0.25)
Q3 = data['value'].quantile(0.75)
IQR = Q3 - Q1
lower_iqr = Q1 - 1.5 * IQR
upper_iqr = Q3 + 1.5 * IQR
outliers_iqr = data[(data['value'] < lower_iqr) | (data['value'] > upper_iqr)]
# 3. 处理异常值
# 方法1:删除异常值
data_cleaned = data[(data['value'] >= lower_iqr) & (data['value'] <= upper_iqr)]
# 方法2:缩尾处理(Winsorization)
from scipy.stats.mstats import winsorize
data['value_winsorized'] = winsorize(data['value'], limits=[0.05, 0.05])
# 方法3:用中位数或边界值替换
data['value_capped'] = data['value'].clip(lower=lower_iqr, upper=upper_iqr)
```
### 2.3 处理重复数据
```python
# 创建包含重复数据的数据框
df_duplicates = pd.DataFrame({
'ID': [1, 2, 3, 1, 2, 4],
'Name': ['Alice', 'Bob', 'Charlie', 'Alice', 'Bob', 'David'],
'Score': [85, 90, 88, 85, 90, 92]
})
print("原始数据形状:", df_duplicates.shape)
print("\n重复行检测:")
print(df_duplicates.duplicated()) # 标记重复行
print(f"\n基于ID列的重复行: {df_duplicates.duplicated(subset=['ID']).sum()}")
# 删除重复行
df_no_duplicates = df_duplicates.drop_duplicates() # 删除所有列完全相同的行 [ref_3]
df_no_id_duplicates = df_duplicates.drop_duplicates(subset=['ID'], keep='first') # 基于ID去重,保留第一个
df_no_id_duplicates_last = df_duplicates.drop_duplicates(subset=['ID'], keep='last') # 基于ID去重,保留最后一个
df_no_id_duplicates_none = df_duplicates.drop_duplicates(subset=['ID'], keep=False) # 删除所有重复项
print(f"\n删除重复后形状: {df_no_duplicates.shape}")
```
## 三、数据转换与特征工程
### 3.1 数据类型转换
```python
# 创建混合类型数据
df_types = pd.DataFrame({
'int_str': ['1', '2', '3', '4', '5'],
'float_str': ['1.1', '2.2', '3.3', '4.4', '5.5'],
'bool_str': ['True', 'False', 'True', 'False', 'True'],
'date_str': ['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04', '2023-01-05'],
'mixed': [1, 'two', 3.0, True, 'five']
})
# 1. 转换为数值类型
df_types['int_str'] = pd.to_numeric(df_types['int_str'], errors='coerce') # 转换失败设为NaN
df_types['float_str'] = df_types['float_str'].astype('float64')
# 2. 转换为布尔类型
df_types['bool_str'] = df_types['bool_str'].map({'True': True, 'False': False})
# 3. 转换为日期时间类型
df_types['date_str'] = pd.to_datetime(df_types['date_str'], format='%Y-%m-%d') # [ref_6]
# 4. 分类数据编码
df_types['mixed_cat'] = pd.Categorical(df_types['mixed'])
print(f"数据类型转换后:\n{df_types.dtypes}")
```
### 3.2 特征缩放与标准化
```python
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
# 创建示例数据
np.random.seed(42)
data_scaling = pd.DataFrame({
'feature1': np.random.normal(100, 50, 100),
'feature2': np.random.normal(10, 5, 100),
'feature3': np.random.exponential(1, 100)
})
# 1. 标准化(Z-score标准化)
scaler_std = StandardScaler()
data_standardized = pd.DataFrame(
scaler_std.fit_transform(data_scaling),
columns=data_scaling.columns
)
# 2. 归一化(Min-Max缩放)
scaler_minmax = MinMaxScaler()
data_normalized = pd.DataFrame(
scaler_minmax.fit_transform(data_scaling),
columns=data_scaling.columns
)
# 3. 鲁棒缩放(对异常值不敏感)
scaler_robust = RobustScaler()
data_robust = pd.DataFrame(
scaler_robust.fit_transform(data_scaling),
columns=data_scaling.columns
)
print("原始数据统计:")
print(data_scaling.describe())
print("\n标准化后统计:")
print(data_standardized.describe())
```
### 3.3 分类特征编码
```python
# 创建分类数据
df_categorical = pd.DataFrame({
'color': ['red', 'blue', 'green', 'blue', 'red', 'green'],
'size': ['S', 'M', 'L', 'M', 'S', 'XL'],
'price': [10, 20, 30, 25, 15, 40]
})
# 1. 标签编码(Label Encoding)
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
df_categorical['color_encoded'] = le.fit_transform(df_categorical['color'])
# 2. 独热编码(One-Hot Encoding)
df_onehot = pd.get_dummies(df_categorical[['color', 'size']], prefix=['col', 'sz'])
# 3. 频率编码(Frequency Encoding)
color_freq = df_categorical['color'].value_counts(normalize=True)
df_categorical['color_freq'] = df_categorical['color'].map(color_freq)
# 4. 目标编码(Target Encoding,需要小心过拟合)
mean_price_by_color = df_categorical.groupby('color')['price'].mean()
df_categorical['color_target'] = df_categorical['color'].map(mean_price_by_color)
print("编码结果:")
print(df_categorical)
print("\n独热编码结果:")
print(df_onehot.head())
```
## 四、数据聚合与分组操作
### 4.1 基础分组聚合
```python
# 创建销售数据示例
sales_data = pd.DataFrame({
'region': ['North', 'South', 'East', 'West', 'North', 'South', 'East', 'West'],
'product': ['A', 'B', 'A', 'B', 'A', 'B', 'A', 'B'],
'sales': [100, 150, 200, 250, 120, 180, 220, 280],
'quantity': [10, 15, 20, 25, 12, 18, 22, 28]
})
# 1. 单维度分组
region_group = sales_data.groupby('region')
print("按地区分组统计:")
print(region_group['sales'].agg(['sum', 'mean', 'count', 'std']))
# 2. 多维度分组
multi_group = sales_data.groupby(['region', 'product'])
print("\n按地区和产品分组统计:")
print(multi_group['sales'].agg(['sum', 'mean']))
# 3. 对多列应用不同聚合函数
agg_dict = {
'sales': ['sum', 'mean', 'max'],
'quantity': ['sum', 'mean']
}
print("\n多列不同聚合:")
print(sales_data.groupby('region').agg(agg_dict))
```
### 4.2 高级分组操作
```python
# 1. 分组后应用自定义函数
def sales_performance(group):
"""计算销售表现指标"""
result = pd.Series({
'total_sales': group['sales'].sum(),
'avg_sales': group['sales'].mean(),
'sales_growth': (group['sales'].iloc[-1] - group['sales'].iloc[0]) / group['sales'].iloc[0] if len(group) > 1 else 0,
'max_sale': group['sales'].max(),
'min_sale': group['sales'].min()
})
return result
performance_by_region = sales_data.groupby('region').apply(sales_performance)
# 2. 分组过滤
# 只保留销售总额超过300的地区
high_sales_regions = sales_data.groupby('region').filter(lambda x: x['sales'].sum() > 300)
# 3. 分组转换(保持原始形状)
# 计算每个地区的销售占比
sales_data['region_sales_ratio'] = sales_data.groupby('region')['sales'].transform(
lambda x: x / x.sum()
)
# 4. 分组排名
sales_data['rank_in_region'] = sales_data.groupby('region')['sales'].rank(
method='dense', ascending=False
)
print("分组性能分析:")
print(performance_by_region)
print("\n高销售地区:")
print(high_sales_regions)
```
## 五、数据合并与连接
### 5.1 多种合并方式
```python
# 创建示例数据
df1 = pd.DataFrame({
'key': ['A', 'B', 'C', 'D'],
'value1': [1, 2, 3, 4],
'data1': ['x1', 'x2', 'x3', 'x4']
})
df2 = pd.DataFrame({
'key': ['B', 'C', 'D', 'E'],
'value2': [5, 6, 7, 8],
'data2': ['y2', 'y3', 'y4', 'y5']
})
df3 = pd.DataFrame({
'key': ['A', 'B', 'F'],
'value3': [9, 10, 11],
'data3': ['z1', 'z2', 'z3']
})
# 1. 内连接(默认)
inner_merge = pd.merge(df1, df2, on='key', how='inner')
print("内连接结果:")
print(inner_merge)
# 2. 左连接
left_merge = pd.merge(df1, df2, on='key', how='left')
print("\n左连接结果:")
print(left_merge)
# 3. 外连接
outer_merge = pd.merge(df1, df2, on='key', how='outer')
print("\n外连接结果:")
print(outer_merge)
# 4. 右连接
right_merge = pd.merge(df1, df2, on='key', how='right')
print("\n右连接结果:")
print(right_merge)
# 5. 多表合并
multi_merge = pd.merge(pd.merge(df1, df2, on='key', how='outer'),
df3, on='key', how='outer')
print("\n多表合并结果:")
print(multi_merge)
# 6. 基于索引合并
df1_indexed = df1.set_index('key')
df2_indexed = df2.set_index('key')
index_merge = df1_indexed.join(df2_indexed, how='inner', rsuffix='_right')
print("\n基于索引合并:")
print(index_merge)
```
### 5.2 复杂合并场景
```python
# 1. 多键合并
df_multi1 = pd.DataFrame({
'key1': ['A', 'B', 'C', 'D'],
'key2': [1, 2, 3, 4],
'value': ['a', 'b', 'c', 'd']
})
df_multi2 = pd.DataFrame({
'key1': ['A', 'B', 'C', 'E'],
'key2': [1, 2, 3, 5],
'data': ['x', 'y', 'z', 'w']
})
multi_key_merge = pd.merge(df_multi1, df_multi2, on=['key1', 'key2'], how='inner')
print("多键合并结果:")
print(multi_key_merge)
# 2. 处理重复列名
df_dup1 = pd.DataFrame({'key': ['A', 'B'], 'value': [1, 2]})
df_dup2 = pd.DataFrame({'key': ['A', 'B'], 'value': [3, 4]})
merge_with_suffix = pd.merge(df_dup1, df_dup2, on='key', suffixes=('_left', '_right'))
print("\n处理重复列名:")
print(merge_with_suffix)
# 3.