# 哨兵2号影像处理实战:从数据下载到AI分类的完整Python流程
如果你刚开始接触遥感数据处理,面对哨兵2号(Sentinel-2)卫星提供的海量多光谱影像,可能会感到无从下手。数据在哪里下载?如何把原始的JP2文件变成可分析的数组?云层干扰怎么处理?又该如何利用这些数据训练一个能自动识别地物的AI模型?这些问题曾同样困扰过我。经过多个项目的实践,我梳理出了一套基于Python生态的完整工作流,它不仅能帮你自动化处理数据,更能将处理后的影像直接送入机器学习模型进行地物分类。这篇文章就是这套流程的详细拆解,我会分享每一步的具体代码、遇到的坑以及解决方案,目标是让你能直接复制代码并应用到自己的项目中。
## 1. 环境搭建与数据获取:构建可复现的Python工作流
在开始处理数据之前,一个稳定、可复现的Python环境至关重要。我强烈建议使用`conda`来管理环境,它能很好地解决地理空间数据处理中常见的依赖冲突问题。下面是我为哨兵2号处理任务配置的一个基础环境。
```bash
# 创建并激活一个新的conda环境
conda create -n sentinel2 python=3.9
conda activate sentinel2
# 安装核心数据处理库
conda install -c conda-forge gdal rasterio sentinelsat geopandas scipy
pip install sentinelsat
```
这里有几个关键库需要解释一下:`gdal`和`rasterio`是读写地理空间栅格数据的基石,几乎所有的遥感操作都离不开它们。`sentinelsat`则是我们自动化下载哨兵2号数据的利器,它提供了对哥白尼数据空间(Copernicus Data Space)的Python接口。`geopandas`用于处理矢量数据(比如你的研究区域边界),而`scipy`会在后续的图像处理中用到。
数据获取是整个流程的起点。过去我们依赖Copernicus Open Access Hub,但现在更推荐使用新的**Copernicus Data Space Ecosystem**。它的API更稳定,下载速度也更快。首先,你需要去其官网注册一个账号并获取API密钥。接下来,就可以用`sentinelsat`来搜索和下载数据了。
```python
from sentinelsat import SentinelAPI, read_geojson, geojson_to_wkt
from datetime import date
# 连接到Copernicus Data Space
api = SentinelAPI('你的用户名', '你的密码', 'https://catalogue.dataspace.copernicus.eu/')
# 定义搜索区域(这里用GeoJSON格式的边界框)
footprint = geojson_to_wkt({
"type": "Polygon",
"coordinates": [[
[116.0, 39.5],
[117.0, 39.5],
[117.0, 40.5],
[116.0, 40.5],
[116.0, 39.5]
]]
})
# 设置搜索条件
products = api.query(
footprint,
date=('20240101', '20240131'), # 时间范围
platformname='Sentinel-2',
processinglevel='Level-2A', # 直接获取经过大气校正的L2A级数据
cloudcoverpercentage=(0, 10) # 云量覆盖百分比,这里要求低于10%
)
# 将查询结果转换为Pandas DataFrame便于查看
products_df = api.to_dataframe(products)
print(f"找到 {len(products_df)} 景影像")
print(products_df[['title', 'cloudcoverpercentage', 'size']].head())
# 下载第一景影像
if not products_df.empty:
product_id = products_df.index[0]
api.download(product_id)
```
> **提示**:直接搜索L2A级数据可以省去本地运行Sen2Cor进行大气校正的步骤,但L2A数据的更新会有几天延迟。如果对时效性要求极高,也可以下载L1C数据,然后用Sen2Cor本地处理。
下载下来的数据是一个包含多个子文件夹的压缩包。解压后,你会看到类似`S2A_MSIL2A_20240115T030541_N0509_R075_T50TMK_20240115T063856.SAFE`的目录结构。我们需要的关键数据在`GRANULE/[Granule ID]/IMG_DATA/R10m/`(10米分辨率波段)和`R20m/`(20米分辨率波段)等文件夹中,格式为JP2。
## 2. 数据预处理:从原始JP2到规整的NumPy数组
下载的原始数据是分散的JP2文件,每个文件对应一个波段。预处理的目标是将这些文件读取、对齐、重采样(如果需要),并合并成一个多波段的多维数组,同时处理好空间参考信息。这一步是后续所有分析的基础。
首先,我们需要读取所有波段的文件。哨兵2号L2A产品提供了13个光谱波段,但最常用的是10米分辨率的4个波段(蓝、绿、红、近红外)和20米分辨率的红边、短波红外等波段。为了进行机器学习分类,我们通常希望所有特征(波段)具有相同的空间分辨率。因此,需要将20米分辨率的波段重采样到10米。
```python
import rasterio
from rasterio.enums import Resampling
import numpy as np
import os
def load_and_resample_bands(safe_folder_path, target_resolution='10m'):
"""
加载并重采样哨兵2号L2A数据的所有波段到一个统一的网格。
参数:
safe_folder_path: .SAFE文件夹的路径
target_resolution: 目标分辨率,'10m' 或 '20m'
返回:
stacked_array: 堆叠后的多波段数组 [bands, height, width]
profile: 包含地理信息的字典,可用于后续写出GeoTIFF
"""
base_img_path = os.path.join(safe_folder_path, 'GRANULE')
granule_folder = [f for f in os.listdir(base_img_path) if f.startswith('L')][0]
if target_resolution == '10m':
target_dir = 'R10m'
resample_factor = 1 # 10米波段不需要重采样
# 10米分辨率波段: B02(蓝), B03(绿), B04(红), B08(近红外)
band_files = ['B02', 'B03', 'B04', 'B08']
else:
target_dir = 'R20m'
resample_factor = 2 # 20米重采样到10米需要2倍上采样
# 20米分辨率波段示例: B05, B06, B07 (红边), B11, B12 (短波红外)
band_files = ['B05', 'B06', 'B07', 'B11', 'B12']
img_data_path = os.path.join(base_img_path, granule_folder, 'IMG_DATA', target_dir)
bands_data = []
profile = None
for band in band_files:
band_path = os.path.join(img_data_path, f'{band}_{target_resolution}.jp2')
with rasterio.open(band_path) as src:
if profile is None:
profile = src.profile.copy() # 以第一个波段为基准
if resample_factor > 1:
# 重采样到更高分辨率
data = src.read(
1,
out_shape=(
src.count,
int(src.height * resample_factor),
int(src.width * resample_factor)
),
resampling=Resampling.bilinear
)
else:
data = src.read(1)
bands_data.append(data)
# 将所有波段堆叠成一个三维数组 [波段数, 高, 宽]
stacked_array = np.stack(bands_data, axis=0)
# 更新profile以反映新的尺寸和波段数
profile.update({
'height': stacked_array.shape[1],
'width': stacked_array.shape[2],
'count': stacked_array.shape[0],
'dtype': stacked_array.dtype
})
return stacked_array, profile
# 使用示例
safe_path = './S2A_MSIL2A_20240115T030541_N0509_R075_T50TMK_20240115T063856.SAFE'
image_10m, profile_10m = load_and_resample_bands(safe_path, target_resolution='10m')
print(f"10米波段数据形状: {image_10m.shape}") # 例如 (4, 10980, 10980)
```
接下来是**云掩膜**。哨兵2号数据自带一个QA60质量评估波段,其中包含了云和云阴影的掩膜信息。正确使用这个波段能极大提升数据质量。
```python
def create_cloud_mask(safe_folder_path):
"""
从QA60波段创建云掩膜。
返回的掩膜中,True表示云/云阴影,False表示清晰像素。
"""
base_img_path = os.path.join(safe_folder_path, 'GRANULE')
granule_folder = [f for f in os.listdir(base_img_path) if f.startswith('L')][0]
qa60_path = os.path.join(base_img_path, granule_folder, 'IMG_DATA', 'R60m', 'MSK_CLDPRB_60m.jp2')
with rasterio.open(qa60_path) as src:
qa_band = src.read(1)
# QA60波段中,第10位(bit 10)表示不透明云,第11位(bit 11)表示卷云
# 这里我们创建一个简单的掩膜:如果任一云标志位被设置,则视为云像素
opaque_cloud = (qa_band & (1 << 10)) != 0
cirrus_cloud = (qa_band & (1 << 11)) != 0
cloud_mask = opaque_cloud | cirrus_cloud
# QA60是60米分辨率,需要重采样到与其他波段相同的分辨率(例如10米)
# 这里使用最近邻重采样,因为掩膜是二值数据
from scipy.ndimage import zoom
scale_factor = 6 # 从60米到10米
cloud_mask_high_res = zoom(cloud_mask, scale_factor, order=0)
# 裁剪到与其他波段相同的尺寸(由于边界效应,重采样后尺寸可能略有偏差)
target_height, target_width = profile_10m['height'], profile_10m['width']
cloud_mask_high_res = cloud_mask_high_res[:target_height, :target_width]
return cloud_mask_high_res
cloud_mask = create_cloud_mask(safe_path)
print(f"云覆盖比例: {np.mean(cloud_mask) * 100:.2f}%")
# 应用云掩膜:将云像素设置为NaN
image_clean = image_10m.copy()
for i in range(image_clean.shape[0]):
image_clean[i, cloud_mask] = np.nan
```
预处理最后一步通常是计算一些**光谱指数**,这些指数能增强特定地物的特征。最经典的是归一化植被指数(NDVI),它对植被非常敏感。
```python
def calculate_ndvi(image_array, red_band_idx=2, nir_band_idx=3):
"""
计算NDVI: (NIR - Red) / (NIR + Red)
假设image_array的波段顺序为 [B, G, R, NIR, ...]
"""
red = image_array[red_band_idx].astype(np.float32)
nir = image_array[nir_band_idx].astype(np.float32)
# 避免除零错误
denominator = nir + red
denominator[denominator == 0] = np.nan
ndvi = (nir - red) / denominator
return ndvi
ndvi = calculate_ndvi(image_10m)
```
至此,我们得到了一个干净、规整的多波段数组,以及一些衍生指数,可以用于后续的AI分类了。
## 3. 特征工程与样本准备:为机器学习模型准备“燃料”
原始波段值虽然是基础特征,但直接用于分类往往效果有限。通过特征工程,我们可以构造出对分类任务更有信息量的特征。对于遥感影像分类,特征工程通常包括光谱指数计算、纹理特征提取以及主成分分析(PCA)降维。
除了NDVI,其他有用的光谱指数还包括:
- **NDWI(归一化水体指数)**: `(Green - NIR) / (Green + NIR)`,用于提取水体。
- **NDBI(归一化建筑指数)**: `(SWIR1 - NIR) / (SWIR1 + NIR)`,用于提取建筑区域。
- **EVI(增强型植被指数)**: `2.5 * (NIR - Red) / (NIR + 6*Red - 7.5*Blue + 1)`,对高生物量区域更敏感。
我们可以批量计算这些指数并添加到特征集中:
```python
def calculate_spectral_indices(image_array):
"""
计算一组常用的光谱指数。
假设波段顺序: [B02(蓝), B03(绿), B04(红), B08(近红外), B11(短波红外1), B12(短波红外2)]
"""
blue = image_array[0].astype(np.float32)
green = image_array[1].astype(np.float32)
red = image_array[2].astype(np.float32)
nir = image_array[3].astype(np.float32)
swir1 = image_array[4].astype(np.float32) if image_array.shape[0] > 4 else None
swir2 = image_array[5].astype(np.float32) if image_array.shape[0] > 5 else None
indices = {}
# NDVI
indices['NDVI'] = (nir - red) / (nir + red + 1e-10)
# NDWI (McFeeters, 1996)
indices['NDWI'] = (green - nir) / (green + nir + 1e-10)
if swir1 is not None:
# NDBI
indices['NDBI'] = (swir1 - nir) / (swir1 + nir + 1e-10)
# MNDWI (改进的NDWI,对水体更敏感)
indices['MNDWI'] = (green - swir1) / (green + swir1 + 1e-10)
# EVI
indices['EVI'] = 2.5 * (nir - red) / (nir + 6*red - 7.5*blue + 1 + 1e-10)
return indices
# 计算所有指数
spectral_indices = calculate_spectral_indices(image_10m)
# 将指数堆叠成新的特征波段
additional_features = np.stack(list(spectral_indices.values()), axis=0)
print(f"新增光谱指数特征形状: {additional_features.shape}")
```
接下来是**纹理特征**。纹理能反映地物的空间结构信息,对于区分“森林”和“农田”这类光谱可能相似但纹理不同的地物很有帮助。灰度共生矩阵(GLCM)是常用的纹理计算方法,但计算量较大。我们可以使用`scikit-image`库来提取简单的纹理特征。
```python
from skimage.feature import graycomatrix, graycoprops
from skimage import img_as_ubyte
def calculate_texture_features(band_data, distances=[1], angles=[0], properties=['contrast', 'homogeneity']):
"""
计算单个波段的纹理特征。
为了性能,通常会对图像进行降采样或分块计算。
"""
# 将数据缩放到0-255并转换为整数类型
band_normalized = ((band_data - np.nanmin(band_data)) /
(np.nanmax(band_data) - np.nanmin(band_data)) * 255).astype(np.uint8)
# 计算GLCM
glcm = graycomatrix(band_normalized, distances=distances, angles=angles,
levels=256, symmetric=True, normed=True)
# 计算纹理属性
texture_features = []
for prop in properties:
texture_features.append(graycoprops(glcm, prop).ravel()[0])
return texture_features
# 示例:计算近红外波段(索引3)的纹理特征(在实际中可能需要对整个图像分块计算)
nir_texture = calculate_texture_features(image_10m[3])
```
对于高维特征(例如我们有了10个原始波段 + 5个光谱指数 + 2个纹理特征),直接使用可能会面临“维度灾难”和多重共线性问题。这时可以使用**主成分分析(PCA)**进行降维,保留大部分信息的同时减少特征数量。
```python
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
def apply_pca_to_image(image_stack, n_components=0.95):
"""
对多波段图像进行PCA降维。
image_stack: 形状为 (特征数, 高, 宽) 的数组
n_components: 保留的主成分数量或方差解释比例
"""
original_shape = image_stack.shape
# 将三维数组重塑为二维 [像素数, 特征数]
X = image_stack.reshape(original_shape[0], -1).T # 转置后每行是一个像素
# 处理NaN值(例如云掩膜产生的)
nan_mask = np.isnan(X).any(axis=1)
X_clean = X[~nan_mask]
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_clean)
# PCA
pca = PCA(n_components=n_components)
X_pca = pca.fit_transform(X_scaled)
print(f"原始特征数: {X_scaled.shape[1]}")
print(f"PCA后特征数: {X_pca.shape[1]}")
print(f"解释方差比例: {np.sum(pca.explained_variance_ratio_):.3f}")
# 将结果重构回图像形状
pca_image = np.full((X_pca.shape[1], original_shape[1], original_shape[2]), np.nan)
for i in range(X_pca.shape[1]):
band_data = np.full(original_shape[1] * original_shape[2], np.nan)
band_data[~nan_mask] = X_pca[:, i]
pca_image[i] = band_data.reshape(original_shape[1], original_shape[2])
return pca_image, pca, scaler
# 将所有特征堆叠在一起
all_features = np.vstack([image_10m, additional_features]) # 这里简化处理,实际需考虑空间对齐
pca_result, pca_model, scaler_model = apply_pca_to_image(all_features, n_components=0.95)
```
现在,我们有了经过PCA降维的特征图像。接下来需要准备训练样本。在监督分类中,我们需要一些已知类别的像素作为“老师”来训练模型。样本可以来自实地调查、高分辨率影像目视解译,或者已有的土地利用数据。这里假设我们已经有了一个GeoJSON文件,其中包含了多边形区域及其类别标签。
```python
import geopandas as gpd
from rasterio import features
from sklearn.model_selection import train_test_split
def create_training_data(raster_path, label_geojson_path, label_field='class'):
"""
从矢量标签文件创建训练样本。
参数:
raster_path: 参考栅格文件的路径(用于获取坐标信息)
label_geojson_path: 包含分类多边形的GeoJSON文件路径
label_field: GeoJSON中存储类别标签的字段名
返回:
X: 特征矩阵 [样本数, 特征数]
y: 标签向量 [样本数]
"""
# 读取标签矢量数据
gdf = gpd.read_file(label_geojson_path)
# 读取参考栅格(这里用我们之前处理好的一个波段)
with rasterio.open(raster_path) as src:
raster_shape = src.shape
transform = src.transform
# 将多边形标签栅格化到与影像相同的网格
labels_raster = np.zeros(raster_shape, dtype=np.int32) - 1 # -1表示无标签
for idx, row in gdf.iterrows():
geom = row.geometry
label = row[label_field]
# 栅格化多边形
mask = features.geometry_mask([geom], out_shape=raster_shape,
transform=transform, invert=True)
labels_raster[mask] = label
# 读取所有特征波段(假设已经保存为多波段GeoTIFF)
with rasterio.open(raster_path) as src:
# 假设是多波段文件
feature_data = src.read() # 形状为 [波段数, 高, 宽]
# 提取有标签的像素
valid_pixels = labels_raster != -1
X = feature_data[:, valid_pixels].T # 转置为 [样本数, 特征数]
y = labels_raster[valid_pixels]
print(f"提取到 {len(y)} 个训练样本")
print(f"类别分布: {np.bincount(y[y >= 0])}")
return X, y, labels_raster
# 假设我们已经将PCA结果保存为GeoTIFF
# 这里我们使用第一个PCA分量作为参考栅格
with rasterio.open('pca_result.tif', 'w', **profile_10m) as dst:
dst.write(pca_result)
# 创建训练数据
X, y, label_map = create_training_data('pca_result.tif', 'training_labels.geojson', label_field='landcover')
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42, stratify=y
)
print(f"训练集大小: {X_train.shape}, 测试集大小: {X_test.shape}")
```
至此,我们已经准备好了高质量的训练数据,可以开始构建分类模型了。
## 4. 机器学习模型训练与评估:让AI学会识别地物
有了特征和标签,我们就可以训练分类模型了。在遥感领域,随机森林(Random Forest)因其对高维数据的良好处理能力、不易过拟合以及能提供特征重要性评估而备受青睐。下面我们用`scikit-learn`来实现一个完整的随机森林分类流程。
首先,训练一个基础的随机森林分类器:
```python
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import joblib
# 初始化随机森林模型
rf_model = RandomForestClassifier(
n_estimators=100, # 树的数量
max_depth=15, # 树的最大深度,防止过拟合
min_samples_split=5, # 内部节点再划分所需最小样本数
min_samples_leaf=2, # 叶节点最少样本数
max_features='sqrt', # 寻找最佳分割时考虑的特征数
n_jobs=-1, # 使用所有CPU核心
random_state=42,
class_weight='balanced' # 平衡类别权重,处理样本不均衡
)
# 训练模型
rf_model.fit(X_train, y_train)
# 在测试集上预测
y_pred = rf_model.predict(X_test)
# 评估模型性能
accuracy = accuracy_score(y_test, y_pred)
print(f"整体分类精度: {accuracy:.4f}")
print("\n分类报告:")
print(classification_report(y_test, y_pred, target_names=['水体', '森林', '农田', '建筑']))
# 混淆矩阵
cm = confusion_matrix(y_test, y_pred)
print("混淆矩阵:")
print(cm)
```
随机森林的一个强大功能是能评估每个特征的重要性,这能帮助我们理解模型决策的依据,甚至优化特征选择。
```python
import pandas as pd
import matplotlib.pyplot as plt
# 获取特征重要性
feature_importance = rf_model.feature_importances_
# 假设我们的特征名称(根据之前的特征工程)
feature_names = [
'B02_Blue', 'B03_Green', 'B04_Red', 'B08_NIR', # 原始波段
'B05_RedEdge1', 'B06_RedEdge2', 'B07_RedEdge3', 'B11_SWIR1', 'B12_SWIR2', # 20米波段
'NDVI', 'NDWI', 'NDBI', 'MNDWI', 'EVI', # 光谱指数
'PCA1', 'PCA2', 'PCA3' # PCA主成分
]
# 创建重要性DataFrame
importance_df = pd.DataFrame({
'feature': feature_names[:len(feature_importance)],
'importance': feature_importance
}).sort_values('importance', ascending=False)
print("特征重要性排序:")
print(importance_df)
# 可视化
plt.figure(figsize=(10, 6))
plt.barh(importance_df['feature'], importance_df['importance'])
plt.xlabel('特征重要性')
plt.title('随机森林特征重要性')
plt.gca().invert_yaxis() # 最重要的特征在顶部
plt.tight_layout()
plt.show()
```
在实际项目中,我们很少只满足于一个默认参数的模型。**超参数调优**能显著提升模型性能。`scikit-learn`的`GridSearchCV`或`RandomizedSearchCV`可以帮我们系统性地搜索最佳参数组合。
```python
from sklearn.model_selection import GridSearchCV
# 定义参数网格
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [10, 15, 20, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4],
'max_features': ['sqrt', 'log2']
}
# 使用网格搜索(注意:这可能需要较长时间)
grid_search = GridSearchCV(
RandomForestClassifier(random_state=42, class_weight='balanced', n_jobs=-1),
param_grid,
cv=5, # 5折交叉验证
scoring='accuracy',
verbose=2,
n_jobs=-1
)
# 在小样本上运行网格搜索以节省时间(实际应用中应在完整训练集上运行)
sample_idx = np.random.choice(len(X_train), size=min(5000, len(X_train)), replace=False)
grid_search.fit(X_train[sample_idx], y_train[sample_idx])
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证精度: {grid_search.best_score_:.4f}")
# 使用最佳参数重新训练完整模型
best_rf = grid_search.best_estimator_
best_rf.fit(X_train, y_train)
# 保存训练好的模型
joblib.dump(best_rf, 'sentinel2_landcover_rf_model.pkl')
```
模型训练好后,我们需要将其应用到整景影像上,生成分类结果图。
```python
def predict_entire_image(model, feature_image_path, output_path):
"""
使用训练好的模型对整个影像进行分类预测。
参数:
model: 训练好的scikit-learn模型
feature_image_path: 多波段特征影像路径
output_path: 分类结果输出路径
"""
with rasterio.open(feature_image_path) as src:
feature_data = src.read()
profile = src.profile
original_shape = feature_data.shape # [波段数, 高, 宽]
# 重塑为二维数组 [像素数, 波段数]
X_full = feature_data.reshape(original_shape[0], -1).T
# 处理NaN值
nan_mask = np.isnan(X_full).any(axis=1)
X_valid = X_full[~nan_mask]
# 预测(分批处理以避免内存不足)
batch_size = 100000
predictions_valid = np.zeros(X_valid.shape[0], dtype=np.uint8)
for i in range(0, X_valid.shape[0], batch_size):
end_idx = min(i + batch_size, X_valid.shape[0])
predictions_valid[i:end_idx] = model.predict(X_valid[i:end_idx])
# 将预测结果填回原始形状
predictions_full = np.zeros(original_shape[1] * original_shape[2], dtype=np.uint8)
predictions_full[~nan_mask] = predictions_valid
predictions_full[nan_mask] = 255 # 用255表示无效值(如云)
classification_result = predictions_full.reshape(original_shape[1], original_shape[2])
# 更新profile以保存分类结果
profile.update({
'count': 1,
'dtype': 'uint8',
'nodata': 255
})
# 保存结果
with rasterio.open(output_path, 'w', **profile) as dst:
dst.write(classification_result, 1)
print(f"分类结果已保存至: {output_path}")
return classification_result
# 应用模型
classification_map = predict_entire_image(
best_rf,
'pca_result.tif',
'landcover_classification.tif'
)
```
最后,我们可以用`matplotlib`将分类结果可视化,并与原始影像进行对比。
```python
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
# 定义分类颜色映射
class_names = ['水体', '森林', '农田', '建筑', '其他']
class_colors = ['#1f77b4', '#2ca02c', '#ff7f0e', '#d62728', '#7f7f7f']
cmap = ListedColormap(class_colors)
# 创建可视化图
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# 1. 真彩色合成 (B04, B03, B02)
rgb_image = np.stack([image_10m[2], image_10m[1], image_10m[0]], axis=-1) # 红、绿、蓝
# 拉伸对比度以便显示
rgb_display = np.clip((rgb_image / np.nanpercentile(rgb_image, 95)) * 255, 0, 255).astype(np.uint8)
axes[0].imshow(rgb_display)
axes[0].set_title('真彩色合成 (RGB)')
axes[0].axis('off')
# 2. NDVI显示
ndvi_display = np.clip((ndvi + 1) / 2 * 255, 0, 255).astype(np.uint8) # 将NDVI从[-1,1]映射到[0,255]
axes[1].imshow(ndvi_display, cmap='RdYlGn')
axes[1].set_title('NDVI (植被指数)')
axes[1].axis('off')
# 3. 分类结果
# 将无效值(255)设为透明
classification_display = classification_map.copy()
mask = classification_display == 255
classification_display = np.ma.masked_where(mask, classification_display)
im = axes[2].imshow(classification_display, cmap=cmap, vmin=0, vmax=len(class_colors)-1)
axes[2].set_title('土地利用分类结果')
axes[2].axis('off')
# 添加图例
from matplotlib.patches import Patch
legend_elements = [Patch(facecolor=color, edgecolor='black', label=name)
for color, name in zip(class_colors, class_names)]
axes[2].legend(handles=legend_elements, loc='lower right', fontsize=8)
plt.tight_layout()
plt.savefig('classification_result.png', dpi=300, bbox_inches='tight')
plt.show()
```
## 5. 流程优化与生产部署:从实验脚本到可复用工具
当我们在单景影像上验证了流程的有效性后,下一步就是将其优化为一个可以处理批量数据、健壮且高效的生产级工具。这涉及到错误处理、日志记录、并行处理和容器化等多个方面。
首先,我们可以将整个流程封装到一个类中,使其更易于管理和复用:
```python
import logging
from pathlib import Path
from typing import Optional, Dict, Tuple
import numpy as np
import rasterio
class Sentinel2Processor:
"""哨兵2号数据处理与分类流水线"""
def __init__(self, working_dir: str, log_level: str = 'INFO'):
self.working_dir = Path(working_dir)
self.working_dir.mkdir(parents=True, exist_ok=True)
# 设置日志
logging.basicConfig(
level=getattr(logging, log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.working_dir / 'sentinel2_processor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# 存储中间结果路径
self.processed_files = {}
def download_product(self, product_id: str, api_user: str, api_pass: str) -> Path:
"""下载指定的哨兵2号产品"""
try:
from sentinelsat import SentinelAPI
api = SentinelAPI(api_user, api_pass, 'https://catalogue.dataspace.copernicus.eu/')
self.logger.info(f"开始下载产品: {product_id}")
# 检查是否已下载
product_info = api.get_product_odata(product_id)
safe_name = product_info['title']
safe_path = self.working_dir / f"{safe_name}.SAFE"
if safe_path.exists():
self.logger.info(f"产品已存在: {safe_path}")
return safe_path
# 下载产品
download_path = api.download(product_id, directory_path=self.working_dir)
self.logger.info(f"下载完成: {download_path}")
return Path(download_path)
except Exception as e:
self.logger.error(f"下载失败: {str(e)}")
raise
def process_single_scene(self, safe_path: Path, output_prefix: str) -> Dict[str, Path]:
"""处理单景影像:预处理、特征提取、分类"""
try:
self.logger.info(f"开始处理: {safe_path.name}")
# 1. 加载和重采样波段
image_10m, profile = self._load_and_resample_bands(safe_path)
# 2. 云掩膜
cloud_mask = self._create_cloud_mask(safe_path, profile['height'], profile['width'])
# 3. 计算光谱指数
spectral_indices = self._calculate_spectral_indices(image_10m)
# 4. 特征堆叠与PCA
all_features = self._stack_features(image_10m, spectral_indices)
pca_features, pca_model = self._apply_pca(all_features)
# 5. 保存中间结果
pca_path = self.working_dir / f"{output_prefix}_pca.tif"
self._save_raster(pca_features, profile, pca_path)
# 6. 分类预测(如果模型存在)
model_path = self.working_dir / 'trained_model.pkl'
if model_path.exists():
classification = self._predict_classification(pca_path, model_path)
class_path = self.working_dir / f"{output_prefix}_classification.tif"
self._save_raster(classification.reshape(1, *classification.shape),
profile, class_path)
self.processed_files[output_prefix] = {
'pca': pca_path,
'classification': class_path,
'cloud_mask': cloud_mask
}
else:
self.processed_files[output_prefix] = {
'pca': pca_path,
'cloud_mask': cloud_mask
}
self.logger.info(f"处理完成: {output_prefix}")
return self.processed_files[output_prefix]
except Exception as e:
self.logger.error(f"处理失败 {safe_path.name}: {str(e)}")
raise
def batch_process(self, product_ids: list, api_credentials: Tuple[str, str]):
"""批量处理多景影像"""
results = {}
for product_id in product_ids:
try:
safe_path = self.download_product(product_id, *api_credentials)
output_prefix = safe_path.stem.replace('.SAFE', '')
result = self.process_single_scene(safe_path, output_prefix)
results[product_id] = result
except Exception as e:
self.logger.error(f"产品 {product_id} 处理失败: {str(e)}")
continue
return results
# 以下为内部方法,实现上述步骤的具体细节(略)
def _load_and_resample_bands(self, safe_path: Path):
# 实现波段加载和重采样
pass
def _create_cloud_mask(self, safe_path: Path, height: int, width: int):
# 实现云掩膜创建
pass
def _calculate_spectral_indices(self, image_array: np.ndarray):
# 实现光谱指数计算
pass
def _stack_features(self, image_array: np.ndarray, indices: Dict):
# 实现特征堆叠
pass
def _apply_pca(self, features: np.ndarray):
# 实现PCA降维
pass
def _save_raster(self, data: np.ndarray, profile: Dict, path: Path):
# 实现栅格保存
pass
def _predict_classification(self, feature_path: Path, model_path: Path):
# 实现分类预测
pass
# 使用示例
processor = Sentinel2Processor('./sentinel2_workspace', log_level='INFO')
api_user = 'your_username'
api_pass = 'your_password'
product_ids = [
'S2A_MSIL2A_20240115T030541_N0509_R075_T50TMK_20240115T063856',
'S2B_MSIL2A_20240120T030539_N0509_R075_T50TMK_20240120T062812'
]
results = processor.batch_process(product_ids, (api_user, api_pass))
```
对于大规模数据处理,**并行计算**可以显著加速流程。我们可以使用Python的`concurrent.futures`模块来并行处理多景影像:
```python
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
def process_scene_wrapper(args):
"""包装函数,用于并行处理"""
product_id, api_user, api_pass, working_dir = args
processor = Sentinel2Processor(working_dir)
safe_path = processor.download_product(product_id, api_user, api_pass)
output_prefix = safe_path.stem.replace('.SAFE', '')
return processor.process_single_scene(safe_path, output_prefix)
def parallel_batch_process(product_ids, api_credentials, working_dir, max_workers=None):
"""并行批量处理"""
if max_workers is None:
max_workers = multiprocessing.cpu_count() - 1 # 留一个核心给系统
args_list = [(pid, api_credentials[0], api_credentials[1], working_dir)
for pid in product_ids]
results = {}
with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_product = {
executor.submit(process_scene_wrapper, args): args[0]
for args in args_list
}
for future in as_completed(future_to_product):
product_id = future_to_product[future]
try:
result = future.result()
results[product_id] = result
print(f"完成处理: {product_id}")
except Exception as e:
print(f"处理失败 {product_id}: {str(e)}")
return results
# 并行处理示例
parallel_results = parallel_batch_process(
product_ids,
(api_user, api_pass),
'./sentinel2_parallel_workspace',
max_workers=4
)
```
最后,为了确保流程的可复现性和可移植性,我们可以使用**Docker**进行容器化。下面是一个简单的Dockerfile示例:
```dockerfile
# Dockerfile
FROM python:3.9-slim
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gdal-bin \
libgdal-dev \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY sentinel2_processor.py .
COPY trained_model.pkl .
# 设置环境变量
ENV GDAL_DATA=/usr/share/gdal
# 运行命令
CMD ["python", "sentinel2_processor.py"]
```
对应的`requirements.txt`文件:
```
# requirements.txt
numpy>=1.21.0
rasterio>=1.3.0
scikit-learn>=1.0.0
scikit-image>=0.19.0
sentinelsat>=1.1.0
geopandas>=0.11.0
matplotlib>=3.5.0
pandas>=1.4.0
joblib>=1.1.0
```
构建并运行Docker容器:
```bash
# 构建镜像
docker build -t sentinel2-processor .
# 运行容器(挂载数据目录)
docker run -v $(pwd)/data:/app/data sentinel2-processor
```
通过这样的容器化部署,你可以确保在任何支持Docker的机器上都能获得完全一致的环境,这对于团队协作和生产部署至关重要。
在实际项目中,我遇到过几个常见的坑。首先是**内存管理**:哨兵2号一景影像的全分辨率数据可能超过1GB,直接加载到内存可能导致崩溃。解决方案是使用分块处理或`rasterio`的窗口读取功能。其次是**坐标系对齐**:不同来源的矢量数据和栅格数据可能有不同的坐标系,务必在预处理阶段进行统一的重投影。最后是**类别不平衡**:实地样本中“建筑”类可能远少于“植被”类,这会导致模型偏向多数类。除了使用`class_weight='balanced'`参数,还可以尝试过采样少数类或欠采样多数类。
这套流程经过多个项目的打磨,已经能够稳定处理数百景哨兵2号影像。最耗时的部分通常是数据下载和云掩膜生成,而模型预测部分通过批处理和并行化已经相当高效。如果你刚开始接触,建议先用一小块区域测试整个流程,确保每一步都按预期工作,再扩展到更大范围。