# 从手动到智能:构建你的ECMWF大气数据自动化下载工作流
如果你正在处理遥感影像,特别是涉及地表温度反演这类对大气条件极其敏感的分析,那么你一定对ECMWF(欧洲中期天气预报中心)的大气再分析数据不陌生。无论是ERA5还是ERA5-Land,这些数据集提供了全球范围内高时空分辨率的大气状态变量,是校正大气影响、提升反演精度的关键。然而,当你的研究区域从单景影像扩展到省级、国家级,甚至全球尺度,时间序列从几天延伸到数年时,手动在网页上点点选选、一一下载数据,很快就会从必要工作变成一场噩梦。数据请求的排队、网络的不稳定、文件命名的混乱,每一项都在消耗着宝贵的研究时间。
这正是自动化脚本的价值所在。今天,我们不谈空洞的理论,直接切入实战,分享一套经过实际项目检验的、用于**批量自动化下载ECMWF大气数据**的Python解决方案。这套方案的核心目标很明确:**将你从重复、繁琐的手动操作中解放出来,让数据获取流程像流水线一样自动、可靠地运行**。无论你是需要为数百景Landsat或Sentinel影像匹配同期大气数据,还是需要定期更新某个区域的长时序数据集,这篇文章都将为你提供一个可直接集成、高度可定制的技术框架。
我们将从最基础的API配置讲起,逐步深入到如何根据遥感影像元数据(如`.xml`或`.txt`格式的元数据文件)智能解析时间与空间范围,并自动触发数据下载。整个过程会包含完整的代码示例、关键参数的详细解释、常见的“坑”以及如何优雅地处理异常。最终,你将拥有一个健壮的工具,只需一个命令,就能完成从元数据解析到数据下载归档的全过程。
## 1. 基石:ECMWF API的配置与环境搭建
在开始编写任何一行下载代码之前,我们必须先拿到访问数据的“钥匙”。ECMWF通过其气候数据存储(CDS)和大气数据存储(ADS)提供了官方的API接口,这比模拟网页请求要稳定和规范得多。
### 1.1 获取你的API凭证
首先,访问 [Copernicus Climate Data Store](https://cds.climate.copernicus.eu) 进行注册。这个过程是免费的,但需要验证邮箱。注册成功后,登录并进入你的用户页面。
> 注意:确保你注册时使用的邮箱是长期有效的,因为API key的维护和通知会通过该邮箱进行。
获取API Key的步骤非常直接:
1. 在用户页面找到“API key”或“How to use the API”的链接。
2. 页面会显示两行关键信息,通常格式如下:
```
url: https://cds.climate.copernicus.eu/api/v2
key: 123456:abcdefghij-1234-5678-90ab-cdef12345678
```
这里的`key`由你的用户ID和API令牌组成。
### 1.2 本地配置 `.cdsapirc` 文件
API客户端库需要通过一个配置文件来读取你的凭证。标准做法是在你的用户主目录下创建一个名为`.cdsapirc`的纯文本文件。
- **对于Linux/macOS用户**:主目录通常是 `~/`。你可以在终端中执行 `cd ~` 然后创建文件。
- **对于Windows用户**:主目录是 `C:\Users\<你的用户名>\`。你需要确保系统显示隐藏文件,才能看到以点开头的文件。
用文本编辑器(如Notepad++, VS Code,甚至系统自带的记事本)创建这个文件,内容就是刚才复制的两行:
```
url: https://cds.climate.copernicus.eu/api/v2
key: 123456:abcdefghij-1234-5678-90ab-cdef12345678
```
保存时,注意文件名必须是 **`.cdsapirc`**,包括开头的点号。在Windows记事本中,你可能需要将“保存类型”选为“所有文件”,然后在文件名输入框中手动输入完整的`.cdsapirc`。
### 1.3 安装必要的Python库
ECMWF官方提供了 `cdsapi` 这个Python库,它是我们与服务器通信的桥梁。安装它只需要一条命令。建议在虚拟环境中进行操作,以避免依赖冲突。
```bash
# 使用pip安装cdsapi库
pip install cdsapi
```
如果安装速度慢,可以考虑使用国内的镜像源:
```bash
pip install cdsapi -i https://pypi.tuna.tsinghua.edu.cn/simple
```
安装完成后,你可以通过一个简单的测试脚本来验证配置是否成功:
```python
import cdsapi
c = cdsapi.Client()
print("API客户端初始化成功!")
```
如果这段代码没有报错,恭喜你,环境搭建已经完成。如果遇到`Invalid key`或文件未找到的错误,请回头检查`.cdsapirc`文件的位置和内容是否正确。
## 2. 核心引擎:封装一个健壮的数据下载函数
直接使用API示例代码进行单次下载很简单,但我们的目标是批量化和自动化。这就需要我们将下载逻辑封装成一个函数,并充分考虑健壮性。
### 2.1 理解数据请求参数
以最常用的`reanalysis-era5-pressure-levels`(ERA5气压层数据)为例,一个数据请求包含多个维度的参数:
| 参数类别 | 参数名 | 说明 | 示例/注意事项 |
| :--- | :--- | :--- | :--- |
| **数据集标识** | `'reanalysis-era5-pressure-levels'` | 指定要下载的数据产品 | 不可更改 |
| **产品类型** | `'product_type'` | 通常为`'reanalysis'`(再分析数据) | |
| **变量** | `'variable'` | 需要下载的大气变量列表 | 如温度、湿度、风场等 |
| **气压层** | `'pressure_level'` | 所需的气压层高度(单位:hPa) | 从`'1'`到`'1000'`,可选 |
| **时间** | `'year'`, `'month'`, `'day'`, `'time'` | 数据的时间点 | **`'time'`必须为UTC整点,格式`'HH:MM'** |
| **空间范围** | `'area'` | 区域的经纬度范围 `[北纬, 西经, 南纬, 东经]` | 单位:度,北纬和东经为正 |
| **格式** | `'format'` | 输出文件格式 | 常用`'netcdf'`或`'grib'` |
一个典型的请求字典结构如下所示。注意,`'variable'`和`'pressure_level'`是列表,这意味着我们可以一次性请求多个变量和层次。
### 2.2 构建下载函数与异常处理
基于以上参数,我们可以构建一个核心下载函数。**异常处理是这里的关键**,因为网络超时、请求参数错误、服务器队列满等情况在批量处理中时有发生。
```python
import cdsapi
import sys
import os
from typing import Tuple, Optional
def download_era5_single_time(
year: str,
month: str,
day: str,
time: str, # 格式: "00:00", "06:00"等
area: list, # 格式: [north, west, south, east], 例如 [50, 100, 20, 120]
output_path: str,
variables: Optional[list] = None,
pressure_levels: Optional[list] = None
) -> Tuple[bool, str]:
"""
下载指定时空范围的单一时次ERA5气压层数据。
参数:
year, month, day, time: 数据时间。
area: 空间范围 [北, 西, 南, 东]。
output_path: 输出NetCDF文件的完整路径。
variables: 可选,大气变量列表。默认为常用变量集。
pressure_levels: 可选,气压层列表。默认为常用层次。
返回:
(success, message): 成功标志和描述信息。
"""
# 设置默认变量和气压层,用户可自定义覆盖
if variables is None:
variables = [
'temperature', # 温度
'relative_humidity', # 相对湿度
'specific_humidity', # 比湿
'u_component_of_wind', # 纬向风
'v_component_of_wind', # 经向风
'geopotential', # 位势高度
]
if pressure_levels is None:
# 选择一些代表性气压层,减少数据量
pressure_levels = ['1000', '850', '700', '500', '300', '200']
client = cdsapi.Client()
# 构建请求字典
request_params = {
'product_type': 'reanalysis',
'variable': variables,
'pressure_level': pressure_levels,
'year': year,
'month': month,
'day': day,
'time': time,
'area': area, # 顺序:北、西、南、东
'format': 'netcdf',
}
try:
print(f"[INFO] 开始请求数据: {year}-{month}-{day} {time}, 区域: {area}")
# 这里是核心的下载调用,result是一个临时文件对象
result = client.retrieve('reanalysis-era5-pressure-levels', request_params)
# 将数据下载到指定路径
result.download(output_path)
print(f"[SUCCESS] 数据已保存至: {output_path}")
return True, "下载成功"
except cdsapi.api.ClientError as e:
# 处理客户端错误,如参数错误、无数据等
error_msg = f"客户端请求错误: {e}"
print(f"[ERROR] {error_msg}")
return False, error_msg
except Exception as e:
# 处理其他异常,如网络错误、磁盘空间不足等
error_msg = f"下载过程发生未知错误: {e}"
print(f"[ERROR] {error_msg}")
return False, error_msg
```
这个函数做了几件重要的事情:
1. **参数化**:将所有可变的要素(时间、空间、变量、输出路径)都作为函数参数,灵活性极高。
2. **设置默认值**:为`variables`和`pressure_levels`提供了经过筛选的常用默认值,在满足多数需求的同时控制了数据量。
3. **精细化异常捕获**:专门捕获`cdsapi.api.ClientError`,这通常是请求参数问题;用通用的`Exception`捕获其他意外错误。
4. **清晰的日志**:在关键步骤打印信息,便于跟踪批量任务的进度和定位问题。
> 提示:ECMWF API对时间参数有严格限制。对于`reanalysis-era5-pressure-levels`,`time`参数必须是UTC时间的整点(如`"00:00"`, `"06:00"`, `"12:00"`, `"18:00"`)。传入非整点时间会直接导致请求被服务器拒绝。
## 3. 大脑:从遥感影像元数据中自动提取请求参数
自动化流程的“智能”体现在它能自动决策。在我们的场景里,决策依据就是遥感影像的元数据。我们需要编写一个模块,专门解析元数据文件,提取出影像的**获取时间**和**空间范围**,并将其转换为ECMWF API所需的参数格式。
### 3.1 解析不同卫星的元数据格式
不同的遥感数据源,其元数据格式差异很大。例如,Landsat系列通常使用`.txt`或`.MTL`文件,而Sentinel-2则使用`.xml`文件。我们的解析器需要能处理这些常见格式。
下面是一个支持Landsat MTL文件(.txt)和类Sentinel XML文件的解析函数示例:
```python
import os
from xml.dom.minidom import parse
import re
def parse_metadata(file_path: str):
"""
自动识别并解析元数据文件,提取中心时间与空间四至。
参数:
file_path: 元数据文件的路径。
返回:
dict: 包含解析出的时间、日期和边界框信息。
格式: {
'date': 'YYYY-MM-DD',
'time': 'HH:MM:SS',
'bbox': [north, west, south, east]
}
解析失败时返回None。
"""
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.txt' or 'mtl' in file_path.lower():
# 处理Landsat MTL格式
return _parse_landsat_mtl(file_path)
elif file_ext == '.xml':
# 处理Sentinel等XML格式
return _parse_sentinel_xml(file_path)
else:
print(f"[WARNING] 不支持的元数据格式: {file_path}")
return None
def _parse_landsat_mtl(txt_path):
"""解析Landsat MTL文件"""
info = {}
with open(txt_path, 'r') as f:
for line in f:
line = line.strip()
# 匹配 DATE_ACQUIRED 和 SCENE_CENTER_TIME
if line.startswith('DATE_ACQUIRED'):
info['date'] = line.split('=')[1].strip().strip('"')
elif line.startswith('SCENE_CENTER_TIME'):
# 时间可能带引号,如 "12:30:15.123456Z"
time_str = line.split('=')[1].strip().strip('"')
# 提取时分秒,忽略毫秒和Z
match = re.search(r'(\d{2}):(\d{2}):(\d{2})', time_str)
if match:
info['time'] = f"{match.group(1)}:{match.group(2)}:{match.group(3)}"
# 解析空间四至 (示例,实际MTL文件角点关键词可能不同)
elif 'CORNER_UL_LAT_PRODUCT' in line:
info['ul_lat'] = float(line.split('=')[1].strip().strip('"'))
elif 'CORNER_UR_LAT_PRODUCT' in line:
info['ur_lat'] = float(line.split('=')[1].strip().strip('"'))
elif 'CORNER_LL_LON_PRODUCT' in line:
info['ll_lon'] = float(line.split('=')[1].strip().strip('"'))
elif 'CORNER_LR_LON_PRODUCT' in line:
info['lr_lon'] = float(line.split('=')[1].strip().strip('"'))
# 计算边界框:北=max(纬度),南=min(纬度),西=min(经度),东=max(经度)
if all(k in info for k in ['ul_lat', 'ur_lat', 'll_lon', 'lr_lon']):
north = max(info['ul_lat'], info['ur_lat'])
south = min(info['ul_lat'], info['ur_lat']) # 这里简化了,实际需所有角点
west = min(info['ll_lon'], info['lr_lon'])
east = max(info['ll_lon'], info['lr_lon'])
info['bbox'] = [north, west, south, east]
return info if 'date' in info and 'time' in info and 'bbox' in info else None
def _parse_sentinel_xml(xml_path):
"""解析Sentinel SAFE格式中的MTD_*.xml文件"""
try:
dom = parse(xml_path)
root = dom.documentElement
# 查找时间信息 (Sentinel-2示例)
# 实际XPath可能因产品版本而异,这里是一个通用示例
start_time_elem = root.getElementsByTagName('PRODUCT_START_TIME')
if not start_time_elem:
start_time_elem = root.getElementsByTagName('StartTime') # 其他可能标签
if start_time_elem:
time_str = start_time_elem[0].firstChild.data
# 假设格式为 "2023-10-27T02:30:45.123Z"
date_part, time_part = time_str.split('T')
info = {'date': date_part, 'time': time_part[:8]} # 取到秒
# 查找地理范围 (通常以多边形或角点形式存在)
# 这里需要根据具体的XML结构进行解析,可能涉及‘geometricInfo’
# 以下为简化示例,实际应用需适配具体schema
# ...
# info['bbox'] = [north, west, south, east]
return info
except Exception as e:
print(f"[ERROR] 解析XML文件 {xml_path} 时出错: {e}")
return None
```
### 3.2 时间匹配策略:UTC转换与整点逼近
遥感影像的获取时间通常是UTC时间,但ECMWF的ERA5数据只在UTC的整点时刻有输出(例如00:00, 01:00, ...)。因此,我们需要一个策略,为影像时间匹配最接近的、可用的ECMWF数据时次。
一个简单有效的策略是**四舍五入到最近的整点**。例如,影像时间是`2023-10-27 02:25:00`,那么最接近的ECMWF整点时间是`02:00`。如果分钟数大于等于30,则向上取整(如`02:35:00`匹配`03:00`)。
```python
from datetime import datetime, timedelta
def match_era5_time(acquisition_datetime: datetime) -> Tuple[str, str, str, str]:
"""
将影像获取时间匹配到最接近的ERA5整点时间。
参数:
acquisition_datetime: datetime对象,表示影像获取的UTC时间。
返回:
(year, month, day, era5_time_str): 匹配后的年、月、日和ERA5时间字符串("HH:00")。
"""
# 计算最接近的整点小时
hour = acquisition_datetime.hour
minute = acquisition_datetime.minute
if minute >= 30:
# 向上取整,需要考虑跨日的情况
matched_dt = acquisition_datetime + timedelta(hours=1)
matched_dt = matched_dt.replace(minute=0, second=0, microsecond=0)
else:
# 向下取整
matched_dt = acquisition_datetime.replace(minute=0, second=0, microsecond=0)
# 格式化输出
year = matched_dt.strftime("%Y")
month = matched_dt.strftime("%m")
day = matched_dt.strftime("%d")
era5_time = matched_dt.strftime("%H:00") # ERA5要求的格式
print(f"[INFO] 原始时间: {acquisition_datetime} -> 匹配ERA5时间: {year}-{month}-{day} {era5_time}")
return year, month, day, era5_time
# 使用示例
from datetime import datetime
img_time = datetime.strptime("2023-10-27 02:25:00", "%Y-%m-%d %H:%M:%S")
y, m, d, t = match_era5_time(img_time)
# 输出: y='2023', m='10', d='27', t='02:00'
```
### 3.3 空间范围处理:缓冲区与区域合并
直接从元数据中提取的影像四至范围(Bounding Box)通常刚好覆盖影像。为了确保下载的大气数据能完全覆盖研究区域,并且考虑到大气模型网格的边界效应,我们通常会在原始范围的基础上**添加一个缓冲区**。
例如,对每个方向扩展0.5到1个经纬度。同时,如果你要处理一个区域内多景相邻的影像,一个更高效的策略是**计算这些影像的并集范围**,然后为这个合并后的大区域一次性下载数据,这比逐景下载要快得多,也减少了请求次数。
```python
def add_buffer_to_bbox(bbox: list, buffer_deg: float = 0.5) -> list:
"""
给边界框添加缓冲区。
参数:
bbox: 原始边界框 [north, west, south, east]。
buffer_deg: 缓冲区大小(度)。
返回:
list: 扩展后的边界框 [north_buf, west_buf, south_buf, east_buf]。
"""
north, west, south, east = bbox
north_buf = min(90, north + buffer_deg) # 北边界不能超过90度
south_buf = max(-90, south - buffer_deg) # 南边界不能超过-90度
west_buf = west - buffer_deg
east_buf = east + buffer_deg
# 经度处理(如果跨过180度经线需要特殊处理,此处简化)
return [north_buf, west_buf, south_buf, east_buf]
def merge_bboxes(bbox_list: list) -> list:
"""
合并多个边界框,返回一个能覆盖所有区域的最小外接矩形。
参数:
bbox_list: 多个边界框的列表,每个都是[north, west, south, east]。
返回:
list: 合并后的边界框。
"""
if not bbox_list:
return None
all_norths = [b[0] for b in bbox_list]
all_wests = [b[1] for b in bbox_list]
all_souths = [b[2] for b in bbox_list]
all_easts = [b[3] for b in bbox_list]
merged_north = max(all_norths)
merged_west = min(all_wests)
merged_south = min(all_souths)
merged_east = max(all_easts)
return [merged_north, merged_west, merged_south, merged_east]
```
## 4. 实战整合:构建完整的自动化批处理流水线
现在,我们将前面所有的模块组合起来,形成一个端到端的自动化脚本。这个脚本的核心逻辑是:**遍历指定目录下的所有遥感影像元数据文件 -> 解析每个文件获取时空信息 -> 匹配并格式化ECMWF请求参数 -> 调用下载函数 -> 记录日志**。
### 4.1 主控脚本设计
下面是一个主控脚本的框架,它定义了整个工作流的步骤:
```python
import os
import sys
import logging
from datetime import datetime
from pathlib import Path
# 导入我们之前编写的模块
from metadata_parser import parse_metadata, match_era5_time, add_buffer_to_bbox
from era5_downloader import download_era5_single_time
def setup_logging(log_dir: Path):
"""配置日志,同时输出到文件和终端"""
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"era5_batch_download_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
return logging.getLogger(__name__)
def find_metadata_files(input_dir: Path, extensions: tuple = ('.txt', '.xml', '.MTL')) -> list:
"""递归查找指定目录下所有特定后缀的元数据文件"""
metadata_files = []
for ext in extensions:
for file_path in input_dir.rglob(f'*{ext}'):
# 可以添加更精确的文件名过滤,例如 *MTL.txt
if file_path.is_file():
metadata_files.append(file_path)
logging.info(f"在目录 {input_dir} 下找到 {len(metadata_files)} 个元数据文件。")
return metadata_files
def process_single_metadata(meta_file: Path, output_dir: Path, buffer_deg: float = 0.5) -> bool:
"""处理单个元数据文件:解析、匹配、下载"""
logger = logging.getLogger(__name__)
# 1. 解析元数据
meta_info = parse_metadata(str(meta_file))
if not meta_info:
logger.error(f"无法解析元数据文件: {meta_file}")
return False
# 2. 时间匹配
try:
acq_date_str = meta_info['date']
acq_time_str = meta_info['time']
acq_datetime = datetime.strptime(f"{acq_date_str} {acq_time_str}", "%Y-%m-%d %H:%M:%S")
year, month, day, era5_time = match_era5_time(acq_datetime)
except Exception as e:
logger.error(f"处理文件 {meta_file} 的时间信息时出错: {e}")
return False
# 3. 空间范围处理(添加缓冲区)
original_bbox = meta_info['bbox']
request_bbox = add_buffer_to_bbox(original_bbox, buffer_deg)
logger.info(f"文件 {meta_file.name}: 原始范围 {original_bbox} -> 请求范围 {request_bbox}")
# 4. 准备输出路径
# 使用影像获取日期和时间(匹配后)作为文件名的一部分,避免重复
output_filename = f"ERA5_{year}{month}{day}_{era5_time.replace(':', '')}_{original_bbox[0]:.2f}_{original_bbox[1]:.2f}.nc"
output_path = output_dir / output_filename
# 5. 调用下载函数
success, message = download_era5_single_time(
year=year,
month=month,
day=day,
time=era5_time,
area=request_bbox,
output_path=str(output_path)
)
if success:
logger.info(f"文件 {meta_file.name} 处理成功。")
else:
logger.warning(f"文件 {meta_file.name} 处理失败: {message}")
return success
def main(input_directory: str, output_directory: str):
"""主函数"""
input_dir = Path(input_directory)
output_dir = Path(output_directory)
log_dir = output_dir / "logs"
if not input_dir.exists():
print(f"错误:输入目录不存在 {input_dir}")
sys.exit(1)
output_dir.mkdir(parents=True, exist_ok=True)
logger = setup_logging(log_dir)
logger.info("=== ECMWF ERA5 批量下载任务开始 ===")
logger.info(f"输入目录: {input_dir}")
logger.info(f"输出目录: {output_dir}")
# 查找所有元数据文件
meta_files = find_metadata_files(input_dir)
if not meta_files:
logger.warning("未找到任何元数据文件,任务结束。")
return
total_files = len(meta_files)
success_count = 0
fail_count = 0
# 遍历处理每个文件
for idx, meta_file in enumerate(meta_files, 1):
logger.info(f"处理进度: ({idx}/{total_files}) - {meta_file.name}")
try:
if process_single_metadata(meta_file, output_dir):
success_count += 1
else:
fail_count += 1
except Exception as e:
logger.exception(f"处理文件 {meta_file} 时发生未捕获的异常: {e}")
fail_count += 1
# 任务总结
logger.info("=== 批量下载任务结束 ===")
logger.info(f"总计: {total_files} 个文件")
logger.info(f"成功: {success_count} 个")
logger.info(f"失败: {fail_count} 个")
if fail_count > 0:
logger.warning("部分文件处理失败,请查看上方错误日志。")
if __name__ == "__main__":
# 示例:通过命令行参数指定输入输出目录
if len(sys.argv) != 3:
print("用法: python era5_batch_download.py <输入目录> <输出目录>")
print("示例: python era5_batch_download.py ./landsat_scenes ./era5_data")
sys.exit(1)
input_dir = sys.argv[1]
output_dir = sys.argv[2]
main(input_dir, output_dir)
```
### 4.2 高级技巧与优化建议
直接使用上述脚本已经能解决大部分问题,但在生产环境中,我们还可以进行更多优化:
**1. 请求合并与队列管理**
ECMWF API对请求频率和队列长度有限制。如果你有成千上万个请求,一股脑提交会导致大量请求失败。一个更好的策略是:
- **合并空间或时间相近的请求**:例如,同一天内同一区域的多景影像,可以合并为一个请求,下载一个覆盖该区域全天的数据文件,然后在本地按需裁剪。
- **实现请求队列**:使用`queue.Queue`和线程池,控制同时发起的请求数量(例如,最多5个并发)。每个请求完成后,根据返回状态(成功、失败、排队中)决定是等待、重试还是记录错误。
**2. 断点续传与状态持久化**
批量下载可能耗时很长,网络中断或程序崩溃时有发生。我们可以将任务状态保存到本地文件(如JSON或SQLite数据库),记录每个元数据文件对应的处理状态(待处理、处理中、成功、失败及错误信息)。当程序重新启动时,先读取状态文件,跳过已成功的,重试或标记失败的。
**3. 使用更高效的数据格式和变量筛选**
- **格式选择**:`NetCDF`是科学计算的标准格式,但如果你后续只用Python的`xarray`或`netCDF4`库处理,它是不错的选择。如果你需要与其他气象软件交互,`GRIB`格式可能更通用。
- **变量与层次筛选**:不是所有反演算法都需要全部37个气压层和所有变量。仔细阅读你的反演模型文档,只请求必需的变量和层次,能显著减少数据量、下载时间和存储压力。例如,如果只做温度反演,可能只需要`temperature`、`relative_humidity`和`surface_pressure`(如果是单层请求)。
**4. 错误重试与退避策略**
网络请求失败是常态。简单的`try-except`不够健壮。应该实现一个带有**指数退避**的重试机制。例如,第一次失败后等待2秒重试,第二次失败等待4秒,第三次等待8秒,最多重试3-5次。
```python
import time
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def robust_download_request(client, request_params, output_path):
"""带有重试机制的下载请求"""
result = client.retrieve('reanalysis-era5-pressure-levels', request_params)
result.download(output_path)
```
这里使用了`tenacity`库来优雅地实现重试逻辑。记得用`pip install tenacity`安装它。
将上述模块——配置、下载、解析、匹配、主控——按照清晰的目录结构组织起来,你就得到了一个专属于你的、高度自动化的ECMWF大气数据下载工具箱。这个工具不仅能用于温度反演,稍加修改,就能适配任何需要时空匹配大气数据的遥感应用场景,如大气校正、天气影响分析等。