针对使用 Python 3 爬取气象站数据的需求,核心在于选择合适的库来发送网络请求、解析网页内容以及处理数据。以下将解构此问题,并提供基于不同场景的代码方案。
### 核心库选择与功能对比
| 库名 | 主要用途 | 适用场景 | 特点 |
| :--- | :--- | :--- | :--- |
| **`requests`** | 发送 HTTP 请求,获取网页原始内容。 | 静态网页抓取,API 接口调用。 | 简单易用,同步请求,是网络请求的基础库。 |
| **`aiohttp`** | 发送异步 HTTP 请求。 | 需要高性能、高并发的爬虫场景,如抓取大量页面。 | 基于 asyncio,能显著提升爬取效率。 |
| **`BeautifulSoup`** | 解析 HTML/XML 文档,提取所需数据。 | 网页结构清晰,数据嵌入在 HTML 标签中。 | 解析方式灵活,支持多种选择器(如 find, select)。 |
| **`lxml`** | 解析 HTML/XML 文档,XPath 提取数据。 | 需要高性能解析或复杂 XPath 路径定位。 | 解析速度极快,XPath 语法功能强大。 |
| **`pandas`** | 数据清洗、分析和存储(如 CSV, Excel)。 | 对抓取到的结构化数据进行处理和分析。 | 提供了强大的 DataFrame 数据结构,便于数据操作。 |
| **`sqlite3`** | 轻量级数据库操作。 | 需要将数据持久化存储到本地数据库。 | Python 内置,无需安装,适合中小规模数据存储。 |
### 方案推演与代码示例
根据目标网站的类型(静态、动态、API接口),爬取策略有所不同。
#### 场景一:爬取静态历史天气网页(以中国天气网为例)
此场景下,数据直接存在于 HTML 页面中,使用 `requests` 获取页面,再用 `BeautifulSoup` 解析是经典组合。
```python
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
def crawl_historical_weather(city_code, start_year, end_year):
"""
爬取指定城市在指定年份区间的历史天气数据(示例:中国天气网历史数据页面结构)。
"""
base_url = "http://www.weather.com.cn/weather/{}{:02d}{:02d}.shtml" # 示例URL,实际需分析
all_data = []
for year in range(start_year, end_year + 1):
for month in range(1, 13):
# 构造URL,此处为示例,实际URL规则需根据目标网站分析
url = base_url.format(city_code, year, month)
try:
headers = {'User-Agent': 'Mozilla/5.0'} # 添加请求头模拟浏览器
resp = requests.get(url, headers=headers, timeout=10)
resp.encoding = 'utf-8' # 根据网页实际编码调整
if resp.status_code == 200:
soup = BeautifulSoup(resp.text, 'html.parser')
# 假设数据在 class 为 `weather_data` 的表格中
weather_table = soup.find('table', class_='weather_data')
if weather_table:
rows = weather_table.find_all('tr')[1:] # 跳过表头
for row in rows:
cols = row.find_all('td')
date = cols[0].text.strip()
high_temp = cols[1].text.strip()
low_temp = cols[2].text.strip()
weather = cols[3].text.strip()
all_data.append([f"{year}-{month:02d}", date, high_temp, low_temp, weather])
print(f"成功抓取 {year}年{month}月 数据")
else:
print(f"请求失败: {url}, 状态码: {resp.status_code}")
time.sleep(1) # 礼貌性延时,避免对服务器造成压力
except Exception as e:
print(f"抓取 {url} 时出错: {e}")
continue
# 将数据保存为CSV文件
df = pd.DataFrame(all_data, columns=['年月', '日期', '最高温', '最低温', '天气状况'])
df.to_csv(f'weather_data_{city_code}_{start_year}_{end_year}.csv', index=False, encoding='utf-8-sig')
print("数据已保存至CSV文件。")
return df
# 示例调用:假设城市代码为`101010100`(北京),抓取2018-2019年数据
# historical_df = crawl_historical_weather('101010100', 2018, 2019)
```
**代码说明**:
1. **请求与解析**:使用 `requests.get()` 获取网页,`BeautifulSoup` 解析 HTML。
2. **数据定位**:通过 `find()` 和 `find_all()` 方法,结合标签名和属性(如 `class`)定位数据所在标签。
3. **数据提取与存储**:遍历行和列,提取文本,存入列表,最后用 `pandas` 保存为 CSV 文件。
4. **反爬策略**:设置了 `User-Agent` 请求头和 `time.sleep()` 延时,是应对基础反爬机制的常见做法 [ref_3]。
#### 场景二:爬取动态加载的气象站数据(以温室数据系统为例)
许多现代网站的数据通过 JavaScript 动态加载,直接请求 HTML 页面无法获得数据。此时需要分析其网络请求(XHR/Fetch),找到真实的数据 API 接口 [ref_1]。
```python
import requests
import pandas as pd
import json
def crawl_dynamic_weather_station(station_id, element, start_date, end_date):
"""
爬取动态网站(如温室数据系统)的气象站数据。
通过分析浏览器开发者工具中的网络请求,找到数据接口。
"""
# 这是示例API地址,实际地址需通过分析网站XHR请求获得
api_url = "http://data.sheshiyuanyi.com/WeatherData/GetWeatherData"
# 构造POST请求的参数,这些参数通常可通过分析请求负载(Payload)获得
params = {
'stationID': station_id, # 气象站ID
'element': element, # 气象要素,如`TEM_Max`代表日最高气温
'startDate': start_date,
'endDate': end_date,
'type': 'day' # 数据类型,日值
}
headers = {
'User-Agent': 'Mozilla/5.0',
'Content-Type': 'application/x-www-form-urlencoded', # 根据实际情况调整
'Referer': 'http://data.sheshiyuanyi.com/WeatherData/' # 有时需要Referer
}
try:
# 发送POST请求
resp = requests.post(api_url, data=params, headers=headers, timeout=15)
if resp.status_code == 200:
# 假设返回的是JSON格式数据
data_json = resp.json()
# 将JSON数据转换为pandas DataFrame
df = pd.DataFrame(data_json['data']) # 具体键名根据实际JSON结构调整
print(f"成功获取气象站 {station_id} 的 {element} 数据,共 {len(df)} 条记录。")
# 保存数据
df.to_csv(f'station_{station_id}_{element}.csv', index=False, encoding='utf-8-sig')
return df
else:
print(f"API请求失败,状态码: {resp.status_code}")
return None
except Exception as e:
print(f"请求过程中发生错误: {e}")
return None
# 示例调用:爬取兴海气象站(假设ID: 12345)2005年的日最高气温数据
# dynamic_df = crawl_dynamic_weather_station('12345', 'TEM_Max', '2005-01-01', '2005-12-31')
```
**代码说明**:
1. **接口分析**:关键步骤是使用浏览器开发者工具(F12)的“网络(Network)”标签,筛选 XHR/Fetch 请求,找到返回气象数据的真实接口地址和请求参数 [ref_1]。
2. **请求构造**:使用 `requests.post()` 或 `requests.get()` 模拟浏览器向该接口发送请求,并携带必要的参数(如站号、要素、时间范围)。
3. **数据处理**:接口通常返回 JSON 格式数据,使用 `.json()` 方法解析,并用 `pandas` 轻松转换为表格。
#### 场景三:高性能异步爬取(以 NOAA 气象数据为例)
当需要爬取大量数据或页面时,同步请求效率低下。使用 `aiohttp` 进行异步并发请求可以极大提升速度 [ref_6]。
```python
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import pandas as pd
import time
async def fetch_one_page(session, url, semaphore):
"""异步获取单个页面的内容。"""
async with semaphore: # 使用信号量控制并发度,避免被封IP
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
html = await response.text()
return html
else:
print(f"请求失败: {url}, 状态码: {response.status}")
return None
except Exception as e:
print(f"请求出错 {url}: {e}")
return None
async def parse_and_save(html, page_id):
"""解析HTML并提取数据(示例)。"""
if not html:
return []
soup = BeautifulSoup(html, 'html.parser')
# 假设每页有一个数据列表,每个列表项在`<li class="data-item">`里
data_items = soup.find_all('li', class_='data-item')
page_data = []
for item in data_items:
# 提取具体数据,这里仅为示例
date = item.find('span', class_='date').text
temp = item.find('span', class_='temp').text
page_data.append([page_id, date, temp])
return page_data
async def main_async_crawler(url_list, concurrency=5):
"""异步爬虫主函数。"""
semaphore = asyncio.Semaphore(concurrency) # 控制最大并发数
all_data = []
async with aiohttp.ClientSession(headers={'User-Agent': 'Mozilla/5.0'}) as session:
tasks = []
for idx, url in enumerate(url_list):
task = asyncio.create_task(fetch_one_page(session, url, semaphore))
tasks.append((task, idx)) # 记录任务和页面ID
# 等待所有任务完成并处理结果
for task, page_id in tasks:
html = await task
if html:
page_data = await parse_and_save(html, page_id)
all_data.extend(page_data)
await asyncio.sleep(0.5) # 异步延时,更友好
# 保存所有数据
df = pd.DataFrame(all_data, columns=['页码', '日期', '温度'])
df.to_csv('async_weather_data.csv', index=False)
print(f"异步爬取完成,共获取 {len(df)} 条数据。")
return df
# 示例调用:假设有一个URL列表
# url_list = [f'https://example.com/weather/page/{i}' for i in range(1, 11)]
# 在异步环境中运行主函数
# asyncio.run(main_async_crawler(url_list))
```
**代码说明**:
1. **异步核心**:使用 `async/await` 语法定义异步函数,`aiohttp.ClientSession` 管理 HTTP 会话。
2. **并发控制**:通过 `asyncio.Semaphore` 限制同时发起的请求数量,是防止因请求过快而被封禁的重要策略 [ref_6]。
3. **性能优势**:在 I/O 等待(网络请求)期间,CPU 可以处理其他任务,从而充分利用系统资源,大幅提升爬取海量数据的效率。
#### 场景四:数据存储到数据库
对于需要长期存储、查询或增量更新的场景,将数据存入数据库(如 SQLite)是更佳选择 [ref_5]。
```python
import sqlite3
import pandas as pd
def save_to_sqlite(data_df, db_path='weather_data.db', table_name='weather'):
"""
将DataFrame数据存储到SQLite数据库。
"""
conn = sqlite3.connect(db_path)
try:
# 将DataFrame写入数据库,如果表存在则替换
data_df.to_sql(table_name, conn, if_exists='replace', index=False)
print(f"数据已成功存入数据库 {db_path} 的表 {table_name} 中。")
# 示例:查询数据验证
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
print(f"表中现有 {count} 条记录。")
# 示例:查询最低温度
cursor.execute(f"SELECT MIN(最低温) FROM {table_name} WHERE 最低温 != 'N/A'")
min_temp = cursor.fetchone()[0]
print(f"数据库中记录的最低温度是: {min_temp}")
except Exception as e:
print(f"数据库操作出错: {e}")
finally:
conn.close()
# 假设已有从网页抓取并处理好的DataFrame `cleaned_df`
# save_to_sqlite(cleaned_df)
```
**代码说明**:
1. **数据库连接**:使用 `sqlite3.connect()` 创建或连接数据库。
2. **数据入库**:`pandas.DataFrame.to_sql()` 方法能直接将 DataFrame 写入数据库表,极大简化了操作。
3. **数据管理**:存储在数据库中便于进行复杂查询、数据关联和长期维护 [ref_5]。
### 总结与建议
1. **先分析,后编码**:爬取任何网站前,务必先使用浏览器开发者工具分析其数据加载方式(静态HTML、动态JS、API接口),这是成功的关键 [ref_1][ref_4]。
2. **库的组合使用**:
* **基础爬虫**:`requests` + `BeautifulSoup`/`lxml` 足以应对大部分静态网站。
* **动态网站**:分析并调用 `API`,使用 `requests` 直接获取 `JSON` 数据。
* **大规模爬取**:采用 `aiohttp` 进行异步并发,配合 `asyncio` 管理任务。
* **数据处理与存储**:`pandas` 用于数据清洗和分析,`sqlite3` 或其它数据库驱动用于持久化存储。
3. **遵守规则**:爬取时注意设置合理的请求间隔(如 `time.sleep`),尊重网站的 `robots.txt` 协议,避免对目标服务器造成过大负担 [ref_6]。
4. **错误处理**:网络请求不稳定,代码中必须包含 `try...except` 块进行异常捕获和重试机制,确保爬虫的健壮性。
以上代码示例和库的选择覆盖了从简单到复杂、从同步到异步的多种气象数据爬取场景,可根据具体任务需求进行调整和组合。