# 从零构建Python驱动的TikTok用户作品数据采集与分析系统
如果你是一位对社交媒体数据感兴趣的开发者,或者正在构建一个需要整合TikTok内容的应用,那么直接通过API获取用户作品列表无疑是最直接、最高效的方式。但官方API的复杂性、访问限制以及文档的缺失,常常让开发者望而却步。这篇文章将带你深入探索如何利用Python构建一个健壮、可扩展的TikTok用户作品数据采集系统,而不仅仅是调用一个简单的接口。我们将从核心概念讲起,逐步深入到工程化实践,包括身份认证、分页处理、数据解析、错误重试、数据存储以及性能优化,最终形成一个可以直接集成到生产环境中的解决方案。
在开始之前,我们需要明确一个核心理念:**数据采集的合法性**。任何自动化工具的使用都必须严格遵守平台的服务条款和当地法律法规。本文旨在分享技术实现方案,用于合法的数据分析、研究或个人学习,请勿将其用于任何违反平台规则或侵犯用户隐私的用途。一个负责任的开发者,技术能力与法律意识同等重要。
## 1. 理解TikTok数据接口的核心机制
在动手写代码之前,我们必须先理解TikTok数据接口的基本工作原理。与许多现代社交媒体平台一样,TikTok并未提供完全公开、文档化的REST API供普通开发者随意调用其核心数据。我们通常所说的“API调用”,实际上是通过分析其移动端或网页端的网络请求,模拟这些请求来获取数据。
### 1.1 关键概念:`sec_user_id` 与用户身份标识
要获取特定用户的作品列表,首先需要找到该用户的唯一标识符。在TikTok的体系中,每个用户拥有多个ID,但最关键的是 `sec_user_id`。这是一个加密的用户标识符,比常见的用户名(`@username`)或数字UID更加稳定和安全。
**如何获取一个用户的 `sec_user_id`?**
最直接的方式是通过用户的个人主页URL。例如,一个典型的TikTok用户主页链接如下:
```
https://www.tiktok.com/@tiktok
```
通过浏览器开发者工具(F12)查看网络请求,你会发现类似这样的请求:
```
GET /api/user/detail/?uniqueId=tiktok&... HTTP/1.1
```
在返回的JSON响应中,你可以找到 `userInfo.user.secUid` 字段,其值类似于:
```
MS4wLjABAAAAv7iBp6J3b9Qd5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5
```
这就是我们需要的 `sec_user_id`。它是一个Base64编码的字符串,通常以 `MS4wLjABAAAA` 开头。在实际项目中,我们可以通过以下方式自动化获取:
```python
import re
import requests
def extract_sec_user_id_from_profile(profile_url):
"""
从用户主页HTML中提取sec_user_id
注意:此方法依赖于页面结构,TikTok更新后可能失效
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
try:
response = requests.get(profile_url, headers=headers, timeout=10)
response.raise_for_status()
# 方法1:从JavaScript变量中提取
pattern = r'"secUid":"([^"]+)"'
matches = re.findall(pattern, response.text)
if matches:
return matches[0]
# 方法2:从数据属性中提取(备用方案)
pattern2 = r'secUid[=:]\s*["\']([^"\']+)["\']'
matches2 = re.findall(pattern2, response.text)
if matches2:
return matches2[0]
except Exception as e:
print(f"提取sec_user_id失败: {e}")
return None
# 使用示例
profile_url = "https://www.tiktok.com/@tiktok"
sec_user_id = extract_sec_user_id_from_profile(profile_url)
print(f"提取到的sec_user_id: {sec_user_id}")
```
> **注意**:网页结构可能随时变化,这种提取方法需要定期维护。对于生产环境,建议结合多种提取策略,并设置备用方案。
### 1.2 接口请求参数解析
获取用户作品列表的核心接口通常需要以下几个关键参数:
| 参数名 | 类型 | 说明 | 示例值 |
|--------|------|------|--------|
| `sec_user_id` | string | 用户安全ID,必需 | `MS4wLjABAAAAv7iBp6J3...` |
| `count` | integer | 每次请求返回的作品数量 | `20` |
| `max_cursor` | integer | 分页游标,第一次请求为0 | `0` |
| `min_cursor` | integer | 最小游标,通常为0 | `0` |
| `aid` | integer | 应用ID,通常为固定值 | `1988` |
| `cookie` | string | 用户会话Cookie,用于认证 | `tt_chain_token=...` |
在实际请求中,你可能会遇到更复杂的参数,包括时间戳、签名等。这些参数是为了防止未授权的访问和确保请求的合法性。
### 1.3 响应数据结构深度解析
TikTok的作品列表接口返回的JSON数据结构非常丰富,包含了作品的完整元数据。以下是一个简化的结构概览:
```json
{
"status_code": 0,
"has_more": true,
"max_cursor": "1620000000000",
"min_cursor": "0",
"aweme_list": [
{
"aweme_id": "1234567890123456789",
"desc": "视频描述文本",
"create_time": 1620000000,
"author": {
"uid": "123456789",
"unique_id": "username",
"nickname": "用户昵称",
"signature": "个性签名"
},
"video": {
"play_addr": {
"url_list": ["视频播放地址1", "视频播放地址2"]
},
"cover": {
"url_list": ["封面图地址1", "封面图地址2"]
},
"duration": 15000,
"width": 720,
"height": 1280
},
"statistics": {
"digg_count": 1000,
"comment_count": 200,
"share_count": 300,
"play_count": 50000
},
"music": {
"id": "music_123",
"title": "音乐标题",
"author": "音乐作者",
"play_url": {
"url_list": ["音乐播放地址"]
}
}
}
],
"extra": {
"now": 1620000000000,
"logid": "20211111111111111111111111111111"
}
}
```
理解这个数据结构对于后续的数据处理至关重要。每个字段都包含了特定的信息,我们可以根据需求提取相应的数据。
## 2. 构建健壮的Python数据采集客户端
现在让我们进入实战环节,构建一个完整的Python客户端。我们将采用面向对象的设计,创建一个可重用、可扩展的 `TikTokAPI` 类。
### 2.1 基础客户端实现
首先,我们需要设置一个基础客户端,处理HTTP请求、错误处理和基础配置:
```python
import requests
import time
import json
import logging
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
from datetime import datetime
import hashlib
import random
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class TikTokVideo:
"""视频数据模型"""
aweme_id: str
desc: str
create_time: int
author_id: str
author_name: str
video_url: str
cover_url: str
duration: int
width: int
height: int
digg_count: int
comment_count: int
share_count: int
play_count: int
music_id: str
music_title: str
music_author: str
music_url: str
collected_at: datetime = None
def __post_init__(self):
if self.collected_at is None:
self.collected_at = datetime.now()
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'aweme_id': self.aweme_id,
'desc': self.desc,
'create_time': self.create_time,
'create_time_iso': datetime.fromtimestamp(self.create_time).isoformat(),
'author_id': self.author_id,
'author_name': self.author_name,
'video_url': self.video_url,
'cover_url': self.cover_url,
'duration': self.duration,
'width': self.width,
'height': self.height,
'digg_count': self.dig_count,
'comment_count': self.comment_count,
'share_count': self.share_count,
'play_count': self.play_count,
'music_id': self.music_id,
'music_title': self.music_title,
'music_author': self.music_author,
'music_url': self.music_url,
'collected_at': self.collected_at.isoformat()
}
class TikTokAPIClient:
"""TikTok API客户端"""
def __init__(
self,
session_cookie: Optional[str] = None,
user_agent: Optional[str] = None,
timeout: int = 30,
max_retries: int = 3,
retry_delay: float = 1.0
):
"""
初始化TikTok API客户端
Args:
session_cookie: 可选的会话Cookie
user_agent: 自定义User-Agent
timeout: 请求超时时间(秒)
max_retries: 最大重试次数
retry_delay: 重试延迟(秒)
"""
self.session = requests.Session()
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay = retry_delay
# 设置默认请求头
self.headers = {
'User-Agent': user_agent or (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/120.0.0.0 Safari/537.36'
),
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://www.tiktok.com/',
'Connection': 'keep-alive',
}
if session_cookie:
self.headers['Cookie'] = session_cookie
def _make_request(
self,
method: str,
url: str,
params: Optional[Dict] = None,
data: Optional[Dict] = None,
json_data: Optional[Dict] = None,
**kwargs
) -> Optional[Dict]:
"""
发送HTTP请求,包含重试机制
Args:
method: HTTP方法(GET/POST等)
url: 请求URL
params: URL参数
data: 表单数据
json_data: JSON数据
**kwargs: 其他requests参数
Returns:
解析后的JSON响应或None
"""
for attempt in range(self.max_retries):
try:
response = self.session.request(
method=method,
url=url,
params=params,
data=data,
json=json_data,
headers=self.headers,
timeout=self.timeout,
**kwargs
)
response.raise_for_status()
# 尝试解析JSON
try:
return response.json()
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}, 响应内容: {response.text[:200]}")
return None
except requests.exceptions.RequestException as e:
logger.warning(f"请求失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
if attempt < self.max_retries - 1:
sleep_time = self.retry_delay * (2 ** attempt) + random.uniform(0, 0.5)
logger.info(f"等待 {sleep_time:.2f} 秒后重试...")
time.sleep(sleep_time)
else:
logger.error(f"请求失败,已达到最大重试次数: {e}")
raise
return None
```
这个基础客户端提供了请求重试、错误处理和基本的配置管理。接下来,我们需要实现具体的作品列表获取功能。
### 2.2 实现作品列表获取功能
现在让我们实现核心的作品列表获取方法。这里的关键是正确处理分页逻辑和参数构造:
```python
class TikTokAPIClient(TikTokAPIClient):
"""扩展TikTok API客户端,添加作品列表功能"""
def get_user_videos(
self,
sec_user_id: str,
max_count: int = 100,
max_cursor: int = 0,
count_per_request: int = 20
) -> List[TikTokVideo]:
"""
获取用户的所有视频作品
Args:
sec_user_id: 用户安全ID
max_count: 最大获取数量(0表示无限制)
max_cursor: 起始游标
count_per_request: 每次请求获取的数量
Returns:
视频对象列表
"""
videos = []
current_cursor = max_cursor
has_more = True
request_count = 0
# 计算最大请求次数(防止无限循环)
max_requests = (max_count // count_per_request) + 2 if max_count > 0 else 100
while has_more and request_count < max_requests:
try:
logger.info(f"获取用户 {sec_user_id} 的视频,游标: {current_cursor}")
# 构建请求参数
params = {
'sec_user_id': sec_user_id,
'count': count_per_request,
'max_cursor': current_cursor,
'min_cursor': 0,
'aid': 1988, # TikTok的App ID
'cookie_enabled': 'true',
'screen_width': 1920,
'screen_height': 1080,
'browser_language': 'en-US',
'browser_platform': 'Win32',
'browser_name': 'Mozilla',
'browser_version': '5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'browser_online': 'true',
'timezone_name': 'America/New_York',
'priority_region': '',
'verifyFp': self._generate_verify_fp(),
'app_name': 'tiktok_web',
'app_language': 'en',
'webcast_language': 'en',
'tz_name': 'America/New_York',
'is_page_visible': 'true',
'focus_state': 'true',
'is_fullscreen': 'false',
'history_len': 4,
'battery_info': '1',
'from_page': 'user',
'device_id': self._generate_device_id(),
'language': 'en',
'priority_region': '',
'rotate_count': 0,
'screen_orientation': 'portrait',
'webid': self._generate_webid(),
'msToken': self._generate_ms_token(),
'X-Bogus': self._generate_x_bogus(params),
'_signature': self._generate_signature(params)
}
# 发送请求
response = self._make_request(
'GET',
'https://www.tiktok.com/api/post/item_list/',
params=params
)
if not response:
logger.error(f"获取用户 {sec_user_id} 视频失败")
break
# 检查响应状态
if response.get('status_code') != 0:
logger.error(f"API返回错误: {response.get('status_msg', 'Unknown error')}")
break
# 解析视频数据
aweme_list = response.get('aweme_list', [])
for item in aweme_list:
try:
video = self._parse_video_item(item)
if video:
videos.append(video)
except Exception as e:
logger.error(f"解析视频数据失败: {e}, 原始数据: {item.get('aweme_id', 'unknown')}")
# 更新分页状态
has_more = response.get('has_more', False)
current_cursor = response.get('max_cursor', current_cursor + count_per_request)
request_count += 1
# 检查是否达到最大数量限制
if max_count > 0 and len(videos) >= max_count:
videos = videos[:max_count]
break
# 添加延迟,避免请求过于频繁
time.sleep(random.uniform(1.0, 2.5))
except Exception as e:
logger.error(f"获取视频过程中发生错误: {e}")
break
logger.info(f"成功获取 {len(videos)} 个视频")
return videos
def _parse_video_item(self, item: Dict) -> Optional[TikTokVideo]:
"""解析单个视频项"""
try:
# 提取基本信息
aweme_id = item.get('aweme_id', '')
desc = item.get('desc', '')
create_time = item.get('create_time', 0)
# 提取作者信息
author = item.get('author', {})
author_id = author.get('uid', '')
author_name = author.get('nickname', '')
# 提取视频信息
video_info = item.get('video', {})
play_addr = video_info.get('play_addr', {})
video_urls = play_addr.get('url_list', [])
video_url = video_urls[0] if video_urls else ''
cover_info = video_info.get('cover', {})
cover_urls = cover_info.get('url_list', [])
cover_url = cover_urls[0] if cover_urls else ''
duration = video_info.get('duration', 0)
width = video_info.get('width', 0)
height = video_info.get('height', 0)
# 提取统计信息
stats = item.get('statistics', {})
digg_count = stats.get('digg_count', 0)
comment_count = stats.get('comment_count', 0)
share_count = stats.get('share_count', 0)
play_count = stats.get('play_count', 0)
# 提取音乐信息
music_info = item.get('music', {})
music_id = music_info.get('id', '')
music_title = music_info.get('title', '')
music_author = music_info.get('author', '')
music_play_url = music_info.get('play_url', {})
music_urls = music_play_url.get('url_list', [])
music_url = music_urls[0] if music_urls else ''
return TikTokVideo(
aweme_id=aweme_id,
desc=desc,
create_time=create_time,
author_id=author_id,
author_name=author_name,
video_url=video_url,
cover_url=cover_url,
duration=duration,
width=width,
height=height,
digg_count=digg_count,
comment_count=comment_count,
share_count=share_count,
play_count=play_count,
music_id=music_id,
music_title=music_title,
music_author=music_author,
music_url=music_url
)
except Exception as e:
logger.error(f"解析视频项失败: {e}")
return None
def _generate_verify_fp(self) -> str:
"""生成verifyFp参数(简化版)"""
# 实际实现需要更复杂的算法
import uuid
return f"verify_{uuid.uuid4().hex[:16]}"
def _generate_device_id(self) -> str:
"""生成设备ID"""
import uuid
return str(uuid.uuid4().hex[:16]).upper()
def _generate_webid(self) -> str:
"""生成webid"""
import uuid
return str(uuid.uuid4().hex[:19])
def _generate_ms_token(self) -> str:
"""生成msToken参数"""
import random
import string
length = 107
chars = string.ascii_letters + string.digits + '-_'
return ''.join(random.choice(chars) for _ in range(length))
def _generate_x_bogus(self, params: Dict) -> str:
"""生成X-Bogus签名(简化版)"""
# 注意:实际实现需要逆向TikTok的X-Bogus生成算法
# 这里返回一个模拟值,实际使用可能需要更复杂的实现
return "DFSzswVY8ANANtHT6cWXyQqXgQq1"
def _generate_signature(self, params: Dict) -> str:
"""生成_signature参数(简化版)"""
# 实际实现需要逆向TikTok的签名算法
import hashlib
import time
param_str = '&'.join([f'{k}={v}' for k, v in sorted(params.items())])
timestamp = str(int(time.time() * 1000))
to_sign = f"{param_str}&{timestamp}"
return hashlib.md5(to_sign.encode()).hexdigest()[:32]
```
> **重要提示**:上面的 `_generate_x_bogus` 和 `_generate_signature` 方法是简化版本。在实际生产环境中,这些参数的生成算法可能非常复杂且会定期更新。你可能需要研究最新的逆向工程成果或使用维护良好的第三方库。
### 2.3 添加数据导出和存储功能
获取到数据后,我们通常需要将其保存到文件或数据库中。让我们添加一些实用的导出功能:
```python
import csv
import json
from pathlib import Path
from typing import List
class TikTokDataExporter:
"""TikTok数据导出器"""
@staticmethod
def export_to_json(videos: List[TikTokVideo], filename: str) -> bool:
"""
将视频数据导出为JSON文件
Args:
videos: 视频对象列表
filename: 输出文件名
Returns:
是否成功
"""
try:
data = [video.to_dict() for video in videos]
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"成功导出 {len(videos)} 个视频到 {filename}")
return True
except Exception as e:
logger.error(f"导出JSON失败: {e}")
return False
@staticmethod
def export_to_csv(videos: List[TikTokVideo], filename: str) -> bool:
"""
将视频数据导出为CSV文件
Args:
videos: 视频对象列表
filename: 输出文件名
Returns:
是否成功
"""
try:
if not videos:
logger.warning("没有视频数据可导出")
return False
# 获取所有字段
first_video = videos[0].to_dict()
fieldnames = list(first_video.keys())
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for video in videos:
writer.writerow(video.to_dict())
logger.info(f"成功导出 {len(videos)} 个视频到 {filename}")
return True
except Exception as e:
logger.error(f"导出CSV失败: {e}")
return False
@staticmethod
def export_to_sqlite(videos: List[TikTokVideo], db_path: str) -> bool:
"""
将视频数据导出到SQLite数据库
Args:
videos: 视频对象列表
db_path: 数据库文件路径
Returns:
是否成功
"""
try:
import sqlite3
from datetime import datetime
# 创建数据库连接
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS tiktok_videos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
aweme_id TEXT UNIQUE,
desc TEXT,
create_time INTEGER,
create_time_iso TEXT,
author_id TEXT,
author_name TEXT,
video_url TEXT,
cover_url TEXT,
duration INTEGER,
width INTEGER,
height INTEGER,
digg_count INTEGER,
comment_count INTEGER,
share_count INTEGER,
play_count INTEGER,
music_id TEXT,
music_title TEXT,
music_author TEXT,
music_url TEXT,
collected_at TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_aweme_id ON tiktok_videos(aweme_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_author_id ON tiktok_videos(author_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_create_time ON tiktok_videos(create_time)')
# 插入数据
inserted_count = 0
for video in videos:
video_dict = video.to_dict()
# 移除id字段(自增)
if 'id' in video_dict:
del video_dict['id']
# 构建插入语句
columns = ', '.join(video_dict.keys())
placeholders = ', '.join(['?' for _ in video_dict])
values = list(video_dict.values())
try:
cursor.execute(
f'INSERT OR REPLACE INTO tiktok_videos ({columns}) VALUES ({placeholders})',
values
)
inserted_count += 1
except sqlite3.IntegrityError:
# 重复数据,跳过
pass
conn.commit()
conn.close()
logger.info(f"成功插入/更新 {inserted_count} 个视频到数据库 {db_path}")
return True
except ImportError:
logger.error("SQLite支持不可用,请确保已安装sqlite3")
return False
except Exception as e:
logger.error(f"导出到SQLite失败: {e}")
return False
```
## 3. 高级功能:并发采集与性能优化
当需要采集大量用户的数据时,单线程的方式会非常慢。我们可以使用并发编程来显著提高采集效率。
### 3.1 使用异步IO进行并发采集
```python
import asyncio
import aiohttp
import async_timeout
from typing import List, Dict, Any
import logging
logger = logging.getLogger(__name__)
class AsyncTikTokClient:
"""异步TikTok客户端"""
def __init__(
self,
session_cookie: Optional[str] = None,
max_concurrent: int = 5,
request_timeout: int = 30
):
"""
初始化异步客户端
Args:
session_cookie: 会话Cookie
max_concurrent: 最大并发数
request_timeout: 请求超时时间
"""
self.session_cookie = session_cookie
self.max_concurrent = max_concurrent
self.request_timeout = request_timeout
self.semaphore = asyncio.Semaphore(max_concurrent)
# 请求头
self.headers = {
'User-Agent': (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/120.0.0.0 Safari/537.36'
),
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Referer': 'https://www.tiktok.com/',
}
if session_cookie:
self.headers['Cookie'] = session_cookie
async def fetch_user_videos(
self,
sec_user_ids: List[str],
max_videos_per_user: int = 50
) -> Dict[str, List[Dict]]:
"""
并发获取多个用户的视频
Args:
sec_user_ids: 用户ID列表
max_videos_per_user: 每个用户最大视频数
Returns:
用户ID到视频列表的映射
"""
tasks = []
for sec_user_id in sec_user_ids:
task = self._fetch_single_user_videos(sec_user_id, max_videos_per_user)
tasks.append(task)
# 并发执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
user_videos = {}
for sec_user_id, result in zip(sec_user_ids, results):
if isinstance(result, Exception):
logger.error(f"获取用户 {sec_user_id} 视频失败: {result}")
user_videos[sec_user_id] = []
else:
user_videos[sec_user_id] = result
return user_videos
async def _fetch_single_user_videos(
self,
sec_user_id: str,
max_videos: int
) -> List[Dict]:
"""
获取单个用户的视频
Args:
sec_user_id: 用户ID
max_videos: 最大视频数
Returns:
视频数据列表
"""
videos = []
has_more = True
max_cursor = 0
count_per_request = 20
async with aiohttp.ClientSession(headers=self.headers) as session:
while has_more and len(videos) < max_videos:
try:
async with self.semaphore:
# 构建请求参数
params = self._build_request_params(sec_user_id, max_cursor, count_per_request)
async with async_timeout.timeout(self.request_timeout):
async with session.get(
'https://www.tiktok.com/api/post/item_list/',
params=params
) as response:
if response.status != 200:
logger.warning(f"请求失败: {response.status}")
break
data = await response.json()
if data.get('status_code') != 0:
logger.warning(f"API错误: {data.get('status_msg')}")
break
# 解析视频
aweme_list = data.get('aweme_list', [])
for item in aweme_list:
parsed = self._parse_video_item(item)
if parsed:
videos.append(parsed)
# 更新分页状态
has_more = data.get('has_more', False)
max_cursor = data.get('max_cursor', max_cursor + count_per_request)
# 随机延迟,避免请求过快
await asyncio.sleep(1.0 + random.random())
except asyncio.TimeoutError:
logger.warning(f"请求超时: {sec_user_id}")
break
except Exception as e:
logger.error(f"获取用户 {sec_user_id} 视频失败: {e}")
break
logger.info(f"用户 {sec_user_id} 获取到 {len(videos)} 个视频")
return videos
def _build_request_params(self, sec_user_id: str, max_cursor: int, count: int) -> Dict:
"""构建请求参数"""
# 这里需要实现实际的参数构建逻辑
# 包括生成必要的签名参数
return {
'sec_user_id': sec_user_id,
'count': count,
'max_cursor': max_cursor,
'min_cursor': 0,
'aid': 1988,
# ... 其他必要参数
}
def _parse_video_item(self, item: Dict) -> Optional[Dict]:
"""解析视频项"""
try:
return {
'aweme_id': item.get('aweme_id'),
'desc': item.get('desc', ''),
'create_time': item.get('create_time', 0),
'author': item.get('author', {}).get('nickname', ''),
'video_url': item.get('video', {}).get('play_addr', {}).get('url_list', [''])[0],
'stats': item.get('statistics', {})
}
except Exception as e:
logger.error(f"解析视频项失败: {e}")
return None
# 使用示例
async def main():
# 用户ID列表
user_ids = [
"MS4wLjABAAAAv7iBp6J3b9Qd5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5",
# 添加更多用户ID
]
client = AsyncTikTokClient()
results = await client.fetch_user_videos(user_ids, max_videos_per_user=30)
# 处理结果
for user_id, videos in results.items():
print(f"用户 {user_id}: {len(videos)} 个视频")
TikTokDataExporter.export_to_json(videos, f"{user_id}_videos.json")
# 运行异步主函数
if __name__ == "__main__":
asyncio.run(main())
```
### 3.2 使用线程池进行并发处理
如果你更熟悉传统的多线程编程,也可以使用 `concurrent.futures` 来实现并发:
```python
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Tuple
import time
class ConcurrentTikTokClient:
"""并发TikTok客户端(使用线程池)"""
def __init__(
self,
max_workers: int = 5,
request_timeout: int = 30
):
"""
初始化并发客户端
Args:
max_workers: 最大工作线程数
request_timeout: 请求超时时间
"""
self.max_workers = max_workers
self.request_timeout = request_timeout
self.client = TikTokAPIClient() # 使用之前实现的同步客户端
def fetch_multiple_users_videos(
self,
user_data: List[Tuple[str, int]],
output_dir: str = "output"
) -> Dict[str, List[TikTokVideo]]:
"""
并发获取多个用户的视频
Args:
user_data: 用户数据列表,每个元素是(sec_user_id, max_videos)元组
output_dir: 输出目录
Returns:
用户ID到视频列表的映射
"""
from pathlib import Path
# 创建输出目录
Path(output_dir).mkdir(parents=True, exist_ok=True)
results = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_user = {
executor.submit(self._fetch_user_videos_task, sec_user_id, max_videos, output_dir):
(sec_user_id, max_videos)
for sec_user_id, max_videos in user_data
}
# 处理完成的任务
for future in as_completed(future_to_user):
sec_user_id, max_videos = future_to_user[future]
try:
videos = future.result(timeout=self.request_timeout + 10)
results[sec_user_id] = videos
logger.info(f"用户 {sec_user_id} 完成,获取 {len(videos)} 个视频")
except Exception as e:
logger.error(f"用户 {sec_user_id} 任务失败: {e}")
results[sec_user_id] = []
return results
def _fetch_user_videos_task(
self,
sec_user_id: str,
max_videos: int,
output_dir: str
) -> List[TikTokVideo]:
"""
单个用户的视频获取任务
Args:
sec_user_id: 用户ID
max_videos: 最大视频数
output_dir: 输出目录
Returns:
视频列表
"""
try:
# 获取视频
videos = self.client.get_user_videos(
sec_user_id=sec_user_id,
max_count=max_videos
)
# 保存到文件
if videos:
output_file = Path(output_dir) / f"{sec_user_id}_videos.json"
TikTokDataExporter.export_to_json(videos, str(output_file))
return videos
except Exception as e:
logger.error(f"获取用户 {sec_user_id} 视频失败: {e}")
return []
def batch_process_users_from_file(
self,
input_file: str,
output_dir: str = "output",
max_videos_per_user: int = 50
) -> Dict[str, List[TikTokVideo]]:
"""
从文件批量处理用户
Args:
input_file: 输入文件(每行一个用户ID)
output_dir: 输出目录
max_videos_per_user: 每个用户最大视频数
Returns:
处理结果
"""
try:
with open(input_file, 'r', encoding='utf-8') as f:
user_ids = [line.strip() for line in f if line.strip()]
user_data = [(user_id, max_videos_per_user) for user_id in user_ids]
return self.fetch_multiple_users_videos(user_data, output_dir)
except FileNotFoundError:
logger.error(f"输入文件不存在: {input_file}")
return {}
except Exception as e:
logger.error(f"批量处理失败: {e}")
return {}
```
## 4. 错误处理与监控
在生产环境中,健壮的错误处理和监控是必不可少的。让我们为我们的采集系统添加这些功能。
### 4.1 实现完善的错误处理
```python
from enum import Enum
from typing import Optional, Dict, Any
import traceback
from datetime import datetime, timedelta
class TikTokErrorCode(Enum):
"""TikTok API错误代码枚举"""
SUCCESS = 0
RATE_LIMITED = 10202
USER_NOT_FOUND = 10210
VIDEO_NOT_FOUND = 10211
PRIVATE_ACCOUNT = 10212
SESSION_EXPIRED = 10214
NETWORK_ERROR = 10000
PARSE_ERROR = 10001
UNKNOWN_ERROR = 99999
class TikTokAPIError(Exception):
"""TikTok API异常基类"""
def __init__(
self,
code: TikTokErrorCode,
message: str,
details: Optional[Dict] = None
):
self.code = code
self.message = message
self.details = details or {}
super().__init__(f"[{code.name}] {message}")
class RateLimitError(TikTokAPIError):
"""速率限制异常"""
pass
class AuthenticationError(TikTokAPIError):
"""认证异常"""
pass
class TikTokClientWithErrorHandling(TikTokAPIClient):
"""带有完善错误处理的TikTok客户端"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.error_stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'rate_limit_hits': 0,
'auth_errors': 0,
'network_errors': 0,
'last_error': None,
'last_error_time': None
}
self.rate_limit_reset = None
def get_user_videos_with_retry(
self,
sec_user_id: str,
max_count: int = 100,
max_retries: int = 3
) -> List[TikTokVideo]:
"""
带重试机制的用户视频获取
Args:
sec_user_id: 用户ID
max_count: 最大视频数
max_retries: 最大重试次数
Returns:
视频列表
"""
self.error_stats['total_requests'] += 1
for attempt in range(max_retries):
try:
# 检查速率限制
if self.rate_limit_reset and datetime.now() < self.rate_limit_reset:
wait_seconds = (self.rate_limit_reset - datetime.now()).total_seconds()
logger.warning(f"速率限制中,等待 {wait_seconds:.1f} 秒")
time.sleep(wait_seconds + 1)
videos = self.get_user_videos(sec_user_id, max_count)
self.error_stats['successful_requests'] += 1
return videos
except RateLimitError as e:
self.error_stats['rate_limit_hits'] += 1
self.error_stats['failed_requests'] += 1
# 解析重试时间
retry_after = e.details.get('retry_after', 60)
self.rate_limit_reset = datetime.now() + timedelta(seconds=retry_after)
if attempt < max_retries - 1:
wait_time = retry_after + random.uniform(5, 15)
logger.warning(f"速率限制,等待 {wait_time:.1f} 秒后重试 (尝试 {attempt + 1}/{max_retries})")
time.sleep(wait_time)
else:
logger.error(f"达到最大重试次数,放弃获取用户 {sec_user_id} 的视频")
raise
except AuthenticationError as e:
self.error_stats['auth_errors'] += 1
self.error_stats['failed_requests'] += 1
self.error_stats['last_error'] = str(e)
self.error_stats['last_error_time'] = datetime.now()
logger.error(f"认证失败: {e}")
raise
except Exception as e:
self.error_stats['failed_requests'] += 1
self.error_stats['last_error'] = str(e)
self.error_stats['last_error_time'] = datetime.now()
if attempt < max_retries - 1:
wait_time = 2 ** attempt + random.uniform(1, 3)
logger.warning(f"请求失败,{wait_time:.1f} 秒后重试 (尝试 {attempt + 1}/{max_retries}): {e}")
time.sleep(wait_time)
else:
logger.error(f"达到最大重试次数,最终失败: {e}")
raise
return []
def _handle_api_response(self, response: Dict) -> None:
"""
处理API响应,检查错误
Args:
response: API响应数据
Raises:
TikTokAPIError: 如果响应包含错误
"""
status_code = response.get('status_code', 0)
if status_code == TikTokErrorCode.SUCCESS.value:
return
error_msg = response.get('status_msg', 'Unknown error')
if status_code == TikTokErrorCode.RATE_LIMITED.value:
raise RateLimitError(
TikTokErrorCode.RATE_LIMITED,
f"Rate limited: {error_msg}",
{'retry_after': 60} # 默认60秒后重试
)
elif status_code == TikTokErrorCode.USER_NOT_FOUND.value:
raise TikTokAPIError(
TikTokErrorCode.USER_NOT_FOUND,
f"User not found: {error_msg}"
)
elif status_code == TikTokErrorCode.SESSION_EXPIRED.value:
raise AuthenticationError(
TikTokErrorCode.SESSION_EXPIRED,
f"Session expired: {error_msg}"
)
else:
raise TikTokAPIError(
TikTokErrorCode.UNKNOWN_ERROR,
f"API error {status_code}: {error_msg}",
{'raw_response': response}
)
def get_error_stats(self) -> Dict[str, Any]:
"""获取错误统计信息"""
success_rate = 0
if self.error_stats['total_requests'] > 0:
success_rate = (self.error_stats['successful_requests'] /
self.error_stats['total_requests']) * 100
return {
**self.error_stats,
'success_rate': f"{success_rate:.2f}%",
'failure_rate': f"{100 - success_rate:.2f}%"
}
def reset_error_stats(self) -> None:
"""重置错误统计"""
self.error_stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'rate_limit_hits': 0,
'auth_errors': 0,
'network_errors': 0,
'last_error': None,
'last_error_time': None
}
```
### 4.2 添加监控和日志记录
```python
import logging
from logging.handlers import RotatingFileHandler
import json
from datetime import datetime
from typing import Dict, Any
class TikTokMonitor:
"""TikTok采集监控器"""
def __init__(self, log_file: str = "tiktok_monitor.log"):
"""
初始化监控器
Args:
log_file: 日志文件路径
"""
self.logger = logging.getLogger('TikTokMonitor')
self.logger.setLevel(logging.INFO)
# 避免重复添加handler
if not self.logger.handlers:
# 文件处理器(按大小轮转)
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(file_formatter)
self.logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
self.logger.addHandler(console_handler)
self.metrics = {
'start_time': datetime.now(),
'total_users_processed': 0,
'total_videos_collected': 0,
'successful_requests': 0,
'failed_requests': 0,
'rate_limit_events': 0,
'last_activity': None,
'user_stats': {}
}
def log_request(self, user_id: str, success: bool, video_count: int = 0) -> None:
"""
记录请求日志
Args:
user_id: 用户ID
success: 是否成功
video_count: 获取的视频数量
"""
self.metrics['total_users_processed'] += 1
self.metrics['last_activity'] = datetime.now()
if success:
self.metrics['successful_requests'] += 1
self.metrics['total_videos_collected'] += video_count
# 更新用户统计
if user_id not in self.metrics['user_stats']:
self.metrics['user_stats'][user_id] = {
'first_seen': datetime.now(),
'total_videos': 0,
'last_success': None,
'failure_count': 0
}
user_stat = self.metrics['user_stats'][user_id]
user_stat['total_videos'] += video_count
user_stat['last_success'] = datetime.now()
self.logger.info(
f"成功获取用户 {user_id} 的 {video_count} 个视频。"
f"总计: {self.metrics['total_videos_collected']} 个视频"
)
else:
self.metrics['failed_requests'] += 1
if user_id not in self.metrics['user_stats']:
self.metrics['user_stats'][user_id] = {
'first_seen': datetime.now(),
'total_videos': 0,
'last_success': None,
'failure_count': 0
}
self.metrics['user_stats'][user_id]['failure_count'] += 1
self.logger.warning(f"获取用户 {user_id} 视频失败")
def log_rate_limit(self, user_id: str, retry_after: int) -> None:
"""
记录速率限制事件
Args:
user_id: 用户ID
retry_after: 重试等待时间(秒)
"""
self.metrics['rate_limit_events'] += 1
self.logger.warning(
f"用户 {user_id} 触发速率限制,{retry_after} 秒后重试"
)
def get_metrics_report(self) -> Dict[str, Any]:
"""
获取监控报告
Returns:
监控指标报告
"""
current_time = datetime.now()
runtime = current_time - self.metrics['start_time']
report = {
'runtime_seconds': runtime.total_seconds(),
'runtime_human': str(runtime),
'total_users_processed': self.metrics['total_users_processed'],
'total_videos_collected': self.metrics['total_videos_collected'],
'successful_requests': self.metrics['successful_requests'],
'failed_requests': self.metrics['failed_requests'],
'rate_limit_events': self.metrics['rate_limit_events'],
'success_rate': 0,
'last_activity': self.metrics['last_activity'].isoformat()
if self.metrics['last_activity'] else None,
'top_users_by_videos': [],
'users_with_most_failures': []
}
# 计算成功率
total_requests = self.metrics['successful_requests'] + self.metrics['failed_requests']
if total_requests > 0:
report['success_rate'] = (
self.metrics['successful_requests'] / total_requests * 100
)
# 获取视频最多的用户
user_video_counts = [
(user_id, stats['total_videos'])
for user_id, stats in self.metrics['user_stats'].items()
]
user_video_counts.sort(key=lambda x: x[1], reverse=True)
report['top_users_by_videos'] = user_video_counts[:10]
# 获取失败最多的用户
user_failure_counts = [
(user_id, stats['failure_count'])
for user_id, stats in self.metrics['user_stats'].items()
if stats['failure_count'] > 0
]
user_failure_counts.sort(key=lambda x: x[1], reverse=True)
report['users_with_most_failures'] = user_failure_counts[:10]
return report
def save_metrics_report(self, filename: str = "metrics_report.json") -> None:
"""
保存监控报告到文件
Args:
filename: 输出文件名
"""
report = self.get_metrics_report()
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2, default=str)
self.logger.info(f"监控报告已保存到 {filename}")
except Exception as e:
self.logger.error(f"保存监控报告失败: {e}")
def print_summary(self) -> None:
"""打印监控摘要"""
report = self.get_metrics_report()
print("\n" + "="*60)
print("TikTok采集监控摘要")
print("="*60)
print(f"运行时间: {report['runtime_human']}")
print(f"处理用户数: {report['total_users_processed']}")
print(f"采集视频数: {report['total_videos_collected']}")
print(f"成功请求: {report['successful_requests']}")
print(f"失败请求: {report['failed_requests']}")
print(f"成功率: {report['success_rate']:.2f}%")
print(f"速率限制事件: {report['rate_limit_events']}")
if report['top_users_by_videos']:
print("\n视频最多的用户:")
for user_id, count in report['top_users_by_videos'][:5]:
print(f" {user_id[:20]}...: {count} 个视频")
if report['users_with_most_failures']:
print("\n失败最多的用户:")
for user_id, failures in report['users_with_most_failures'][:5]:
print(f" {user_id[:20]}...: {failures} 次失败")
print("="*60)
```
## 5. 完整示例:端到端的数据采集系统
现在让我们把所有组件组合起来,创建一个完整的端到端数据采集系统:
```python
import argparse
import sys
from pathlib import Path
from typing import List, Dict, Any
import json
class TikTokDataCollector:
"""TikTok数据采集系统"""
def __init__(self, config_file: str = "config.json"):
"""
初始化数据采集系统
Args:
config_file: 配置文件路径
"""
self.config = self._load_config(config_file)
self.client = TikTokClientWithErrorHandling(
session_cookie=self.config.get('session_cookie'),
timeout=self.config.get('timeout', 30),
max_retries=self.config.get('max_retries', 3)
)
self.monitor = TikTokMonitor(
log_file=self.config.get('log_file', 'tiktok_collector.log')
)
self.exporter = TikTokDataExporter()
# 创建输出目录
self.output_dir = Path(self.config.get('output_dir', 'data'))
self.output_dir.mkdir(parents=True, exist_ok=True)
def _load_config(self, config_file: str) -> Dict[str, Any]:
"""加载配置文件"""
default_config = {
'session_cookie': None,
'timeout': 30,
'max_retries': 3,
'output_dir': 'data',
'log_file': 'tiktok_collector.log',
'max_videos_per_user': 100,
'batch_size': 5,
'request_delay': 1.0
}
try:
if Path(config_file).exists():
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# 合并配置
default_config.update(user_config)
except Exception as e:
print(f"加载配置文件失败,使用默认配置: {e}")
return default_config
def collect_user_videos(
self,
sec_user_id: str,
max_videos: Optional[int] = None
) -> List[TikTokVideo]:
"""
采集单个用户的视频
Args:
sec_user_id: 用户ID
max_videos: 最大视频数(None表示使用配置)
Returns:
视频列表
"""
if max_videos is None:
max_videos = self.config['max_videos_per_user']
try:
self.monitor.logger.info(f"开始采集用户 {sec_user_id} 的视频...")
videos = self.client.get_user_videos_with_retry(
sec_user_id=sec_user_id,
max_count=max_videos
)
success = len(videos) > 0
self.monitor.log_request(sec_user_id, success, len(videos))
if videos:
# 保存数据
self._save_user_data(sec_user_id, videos)
self.monitor.logger.info(f"用户 {sec_user_id} 采集完成,共 {len(videos)} 个视频")
else:
self.monitor.logger.warning(f"用户 {sec_user_id} 没有采集到视频")
return videos
except Exception as e:
self.monitor.log_request(sec_user_id, False, 0)
self.monitor.logger.error(f"采集用户 {sec_user_id} 失败: {e}")
return []
def collect_multiple_users(
self,
user_ids: List[str],
output_format: str = 'json'
) -> Dict[str, List[TikTokVideo]]:
"""
批量采集多个用户的视频
Args:
user_ids: 用户ID列表
output_format: 输出格式(json/csv/sqlite)
Returns:
用户ID到视频列表的映射
"""
results = {}
self.monitor.logger.info(f"开始批量采集 {len(user_ids)} 个用户...")
for i, user_id in enumerate(user_ids, 1):
self.monitor.logger.info(f"处理用户 {i}/{len(user_ids)}: {user_id}")
videos = self.collect_user_videos(user_id)
results[user_id] = videos
# 批量处理延迟
if i < len(user_ids):
delay = self.config.get('request_delay', 1.0)
time.sleep(delay + random.uniform(0, 0.5))
# 生成汇总报告
self._generate_summary_report(results, output_format)
return results
def _save_user_data(
self,
user_id: str,
videos: List[TikTokVideo]
) -> None:
"""保存用户数据"""
# 创建用户目录
user_dir = self.output_dir / user_id
user_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# 保存为JSON
json_file = user_dir / f"videos_{timestamp}.json"
self.exporter.export_to_json(videos, str(json_file))
# 保存为CSV
csv_file = user_dir / f"videos_{timestamp}.csv"
self.exporter.export_to_csv(videos, str(csv_file))
# 保存到SQLite(可选)
if self.config.get('use_sqlite', False):
db_file = self.output_dir / "tiktok_data.db"
self.exporter.export_to_sqlite(videos, str(db_file))
def _generate_summary_report(
self,
results: Dict[str, List[TikTokVideo]],
output_format: str
) -> None:
"""生成汇总报告"""
total_users = len(results)
total_videos = sum(len(videos) for videos in results.values())
users_with_videos = sum(1 for videos in results.values() if videos)
report = {
'collection_time': datetime.now().isoformat(),
'total_users': total_users,
'users_with_videos': users_with_videos,
'users_without_videos': total_users - users_with_videos,
'total_videos_collected': total_videos,
'average_videos_per_user': total_videos / users_with_videos if users_with_videos > 0 else 0,
'user_details': {}
}
for user_id, videos in results.items():
report['user_details'][user_id] = {
'video_count': len(videos),
'has_videos': len(videos) > 0,
'first_video_time': None,
'last_video_time': None
}
if videos:
# 按时间排序
sorted_videos = sorted(videos, key=lambda x: x.create_time)
report['user_details'][user_id]['first_video_time'] = datetime.fromtimestamp(
sorted_videos[0].create_time
).isoformat() if sorted_videos[0].create_time > 0 else None
report['user_details'][user_id]['last_video_time'] = datetime.fromtimestamp(
sorted_videos[-1].create_time
).isoformat() if sorted_videos[-1].create_time > 0 else None
# 保存报告
report_file = self.output_dir / f"collection_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
self.monitor.logger.info(f"汇总报告已保存到 {report_file}")
# 打印摘要
print(f"\n采集完成!")
print(f"处理用户数: {total_users}")
print(f"成功获取视频的用户: {users_with_videos}")
print(f"采集视频总数: {total_videos}")
if users_with_videos > 0:
print(f"平均每个用户视频数: {total_videos/users_with_videos:.1f}")
def run_from_file(self, input_file: str) -> None:
"""
从文件读取用户ID并运行采集
Args:
input_file: 输入文件路径(每行一个用户ID)
"""
try:
with open(input_file, 'r', encoding='utf-8') as f:
user_ids = [line.strip() for line in f if line.strip()]
if not user_ids:
self.monitor.logger.error("输入文件为空")
return
self.monitor.logger.info(f"从文件读取到 {len(user_ids)} 个用户ID")
# 分批处理
batch_size = self.config.get('batch_size', 5)
for i in range(0, len(user_ids), batch_size):
batch = user_ids[i:i + batch_size]
self.monitor.logger.info(f"处理批次 {i//batch_size + 1}/{(len(user_ids)-1)//batch_size + 1}")
self.collect_multiple_users(batch)
# 批次间延迟
if i + batch_size < len(user_ids):
batch_delay = self.config.get('batch_delay', 5.0)
time.sleep(batch_delay)
# 保存监控报告
self.monitor.save_metrics_report(str(self.output_dir / "metrics_report.json"))
self.monitor.print_summary()
except FileNotFoundError:
self.monitor.logger.error(f"输入文件不存在: {input_file}")
except Exception as e:
self.monitor.logger.error(f"运行失败: {e}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='TikTok用户视频采集系统')
parser.add_argument('--config', '-c', default='config.json', help='配置文件路径')
parser.add_argument('--input', '-i', help='用户ID列表文件路径')
parser.add_argument('--user', '-u', help='单个用户ID')
parser.add_argument('--max-videos', '-m', type=int, default=100, help='每个用户最大视频数')
parser.add_argument('--output-dir', '-o', default='data', help='输出目录')
args = parser.parse_args()
# 创建采集器
collector = TikTokDataCollector(args.config)
# 覆盖配置
if args.output_dir:
collector.output_dir = Path(args.output_dir)
collector.output_dir.mkdir(parents=True, exist_ok=True)
# 运行采集
if args.input:
collector.run_from_file(args.input)
elif args.user:
videos = collector.collect_user_videos(args.user, args.max_videos)
print(f"采集到 {len(videos)} 个视频")
else:
print("请指定输入文件(--input)或单个用户ID(--user)")
sys.exit(1)
if __name__ == "__main__":
main()
```
这个完整的系统提供了从配置加载、数据采集、错误处理、监控日志到数据导出的完整功能。你可以通过命令行参数来控制采集行为,也可以修改配置文件来调整各种参数。
## 6. 配置文件和运行示例
创建一个配置文件 `config.json`:
```json
{
"session_cookie": "你的TikTok会话Cookie(可选)",
"timeout": 30,
"max_retries": 3,
"output_dir": "./tiktok_data",
"log_file": "./logs/tiktok_collector.log",
"max_videos_per_user": 100,
"batch_size": 5,
"request_delay": 1.5,
"batch_delay": 10.0,
"use_sqlite": true
}
```
创建一个用户ID列表文件 `users.txt`:
```
MS4wLjABAAAAv7iBp6J3b9Qd5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5
MS4wLjABAAAAAnotherExampleUserIDHere1234567890
MS4wLjABAAAAYetAnotherExampleUserID9876543210
```
运行采集系统:
```bash
# 从文件批量采集
python tiktok_collector.py --input users.txt --config config.json
# 采集单个用户
python tiktok_collector.py --user MS4wLjABAAAAv7iBp6J3b9Qd5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5 --max-videos 50
# 指定输出目录
python tiktok_collector.py --input users.txt --output-dir ./my_tiktok_data
```
系统运行后,会在指定的输出目录中为每个用户创建子目录,保存JSON和CSV格式的数据,同时生成详细的采集报告和监控日志。
## 7. 实际应用中的注意事项与最佳实践
在真实的生产环境中使用这样的系统时,有几个关键点需要特别注意:
### 7.1 遵守平台规则与法律合规
**速率限制**:TikTok和其他社交媒体平台都有严格的速率限制。过于频繁的请求会导致IP被封禁。建议:
- 为每个请求添加随机延迟(1-3秒)
- 实现指数退避的重试机制
- 监控速率限制响应,动态调整请求频率
**数据使用条款**:仔细阅读TikTok的服务条款,确保你的数据使用方式符合规定。特别是:
- 不要大规模采集个人隐私数据
- 尊重用户的隐私设置
- 不要将数据用于垃圾邮件、骚扰或其他不当用途
### 7.2 数据存储与处理优化
对于大规模数据采集,考虑以下优化策略:
```python
# 使用数据库连接池
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_url: str):
self.engine = create_engine(
db_url,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=3600
)
self.Session = sessionmaker(bind=self.engine)
def batch_insert_videos(self, videos: List[TikTokVideo]):
"""批量插入视频数据"""
session = self.Session()
try:
# 使用批量插入
video_dicts = [video.to_dict() for video in videos]
# 使用SQLAlchemy Core进行批量插入,性能更好
from sqlalchemy import Table, MetaData
metadata = MetaData()
videos_table = Table('tiktok_videos', metadata, autoload_with=self.engine)
# 分批插入,每批100条
batch_size = 100
for i in range(0, len(video_dicts), batch_size):
batch = video_dicts[i:i + batch_size]
session.execute(videos_table.insert(), batch)
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
```
### 7.3 错误恢复与数据完整性
实现检查点机制,确保在程序崩溃或网络中断后能够恢复:
```python
import pickle
from pathlib import Path
class CheckpointManager:
"""检查点管理器"""
def __init__(self, checkpoint_dir: str = "checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def save_checkpoint(self, user_id: str, cursor: int, videos: List[TikTokVideo]):
"""保存检查点"""
checkpoint_data = {
'user_id': user_id,
'cursor': cursor,
'videos': [video.to_dict() for video in videos],
'timestamp': datetime.now().isoformat()
}
checkpoint_file = self.checkpoint_dir / f"{user_id}_checkpoint.pkl"
with open(checkpoint_file, 'wb') as f:
pickle.dump(checkpoint_data, f)
def load_checkpoint(self, user_id: str) -> Optional[Dict]:
"""加载检查点"""
checkpoint_file = self.checkpoint_dir / f"{user_id}_checkpoint.pkl"
if checkpoint_file.exists():
with open(checkpoint_file, 'rb') as f:
return pickle.load(f)
return None
def clear_checkpoint(self, user_id: str):
"""清除检查点"""
checkpoint_file = self.checkpoint_dir / f"{user_id}_checkpoint.pkl"
if checkpoint_file.exists():
checkpoint_file.unlink()
```
### 7.4 监控与告警
在生产环境中,需要设置监控和告警:
```python
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class AlertManager:
"""告警管理器"""
def __init__(self, config: Dict):
self.config = config
def send_alert(self, subject: str, message: str, level: str = "ERROR"):
"""发送告警"""
if level not in self.config.get('alert_levels', ['ERROR