# 深入解析网易云音乐评论API的加密机制:从AES到RSA的完整逆向实战
如果你曾经尝试过用Python爬取网易云音乐的评论数据,大概率会遇到一个令人头疼的问题——那些看似简单的POST请求背后,隐藏着一套复杂的加密体系。params和encSecKey这两个参数就像两把锁,牢牢地保护着API的访问权限。今天,我们不只给你现成的代码,而是要带你深入这套加密机制的核心,理解每一个加密环节的设计逻辑,让你真正掌握逆向工程的核心思维。
我最初接触这个问题时,也像大多数人一样,在网上找现成的代码。但很快发现,很多代码要么已经失效,要么只知其然不知其所以然。直到我决定亲自深入JavaScript源码,一步步追踪加密过程,才真正理解了这套系统的精妙之处。这个过程虽然耗时,但收获的不仅仅是代码,更是一种解决问题的思维方式。
## 1. 加密体系架构全景解析
网易云音乐评论API采用的是一种典型的**客户端-服务器端非对称加密验证体系**。这套系统不是简单的单一加密,而是多层加密的组合拳,每一层都有其特定的安全考量。
### 1.1 整体加密流程概览
让我们先通过一个流程图来理解整个加密过程的数据流向:
```
明文请求数据 → JSON序列化 → 第一次AES加密 → 第二次AES加密 → 生成params
↓
随机密钥生成 → RSA加密 → 生成encSecKey
```
这个流程中包含了几个关键的设计理念:
1. **双重AES加密**:使用不同的密钥对同一数据进行两次加密,增加破解难度
2. **随机密钥机制**:每次请求都生成新的随机密钥,防止重放攻击
3. **RSA非对称加密**:保护随机密钥的安全传输
4. **固定参数混淆**:通过看似随机的固定字符串增加逆向分析难度
### 1.2 加密参数详解
在深入代码之前,我们需要先理解几个核心参数的含义:
| 参数名 | 类型 | 作用 | 是否固定 |
|--------|------|------|----------|
| `params` | 字符串 | 经过双重AES加密后的请求数据 | 每次请求变化 |
| `encSecKey` | 字符串 | RSA加密后的随机密钥 | 每次请求变化 |
| `i` | 字符串 | 16位随机密钥 | 每次请求变化 |
| `e` | 字符串 | RSA公钥指数 | 固定值"010001" |
| `f` | 字符串 | RSA公钥模数 | 固定值(长字符串) |
| `g` | 字符串 | 第一次AES加密的密钥 | 固定值"0CoJUm6Qyw8W8jud" |
> 注意:这里的固定参数`e`、`f`、`g`虽然值固定,但它们在JavaScript源码中是通过函数调用生成的,这是常见的混淆手段,目的是增加直接搜索的难度。
## 2. JavaScript逆向分析与断点调试实战
要真正理解加密过程,最直接的方法就是深入JavaScript源码。这里我分享几个实用的调试技巧,这些技巧不仅适用于网易云音乐,也适用于大多数Web端的加密分析。
### 2.1 浏览器开发者工具的高级用法
现代浏览器的开发者工具提供了强大的调试功能,但很多人只用了最基本的部分。下面是一些进阶技巧:
```javascript
// 在控制台中可以执行的实用命令
// 1. 监控特定函数的调用
var originalFunction = window.asrsea;
window.asrsea = function() {
console.log('asrsea被调用,参数:', arguments);
console.trace(); // 打印调用栈
return originalFunction.apply(this, arguments);
};
// 2. 设置条件断点
// 在Sources面板中,右键行号选择"Add conditional breakpoint"
// 输入条件如:arguments[0].rid === "R_SO_4_167827"
// 3. 使用XHR断点
// 在Network面板中,右键请求选择"Break on" -> "Request"
```
### 2.2 关键加密函数定位策略
网易云音乐的加密代码经过了混淆处理,函数名和变量名都失去了可读性。但通过以下方法,我们仍然可以找到关键函数:
1. **搜索特征字符串**:在混淆的代码中搜索`encText`、`encSecKey`等关键词
2. **追踪网络请求**:在发起评论请求时设置XHR断点,然后逐步回溯调用栈
3. **分析参数生成**:观察`params`和`encSecKey`的生成过程,找到对应的加密函数
我在实际分析中发现,加密核心函数通常被包裹在一个立即执行函数表达式(IIFE)中,形式如下:
```javascript
!function() {
// 加密函数定义
function d(d, e, f, g) {
var h = {}, i = a(16);
return h.encText = b(d, g),
h.encText = b(h.encText, i),
h.encSecKey = c(i, e, f),
h
}
window.asrsea = d;
// ... 其他函数定义
}();
```
### 2.3 参数传递链分析
通过断点调试,我们可以清晰地看到参数是如何一步步传递和转换的:
```
用户请求参数 → i6c对象 → JSON.stringify() → 第一次AES加密 → 第二次AES加密 → params
↓
随机密钥i → RSA加密 → encSecKey
```
这里有一个重要的发现:随机密钥`i`不仅用于第二次AES加密,还被RSA加密后作为`encSecKey`发送给服务器。这意味着服务器端需要先用私钥解密`encSecKey`得到`i`,再用`i`解密`params`得到原始请求数据。
## 3. Python实现完整加密流程
理解了加密原理后,我们就可以用Python完整地复现整个加密过程。这里我提供的是一个模块化、可维护的实现方案,而不是简单的脚本堆砌。
### 3.1 项目结构与依赖管理
首先,我们创建一个结构清晰的项目:
```
netease_encrypt/
├── __init__.py
├── encrypt.py # 加密核心模块
├── request.py # 请求处理模块
├── utils.py # 工具函数
└── config.py # 配置参数
```
安装必要的依赖:
```bash
pip install pycryptodome requests
```
> 注意:这里使用`pycryptodome`而不是`pycrypto`,因为后者已停止维护,且在某些系统上安装会有问题。
### 3.2 AES加密实现细节
AES加密有几个关键点需要注意:工作模式、填充方案、初始向量(IV)。网易云音乐使用的是CBC模式,PKCS7填充,固定IV。
```python
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
import base64
class AESEncryptor:
"""AES加密器,专门处理网易云音乐的加密需求"""
def __init__(self):
# 固定IV,与JavaScript中一致
self.iv = "0102030405060708"
def encrypt(self, text: str, key: str) -> str:
"""
AES-CBC加密
Args:
text: 要加密的文本
key: 加密密钥,必须是16位
Returns:
Base64编码的加密结果
"""
if len(key) != 16:
raise ValueError(f"密钥长度必须为16位,当前为{len(key)}位")
# 确保文本是字节类型
text_bytes = text.encode('utf-8')
key_bytes = key.encode('utf-8')
iv_bytes = self.iv.encode('utf-8')
# 创建加密器
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
# PKCS7填充
padded_text = pad(text_bytes, AES.block_size)
# 加密
ciphertext = cipher.encrypt(padded_text)
# Base64编码
return base64.b64encode(ciphertext).decode('utf-8')
def double_encrypt(self, text: str, first_key: str, second_key: str) -> str:
"""
双重AES加密,模拟JavaScript中的两次加密
Args:
text: 原始文本
first_key: 第一次加密的密钥
second_key: 第二次加密的密钥
Returns:
双重加密后的结果
"""
# 第一次加密
first_encrypted = self.encrypt(text, first_key)
# 第二次加密
second_encrypted = self.encrypt(first_encrypted, second_key)
return second_encrypted
```
### 3.3 RSA加密实现
虽然Python标准库没有直接提供RSA加密函数,但我们可以根据网易云音乐JavaScript代码中的实现,用Python复现相同的逻辑。
```python
import binascii
import math
class RSAEncryptor:
"""RSA加密器,复现JavaScript中的RSA加密逻辑"""
def __init__(self, e: str, n: str):
"""
初始化RSA加密器
Args:
e: 公钥指数(十六进制字符串)
n: 公钥模数(十六进制字符串)
"""
self.e = int(e, 16)
self.n = int(n, 16)
def encrypt(self, text: str) -> str:
"""
RSA加密
Args:
text: 要加密的文本
Returns:
加密后的十六进制字符串
"""
# 将文本转换为大整数
m = self._text_to_int(text)
# 计算密文: c = m^e mod n
c = pow(m, self.e, self.n)
# 转换为十六进制字符串
return format(c, 'x')
def _text_to_int(self, text: str) -> int:
"""
将文本转换为大整数
这个转换逻辑需要与JavaScript中的实现完全一致
"""
result = 0
# 反向处理字符串,与JavaScript中的逻辑匹配
for char in reversed(text):
result = (result << 8) | ord(char)
return result
@staticmethod
def generate_random_key(length: int = 16) -> str:
"""
生成随机密钥
Args:
length: 密钥长度
Returns:
随机密钥字符串
"""
import random
import string
# 生成随机字符串,字符集与JavaScript中一致
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(length))
```
### 3.4 完整加密流程整合
现在我们将AES和RSA加密整合起来,复现完整的加密流程:
```python
import json
from typing import Dict, Any
class NeteaseEncryptor:
"""网易云音乐加密器"""
# 固定参数
RSA_E = "010001"
RSA_N = "00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7"
FIRST_AES_KEY = "0CoJUm6Qyw8W8jud"
def __init__(self):
self.aes = AESEncryptor()
self.rsa = RSAEncryptor(self.RSA_E, self.RSA_N)
def encrypt_request(self, request_data: Dict[str, Any]) -> Dict[str, str]:
"""
加密请求数据
Args:
request_data: 请求参数字典
Returns:
包含params和encSecKey的字典
"""
# 1. 生成随机密钥
random_key = self.rsa.generate_random_key(16)
# 2. 将请求数据转换为JSON字符串
json_str = json.dumps(request_data, separators=(',', ':'))
# 3. 双重AES加密得到params
params = self.aes.double_encrypt(
json_str,
self.FIRST_AES_KEY,
random_key
)
# 4. RSA加密随机密钥得到encSecKey
enc_seckey = self.rsa.encrypt(random_key)
return {
"params": params,
"encSecKey": enc_seckey
}
def create_comment_request(self, song_id: str, page: int = 1, page_size: int = 20) -> Dict[str, Any]:
"""
创建评论请求的完整数据
Args:
song_id: 歌曲ID
page: 页码
page_size: 每页数量
Returns:
完整的请求数据
"""
# 计算偏移量
offset = (page - 1) * page_size
request_data = {
"csrf_token": "",
"cursor": str(-1 if page == 1 else offset),
"offset": str(offset),
"orderType": "1",
"pageNo": str(page),
"pageSize": str(page_size),
"rid": f"R_SO_4_{song_id}",
"threadId": f"R_SO_4_{song_id}"
}
return self.encrypt_request(request_data)
```
## 4. 请求处理与错误处理机制
有了加密功能,我们还需要一个健壮的请求处理模块。这个模块不仅要能发送请求,还要能处理各种异常情况。
### 4.1 请求头配置策略
正确的请求头是成功获取数据的关键。网易云音乐对请求头有一定的验证机制。
```python
import requests
import time
from typing import Optional, Dict, Any
class NeteaseRequest:
"""网易云音乐请求处理器"""
BASE_URL = "https://music.163.com/weapi/comment/resource/comments/get?csrf_token="
def __init__(self, user_agent: Optional[str] = None):
"""
初始化请求处理器
Args:
user_agent: 自定义User-Agent,如果为None则使用默认值
"""
self.session = requests.Session()
self.encryptor = NeteaseEncryptor()
# 配置请求头
self.headers = {
'User-Agent': user_agent or 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
'Referer': 'https://music.163.com/',
'Origin': 'https://music.163.com',
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
}
# 配置会话
self.session.headers.update(self.headers)
def get_comments(self, song_id: str, page: int = 1, page_size: int = 20,
max_retries: int = 3) -> Dict[str, Any]:
"""
获取歌曲评论
Args:
song_id: 歌曲ID
page: 页码
page_size: 每页数量
max_retries: 最大重试次数
Returns:
评论数据
"""
# 加密请求数据
encrypted_data = self.encryptor.create_comment_request(
song_id, page, page_size
)
# 发送请求
for attempt in range(max_retries):
try:
response = self.session.post(
self.BASE_URL,
data=encrypted_data,
timeout=10
)
# 检查响应状态
response.raise_for_status()
# 解析JSON
result = response.json()
# 检查API返回状态
if result.get('code') != 200:
raise ValueError(f"API返回错误: {result.get('msg', '未知错误')}")
return result
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
# 等待后重试
wait_time = 2 ** attempt # 指数退避
time.sleep(wait_time)
except (ValueError, KeyError) as e:
# JSON解析错误或数据格式错误
raise ValueError(f"数据解析错误: {str(e)}")
def get_all_comments(self, song_id: str, max_pages: Optional[int] = None,
delay: float = 1.0) -> list:
"""
获取所有评论(分页获取)
Args:
song_id: 歌曲ID
max_pages: 最大页数,如果为None则获取所有页
delay: 请求间隔(秒),避免请求过快
Returns:
所有评论列表
"""
all_comments = []
# 先获取第一页,了解总评论数
first_page = self.get_comments(song_id, page=1)
total = first_page.get('data', {}).get('total', 0)
page_size = 20 # 网易云音乐固定每页20条
# 计算总页数
total_pages = (total + page_size - 1) // page_size
# 限制最大页数
if max_pages is not None:
total_pages = min(total_pages, max_pages)
# 添加第一页评论
all_comments.extend(first_page.get('data', {}).get('comments', []))
# 获取剩余页
for page in range(2, total_pages + 1):
try:
page_data = self.get_comments(song_id, page=page)
comments = page_data.get('data', {}).get('comments', [])
all_comments.extend(comments)
# 延迟,避免请求过快
time.sleep(delay)
except Exception as e:
print(f"获取第{page}页失败: {str(e)}")
# 可以选择继续或中断
continue
return all_comments
```
### 4.2 错误处理与重试机制
在实际使用中,网络请求可能会遇到各种问题。一个健壮的系统需要有完善的错误处理机制。
```python
class RobustNeteaseRequest(NeteaseRequest):
"""增强版的请求处理器,包含更完善的错误处理"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.request_count = 0
self.error_count = 0
def safe_request(self, url: str, data: Dict[str, str],
max_retries: int = 5, backoff_factor: float = 0.5) -> requests.Response:
"""
安全的请求方法,包含重试和退避机制
Args:
url: 请求URL
data: 请求数据
max_retries: 最大重试次数
backoff_factor: 退避因子
Returns:
响应对象
"""
last_exception = None
for attempt in range(max_retries):
try:
self.request_count += 1
response = self.session.post(url, data=data, timeout=15)
# 检查HTTP状态码
if response.status_code == 200:
return response
elif response.status_code == 403:
raise PermissionError("访问被拒绝,可能IP被封禁")
elif response.status_code == 404:
raise ValueError("资源不存在")
elif 500 <= response.status_code < 600:
# 服务器错误,可以重试
raise requests.exceptions.HTTPError(f"服务器错误: {response.status_code}")
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError) as e:
last_exception = e
self.error_count += 1
# 计算等待时间(指数退避)
wait_time = backoff_factor * (2 ** attempt)
time.sleep(wait_time)
if attempt == max_retries - 1:
raise last_exception
raise last_exception or RuntimeError("请求失败")
def get_rate_limit_info(self) -> Dict[str, Any]:
"""
获取请求统计信息
Returns:
统计信息字典
"""
return {
"total_requests": self.request_count,
"total_errors": self.error_count,
"success_rate": (self.request_count - self.error_count) / self.request_count
if self.request_count > 0 else 0
}
```
## 5. 数据解析与存储方案
获取到数据后,我们需要有效地解析和存储。这里提供几种不同的方案,适用于不同的使用场景。
### 5.1 数据结构解析
网易云音乐的评论数据有固定的结构,了解这个结构有助于我们高效地提取信息。
```python
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
@dataclass
class Comment:
"""评论数据类"""
comment_id: int
user_id: int
nickname: str
avatar_url: str
content: str
time: datetime
liked_count: int
reply_count: int
is_hot: bool
is_top: bool
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Comment':
"""
从API返回的字典创建Comment对象
Args:
data: API返回的评论数据
Returns:
Comment对象
"""
# 时间戳转换(网易云使用毫秒级时间戳)
timestamp = data.get('time', 0) / 1000
comment_time = datetime.fromtimestamp(timestamp)
return cls(
comment_id=data.get('commentId', 0),
user_id=data.get('user', {}).get('userId', 0),
nickname=data.get('user', {}).get('nickname', ''),
avatar_url=data.get('user', {}).get('avatarUrl', ''),
content=data.get('content', ''),
time=comment_time,
liked_count=data.get('likedCount', 0),
reply_count=data.get('replyCount', 0),
is_hot=data.get('hot', False),
is_top=data.get('top', False)
)
class CommentParser:
"""评论解析器"""
@staticmethod
def parse_response(response_data: Dict[str, Any]) -> Dict[str, Any]:
"""
解析API响应
Args:
response_data: API返回的原始数据
Returns:
解析后的数据
"""
data = response_data.get('data', {})
# 解析评论
comments = [Comment.from_dict(c) for c in data.get('comments', [])]
hot_comments = [Comment.from_dict(c) for c in data.get('hotComments', [])]
# 提取其他信息
result = {
'total': data.get('total', 0),
'cursor': data.get('cursor', ''),
'has_more': data.get('hasMore', False),
'comments': comments,
'hot_comments': hot_comments,
'top_comments': data.get('topComments', []),
'current_page': data.get('pageNo', 1),
'page_size': data.get('pageSize', 20)
}
return result
@staticmethod
def filter_comments(comments: List[Comment],
min_likes: int = 0,
after_date: Optional[datetime] = None,
keyword: Optional[str] = None) -> List[Comment]:
"""
过滤评论
Args:
comments: 评论列表
min_likes: 最小点赞数
after_date: 在此日期之后
keyword: 包含的关键词
Returns:
过滤后的评论列表
"""
filtered = comments
# 按点赞数过滤
if min_likes > 0:
filtered = [c for c in filtered if c.liked_count >= min_likes]
# 按时间过滤
if after_date:
filtered = [c for c in filtered if c.time > after_date]
# 按关键词过滤
if keyword:
filtered = [c for c in filtered if keyword.lower() in c.content.lower()]
return filtered
```
### 5.2 多种存储方案
根据不同的需求,我们可以选择不同的存储方案。这里提供三种常见方案:JSON文件、CSV文件和SQLite数据库。
```python
import json
import csv
import sqlite3
from pathlib import Path
from typing import List, Union
class CommentStorage:
"""评论存储管理器"""
def __init__(self, base_path: Union[str, Path] = "./data"):
"""
初始化存储管理器
Args:
base_path: 数据存储基础路径
"""
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
def save_to_json(self, comments: List[Comment],
filename: str = "comments.json") -> str:
"""
保存评论到JSON文件
Args:
comments: 评论列表
filename: 文件名
Returns:
保存的文件路径
"""
filepath = self.base_path / filename
# 将Comment对象转换为字典
data = []
for comment in comments:
comment_dict = {
'comment_id': comment.comment_id,
'user_id': comment.user_id,
'nickname': comment.nickname,
'avatar_url': comment.avatar_url,
'content': comment.content,
'time': comment.time.isoformat(),
'liked_count': comment.liked_count,
'reply_count': comment.reply_count,
'is_hot': comment.is_hot,
'is_top': comment.is_top
}
data.append(comment_dict)
# 保存到文件
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return str(filepath)
def save_to_csv(self, comments: List[Comment],
filename: str = "comments.csv") -> str:
"""
保存评论到CSV文件
Args:
comments: 评论列表
filename: 文件名
Returns:
保存的文件路径
"""
filepath = self.base_path / filename
# 定义CSV列
fieldnames = [
'comment_id', 'user_id', 'nickname', 'avatar_url',
'content', 'time', 'liked_count', 'reply_count',
'is_hot', 'is_top'
]
# 写入CSV
with open(filepath, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for comment in comments:
row = {
'comment_id': comment.comment_id,
'user_id': comment.user_id,
'nickname': comment.nickname,
'avatar_url': comment.avatar_url,
'content': comment.content,
'time': comment.time.isoformat(),
'liked_count': comment.liked_count,
'reply_count': comment.reply_count,
'is_hot': comment.is_hot,
'is_top': comment.is_top
}
writer.writerow(row)
return str(filepath)
def save_to_sqlite(self, comments: List[Comment],
db_name: str = "comments.db") -> str:
"""
保存评论到SQLite数据库
Args:
comments: 评论列表
db_name: 数据库文件名
Returns:
数据库文件路径
"""
db_path = self.base_path / db_name
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
comment_id INTEGER UNIQUE,
user_id INTEGER,
nickname TEXT,
avatar_url TEXT,
content TEXT,
time TIMESTAMP,
liked_count INTEGER,
reply_count INTEGER,
is_hot BOOLEAN,
is_top BOOLEAN,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_id ON comments(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_time ON comments(time)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_liked_count ON comments(liked_count)')
# 插入数据
for comment in comments:
try:
cursor.execute('''
INSERT OR REPLACE INTO comments
(comment_id, user_id, nickname, avatar_url, content, time,
liked_count, reply_count, is_hot, is_top)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
comment.comment_id,
comment.user_id,
comment.nickname,
comment.avatar_url,
comment.content,
comment.time.isoformat(),
comment.liked_count,
comment.reply_count,
comment.is_hot,
comment.is_top
))
except sqlite3.IntegrityError:
# 重复数据,跳过
continue
# 提交并关闭
conn.commit()
conn.close()
return str(db_path)
def save_to_all_formats(self, comments: List[Comment],
base_name: str = "comments") -> Dict[str, str]:
"""
保存评论到所有格式
Args:
comments: 评论列表
base_name: 基础文件名
Returns:
各格式文件路径字典
"""
return {
'json': self.save_to_json(comments, f"{base_name}.json"),
'csv': self.save_to_csv(comments, f"{base_name}.csv"),
'sqlite': self.save_to_sqlite(comments, f"{base_name}.db")
}
```
### 5.3 数据去重与增量更新
在实际应用中,我们经常需要定期更新数据。为了避免重复存储,我们需要实现去重和增量更新功能。
```python
class CommentManager:
"""评论管理器,处理去重和增量更新"""
def __init__(self, storage: CommentStorage):
"""
初始化评论管理器
Args:
storage: 存储管理器
"""
self.storage = storage
self.seen_comments = set()
# 加载已存在的评论ID
self._load_existing_comments()
def _load_existing_comments(self):
"""加载已存在的评论ID"""
# 检查SQLite数据库
db_path = self.storage.base_path / "comments.db"
if db_path.exists():
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('SELECT comment_id FROM comments')
rows = cursor.fetchall()
self.seen_comments = {row[0] for row in rows}
conn.close()
except:
# 如果数据库读取失败,从JSON文件加载
self._load_from_json()
def _load_from_json(self):
"""从JSON文件加载评论ID"""
json_path = self.storage.base_path / "comments.json"
if json_path.exists():
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self.seen_comments = {item['comment_id'] for item in data}
except:
self.seen_comments = set()
def filter_new_comments(self, comments: List[Comment]) -> List[Comment]:
"""
过滤出新评论
Args:
comments: 评论列表
Returns:
新评论列表
"""
new_comments = []
for comment in comments:
if comment.comment_id not in self.seen_comments:
new_comments.append(comment)
self.seen_comments.add(comment.comment_id)
return new_comments
def update_comments(self, new_comments: List[Comment],
append: bool = True) -> Dict[str, Any]:
"""
更新评论数据
Args:
new_comments: 新评论列表
append: 是否追加到现有数据
Returns:
更新统计信息
"""
# 过滤重复评论
unique_comments = self.filter_new_comments(new_comments)
if not unique_comments:
return {
'total_received': len(new_comments),
'new_comments': 0,
'duplicate_comments': len(new_comments)
}
# 保存新评论
if append:
# 加载现有评论
existing_comments = self._load_all_comments()
all_comments = existing_comments + unique_comments
else:
all_comments = unique_comments
# 保存到所有格式
self.storage.save_to_all_formats(all_comments)
return {
'total_received': len(new_comments),
'new_comments': len(unique_comments),
'duplicate_comments': len(new_comments) - len(unique_comments),
'total_stored': len(all_comments)
}
def _load_all_comments(self) -> List[Comment]:
"""加载所有评论"""
# 优先从SQLite加载
db_path = self.storage.base_path / "comments.db"
if db_path.exists():
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('SELECT * FROM comments ORDER BY time DESC')
rows = cursor.fetchall()
conn.close()
# 转换为Comment对象
comments = []
for row in rows:
# 跳过自增ID列
comment_data = {
'commentId': row[1],
'user': {
'userId': row[2],
'nickname': row[3],
'avatarUrl': row[4]
},
'content': row[5],
'time': int(datetime.fromisoformat(row[6]).timestamp() * 1000),
'likedCount': row[7],
'replyCount': row[8],
'hot': bool(row[9]),
'top': bool(row[10])
}
comments.append(Comment.from_dict(comment_data))
return comments
except:
pass
# 如果SQLite加载失败,从JSON加载
json_path = self.storage.base_path / "comments.json"
if json_path.exists():
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
comments = []
for item in data:
# 将JSON数据转换为API格式
comment_data = {
'commentId': item['comment_id'],
'user': {
'userId': item['user_id'],
'nickname': item['nickname'],
'avatarUrl': item['avatar_url']
},
'content': item['content'],
'time': int(datetime.fromisoformat(item['time']).timestamp() * 1000),
'likedCount': item['liked_count'],
'replyCount': item['reply_count'],
'hot': item['is_hot'],
'top': item['is_top']
}
comments.append(Comment.from_dict(comment_data))
return comments
except:
pass
return []
```
## 6. 高级功能与性能优化
对于大规模数据采集,我们需要考虑性能优化和高级功能。这里提供几个实用的优化方案。
### 6.1 并发请求处理
当需要采集大量数据时,串行请求效率太低。我们可以使用并发来提高效率。
```python
import concurrent.futures
from typing import List, Dict, Any
class ConcurrentNeteaseRequest:
"""并发请求处理器"""
def __init__(self, max_workers: int = 5):
"""
初始化并发请求处理器
Args:
max_workers: 最大工作线程数
"""
self.max_workers = max_workers
self.requesters = [RobustNeteaseRequest() for _ in range(max_workers)]
def fetch_multiple_songs(self, song_ids: List[str],
pages_per_song: int = 5) -> Dict[str, List[Comment]]:
"""
并发获取多首歌曲的评论
Args:
song_ids: 歌曲ID列表
pages_per_song: 每首歌曲获取的页数
Returns:
歌曲ID到评论列表的映射
"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 创建任务
future_to_song = {}
for i, song_id in enumerate(song_ids):
# 轮询使用不同的请求器
requester = self.requesters[i % len(self.requesters)]
future = executor.submit(
self._fetch_song_comments,
requester, song_id, pages_per_song
)
future_to_song[future] = song_id
# 收集结果
for future in concurrent.futures.as_completed(future_to_song):
song_id = future_to_song[future]
try:
comments = future.result()
results[song_id] = comments
except Exception as e:
print(f"获取歌曲{song_id}的评论失败: {str(e)}")
results[song_id] = []
return results
def _fetch_song_comments(self, requester: RobustNeteaseRequest,
song_id: str, max_pages: int) -> List[Comment]:
"""
获取单首歌曲的评论(内部方法)
"""
comments = []
for page in range(1, max_pages + 1):
try:
response = requester.get_comments(song_id, page=page)
parser = CommentParser()
parsed = parser.parse_response(response)
comments.extend(parsed['comments'])
# 随机延迟,避免请求过快
import random
time.sleep(random.uniform(0.5, 1.5))
except Exception as e:
print(f"获取歌曲{song_id}第{page}页失败: {str(e)}")
break
return comments
def batch_update(self, song_ids: List[str],
storage: CommentStorage) -> Dict[str, Any]:
"""
批量更新多首歌曲的评论
Args:
song_ids: 歌曲ID列表
storage: 存储管理器
Returns:
批量更新统计信息
"""
manager = CommentManager(storage)
all_results = self.fetch_multiple_songs(song_ids)
total_stats = {
'total_songs': len(song_ids),
'successful_songs': 0,
'failed_songs': 0,
'total_comments_received': 0,
'total_new_comments': 0
}
for song_id, comments in all_results.items():
if comments:
stats = manager.update_comments(comments, append=True)
total_stats['successful_songs'] += 1
total_stats['total_comments_received'] += stats['total_received']
total_stats['total_new_comments'] += stats['new_comments']
else:
total_stats['failed_songs'] += 1
return total_stats
```
### 6.2 缓存机制实现
为了避免重复请求相同的数据,我们可以实现一个简单的缓存机制。
```python
import hashlib
import pickle
from datetime import datetime, timedelta
class RequestCache:
"""请求缓存管理器"""
def __init__(self, cache_dir: str = "./cache",
ttl_hours: int = 24):
"""
初始化缓存管理器
Args:
cache_dir: 缓存目录
ttl_hours: 缓存有效期(小时)
"""
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.ttl = timedelta(hours=ttl_hours)
def _get_cache_key(self, song_id: str, page: int) -> str:
"""
生成缓存键
Args:
song_id: 歌曲ID
page: 页码
Returns:
缓存键
"""
key_string = f"{song_id}_{page}"
return hashlib.md5(key_string.encode()).hexdigest()
def _get_cache_path(self, cache_key: str) -> Path:
"""
获取缓存文件路径
Args:
cache_key: 缓存键
Returns:
缓存文件路径
"""
return self.cache_dir / f"{cache_key}.pkl"
def get(self, song_id: str, page: int) -> Optional[Dict[str, Any]]:
"""
从缓存获取数据
Args:
song_id: 歌曲ID
page: 页码
Returns:
缓存数据,如果不存在或已过期则返回None
"""
cache_key = self._get_cache_key(song_id, page)
cache_path = self._get_cache_path(cache_key)
if not cache_path.exists():
return None
try:
with open(cache_path, 'rb') as f:
cached_data = pickle.load(f)
# 检查是否过期
cached_time = cached_data.get('timestamp')
if cached_time and datetime.now() - cached_time > self.ttl:
# 缓存过期,删除文件
cache_path.unlink()
return None
return cached_data.get('data')
except (pickle.PickleError, EOFError, KeyError):
# 缓存文件损坏,删除
cache_path.unlink()
return None
def set(self, song_id: str, page: int, data: Dict[str, Any]):
"""
设置缓存
Args:
song_id: 歌曲ID
page: 页码
data: 要缓存的数据
"""
cache_key = self._get_cache_key(song_id, page)
cache_path = self._get_cache_path(cache_key)
cache_data = {
'timestamp': datetime.now(),
'data': data
}
try:
with open(cache_path, 'wb') as f:
pickle.dump(cache_data, f)
except:
# 缓存写入失败,忽略
pass
def clear_expired(self):
"""清理过期的缓存"""
now = datetime.now()
for cache_file in self.cache_dir.glob("*.pkl"):
try:
with open(cache_file, 'rb') as f:
cached_data = pickle.load(f)
cached_time = cached_data.get('timestamp')
if cached_time and now - cached_time > self.ttl:
cache_file.unlink()
except:
# 文件损坏,删除
cache_file.unlink()
class CachedNeteaseRequest(RobustNeteaseRequest):
"""带缓存的请求处理器"""
def __init__(self, cache_dir: str = "./cache", *args, **kwargs):
super().__init__(*args, **kwargs)
self.cache = RequestCache(cache_dir)
def get_comments(self, song_id: str, page: int = 1,
use_cache: bool = True, **kwargs) -> Dict[str, Any]:
"""
获取评论(带缓存)
Args:
song_id: 歌曲ID
page: 页码
use_cache: 是否使用缓存
**kwargs: 其他参数
Returns:
评论数据
"""
# 检查缓存
if use_cache:
cached = self.cache.get(song_id, page)
if cached is not None:
return cached
# 缓存未命中或禁用缓存,发起请求
result = super().get_comments(song_id, page, **kwargs)
# 保存到缓存
if use_cache:
self.cache.set(song_id, page, result)
return result
def get_all_comments(self, song_id: str, max_pages: Optional[int] = None,
use_cache: bool = True, **kwargs) -> list:
"""
获取所有评论(带缓存)
Args:
song_id: 歌曲ID
max_pages: 最大页数
use_cache: 是否使用缓存
**kwargs: 其他参数
Returns:
所有评论列表
"""
all_comments = []
# 先获取第一页,了解总评论数
first_page = self.get_comments(song_id, page=1, use_cache=use_cache)
total = first_page.get('data', {}).get('total', 0)
page_size = 20
# 计算总页数
total_pages = (total + page_size - 1) // page_size
# 限制最大页数
if max_pages is not None:
total_pages = min(total_pages, max_pages)
# 添加第一页评论
all_comments.extend(first_page.get('data', {}).get('comments', []))
# 获取剩余页
for page in range(2, total_pages + 1):
try:
page_data = self.get_comments(song_id, page=page, use_cache=use_cache)
comments = page_data.get('data', {}).get('comments', [])
all_comments.extend(comments)
# 延迟
time.sleep(kwargs.get('delay', 1.0))
except Exception as e:
print(f"获取第{page}页失败: {str(e)}")
continue
return all_comments
```
### 6.3 监控与日志系统
对于长期运行的数据采集任务,一个完善的监控和日志系统是必不可少的。
```python
import logging
from logging.handlers import RotatingFileHandler
import sys
class MonitoringSystem:
"""监控系统"""
def __init__(self, log_dir: str = "./logs"):
"""
初始化监控系统
Args:
log_dir: 日志目录
"""
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
# 配置日志
self._setup_logging()
# 监控数据
self.metrics = {
'requests_total': 0,
'requests_successful': 0,
'requests_failed': 0,
'comments_collected': 0,
'cache_hits': 0,
'cache_misses': 0,
'start_time': datetime.now()
}
def _setup_logging(self):
"""配置日志系统"""
# 创建logger
self.logger = logging.getLogger('netease_monitor')
self.logger.setLevel(logging.INFO)
# 避免重复添加handler
if not self.logger.handlers:
# 文件handler(按大小轮转)
file_handler = RotatingFileHandler(
self.log_dir / 'netease.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setLevel(logging.INFO)
# 控制台handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加handler
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def log_request(self, song_id: str, page: int,
success: bool, comments_count: int = 0):
"""
记录请求日志
Args:
song_id: 歌曲ID
page: 页码
success: 是否成功
comments_count: 获取的评论数
"""
self.metrics['requests_total'] += 1
if success:
self.metrics['requests_successful'] += 1
self.metrics['comments_collected'] += comments_count
self.logger.info(
f"请求成功 - 歌曲: {song_id}, 页码: {page}, "
f"评论数: {comments_count}"
)
else:
self.metrics['requests_failed'] += 1
self.logger.warning(
f"请求失败 - 歌曲: {song_id}, 页码: {page}"
)
def log_cache(self, hit: bool):
"""
记录缓存日志
Args:
hit: 是否命中缓存
"""
if hit:
self.metrics['cache_hits'] += 1
else:
self.metrics['cache_misses'] += 1
def get_metrics(self) -> Dict[str, Any]:
"""
获取监控指标
Returns:
监控指标字典
"""
# 计算运行时间
run_time = datetime.now() - self.metrics['start_time']
# 计算成功率
total = self.metrics['requests_total']
successful = self.metrics['requests_successful']
success_rate = successful / total if total > 0 else 0
# 计算缓存命中率
cache_total = self.metrics['cache_hits'] + self.metrics['cache_misses']
cache_hit_rate = (self.metrics['cache_hits'] / cache_total
if cache_total > 0 else 0)
return {
**self.metrics,
'run_time_seconds': run_time.total_seconds(),
'success_rate': success_rate,
'cache_hit_rate': cache_hit_rate,
'comments_per_request': (self.metrics['comments_collected'] / successful
if successful > 0 else 0)
}
def generate_report(self) -> str:
"""
生成监控报告
Returns:
报告字符串
"""
metrics = self.get_metrics()
report_lines = [
"=" * 50,
"网易云音乐评论采集监控报告",
"=" * 50,
f"开始时间: {metrics['start_time'].strftime('%Y-%m-%d %H:%M:%S')}",
f"运行时间: {metrics['run_time_seconds']:.2f} 秒",
f"总请求数: {metrics['requests_total']}",
f"成功请求: {metrics['requests_successful']}",
f"失败请求: {metrics['requests_failed']}",
f"成功率: {metrics['success_rate']:.2%}",
f"采集评论: {metrics['comments_collected']} 条",
f"平均每请求评论数: {metrics['comments_per_request']:.2f}",
f"缓存命中: {metrics['cache_hits']}",
f"缓存未命中: {metrics['cache_misses']}",
f"缓存命中率: {metrics['cache_hit_rate']:.2%}",
"=" * 50
]
return "\n".join(report_lines)
def save_report(self, filename: str = "monitor_report.txt"):
"""
保存监控报告
Args:
filename: 报告文件名
"""
report = self.generate_report()
report_path = self.log_dir / filename
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report)
# 同时记录到日志
self.logger.info("监控报告已生成")
for line in report.split('\n'):
self.logger.info(line)
```
### 6.4 完整示例:端到端的评论采集系统
最后,让我们把这些组件组合成一个完整的评论采集系统。
```python
class NeteaseCommentCollector:
"""网易云音乐评论采集系统"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
初始化采集系统
Args:
config: 配置字典
"""
# 默认配置
self.config = {
'cache_dir': './cache',
'data_dir': './data',
'log_dir': './logs',
'max_workers': 3,
'request_delay': 1.0,
'cache_ttl_hours': 24,
'max_retries': 3,
'user_agent': None
}
# 更新用户配置
if config:
self.config.update(config)
# 初始化组件
self._init_components()
def _init_components(self):
"""初始化所有组件"""
# 监控系统
self.monitor = MonitoringSystem(self.config['log_dir'])
# 存储系统
self.storage = CommentStorage(self.config['data_dir'])
# 请求系统(带缓存)
self.requester = CachedNeteaseRequest(
cache_dir=self.config['cache_dir'],
user_agent=self.config['user_agent']
)
# 评论管理器
self.manager = CommentManager(self.storage)
# 并发处理器(如果需要)
self.concurrent = None
if self.config['max_workers'] > 1:
self.concurrent = ConcurrentNeteaseRequest(
max_workers=self.config['max_workers']
)
def collect_single_song(self, song_id: str,
max_pages: Optional[int] = None) -> Dict[str, Any]:
"""
采集单首歌曲的评论
Args:
song_id: 歌曲ID
max_pages: 最大页数
Returns:
采集结果
"""
self.monitor.logger.info(f"开始采集歌曲 {song_id}")
try:
# 获取评论
comments_data = self.requester.get_all_comments(
song_id=song_id,
max_pages=max_pages,
delay=self.config['request_delay']
)
# 解析评论
parser = CommentParser()
comments = [Comment.from_dict(c) for c in comments_data]
# 记录请求
self.monitor.log_request(
song_id=song_id,
page=1, # 这里简化处理,实际应该记录每页
success=True,
comments_count=len(comments)
)
# 更新到存储
update_stats = self.manager.update_comments(comments, append=True)
result = {
'success': True,
'song_id': song_id,
'comments_received': len(comments),
'new_comments': update_stats['new_comments'],
'duplicate_comments': update_stats['duplicate_comments'],
'total_stored': update_stats['total_stored']
}
self.monitor.logger.info(
f"歌曲 {song_id} 采集完成: "
f"收到 {len(comments)} 条评论, "
f"其中 {update_stats['new_comments']} 条为新评论"
)
return result
except Exception as e:
self.monitor.logger.error(f"采集歌曲 {song_id} 失败: {str(e)}")
self.monitor.log_request(song_id, 1, False)
return {
'success': False,
'song_id': song_id,
'error': str(e)
}
def collect_multiple_songs(self, song_ids: List[str],
pages_per_song: int = 5) -> List[Dict[str, Any]]:
"""
采集多首歌曲的评论
Args:
song_ids: 歌曲ID列表
pages_per_song: 每首歌曲采集的页数
Returns:
每首歌曲的采集结果
"""
self.monitor.logger.info(f"开始批量采集 {len(song_ids)} 首歌曲")
results = []
if self.concurrent and len(song_ids) > 1:
# 使用并发采集
all_comments = self.concurrent.fetch_multiple_songs(
song_ids, pages_per_song
)
for song_id, comments in all_comments.items():
if comments:
# 记录请求(简化处理)
self.monitor.log_request(
song_id=song_id,
page=1,
success=True,
comments_count=len(comments)
)
# 更新到存储
update_stats = self.manager.update_comments(comments, append=True)
results.append({
'success': True,
'song_id': song_id,
'comments_received': len(comments),
'new_comments': update_stats['new_comments'],
'duplicate_comments': update_stats['duplicate_comments']
})
else:
results.append({
'success': False,
'song_id': song_id,
'error': '未获取到评论'
})
else:
# 串行采集
for song_id in song_ids:
result = self.collect_single_song(song_id, pages_per_song)
results.append(result)
# 歌曲间延迟
time.sleep(self.config['request_delay'] * 2)
# 生成报告
success_count = sum(1 for r in results if r['success'])
total_comments = sum(r.get('comments_received', 0) for r in results)
self.monitor.logger.info(
f"批量采集完成: {success_count}/{len(song_ids)} 首歌曲成功, "
f"共采集 {total_comments} 条评论"
)
return results
def run_scheduled_collection(self, song_ids: List[str],
interval_hours: int = 6,
pages_per_song: int = 3):
"""
运行定时采集任务
Args:
song_ids: 歌曲ID列表
interval_hours: 采集间隔(小时)
pages_per_song: 每首歌曲采集的页数
"""
self.monitor.logger.info("启动定时采集任务")
import schedule
import time as time_module
def collection_job():
"""采集任务"""
self.monitor.logger.info("执行定时采集")
results = self.collect_multiple_songs(song_ids, pages_per_song)
# 保存报告
self.monitor.save_report()
# 清理过期缓存
self.requester.cache.clear_expired()
return results
# 安排任务
schedule.every(interval_hours).hours.do(collection_job)
# 立即执行一次
collection_job()
# 运行调度器
self.monitor.logger.info(f"定时采集已安排,每 {interval_hours} 小时执行一次")
try:
while True:
schedule.run_pending()
time_module.sleep(60) # 每分钟检查一次
except KeyboardInterrupt:
self.monitor.logger.info("定时采集任务被用户中断")
finally:
# 最终报告
final_report = self.monitor.generate_report()
print(final_report)
self.monitor.save_report("final_report.txt")
def export_data(self, format: str = 'all',
filename: Optional[str] = None) -> Dict[str, str]:
"""
导出数据
Args:
format: 导出格式 ('json', 'csv', 'sqlite', 'all')
filename: 文件名(不含扩展名)
Returns:
导出的文件路径
"""
# 加载所有评论
comments = self.manager._load_all_comments()
if not comments:
self.monitor.logger.warning("没有可导出的数据")
return {}
# 确定文件名
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"netease_comments_{timestamp}"
# 导出数据
if format == 'all':
return self.storage.save_to_all_formats(comments, filename)
elif format == 'json':
path = self.storage.save_to_json(comments, f"{filename}.json")
return {'json': path}
elif format == 'csv':
path = self.storage.save_to_csv(comments, f"{filename}.csv")
return {'csv': path}
elif format == 'sqlite':
path = self.storage.save_to_sqlite(comments, f"{filename}.db")
return {'sqlite': path}
else:
raise ValueError(f"不支持的格式: {format}")
```
这个完整的采集系统包含了从加密、请求、解析到存储的所有功能,并且具备了监控、缓存、并发等高级特性。你可以根据自己的需求调整配置,或者扩展新的功能模块。