# 从零到一:构建你的新闻数据自动化采集系统
如果你是一名数据分析师,或者对新闻信息有深度研究需求,每天手动复制粘贴新闻内容绝对是一场噩梦。想象一下,你需要追踪某个行业动态,或者分析特定事件在不同媒体的报道差异,手动操作不仅效率低下,还容易出错。而Python爬虫技术,恰恰是解决这个痛点的利器。
但市面上的教程大多停留在“获取单个页面并保存”的基础层面,对于真实的数据采集需求来说,这远远不够。一个成熟的新闻采集系统,需要考虑反爬策略、数据清洗、自动化调度、错误处理等一系列问题。今天,我将分享一套经过实战检验的完整方案,不仅教你如何获取数据,更重要的是如何构建一个稳定、高效、可维护的自动化数据管道。
这套方法特别适合需要长期追踪特定主题新闻的研究人员、希望建立个人新闻档案库的爱好者,以及需要批量文本数据进行自然语言处理或情感分析的数据科学家。我们将从最基础的请求开始,逐步深入到动态内容处理、数据清洗、自动化调度等高级话题,最终形成一个完整的解决方案。
## 1. 环境搭建与核心库选择
在开始编写代码之前,选择合适的工具至关重要。Python生态中有众多优秀的网络请求和解析库,但并非所有都适合新闻采集这个特定场景。
### 1.1 核心库的深度对比
对于新闻网站的数据采集,我通常会根据目标网站的技术特点选择不同的工具组合。下面这个表格对比了几个常用库的适用场景:
| 库名称 | 主要用途 | 优点 | 缺点 | 新闻采集适用性 |
|--------|----------|------|------|----------------|
| **requests** | HTTP请求 | 简单易用,社区支持好 | 不支持JavaScript渲染 | 静态页面首选 |
| **httpx** | HTTP请求(异步) | 支持HTTP/2,异步请求 | 相对较新,生态不如requests成熟 | 高并发场景 |
| **BeautifulSoup** | HTML解析 | 解析方式灵活,容错性好 | 速度相对较慢 | 复杂HTML结构 |
| **lxml** | HTML/XML解析 | 解析速度快,内存占用低 | 对格式错误的HTML容忍度低 | 大规模数据处理 |
| **Selenium** | 浏览器自动化 | 可执行JavaScript,模拟真实用户 | 资源消耗大,速度慢 | 动态加载内容 |
| **Playwright** | 浏览器自动化 | 跨浏览器支持,API现代化 | 需要安装浏览器驱动 | 复杂交互场景 |
在实际项目中,我通常采用**requests + BeautifulSoup**的组合处理大多数新闻网站,因为新闻内容通常以静态HTML形式呈现。只有当遇到需要执行JavaScript才能获取内容的情况时,才会考虑使用Selenium或Playwright。
### 1.2 环境配置实战
安装这些库非常简单,但我建议创建一个独立的虚拟环境来管理依赖:
```bash
# 创建虚拟环境(Windows)
python -m venv news_crawler_env
# 激活虚拟环境(Windows)
news_crawler_env\Scripts\activate
# 安装核心库
pip install requests beautifulsoup4 lxml
# 可选:安装异步请求库
pip install httpx
# 可选:安装浏览器自动化工具
pip install selenium playwright
```
> 提示:使用虚拟环境可以避免不同项目间的依赖冲突,特别是在处理多个爬虫项目时,这一点尤为重要。
对于需要处理动态内容的网站,Selenium需要额外配置浏览器驱动。我推荐使用ChromeDriver,因为它与大多数网站的兼容性最好:
```python
# 安装Playwright浏览器(一次性操作)
python -m playwright install chromium
```
配置好环境后,我们可以创建一个基础的项目结构:
```
news_crawler/
├── config/
│ ├── __init__.py
│ └── settings.py # 配置文件
├── spiders/
│ ├── __init__.py
│ ├── base_spider.py # 基础爬虫类
│ └── news_spider.py # 新闻爬虫实现
├── utils/
│ ├── __init__.py
│ ├── file_utils.py # 文件操作工具
│ └── text_utils.py # 文本处理工具
├── data/
│ └── raw/ # 原始数据存储
├── logs/ # 日志目录
└── main.py # 主程序入口
```
这种模块化的结构虽然初期看起来有些复杂,但随着项目规模扩大,它的优势会越来越明显。每个模块职责明确,便于维护和扩展。
## 2. 基础爬取:从静态页面到文本提取
掌握了工具选择和环境配置后,我们进入实战环节。新闻采集的第一步是获取网页内容,这看似简单,实则暗藏玄机。
### 2.1 请求头配置的艺术
很多初学者直接使用`requests.get(url)`就以为万事大吉,结果往往收到403 Forbidden错误。这是因为大多数新闻网站都有基础的反爬机制,会检查请求头中的User-Agent等信息。
一个完整的请求头应该包含哪些信息?我通常这样配置:
```python
import requests
from fake_useragent import UserAgent
def get_enhanced_headers():
"""生成增强型请求头"""
ua = UserAgent()
headers = {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0',
'Referer': 'https://www.google.com/',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Pragma': 'no-cache',
}
return headers
```
这里有几个关键点需要注意:
- **User-Agent轮换**:使用`fake_useragent`库生成随机的浏览器标识,避免使用固定的User-Agent
- **完整的HTTP头**:模拟真实浏览器的请求头,包括Accept、Accept-Language等字段
- **Referer设置**:合理设置Referer可以降低被识别为爬虫的概率
### 2.2 稳健的请求处理机制
网络请求充满了不确定性,我们需要构建一个健壮的错误处理机制:
```python
import time
import random
from typing import Optional
import requests
from requests.exceptions import RequestException
class RobustRequestor:
"""稳健的请求处理器"""
def __init__(self, max_retries: int = 3, timeout: int = 10):
self.max_retries = max_retries
self.timeout = timeout
self.session = requests.Session()
def get_with_retry(self, url: str, headers: dict = None) -> Optional[str]:
"""带重试机制的GET请求"""
for attempt in range(self.max_retries):
try:
response = self.session.get(
url,
headers=headers or get_enhanced_headers(),
timeout=self.timeout
)
# 检查HTTP状态码
if response.status_code == 200:
# 检查编码并设置
response.encoding = self.detect_encoding(response)
return response.text
elif response.status_code == 403:
print(f"访问被拒绝: {url}")
self.rotate_user_agent()
elif response.status_code == 404:
print(f"页面不存在: {url}")
return None
else:
print(f"HTTP {response.status_code}: {url}")
except RequestException as e:
print(f"请求失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
# 指数退避策略
wait_time = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
return None
def detect_encoding(self, response) -> str:
"""检测响应编码"""
# 优先使用headers中的编码
if response.encoding:
return response.encoding
# 尝试从HTML meta标签中检测
import re
charset_pattern = re.compile(r'charset=["\']?([\w-]+)["\']?', re.IGNORECASE)
match = charset_pattern.search(response.text[:1000])
if match:
return match.group(1)
# 默认使用UTF-8
return 'utf-8'
def rotate_user_agent(self):
"""轮换User-Agent"""
ua = UserAgent()
if hasattr(self.session, 'headers'):
self.session.headers.update({'User-Agent': ua.random})
```
这个类实现了几个重要功能:
- **自动重试机制**:遇到网络错误时自动重试
- **指数退避**:避免对服务器造成过大压力
- **编码自动检测**:正确处理不同编码的网页
- **User-Agent轮换**:降低被封锁的风险
### 2.3 精准的内容提取策略
获取到HTML后,下一步是提取新闻正文。这里最大的挑战是不同网站的HTML结构千差万别。我总结了几种常见的新闻正文定位方法:
```python
from bs4 import BeautifulSoup
import re
class ContentExtractor:
"""内容提取器"""
def extract_news_content(self, html: str, url: str) -> dict:
"""提取新闻内容"""
soup = BeautifulSoup(html, 'lxml')
# 方法1:尝试常见的内容选择器
content_selectors = [
'article', '.article-content', '.content',
'#content', '.news-content', '.post-content',
'.article-body', '.story-body'
]
for selector in content_selectors:
element = soup.select_one(selector)
if element and len(element.get_text(strip=True)) > 200:
return self._clean_content(element)
# 方法2:基于启发式规则
# 寻找包含最多文本的div
all_divs = soup.find_all('div')
content_div = max(
all_divs,
key=lambda d: len(d.get_text(strip=True)) if d.get_text(strip=True) else 0
)
if len(content_div.get_text(strip=True)) > 200:
return self._clean_content(content_div)
# 方法3:基于段落密度
paragraphs = soup.find_all('p')
content_paragraphs = []
for p in paragraphs:
text = p.get_text(strip=True)
if len(text) > 50: # 过滤短段落(可能是广告或导航)
content_paragraphs.append(text)
if content_paragraphs:
return {
'title': self._extract_title(soup),
'content': '\n\n'.join(content_paragraphs),
'publish_date': self._extract_date(soup),
'source': url
}
return None
def _clean_content(self, element):
"""清理内容中的无关元素"""
# 移除脚本和样式
for script in element(['script', 'style', 'nav', 'footer', 'aside']):
script.decompose()
# 移除空白和多余换行
text = element.get_text()
text = re.sub(r'\n\s*\n', '\n\n', text) # 合并多个空行
text = re.sub(r'[ \t]+', ' ', text) # 合并多个空格
return text.strip()
def _extract_title(self, soup):
"""提取标题"""
title_selectors = [
'h1', '.article-title', '.news-title',
'title', 'meta[property="og:title"]'
]
for selector in title_selectors:
element = soup.select_one(selector)
if element:
if selector.startswith('meta'):
return element.get('content', '')
return element.get_text(strip=True)
return "未找到标题"
def _extract_date(self, soup):
"""提取发布日期"""
date_patterns = [
r'(\d{4}[-/]\d{1,2}[-/]\d{1,2})',
r'(\d{1,2}月\d{1,2}日\s*\d{4})',
r'发布于\s*[::]?\s*(\d{4}[-/]\d{1,2}[-/]\d{1,2})'
]
# 检查meta标签
date_meta = soup.find('meta', {'property': 'article:published_time'})
if date_meta:
return date_meta.get('content', '')
# 在文本中搜索日期模式
text = soup.get_text()
for pattern in date_patterns:
match = re.search(pattern, text)
if match:
return match.group(1)
return "未知日期"
```
这个内容提取器采用了多层策略:
1. **选择器优先**:尝试常见的新闻内容CSS选择器
2. **启发式规则**:基于文本长度和段落密度
3. **智能清理**:移除无关元素,保留核心内容
## 3. 高级技巧:处理动态内容与反爬策略
随着网站技术的发展,越来越多的新闻网站采用JavaScript动态加载内容。同时,反爬机制也越来越复杂。这一部分我们将深入探讨如何应对这些挑战。
### 3.1 动态内容处理方案
当requests无法获取完整内容时,我们需要使用浏览器自动化工具。Selenium和Playwright是两种主流选择,我更喜欢Playwright,因为它更现代化且性能更好:
```python
from playwright.sync_api import sync_playwright
import asyncio
from playwright.async_api import async_playwright
class DynamicContentFetcher:
"""动态内容获取器"""
def __init__(self, headless: bool = True):
self.headless = headless
self.browser = None
self.context = None
def __enter__(self):
"""上下文管理器入口"""
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=self.headless,
args=['--disable-blink-features=AutomationControlled']
)
# 设置更真实的浏览器上下文
self.context = self.browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
locale='zh-CN',
timezone_id='Asia/Shanghai',
permissions=['geolocation']
)
# 添加额外的HTTP头
self.context.set_extra_http_headers({
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Sec-Ch-Ua': '"Not_A Brand";v="8", "Chromium";v="120"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
})
return self
def fetch_dynamic_content(self, url: str, wait_for_selector: str = None, wait_time: int = 3):
"""获取动态加载的内容"""
page = self.context.new_page()
try:
# 访问页面
page.goto(url, wait_until='networkidle')
# 等待内容加载
if wait_for_selector:
page.wait_for_selector(wait_for_selector, timeout=10000)
else:
page.wait_for_timeout(wait_time * 1000) # 等待指定时间
# 滚动页面以触发懒加载
self._scroll_page(page)
# 获取完整HTML
content = page.content()
# 截图用于调试
page.screenshot(path=f'debug_{hash(url)}.png')
return content
except Exception as e:
print(f"动态获取失败: {url}, 错误: {e}")
return None
finally:
page.close()
def _scroll_page(self, page, scroll_step: int = 300, scroll_delay: float = 0.1):
"""模拟页面滚动"""
# 获取页面高度
page_height = page.evaluate('document.body.scrollHeight')
current_position = 0
while current_position < page_height:
page.evaluate(f'window.scrollTo(0, {current_position})')
current_position += scroll_step
page.wait_for_timeout(int(scroll_delay * 1000))
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器退出"""
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if hasattr(self, 'playwright'):
self.playwright.stop()
# 使用示例
with DynamicContentFetcher(headless=True) as fetcher:
html = fetcher.fetch_dynamic_content(
url="https://example-news-site.com/article",
wait_for_selector=".article-content",
wait_time=5
)
```
> 注意:使用浏览器自动化工具会显著增加资源消耗,建议只在必要时使用。对于大多数新闻网站,静态请求已经足够。
### 3.2 应对反爬机制的策略
现代新闻网站的反爬机制越来越复杂,我们需要采取多种策略来应对:
**策略一:请求频率控制**
```python
import time
import random
from datetime import datetime, timedelta
class RateLimiter:
"""请求频率控制器"""
def __init__(self, requests_per_minute: int = 30):
self.requests_per_minute = requests_per_minute
self.request_times = []
def wait_if_needed(self):
"""如果需要则等待"""
now = datetime.now()
# 移除一分钟前的记录
one_minute_ago = now - timedelta(minutes=1)
self.request_times = [t for t in self.request_times if t > one_minute_ago]
# 检查是否超过限制
if len(self.request_times) >= self.requests_per_minute:
# 计算需要等待的时间
oldest_request = min(self.request_times)
wait_until = oldest_request + timedelta(minutes=1)
wait_seconds = (wait_until - now).total_seconds()
if wait_seconds > 0:
print(f"达到频率限制,等待 {wait_seconds:.1f} 秒")
time.sleep(wait_seconds + random.uniform(0.5, 1.5))
# 记录本次请求
self.request_times.append(datetime.now())
# 添加随机延迟
time.sleep(random.uniform(0.5, 2.0))
```
**策略二:IP轮换与代理池**
```python
class ProxyManager:
"""代理管理器"""
def __init__(self, proxy_list: list = None):
self.proxies = proxy_list or []
self.current_index = 0
def get_proxy(self):
"""获取下一个代理"""
if not self.proxies:
return None
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
def test_proxy(self, proxy_url: str, test_url: str = "http://httpbin.org/ip") -> bool:
"""测试代理是否可用"""
try:
response = requests.get(
test_url,
proxies={"http": proxy_url, "https": proxy_url},
timeout=10
)
return response.status_code == 200
except:
return False
```
**策略三:Cookie和Session管理**
```python
class SessionManager:
"""会话管理器"""
def __init__(self):
self.sessions = {}
def get_session(self, domain: str):
"""获取或创建会话"""
if domain not in self.sessions:
session = requests.Session()
# 设置初始Cookie
session.cookies.update({
'cookie_consent': 'true',
'preferred_language': 'zh-CN'
})
self.sessions[domain] = session
return self.sessions[domain]
def rotate_session(self, domain: str):
"""轮换会话"""
if domain in self.sessions:
self.sessions[domain].close()
del self.sessions[domain]
return self.get_session(domain)
```
### 3.3 验证码识别与绕过
虽然大多数新闻网站不会使用复杂的验证码,但了解基本的处理方式还是有必要的:
```python
class CaptchaHandler:
"""验证码处理器(基础版)"""
@staticmethod
def handle_simple_captcha(page):
"""处理简单验证码"""
# 检查是否有验证码
captcha_selectors = [
'img[src*="captcha"]',
'div.captcha',
'input[name="captcha"]'
]
for selector in captcha_selectors:
if page.query_selector(selector):
print("检测到验证码,尝试自动处理...")
# 方法1:等待手动输入
input("请在浏览器中输入验证码后按回车继续...")
return True
# 方法2:使用OCR识别(需要安装额外库)
# return CaptchaHandler._ocr_captcha(page)
return False
@staticmethod
def _ocr_captcha(page):
"""使用OCR识别验证码"""
try:
# 截取验证码图片
captcha_element = page.query_selector('img[src*="captcha"]')
if captcha_element:
captcha_element.screenshot(path='captcha.png')
# 这里可以集成OCR服务
# 例如使用pytesseract或第三方API
print("验证码已保存为captcha.png,请手动识别")
return False
except:
pass
return False
```
## 4. 数据存储与自动化系统
获取数据只是第一步,如何高效地存储、管理和自动化整个流程才是系统的核心。这一部分我们将构建一个完整的新闻采集系统。
### 4.1 智能文件存储系统
简单的将内容保存为TXT文件是不够的,我们需要一个更智能的存储系统:
```python
import os
import json
import hashlib
from datetime import datetime
from pathlib import Path
class NewsStorageSystem:
"""新闻存储系统"""
def __init__(self, base_dir: str = "./news_data"):
self.base_dir = Path(base_dir)
self._init_structure()
def _init_structure(self):
"""初始化目录结构"""
directories = [
'raw', # 原始HTML
'processed', # 处理后的文本
'metadata', # 元数据
'logs', # 日志文件
'backup', # 备份
'temp' # 临时文件
]
for dir_name in directories:
(self.base_dir / dir_name).mkdir(parents=True, exist_ok=True)
def generate_filename(self, url: str, title: str = None) -> str:
"""生成文件名"""
# 使用URL的MD5作为基础文件名
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
if title:
# 清理标题中的非法字符
safe_title = ''.join(c for c in title if c.isalnum() or c in (' ', '-', '_'))
safe_title = safe_title[:50].strip() # 限制长度
filename = f"{safe_title}_{url_hash}"
else:
filename = url_hash
return filename
def save_news_article(self, article_data: dict, format: str = 'txt'):
"""保存新闻文章"""
filename = self.generate_filename(
article_data.get('url', ''),
article_data.get('title', '')
)
# 保存为文本文件
if format == 'txt':
self._save_as_txt(article_data, filename)
elif format == 'json':
self._save_as_json(article_data, filename)
elif format == 'both':
self._save_as_txt(article_data, filename)
self._save_as_json(article_data, filename)
# 保存元数据
self._save_metadata(article_data, filename)
return filename
def _save_as_txt(self, article_data: dict, filename: str):
"""保存为TXT文件"""
filepath = self.base_dir / 'processed' / f"{filename}.txt"
content = f"""标题: {article_data.get('title', '无标题')}
来源: {article_data.get('source', '未知')}
发布日期: {article_data.get('publish_date', '未知')}
采集时间: {article_data.get('fetch_time', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))}
分类: {article_data.get('category', '未分类')}
关键词: {', '.join(article_data.get('keywords', []))}
{'='*60}
{article_data.get('content', '')}
{'='*60}
原文URL: {article_data.get('url', '')}
"""
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
def _save_as_json(self, article_data: dict, filename: str):
"""保存为JSON文件"""
filepath = self.base_dir / 'metadata' / f"{filename}.json"
# 添加系统字段
article_data.update({
'storage_time': datetime.now().isoformat(),
'file_reference': f"{filename}.txt",
'content_length': len(article_data.get('content', ''))
})
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(article_data, f, ensure_ascii=False, indent=2)
def _save_metadata(self, article_data: dict, filename: str):
"""保存到主元数据索引"""
index_file = self.base_dir / 'metadata' / 'index.json'
if index_file.exists():
with open(index_file, 'r', encoding='utf-8') as f:
index_data = json.load(f)
else:
index_data = []
# 添加新记录
record = {
'id': filename,
'title': article_data.get('title', ''),
'source': article_data.get('source', ''),
'publish_date': article_data.get('publish_date', ''),
'fetch_date': datetime.now().strftime('%Y-%m-%d'),
'category': article_data.get('category', '未分类'),
'keywords': article_data.get('keywords', []),
'file_path': f"processed/{filename}.txt"
}
index_data.append(record)
# 保存索引
with open(index_file, 'w', encoding='utf-8') as f:
json.dump(index_data, f, ensure_ascii=False, indent=2)
def search_articles(self, keyword: str = None, category: str = None,
start_date: str = None, end_date: str = None):
"""搜索文章"""
index_file = self.base_dir / 'metadata' / 'index.json'
if not index_file.exists():
return []
with open(index_file, 'r', encoding='utf-8') as f:
articles = json.load(f)
# 过滤条件
filtered = articles
if keyword:
filtered = [a for a in filtered
if keyword.lower() in a.get('title', '').lower()
or keyword in a.get('keywords', [])]
if category:
filtered = [a for a in filtered
if a.get('category', '').lower() == category.lower()]
if start_date:
filtered = [a for a in filtered
if a.get('publish_date', '') >= start_date]
if end_date:
filtered = [a for a in filtered
if a.get('publish_date', '') <= end_date]
return filtered
```
### 4.2 自动化调度系统
对于需要定期采集的新闻源,我们需要一个自动化调度系统:
```python
import schedule
import time
import threading
from typing import List, Dict
import logging
class NewsCrawlerScheduler:
"""新闻爬虫调度器"""
def __init__(self, config_file: str = "crawler_config.json"):
self.config = self._load_config(config_file)
self.crawlers = {}
self.logger = self._setup_logger()
def _load_config(self, config_file: str) -> Dict:
"""加载配置文件"""
default_config = {
"sources": [
{
"name": "example_news",
"url": "https://example-news.com/latest",
"schedule": "hourly",
"enabled": True,
"parser": "generic_news"
}
],
"storage": {
"base_dir": "./news_data",
"format": "both"
},
"performance": {
"max_concurrent": 3,
"request_delay": 2.0
}
}
try:
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
return default_config
def _setup_logger(self):
"""设置日志系统"""
logger = logging.getLogger('NewsCrawler')
logger.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler(
self.config['storage']['base_dir'] + '/logs/crawler.log',
encoding='utf-8'
)
file_handler.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
def register_crawler(self, name: str, crawler_class):
"""注册爬虫类"""
self.crawlers[name] = crawler_class
def run_crawler(self, source_config: Dict):
"""运行单个爬虫"""
crawler_name = source_config.get('parser', 'generic_news')
if crawler_name not in self.crawlers:
self.logger.error(f"未找到爬虫: {crawler_name}")
return
try:
crawler = self.crawlers[crawler_name]()
articles = crawler.fetch(source_config['url'])
storage = NewsStorageSystem(
self.config['storage']['base_dir']
)
for article in articles:
article['category'] = source_config.get('category', '未分类')
storage.save_news_article(
article,
self.config['storage']['format']
)
self.logger.info(
f"成功采集 {source_config['name']}: "
f"获取 {len(articles)} 篇文章"
)
except Exception as e:
self.logger.error(
f"采集失败 {source_config['name']}: {str(e)}"
)
def schedule_tasks(self):
"""安排定时任务"""
for source in self.config['sources']:
if not source.get('enabled', True):
continue
schedule_time = source.get('schedule', 'daily')
if schedule_time == 'hourly':
schedule.every().hour.do(
self.run_crawler, source
)
elif schedule_time == 'daily':
schedule.every().day.at("09:00").do(
self.run_crawler, source
)
elif schedule_time == 'weekly':
schedule.every().monday.at("09:00").do(
self.run_crawler, source
)
elif isinstance(schedule_time, str) and ':' in schedule_time:
# 自定义时间,如 "14:30"
schedule.every().day.at(schedule_time).do(
self.run_crawler, source
)
self.logger.info(
f"已安排任务: {source['name']} - {schedule_time}"
)
def run(self):
"""运行调度器"""
self.schedule_tasks()
self.logger.info("新闻采集调度器已启动")
# 立即运行一次所有任务
for source in self.config['sources']:
if source.get('enabled', True):
threading.Thread(
target=self.run_crawler,
args=(source,)
).start()
time.sleep(self.config['performance']['request_delay'])
# 保持调度器运行
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
# 示例配置
config_example = {
"sources": [
{
"name": "tech_news",
"url": "https://tech.example.com/news",
"schedule": "hourly",
"enabled": True,
"parser": "tech_news_parser",
"category": "科技"
},
{
"name": "finance_news",
"url": "https://finance.example.com/latest",
"schedule": "daily",
"enabled": True,
"parser": "finance_news_parser",
"category": "财经"
},
{
"name": "sports_news",
"url": "https://sports.example.com/updates",
"schedule": "14:30",
"enabled": True,
"parser": "sports_news_parser",
"category": "体育"
}
],
"storage": {
"base_dir": "./collected_news",
"format": "both"
},
"performance": {
"max_concurrent": 2,
"request_delay": 3.0
}
}
```
### 4.3 监控与错误处理
一个健壮的系统需要有完善的监控和错误处理机制:
```python
class CrawlerMonitor:
"""爬虫监控器"""
def __init__(self):
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'total_articles': 0,
'last_run': None,
'errors': []
}
def record_request(self, success: bool, url: str = None, error: str = None):
"""记录请求结果"""
self.metrics['total_requests'] += 1
if success:
self.metrics['successful_requests'] += 1
else:
self.metrics['failed_requests'] += 1
if error:
self.metrics['errors'].append({
'time': datetime.now().isoformat(),
'url': url,
'error': error
})
# 只保留最近100个错误
if len(self.metrics['errors']) > 100:
self.metrics['errors'] = self.metrics['errors'][-100:]
def record_article(self, article_count: int = 1):
"""记录文章数量"""
self.metrics['total_articles'] += article_count
def generate_report(self) -> Dict:
"""生成监控报告"""
success_rate = 0
if self.metrics['total_requests'] > 0:
success_rate = (
self.metrics['successful_requests'] /
self.metrics['total_requests'] * 100
)
return {
'timestamp': datetime.now().isoformat(),
'metrics': self.metrics.copy(),
'success_rate': f"{success_rate:.1f}%",
'avg_articles_per_request': (
self.metrics['total_articles'] /
max(1, self.metrics['total_requests'])
)
}
def save_report(self, filepath: str = "monitor_report.json"):
"""保存报告到文件"""
report = self.generate_report()
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
# 同时保存为历史记录
history_file = filepath.replace('.json', '_history.json')
if os.path.exists(history_file):
with open(history_file, 'r', encoding='utf-8') as f:
history = json.load(f)
else:
history = []
history.append(report)
# 只保留最近30天的记录
cutoff_date = (datetime.now() - timedelta(days=30)).isoformat()
history = [h for h in history if h['timestamp'] > cutoff_date]
with open(history_file, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
```
### 4.4 完整系统集成
最后,我们将所有组件集成到一个完整的系统中:
```python
class CompleteNewsCrawlerSystem:
"""完整的新闻采集系统"""
def __init__(self, config_path: str = "config/system_config.json"):
self.config_path = config_path
self.config = self._load_config()
# 初始化组件
self.storage = NewsStorageSystem(self.config['storage']['base_dir'])
self.scheduler = NewsCrawlerScheduler(config_path)
self.monitor = CrawlerMonitor()
self.requestor = RobustRequestor(
max_retries=self.config.get('max_retries', 3)
)
# 设置日志
self.logger = logging.getLogger('CompleteNewsCrawler')
def _load_config(self):
"""加载系统配置"""
default_config = {
"system": {
"name": "新闻采集系统",
"version": "1.0.0",
"description": "自动化新闻采集与存储系统"
},
"storage": {
"base_dir": "./news_system",
"backup_enabled": True,
"backup_interval_days": 7
},
"crawling": {
"max_concurrent": 3,
"request_timeout": 30,
"retry_delay": [1, 2, 4], # 指数退避延迟
"user_agents_file": "user_agents.txt"
},
"processing": {
"clean_html": True,
"extract_keywords": True,
"detect_language": True,
"min_content_length": 200
},
"monitoring": {
"enable_monitoring": True,
"report_interval_hours": 24,
"alert_on_failure_rate": 20.0 # 失败率超过20%时报警
}
}
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# 深度合并配置
return self._deep_merge(default_config, user_config)
except FileNotFoundError:
return default_config
def _deep_merge(self, base: Dict, update: Dict) -> Dict:
"""深度合并字典"""
result = base.copy()
for key, value in update.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self._deep_merge(result[key], value)
else:
result[key] = value
return result
def run_daily_collection(self):
"""执行每日采集任务"""
self.logger.info("开始每日新闻采集")
sources = self.config.get('sources', [])
results = []
for source in sources:
if not source.get('enabled', True):
continue
self.logger.info(f"采集源: {source['name']}")
try:
# 执行采集
articles = self._crawl_source(source)
# 处理文章
processed_articles = self._process_articles(articles, source)
# 保存文章
saved_count = self._save_articles(processed_articles, source)
# 记录结果
result = {
'source': source['name'],
'status': 'success',
'articles_found': len(articles),
'articles_saved': saved_count,
'timestamp': datetime.now().isoformat()
}
self.monitor.record_request(True)
self.monitor.record_article(saved_count)
except Exception as e:
self.logger.error(f"采集失败 {source['name']}: {str(e)}")
result = {
'source': source['name'],
'status': 'failed',
'error': str(e),
'timestamp': datetime.now().isoformat()
}
self.monitor.record_request(False, source.get('url'), str(e))
results.append(result)
# 生成报告
self._generate_daily_report(results)
self.logger.info("每日新闻采集完成")
return results
def _crawl_source(self, source_config: Dict) -> List[Dict]:
"""采集单个新闻源"""
# 这里可以根据不同的新闻源类型调用不同的爬虫
# 例如:RSS源、API接口、网页爬虫等
crawler_type = source_config.get('type', 'web')
if crawler_type == 'rss':
return self._crawl_rss(source_config)
elif crawler_type == 'api':
return self._crawl_api(source_config)
else: # web
return self._crawl_web(source_config)
def _process_articles(self, articles: List[Dict], source_config: Dict) -> List[Dict]:
"""处理文章数据"""
processed = []
for article in articles:
# 基础处理
if self.config['processing']['clean_html']:
article['content'] = self._clean_content(article.get('content', ''))
# 提取关键词
if self.config['processing']['extract_keywords']:
article['keywords'] = self._extract_keywords(article)
# 检测语言
if self.config['processing']['detect_language']:
article['language'] = self._detect_language(article.get('content', ''))
# 添加源信息
article['source_name'] = source_config['name']
article['source_category'] = source_config.get('category', '未分类')
article['fetch_time'] = datetime.now().isoformat()
# 过滤过短的内容
if len(article.get('content', '')) >= self.config['processing']['min_content_length']:
processed.append(article)
return processed
def _save_articles(self, articles: List[Dict], source_config: Dict) -> int:
"""保存文章到存储系统"""
saved_count = 0
for article in articles:
try:
filename = self.storage.save_news_article(
article,
format=self.config['storage'].get('format', 'both')
)
if filename:
saved_count += 1
self.logger.debug(f"已保存文章: {article.get('title', '无标题')}")
except Exception as e:
self.logger.error(f"保存文章失败: {str(e)}")
return saved_count
def _generate_daily_report(self, results: List[Dict]):
"""生成每日报告"""
report = {
'date': datetime.now().strftime('%Y-%m-%d'),
'summary': {
'total_sources': len(results),
'successful_sources': len([r for r in results if r['status'] == 'success']),
'failed_sources': len([r for r in results if r['status'] == 'failed']),
'total_articles': sum(r.get('articles_saved', 0) for r in results),
},
'details': results,
'system_metrics': self.monitor.generate_report()
}
# 保存报告
report_dir = Path(self.config['storage']['base_dir']) / 'reports'
report_dir.mkdir(exist_ok=True)
report_file = report_dir / f"report_{datetime.now().strftime('%Y%m%d')}.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
# 发送通知(可选)
if self.config['monitoring'].get('send_notifications', False):
self._send_notification(report)
return report
def start(self):
"""启动系统"""
self.logger.info(f"启动新闻采集系统: {self.config['system']['name']}")
# 检查存储目录
storage_dir = Path(self.config['storage']['base_dir'])
if not storage_dir.exists():
storage_dir.mkdir(parents=True)
self.logger.info(f"创建存储目录: {storage_dir}")
# 启动定时任务
if self.config.get('schedule_tasks', True):
schedule.every().day.at("08:00").do(self.run_daily_collection)
self.logger.info("已安排每日采集任务: 08:00")
# 启动监控
if self.config['monitoring']['enable_monitoring']:
schedule.every(
self.config['monitoring']['report_interval_hours']
).hours.do(self.monitor.save_report)
# 运行一次初始采集
self.run_daily_collection()
# 保持运行
while True:
schedule.run_pending()
time.sleep(60)
# 系统启动示例
if __name__ == "__main__":
# 创建系统实例
system = CompleteNewsCrawlerSystem("config/my_news_config.json")
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("news_crawler.log", encoding='utf-8'),
logging.StreamHandler()
]
)
# 启动系统
try:
system.start()
except KeyboardInterrupt:
print("\n系统正在关闭...")
except Exception as e:
print(f"系统错误: {e}")
logging.error(f"系统错误: {e}")
```
这个完整的系统提供了从数据采集、处理、存储到监控的全套功能。在实际使用中,你可能需要根据具体的新闻源调整解析逻辑,但整体的架构和核心组件都是可复用的。
我在实际项目中部署这个系统时,发现最关键的几点是:合理的错误处理机制、完善的日志记录、以及灵活的可配置性。系统运行几个月后,已经自动采集了数万篇新闻文章,为后续的数据分析工作提供了坚实的基础。特别是监控报告功能,让我能够快速定位问题源,及时调整采集策略。