# 1901-2023年全国乡镇级降水数据处理实战:Python自动化工作流深度解析
最近在做一个区域气候风险评估项目,需要处理跨越百年的全国乡镇级降水数据。拿到手的是原始的栅格数据和各级行政区划的Shp文件,数据量庞大,手动操作几乎不可能。这让我不得不重新梳理一套高效的Python自动化处理流程。如果你也经常和地理空间数据、时间序列数据打交道,尤其是需要将栅格数据按行政单元进行统计汇总,那么今天分享的这套代码和思路,或许能帮你节省大量重复劳动的时间。本文面向有一定Python基础的数据分析师和GIS工程师,我们将从数据读取、空间计算、表格整合,一直讲到性能优化和常见坑点,目标是构建一个健壮、可复用的数据处理管道。
## 1. 环境搭建与核心库选择
工欲善其事,必先利其器。处理这类涉及地理空间和大量数值计算的任务,库的选择至关重要。我个人的环境基于Python 3.9+,经过多次实践,以下几个库的组合最为稳定高效。
首先,**地理空间数据处理**是核心。`geopandas` 是当之无愧的首选,它基于`pandas`和`shapely`,让处理矢量数据(如Shp文件)变得像操作DataFrame一样简单。`rasterio`则是读取和处理栅格数据(如我们的逐年降水TIFF文件)的利器,其接口清晰,性能优秀。对于空间叠加分析,我们离不开`shapely`库提供的几何对象操作。
其次,**数值计算与并行处理**能极大提升效率。`numpy`和`pandas`是数据分析的基石。当需要对大量栅格像元进行分区统计时,`xarray`结合`rioxarray`扩展提供了更优雅的网格数据操作方式。对于计算密集型任务,`dask`可以实现内存友好的并行计算,避免处理全国数据时内存爆掉。
最后,**工作流与可视化**辅助工具。`matplotlib`和`contextily`用于快速可视化检查数据处理结果是否正确,比如看看统计后的降水空间分布是否合理。`pathlib`和`tqdm`则让文件管理和进度提示更加现代化和友好。
下面是一个推荐的核心环境配置清单,你可以通过`pip`或`conda`安装:
```bash
# 使用 conda 安装地理空间库通常更省心
conda create -n climate_data python=3.9
conda activate climate_data
conda install -c conda-forge geopandas rasterio rioxarray xarray dask
conda install numpy pandas matplotlib contextily tqdm pathlib scipy
# 或者使用 pip
pip install geopandas rasterio rioxarray xarray dask numpy pandas matplotlib contextily tqdm scipy
```
> 注意:安装`geopandas`和`rasterio`时,如果遇到GDAL等底层C库的编译问题,强烈建议使用`conda-forge`频道安装,它能自动解决复杂的依赖关系。
## 2. 数据准备与规范化读取
数据来源通常五花八门,格式和规范不一。我们的目标是:将不同来源、不同格式的数据,统一加载到Python中,并转换为结构清晰、易于后续计算的数据结构。这一步的规范性直接决定了整个流程的顺畅度。
**2.1 行政边界矢量数据(Shp)读取与检查**
我们手头可能有省、市、县、镇四级行政边界的Shp文件。使用`geopandas`读取非常简单:
```python
import geopandas as gpd
# 读取省级边界
province_gdf = gpd.read_file(‘path/to/province_boundary.shp’)
# 读取乡镇级边界
town_gdf = gpd.read_file(‘path/to/town_boundary.shp’)
print(f“省级数据几何类型: {province_gdf.geometry.type.unique()}”)
print(f“乡镇数据记录数: {len(town_gdf)}”)
print(town_gdf.columns) # 查看属性字段
```
读取后,必须进行几项关键检查:
1. **坐标系统一**:所有矢量数据和栅格数据必须在同一个坐标系下才能进行空间计算。通常使用地理坐标系(如WGS84, EPSG:4326)或投影坐标系(如Albers等积投影)。
```python
print(province_gdf.crs) # 查看当前坐标系
# 如果需要进行面积计算,建议转换到投影坐标系
# province_gdf_projected = province_gdf.to_crs(epsg=xxxx)
```
2. **几何有效性**:Shp文件中的几何图形有时会存在自相交、空洞等无效情况,这会导致空间计算失败。
```python
# 检查并修复无效几何
invalid_geom = town_gdf[~town_gdf.is_valid]
if not invalid_geom.empty:
print(f“发现 {len(invalid_geom)} 个无效几何图形,正在尝试修复...”)
town_gdf.geometry = town_gdf.buffer(0) # 常用修复方法
```
3. **属性字段**:确认行政单元的唯一标识字段(如`adcode`、`name`)是否存在且无重复。这将是后续与Excel数据关联的关键。
**2.2 逐年降水栅格数据批量读取**
假设我们有1901-2023年共123个年份的降水栅格文件(如`precip_1901.tif`, `precip_1902.tif` …)。我们需要高效地批量读取并组织它们。
```python
import rasterio
from pathlib import Path
import numpy as np
# 定义栅格数据文件夹
raster_dir = Path(‘./annual_precipitation_tif’)
# 获取所有tif文件,并按年份排序
tif_files = sorted(raster_dir.glob(‘precip_*.tif’))
# 读取第一个文件获取元数据(坐标系、变换矩阵、形状等)
with rasterio.open(tif_files[0]) as src:
profile = src.profile # 元数据
transform = src.transform
crs = src.crs
# 初始化一个三维数组或使用xarray存储多年数据
# 方法A:使用列表存储每年数组(适合内存充足的情况)
annual_data_list = []
years = []
for tif_file in tif_files:
year = int(tif_file.stem.split(‘_’)[-1]) # 从文件名提取年份
years.append(year)
with rasterio.open(tif_file) as src:
data = src.read(1) # 读取第一个波段
# 处理NoData值
data = np.where(data == src.nodata, np.nan, data)
annual_data_list.append(data)
# 方法B:使用xarray,更利于处理时空数据
import xarray as xr
import rioxarray
# 可以一次性打开多个文件并合并
ds = xr.open_mfdataset([str(f) for f in tif_files], combine=‘by_coords’, engine=‘rasterio’)
# ds 现在是一个包含‘band’维度和时间维度的数据集
```
> 提示:在处理全国1km分辨率栅格数据时,单年份数据量就很大。务必注意内存管理。使用`xarray`并设置`chunks`参数,或结合`dask`进行惰性加载,是处理大数据集的推荐做法。
**2.3 Excel格式统计结果的整合**
有时我们可能已经有一部分处理好的Excel数据,或者需要将最终结果输出为Excel。`pandas`的`read_excel`函数非常强大。
```python
import pandas as pd
# 读取已有的市级降水统计Excel
city_precip_df = pd.read_excel(‘city_precipitation_summary.xlsx’, sheet_name=None) # 读取所有sheet
# 或者读取单个sheet
city_precip_df = pd.read_excel(‘city_precipitation_summary.xlsx’)
# 查看数据结构
print(city_precip_df.head())
print(city_precip_df.columns)
```
关键点在于,Excel表格中的行政单元需要能与Shp文件中的属性字段(如`adcode`)准确匹配,以便进行数据关联或验证。
## 3. 核心处理:基于行政边界的栅格分区统计
这是整个流程的技术核心:如何快速、准确地将每个像元的降水值,汇总到它所属的每个行政多边形内?这里介绍两种主流方法。
**3.1 方法一:基于`rasterio`和`geopandas`的`zonal_stats`**
这是最直观的方法。我们可以为每个行政多边形计算一系列统计值(总和、均值、最大值等)。对于降水总量,我们需要的是`sum`。
```python
from rasterstats import zonal_stats
# 注意:需要安装 rasterstats 库
# 假设我们已经有一个年份的栅格数据数组 `annual_data` 和其仿射变换 `transform`
# 以及一个GeoDataFrame `admin_gdf` (例如乡镇gdf)
# 计算每个乡镇的年降水总量
stats = zonal_stats(admin_gdf.geometry, # 几何对象
annual_data, # 栅格数值数组
affine=transform, # 仿射变换
stats=[‘sum’, ‘mean’, ‘count’], # 需要的统计量
nodata=np.nan, # 指定NoData值
all_touched=True) # 所有触及的像元都计入
# 将结果添加到GeoDataFrame中
admin_gdf[‘precip_sum_2023’] = [stat[‘sum’] for stat in stats]
admin_gdf[‘pixel_count_2023’] = [stat[‘count’] for stat in stats]
# 注意:直接求和得到的是像元值的总和,需要根据实际数据单位判断。
# 如果原始栅格是年均降水量(mm),则总和代表该区域总降水量(mm*像素数)。
# 更常见的需求是区域平均降水量,则使用‘mean’统计量。
```
这种方法逻辑清晰,但有一个明显缺点:当行政单元非常多(比如全国四万多个乡镇)且栅格数据分辨率高时,循环计算每个多边形的速度会非常慢。
**3.2 方法二:基于`xarray`和`rioxarray`的向量化区域统计(推荐)**
为了提升性能,尤其是处理多年份数据时,我们可以利用`xarray`和`rioxarray`的向量化操作。思路是将矢量数据栅格化,生成一个与降水栅格空间对齐的“标签数组”,每个像元的值是其所属行政单元的ID,然后利用`xarray`的`groupby`进行快速分组聚合。
```python
import xarray as xr
import rioxarray
from rasterio import features
# 1. 使用xarray打开多年份栅格数据(假设已合并为ds,维度为(‘year’, ‘y’, ‘x’))
# ds = xr.open_dataset(‘merged_precip.nc’)
# 2. 将行政边界GeoDataFrame栅格化,生成标签栅格
def rasterize_geometries(gdf, like_raster_da):
“””
根据参考栅格,将GeoDataFrame中的几何图形栅格化。
gdf: 包含几何图形和唯一ID字段(如‘adcode’)的GeoDataFrame
like_raster_da: 参考的xarray DataArray,用于获取坐标和形状
“””
shapes = ((geom, value) for geom, value in zip(gdf.geometry, gdf[‘adcode’]))
rasterized = features.rasterize(shapes,
out_shape=like_raster_da.shape[-2:], # (height, width)
transform=like_raster_da.rio.transform(),
fill=0, # 区域外填充0
dtype=‘int32’)
label_da = xr.DataArray(rasterized,
coords={‘y’: like_raster_da.y, ‘x’: like_raster_da.x},
dims=(‘y’, ‘x’))
return label_da
# 假设precip_da是某个年份的降水DataArray
label_da = rasterize_geometries(town_gdf, precip_da)
# 3. 使用groupby进行高效分区统计
# 将降水数据和标签数据堆叠到一起
stacked_da = xr.concat([precip_da.expand_dims(dim={‘var’: [‘precip’]}),
label_da.expand_dims(dim={‘var’: [‘label’]})],
dim=‘var’).transpose(‘y’, ‘x’, ‘var’)
# 按标签分组,计算总和(忽略标签为0的区域,即非行政区域)
grouped = stacked_da.groupby(‘label’)
# 计算每个行政单元内的降水总和
sum_result = grouped.sum(dim=[‘y’, ‘x’])
# sum_result 现在是一个包含每个adcode对应降水总和的DataArray
# 4. 将结果转换回DataFrame
result_df = sum_result.sel(var=‘precip’).to_dataframe(name=‘precip_sum’).reset_index()
result_df = result_df[result_df[‘label’] != 0] # 过滤掉背景值
# 将label与原始乡镇gdf的adcode关联
final_df = pd.merge(town_gdf[[‘adcode’, ‘name’]], result_df, left_on=‘adcode’, right_on=‘label’)
```
这种方法一次性对所有区域进行计算,避免了Python层面的循环,性能有数量级的提升,特别适合处理全国乡镇级这种海量多边形。
**3.3 多年份批量处理与结果组织**
我们需要对1901-2023年每个年份重复上述分区统计过程。为了高效管理,我将流程封装成一个函数,并利用`dask`进行并行化。
```python
import dask.array as da
from dask.diagnostics import ProgressBar
def calculate_annual_precip_for_admin(admin_gdf, raster_path_pattern, years):
“””
计算多年份行政单元降水统计。
admin_gdf: 行政边界GeoDataFrame
raster_path_pattern: 栅格文件路径模式,例如‘precip_{year}.tif’
years: 年份列表
“””
results = {}
# 预栅格化标签(只需一次)
with rasterio.open(raster_path_pattern.format(year=years[0])) as ref_src:
ref_transform = ref_src.transform
ref_shape = ref_src.shape
label_array = rasterize_geometries(admin_gdf, ref_transform, ref_shape)
for year in years:
print(f“Processing {year}...”)
raster_path = raster_path_pattern.format(year=year)
# 读取栅格数据为dask array(惰性加载)
with rasterio.open(raster_path) as src:
precip_array = src.read(1)
precip_da = da.from_array(precip_array, chunks=‘auto’)
# 使用numpy的bincount进行快速分组求和(向量化操作)
# 这里是一个简化的高性能实现逻辑
flat_precip = precip_array.ravel()
flat_labels = label_array.ravel()
# 使用掩码排除NoData值和标签为0的区域
valid_mask = (~np.isnan(flat_precip)) & (flat_labels > 0)
valid_precip = flat_precip[valid_mask]
valid_labels = flat_labels[valid_mask]
# 利用bincount按标签求和
unique_labels = np.unique(valid_labels)
sums = np.bincount(valid_labels.astype(int), weights=valid_precip, minlength=int(valid_labels.max())+1)
# sums[0]是背景区域的和,忽略
year_result = {label: sums[int(label)] for label in unique_labels if label > 0}
results[year] = year_result
# 将结果转换为以行政单元为行、年份为列的DataFrame
admin_ids = admin_gdf[‘adcode’].values
result_df = pd.DataFrame(index=admin_ids)
for year, year_dict in results.items():
result_df[year] = result_df.index.map(year_dict)
return result_df
# 调用函数
town_precip_df = calculate_annual_precip_for_admin(town_gdf,
‘./precip_data/precip_{year}.tif’,
range(1901, 2024))
```
最终得到的`town_precip_df`是一个`DataFrame`,索引是乡镇的`adcode`,列是年份(1901, 1902, …, 2023),每个单元格的值是该乡镇在该年份的降水统计值(总和或均值)。
## 4. 结果输出、验证与可视化
数据处理完,输出和验证同样重要。我们需要将结果保存为可共享、可后续分析的格式,并确保数据的准确性。
**4.1 输出为Shp和Excel格式**
将统计结果与原始的行政边界`GeoDataFrame`合并,然后输出。
```python
# 1. 将统计结果DataFrame合并回GeoDataFrame
# 假设 town_precip_df 索引是 adcode
town_gdf_with_data = town_gdf.merge(town_precip_df, left_on=‘adcode’, right_index=True, how=‘left’)
# 2. 输出为Shapefile
output_shp_path = ‘./output/town_annual_precipitation_1901_2023.shp’
town_gdf_with_data.to_file(output_shp_path, encoding=‘utf-8’)
print(f“Shapefile saved to {output_shp_path}”)
# 3. 输出为Excel(属性表)
output_excel_path = ‘./output/town_annual_precipitation_1901_2023.xlsx’
# 只输出属性数据,不包含几何列
attr_df = town_gdf_with_data.drop(columns=‘geometry’)
attr_df.to_excel(output_excel_path, index=False)
print(f“Excel file saved to {output_excel_path}”)
# 4. 输出为更高效的格式(如GeoPackage和Parquet)
# GeoPackage格式更现代,支持中文更好
town_gdf_with_data.to_file(‘./output/town_data.gpkg’, layer=‘precipitation’, driver=‘GPKG’)
# Parquet格式适合纯表格数据,压缩比高,读写快
attr_df.to_parquet(‘./output/town_precipitation.parquet’)
```
**4.2 数据质量验证**
在交付结果前,必须进行交叉验证。
* **总量核对**:随机选取几个区域,用GIS软件(如QGIS)手动计算栅格统计值,与程序输出结果对比。
* **空间一致性检查**:确保没有行政单元的数据是`NaN`(可能由于边界与栅格不重叠或几何无效)。检查极端值是否合理。
* **时间序列检查**:绘制几个典型乡镇的百年降水序列,查看是否存在异常的突变或缺失值。
```python
import matplotlib.pyplot as plt
# 随机选取5个乡镇,绘制其1901-2023年降水序列
sample_towns = town_precip_df.sample(5, random_state=42)
plt.figure(figsize=(12, 6))
for idx, row in sample_towns.iterrows():
plt.plot(row.index.astype(int), row.values, label=f“Town {idx}”, marker=‘.’, linestyle=‘-’)
plt.xlabel(‘Year’)
plt.ylabel(‘Annual Precipitation (mm)’)
plt.title(‘Sample Time Series of Town-level Precipitation’)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(‘./output/sample_timeseries.png’, dpi=300)
plt.show()
```
**4.3 空间分布可视化**
最后,生成一幅专题地图,直观展示某一年份(如2023年)的乡镇级降水空间分布。
```python
import contextily as ctx
fig, ax = plt.subplots(1, 1, figsize=(15, 10))
# 绘制2023年乡镇降水数据
town_gdf_with_data.plot(column=2023, # 假设列名是整数2023
ax=ax,
legend=True,
legend_kwds={‘label’: “Annual Precipitation (mm)”, ‘orientation’: “horizontal”, ‘shrink’: 0.6},
cmap=‘Blues’, # 使用蓝色系
edgecolor=‘grey’,
linewidth=0.2)
# 添加底图(在线地图,需网络)
try:
ctx.add_basemap(ax, crs=town_gdf_with_data.crs.to_string(), source=ctx.providers.CartoDB.Positron)
except:
print(“Could not add basemap. Proceeding without it.”)
ax.set_axis_off()
plt.title(‘Town-level Annual Precipitation (2023)’)
plt.tight_layout()
plt.savefig(‘./output/precipitation_map_2023.png’, dpi=300, bbox_inches=‘tight’)
```
通过这套从数据读取、核心计算到结果输出与验证的完整流程,我们实现了对海量历史气候数据的自动化、批量化处理。整个过程的关键在于选择合适的工具链(`geopandas`/`rasterio`/`xarray`),采用向量化或并行计算提升性能,并始终保持对数据质量和中间结果的检查。在实际项目中,你可能还需要根据数据的具体情况调整参数,例如处理NoData值的策略、选择最合适的分区统计方法等。希望这个实战指南能成为你处理类似空间-时间数据问题的有力参考。