如何用Python调用TikTok用户作品列表API(附完整代码示例)

# 从零构建Python驱动的TikTok用户作品数据采集与分析系统 如果你是一位对社交媒体数据感兴趣的开发者,或者正在构建一个需要整合TikTok内容的应用,那么直接通过API获取用户作品列表无疑是最直接、最高效的方式。但官方API的复杂性、访问限制以及文档的缺失,常常让开发者望而却步。这篇文章将带你深入探索如何利用Python构建一个健壮、可扩展的TikTok用户作品数据采集系统,而不仅仅是调用一个简单的接口。我们将从核心概念讲起,逐步深入到工程化实践,包括身份认证、分页处理、数据解析、错误重试、数据存储以及性能优化,最终形成一个可以直接集成到生产环境中的解决方案。 在开始之前,我们需要明确一个核心理念:**数据采集的合法性**。任何自动化工具的使用都必须严格遵守平台的服务条款和当地法律法规。本文旨在分享技术实现方案,用于合法的数据分析、研究或个人学习,请勿将其用于任何违反平台规则或侵犯用户隐私的用途。一个负责任的开发者,技术能力与法律意识同等重要。 ## 1. 理解TikTok数据接口的核心机制 在动手写代码之前,我们必须先理解TikTok数据接口的基本工作原理。与许多现代社交媒体平台一样,TikTok并未提供完全公开、文档化的REST API供普通开发者随意调用其核心数据。我们通常所说的“API调用”,实际上是通过分析其移动端或网页端的网络请求,模拟这些请求来获取数据。 ### 1.1 关键概念:`sec_user_id` 与用户身份标识 要获取特定用户的作品列表,首先需要找到该用户的唯一标识符。在TikTok的体系中,每个用户拥有多个ID,但最关键的是 `sec_user_id`。这是一个加密的用户标识符,比常见的用户名(`@username`)或数字UID更加稳定和安全。 **如何获取一个用户的 `sec_user_id`?** 最直接的方式是通过用户的个人主页URL。例如,一个典型的TikTok用户主页链接如下: ``` https://www.tiktok.com/@tiktok ``` 通过浏览器开发者工具(F12)查看网络请求,你会发现类似这样的请求: ``` GET /api/user/detail/?uniqueId=tiktok&... HTTP/1.1 ``` 在返回的JSON响应中,你可以找到 `userInfo.user.secUid` 字段,其值类似于: ``` MS4wLjABAAAAv7iBp6J3b9Qd5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5 ``` 这就是我们需要的 `sec_user_id`。它是一个Base64编码的字符串,通常以 `MS4wLjABAAAA` 开头。在实际项目中,我们可以通过以下方式自动化获取: ```python import re import requests def extract_sec_user_id_from_profile(profile_url): """ 从用户主页HTML中提取sec_user_id 注意:此方法依赖于页面结构,TikTok更新后可能失效 """ headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } try: response = requests.get(profile_url, headers=headers, timeout=10) response.raise_for_status() # 方法1:从JavaScript变量中提取 pattern = r'"secUid":"([^"]+)"' matches = re.findall(pattern, response.text) if matches: return matches[0] # 方法2:从数据属性中提取(备用方案) pattern2 = r'secUid[=:]\s*["\']([^"\']+)["\']' matches2 = re.findall(pattern2, response.text) if matches2: return matches2[0] except Exception as e: print(f"提取sec_user_id失败: {e}") return None # 使用示例 profile_url = "https://www.tiktok.com/@tiktok" sec_user_id = extract_sec_user_id_from_profile(profile_url) print(f"提取到的sec_user_id: {sec_user_id}") ``` > **注意**:网页结构可能随时变化,这种提取方法需要定期维护。对于生产环境,建议结合多种提取策略,并设置备用方案。 ### 1.2 接口请求参数解析 获取用户作品列表的核心接口通常需要以下几个关键参数: | 参数名 | 类型 | 说明 | 示例值 | |--------|------|------|--------| | `sec_user_id` | string | 用户安全ID,必需 | `MS4wLjABAAAAv7iBp6J3...` | | `count` | integer | 每次请求返回的作品数量 | `20` | | `max_cursor` | integer | 分页游标,第一次请求为0 | `0` | | `min_cursor` | integer | 最小游标,通常为0 | `0` | | `aid` | integer | 应用ID,通常为固定值 | `1988` | | `cookie` | string | 用户会话Cookie,用于认证 | `tt_chain_token=...` | 在实际请求中,你可能会遇到更复杂的参数,包括时间戳、签名等。这些参数是为了防止未授权的访问和确保请求的合法性。 ### 1.3 响应数据结构深度解析 TikTok的作品列表接口返回的JSON数据结构非常丰富,包含了作品的完整元数据。以下是一个简化的结构概览: ```json { "status_code": 0, "has_more": true, "max_cursor": "1620000000000", "min_cursor": "0", "aweme_list": [ { "aweme_id": "1234567890123456789", "desc": "视频描述文本", "create_time": 1620000000, "author": { "uid": "123456789", "unique_id": "username", "nickname": "用户昵称", "signature": "个性签名" }, "video": { "play_addr": { "url_list": ["视频播放地址1", "视频播放地址2"] }, "cover": { "url_list": ["封面图地址1", "封面图地址2"] }, "duration": 15000, "width": 720, "height": 1280 }, "statistics": { "digg_count": 1000, "comment_count": 200, "share_count": 300, "play_count": 50000 }, "music": { "id": "music_123", "title": "音乐标题", "author": "音乐作者", "play_url": { "url_list": ["音乐播放地址"] } } } ], "extra": { "now": 1620000000000, "logid": "20211111111111111111111111111111" } } ``` 理解这个数据结构对于后续的数据处理至关重要。每个字段都包含了特定的信息,我们可以根据需求提取相应的数据。 ## 2. 构建健壮的Python数据采集客户端 现在让我们进入实战环节,构建一个完整的Python客户端。我们将采用面向对象的设计,创建一个可重用、可扩展的 `TikTokAPI` 类。 ### 2.1 基础客户端实现 首先,我们需要设置一个基础客户端,处理HTTP请求、错误处理和基础配置: ```python import requests import time import json import logging from typing import Dict, List, Optional, Any, Union from dataclasses import dataclass from datetime import datetime import hashlib import random # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class TikTokVideo: """视频数据模型""" aweme_id: str desc: str create_time: int author_id: str author_name: str video_url: str cover_url: str duration: int width: int height: int digg_count: int comment_count: int share_count: int play_count: int music_id: str music_title: str music_author: str music_url: str collected_at: datetime = None def __post_init__(self): if self.collected_at is None: self.collected_at = datetime.now() def to_dict(self) -> Dict[str, Any]: """转换为字典格式""" return { 'aweme_id': self.aweme_id, 'desc': self.desc, 'create_time': self.create_time, 'create_time_iso': datetime.fromtimestamp(self.create_time).isoformat(), 'author_id': self.author_id, 'author_name': self.author_name, 'video_url': self.video_url, 'cover_url': self.cover_url, 'duration': self.duration, 'width': self.width, 'height': self.height, 'digg_count': self.dig_count, 'comment_count': self.comment_count, 'share_count': self.share_count, 'play_count': self.play_count, 'music_id': self.music_id, 'music_title': self.music_title, 'music_author': self.music_author, 'music_url': self.music_url, 'collected_at': self.collected_at.isoformat() } class TikTokAPIClient: """TikTok API客户端""" def __init__( self, session_cookie: Optional[str] = None, user_agent: Optional[str] = None, timeout: int = 30, max_retries: int = 3, retry_delay: float = 1.0 ): """ 初始化TikTok API客户端 Args: session_cookie: 可选的会话Cookie user_agent: 自定义User-Agent timeout: 请求超时时间(秒) max_retries: 最大重试次数 retry_delay: 重试延迟(秒) """ self.session = requests.Session() self.timeout = timeout self.max_retries = max_retries self.retry_delay = retry_delay # 设置默认请求头 self.headers = { 'User-Agent': user_agent or ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/120.0.0.0 Safari/537.36' ), 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Referer': 'https://www.tiktok.com/', 'Connection': 'keep-alive', } if session_cookie: self.headers['Cookie'] = session_cookie def _make_request( self, method: str, url: str, params: Optional[Dict] = None, data: Optional[Dict] = None, json_data: Optional[Dict] = None, **kwargs ) -> Optional[Dict]: """ 发送HTTP请求,包含重试机制 Args: method: HTTP方法(GET/POST等) url: 请求URL params: URL参数 data: 表单数据 json_data: JSON数据 **kwargs: 其他requests参数 Returns: 解析后的JSON响应或None """ for attempt in range(self.max_retries): try: response = self.session.request( method=method, url=url, params=params, data=data, json=json_data, headers=self.headers, timeout=self.timeout, **kwargs ) response.raise_for_status() # 尝试解析JSON try: return response.json() except json.JSONDecodeError as e: logger.error(f"JSON解析失败: {e}, 响应内容: {response.text[:200]}") return None except requests.exceptions.RequestException as e: logger.warning(f"请求失败 (尝试 {attempt + 1}/{self.max_retries}): {e}") if attempt < self.max_retries - 1: sleep_time = self.retry_delay * (2 ** attempt) + random.uniform(0, 0.5) logger.info(f"等待 {sleep_time:.2f} 秒后重试...") time.sleep(sleep_time) else: logger.error(f"请求失败,已达到最大重试次数: {e}") raise return None ``` 这个基础客户端提供了请求重试、错误处理和基本的配置管理。接下来,我们需要实现具体的作品列表获取功能。 ### 2.2 实现作品列表获取功能 现在让我们实现核心的作品列表获取方法。这里的关键是正确处理分页逻辑和参数构造: ```python class TikTokAPIClient(TikTokAPIClient): """扩展TikTok API客户端,添加作品列表功能""" def get_user_videos( self, sec_user_id: str, max_count: int = 100, max_cursor: int = 0, count_per_request: int = 20 ) -> List[TikTokVideo]: """ 获取用户的所有视频作品 Args: sec_user_id: 用户安全ID max_count: 最大获取数量(0表示无限制) max_cursor: 起始游标 count_per_request: 每次请求获取的数量 Returns: 视频对象列表 """ videos = [] current_cursor = max_cursor has_more = True request_count = 0 # 计算最大请求次数(防止无限循环) max_requests = (max_count // count_per_request) + 2 if max_count > 0 else 100 while has_more and request_count < max_requests: try: logger.info(f"获取用户 {sec_user_id} 的视频,游标: {current_cursor}") # 构建请求参数 params = { 'sec_user_id': sec_user_id, 'count': count_per_request, 'max_cursor': current_cursor, 'min_cursor': 0, 'aid': 1988, # TikTok的App ID 'cookie_enabled': 'true', 'screen_width': 1920, 'screen_height': 1080, 'browser_language': 'en-US', 'browser_platform': 'Win32', 'browser_name': 'Mozilla', 'browser_version': '5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'browser_online': 'true', 'timezone_name': 'America/New_York', 'priority_region': '', 'verifyFp': self._generate_verify_fp(), 'app_name': 'tiktok_web', 'app_language': 'en', 'webcast_language': 'en', 'tz_name': 'America/New_York', 'is_page_visible': 'true', 'focus_state': 'true', 'is_fullscreen': 'false', 'history_len': 4, 'battery_info': '1', 'from_page': 'user', 'device_id': self._generate_device_id(), 'language': 'en', 'priority_region': '', 'rotate_count': 0, 'screen_orientation': 'portrait', 'webid': self._generate_webid(), 'msToken': self._generate_ms_token(), 'X-Bogus': self._generate_x_bogus(params), '_signature': self._generate_signature(params) } # 发送请求 response = self._make_request( 'GET', 'https://www.tiktok.com/api/post/item_list/', params=params ) if not response: logger.error(f"获取用户 {sec_user_id} 视频失败") break # 检查响应状态 if response.get('status_code') != 0: logger.error(f"API返回错误: {response.get('status_msg', 'Unknown error')}") break # 解析视频数据 aweme_list = response.get('aweme_list', []) for item in aweme_list: try: video = self._parse_video_item(item) if video: videos.append(video) except Exception as e: logger.error(f"解析视频数据失败: {e}, 原始数据: {item.get('aweme_id', 'unknown')}") # 更新分页状态 has_more = response.get('has_more', False) current_cursor = response.get('max_cursor', current_cursor + count_per_request) request_count += 1 # 检查是否达到最大数量限制 if max_count > 0 and len(videos) >= max_count: videos = videos[:max_count] break # 添加延迟,避免请求过于频繁 time.sleep(random.uniform(1.0, 2.5)) except Exception as e: logger.error(f"获取视频过程中发生错误: {e}") break logger.info(f"成功获取 {len(videos)} 个视频") return videos def _parse_video_item(self, item: Dict) -> Optional[TikTokVideo]: """解析单个视频项""" try: # 提取基本信息 aweme_id = item.get('aweme_id', '') desc = item.get('desc', '') create_time = item.get('create_time', 0) # 提取作者信息 author = item.get('author', {}) author_id = author.get('uid', '') author_name = author.get('nickname', '') # 提取视频信息 video_info = item.get('video', {}) play_addr = video_info.get('play_addr', {}) video_urls = play_addr.get('url_list', []) video_url = video_urls[0] if video_urls else '' cover_info = video_info.get('cover', {}) cover_urls = cover_info.get('url_list', []) cover_url = cover_urls[0] if cover_urls else '' duration = video_info.get('duration', 0) width = video_info.get('width', 0) height = video_info.get('height', 0) # 提取统计信息 stats = item.get('statistics', {}) digg_count = stats.get('digg_count', 0) comment_count = stats.get('comment_count', 0) share_count = stats.get('share_count', 0) play_count = stats.get('play_count', 0) # 提取音乐信息 music_info = item.get('music', {}) music_id = music_info.get('id', '') music_title = music_info.get('title', '') music_author = music_info.get('author', '') music_play_url = music_info.get('play_url', {}) music_urls = music_play_url.get('url_list', []) music_url = music_urls[0] if music_urls else '' return TikTokVideo( aweme_id=aweme_id, desc=desc, create_time=create_time, author_id=author_id, author_name=author_name, video_url=video_url, cover_url=cover_url, duration=duration, width=width, height=height, digg_count=digg_count, comment_count=comment_count, share_count=share_count, play_count=play_count, music_id=music_id, music_title=music_title, music_author=music_author, music_url=music_url ) except Exception as e: logger.error(f"解析视频项失败: {e}") return None def _generate_verify_fp(self) -> str: """生成verifyFp参数(简化版)""" # 实际实现需要更复杂的算法 import uuid return f"verify_{uuid.uuid4().hex[:16]}" def _generate_device_id(self) -> str: """生成设备ID""" import uuid return str(uuid.uuid4().hex[:16]).upper() def _generate_webid(self) -> str: """生成webid""" import uuid return str(uuid.uuid4().hex[:19]) def _generate_ms_token(self) -> str: """生成msToken参数""" import random import string length = 107 chars = string.ascii_letters + string.digits + '-_' return ''.join(random.choice(chars) for _ in range(length)) def _generate_x_bogus(self, params: Dict) -> str: """生成X-Bogus签名(简化版)""" # 注意:实际实现需要逆向TikTok的X-Bogus生成算法 # 这里返回一个模拟值,实际使用可能需要更复杂的实现 return "DFSzswVY8ANANtHT6cWXyQqXgQq1" def _generate_signature(self, params: Dict) -> str: """生成_signature参数(简化版)""" # 实际实现需要逆向TikTok的签名算法 import hashlib import time param_str = '&'.join([f'{k}={v}' for k, v in sorted(params.items())]) timestamp = str(int(time.time() * 1000)) to_sign = f"{param_str}&{timestamp}" return hashlib.md5(to_sign.encode()).hexdigest()[:32] ``` > **重要提示**:上面的 `_generate_x_bogus` 和 `_generate_signature` 方法是简化版本。在实际生产环境中,这些参数的生成算法可能非常复杂且会定期更新。你可能需要研究最新的逆向工程成果或使用维护良好的第三方库。 ### 2.3 添加数据导出和存储功能 获取到数据后,我们通常需要将其保存到文件或数据库中。让我们添加一些实用的导出功能: ```python import csv import json from pathlib import Path from typing import List class TikTokDataExporter: """TikTok数据导出器""" @staticmethod def export_to_json(videos: List[TikTokVideo], filename: str) -> bool: """ 将视频数据导出为JSON文件 Args: videos: 视频对象列表 filename: 输出文件名 Returns: 是否成功 """ try: data = [video.to_dict() for video in videos] with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"成功导出 {len(videos)} 个视频到 {filename}") return True except Exception as e: logger.error(f"导出JSON失败: {e}") return False @staticmethod def export_to_csv(videos: List[TikTokVideo], filename: str) -> bool: """ 将视频数据导出为CSV文件 Args: videos: 视频对象列表 filename: 输出文件名 Returns: 是否成功 """ try: if not videos: logger.warning("没有视频数据可导出") return False # 获取所有字段 first_video = videos[0].to_dict() fieldnames = list(first_video.keys()) with open(filename, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for video in videos: writer.writerow(video.to_dict()) logger.info(f"成功导出 {len(videos)} 个视频到 {filename}") return True except Exception as e: logger.error(f"导出CSV失败: {e}") return False @staticmethod def export_to_sqlite(videos: List[TikTokVideo], db_path: str) -> bool: """ 将视频数据导出到SQLite数据库 Args: videos: 视频对象列表 db_path: 数据库文件路径 Returns: 是否成功 """ try: import sqlite3 from datetime import datetime # 创建数据库连接 conn = sqlite3.connect(db_path) cursor = conn.cursor() # 创建表 cursor.execute(''' CREATE TABLE IF NOT EXISTS tiktok_videos ( id INTEGER PRIMARY KEY AUTOINCREMENT, aweme_id TEXT UNIQUE, desc TEXT, create_time INTEGER, create_time_iso TEXT, author_id TEXT, author_name TEXT, video_url TEXT, cover_url TEXT, duration INTEGER, width INTEGER, height INTEGER, digg_count INTEGER, comment_count INTEGER, share_count INTEGER, play_count INTEGER, music_id TEXT, music_title TEXT, music_author TEXT, music_url TEXT, collected_at TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_aweme_id ON tiktok_videos(aweme_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_author_id ON tiktok_videos(author_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_create_time ON tiktok_videos(create_time)') # 插入数据 inserted_count = 0 for video in videos: video_dict = video.to_dict() # 移除id字段(自增) if 'id' in video_dict: del video_dict['id'] # 构建插入语句 columns = ', '.join(video_dict.keys()) placeholders = ', '.join(['?' for _ in video_dict]) values = list(video_dict.values()) try: cursor.execute( f'INSERT OR REPLACE INTO tiktok_videos ({columns}) VALUES ({placeholders})', values ) inserted_count += 1 except sqlite3.IntegrityError: # 重复数据,跳过 pass conn.commit() conn.close() logger.info(f"成功插入/更新 {inserted_count} 个视频到数据库 {db_path}") return True except ImportError: logger.error("SQLite支持不可用,请确保已安装sqlite3") return False except Exception as e: logger.error(f"导出到SQLite失败: {e}") return False ``` ## 3. 高级功能:并发采集与性能优化 当需要采集大量用户的数据时,单线程的方式会非常慢。我们可以使用并发编程来显著提高采集效率。 ### 3.1 使用异步IO进行并发采集 ```python import asyncio import aiohttp import async_timeout from typing import List, Dict, Any import logging logger = logging.getLogger(__name__) class AsyncTikTokClient: """异步TikTok客户端""" def __init__( self, session_cookie: Optional[str] = None, max_concurrent: int = 5, request_timeout: int = 30 ): """ 初始化异步客户端 Args: session_cookie: 会话Cookie max_concurrent: 最大并发数 request_timeout: 请求超时时间 """ self.session_cookie = session_cookie self.max_concurrent = max_concurrent self.request_timeout = request_timeout self.semaphore = asyncio.Semaphore(max_concurrent) # 请求头 self.headers = { 'User-Agent': ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/120.0.0.0 Safari/537.36' ), 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.9', 'Referer': 'https://www.tiktok.com/', } if session_cookie: self.headers['Cookie'] = session_cookie async def fetch_user_videos( self, sec_user_ids: List[str], max_videos_per_user: int = 50 ) -> Dict[str, List[Dict]]: """ 并发获取多个用户的视频 Args: sec_user_ids: 用户ID列表 max_videos_per_user: 每个用户最大视频数 Returns: 用户ID到视频列表的映射 """ tasks = [] for sec_user_id in sec_user_ids: task = self._fetch_single_user_videos(sec_user_id, max_videos_per_user) tasks.append(task) # 并发执行所有任务 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 user_videos = {} for sec_user_id, result in zip(sec_user_ids, results): if isinstance(result, Exception): logger.error(f"获取用户 {sec_user_id} 视频失败: {result}") user_videos[sec_user_id] = [] else: user_videos[sec_user_id] = result return user_videos async def _fetch_single_user_videos( self, sec_user_id: str, max_videos: int ) -> List[Dict]: """ 获取单个用户的视频 Args: sec_user_id: 用户ID max_videos: 最大视频数 Returns: 视频数据列表 """ videos = [] has_more = True max_cursor = 0 count_per_request = 20 async with aiohttp.ClientSession(headers=self.headers) as session: while has_more and len(videos) < max_videos: try: async with self.semaphore: # 构建请求参数 params = self._build_request_params(sec_user_id, max_cursor, count_per_request) async with async_timeout.timeout(self.request_timeout): async with session.get( 'https://www.tiktok.com/api/post/item_list/', params=params ) as response: if response.status != 200: logger.warning(f"请求失败: {response.status}") break data = await response.json() if data.get('status_code') != 0: logger.warning(f"API错误: {data.get('status_msg')}") break # 解析视频 aweme_list = data.get('aweme_list', []) for item in aweme_list: parsed = self._parse_video_item(item) if parsed: videos.append(parsed) # 更新分页状态 has_more = data.get('has_more', False) max_cursor = data.get('max_cursor', max_cursor + count_per_request) # 随机延迟,避免请求过快 await asyncio.sleep(1.0 + random.random()) except asyncio.TimeoutError: logger.warning(f"请求超时: {sec_user_id}") break except Exception as e: logger.error(f"获取用户 {sec_user_id} 视频失败: {e}") break logger.info(f"用户 {sec_user_id} 获取到 {len(videos)} 个视频") return videos def _build_request_params(self, sec_user_id: str, max_cursor: int, count: int) -> Dict: """构建请求参数""" # 这里需要实现实际的参数构建逻辑 # 包括生成必要的签名参数 return { 'sec_user_id': sec_user_id, 'count': count, 'max_cursor': max_cursor, 'min_cursor': 0, 'aid': 1988, # ... 其他必要参数 } def _parse_video_item(self, item: Dict) -> Optional[Dict]: """解析视频项""" try: return { 'aweme_id': item.get('aweme_id'), 'desc': item.get('desc', ''), 'create_time': item.get('create_time', 0), 'author': item.get('author', {}).get('nickname', ''), 'video_url': item.get('video', {}).get('play_addr', {}).get('url_list', [''])[0], 'stats': item.get('statistics', {}) } except Exception as e: logger.error(f"解析视频项失败: {e}") return None # 使用示例 async def main(): # 用户ID列表 user_ids = [ "MS4wLjABAAAAv7iBp6J3b9Qd5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5", # 添加更多用户ID ] client = AsyncTikTokClient() results = await client.fetch_user_videos(user_ids, max_videos_per_user=30) # 处理结果 for user_id, videos in results.items(): print(f"用户 {user_id}: {len(videos)} 个视频") TikTokDataExporter.export_to_json(videos, f"{user_id}_videos.json") # 运行异步主函数 if __name__ == "__main__": asyncio.run(main()) ``` ### 3.2 使用线程池进行并发处理 如果你更熟悉传统的多线程编程,也可以使用 `concurrent.futures` 来实现并发: ```python from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Tuple import time class ConcurrentTikTokClient: """并发TikTok客户端(使用线程池)""" def __init__( self, max_workers: int = 5, request_timeout: int = 30 ): """ 初始化并发客户端 Args: max_workers: 最大工作线程数 request_timeout: 请求超时时间 """ self.max_workers = max_workers self.request_timeout = request_timeout self.client = TikTokAPIClient() # 使用之前实现的同步客户端 def fetch_multiple_users_videos( self, user_data: List[Tuple[str, int]], output_dir: str = "output" ) -> Dict[str, List[TikTokVideo]]: """ 并发获取多个用户的视频 Args: user_data: 用户数据列表,每个元素是(sec_user_id, max_videos)元组 output_dir: 输出目录 Returns: 用户ID到视频列表的映射 """ from pathlib import Path # 创建输出目录 Path(output_dir).mkdir(parents=True, exist_ok=True) results = {} with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # 提交所有任务 future_to_user = { executor.submit(self._fetch_user_videos_task, sec_user_id, max_videos, output_dir): (sec_user_id, max_videos) for sec_user_id, max_videos in user_data } # 处理完成的任务 for future in as_completed(future_to_user): sec_user_id, max_videos = future_to_user[future] try: videos = future.result(timeout=self.request_timeout + 10) results[sec_user_id] = videos logger.info(f"用户 {sec_user_id} 完成,获取 {len(videos)} 个视频") except Exception as e: logger.error(f"用户 {sec_user_id} 任务失败: {e}") results[sec_user_id] = [] return results def _fetch_user_videos_task( self, sec_user_id: str, max_videos: int, output_dir: str ) -> List[TikTokVideo]: """ 单个用户的视频获取任务 Args: sec_user_id: 用户ID max_videos: 最大视频数 output_dir: 输出目录 Returns: 视频列表 """ try: # 获取视频 videos = self.client.get_user_videos( sec_user_id=sec_user_id, max_count=max_videos ) # 保存到文件 if videos: output_file = Path(output_dir) / f"{sec_user_id}_videos.json" TikTokDataExporter.export_to_json(videos, str(output_file)) return videos except Exception as e: logger.error(f"获取用户 {sec_user_id} 视频失败: {e}") return [] def batch_process_users_from_file( self, input_file: str, output_dir: str = "output", max_videos_per_user: int = 50 ) -> Dict[str, List[TikTokVideo]]: """ 从文件批量处理用户 Args: input_file: 输入文件(每行一个用户ID) output_dir: 输出目录 max_videos_per_user: 每个用户最大视频数 Returns: 处理结果 """ try: with open(input_file, 'r', encoding='utf-8') as f: user_ids = [line.strip() for line in f if line.strip()] user_data = [(user_id, max_videos_per_user) for user_id in user_ids] return self.fetch_multiple_users_videos(user_data, output_dir) except FileNotFoundError: logger.error(f"输入文件不存在: {input_file}") return {} except Exception as e: logger.error(f"批量处理失败: {e}") return {} ``` ## 4. 错误处理与监控 在生产环境中,健壮的错误处理和监控是必不可少的。让我们为我们的采集系统添加这些功能。 ### 4.1 实现完善的错误处理 ```python from enum import Enum from typing import Optional, Dict, Any import traceback from datetime import datetime, timedelta class TikTokErrorCode(Enum): """TikTok API错误代码枚举""" SUCCESS = 0 RATE_LIMITED = 10202 USER_NOT_FOUND = 10210 VIDEO_NOT_FOUND = 10211 PRIVATE_ACCOUNT = 10212 SESSION_EXPIRED = 10214 NETWORK_ERROR = 10000 PARSE_ERROR = 10001 UNKNOWN_ERROR = 99999 class TikTokAPIError(Exception): """TikTok API异常基类""" def __init__( self, code: TikTokErrorCode, message: str, details: Optional[Dict] = None ): self.code = code self.message = message self.details = details or {} super().__init__(f"[{code.name}] {message}") class RateLimitError(TikTokAPIError): """速率限制异常""" pass class AuthenticationError(TikTokAPIError): """认证异常""" pass class TikTokClientWithErrorHandling(TikTokAPIClient): """带有完善错误处理的TikTok客户端""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.error_stats = { 'total_requests': 0, 'successful_requests': 0, 'failed_requests': 0, 'rate_limit_hits': 0, 'auth_errors': 0, 'network_errors': 0, 'last_error': None, 'last_error_time': None } self.rate_limit_reset = None def get_user_videos_with_retry( self, sec_user_id: str, max_count: int = 100, max_retries: int = 3 ) -> List[TikTokVideo]: """ 带重试机制的用户视频获取 Args: sec_user_id: 用户ID max_count: 最大视频数 max_retries: 最大重试次数 Returns: 视频列表 """ self.error_stats['total_requests'] += 1 for attempt in range(max_retries): try: # 检查速率限制 if self.rate_limit_reset and datetime.now() < self.rate_limit_reset: wait_seconds = (self.rate_limit_reset - datetime.now()).total_seconds() logger.warning(f"速率限制中,等待 {wait_seconds:.1f} 秒") time.sleep(wait_seconds + 1) videos = self.get_user_videos(sec_user_id, max_count) self.error_stats['successful_requests'] += 1 return videos except RateLimitError as e: self.error_stats['rate_limit_hits'] += 1 self.error_stats['failed_requests'] += 1 # 解析重试时间 retry_after = e.details.get('retry_after', 60) self.rate_limit_reset = datetime.now() + timedelta(seconds=retry_after) if attempt < max_retries - 1: wait_time = retry_after + random.uniform(5, 15) logger.warning(f"速率限制,等待 {wait_time:.1f} 秒后重试 (尝试 {attempt + 1}/{max_retries})") time.sleep(wait_time) else: logger.error(f"达到最大重试次数,放弃获取用户 {sec_user_id} 的视频") raise except AuthenticationError as e: self.error_stats['auth_errors'] += 1 self.error_stats['failed_requests'] += 1 self.error_stats['last_error'] = str(e) self.error_stats['last_error_time'] = datetime.now() logger.error(f"认证失败: {e}") raise except Exception as e: self.error_stats['failed_requests'] += 1 self.error_stats['last_error'] = str(e) self.error_stats['last_error_time'] = datetime.now() if attempt < max_retries - 1: wait_time = 2 ** attempt + random.uniform(1, 3) logger.warning(f"请求失败,{wait_time:.1f} 秒后重试 (尝试 {attempt + 1}/{max_retries}): {e}") time.sleep(wait_time) else: logger.error(f"达到最大重试次数,最终失败: {e}") raise return [] def _handle_api_response(self, response: Dict) -> None: """ 处理API响应,检查错误 Args: response: API响应数据 Raises: TikTokAPIError: 如果响应包含错误 """ status_code = response.get('status_code', 0) if status_code == TikTokErrorCode.SUCCESS.value: return error_msg = response.get('status_msg', 'Unknown error') if status_code == TikTokErrorCode.RATE_LIMITED.value: raise RateLimitError( TikTokErrorCode.RATE_LIMITED, f"Rate limited: {error_msg}", {'retry_after': 60} # 默认60秒后重试 ) elif status_code == TikTokErrorCode.USER_NOT_FOUND.value: raise TikTokAPIError( TikTokErrorCode.USER_NOT_FOUND, f"User not found: {error_msg}" ) elif status_code == TikTokErrorCode.SESSION_EXPIRED.value: raise AuthenticationError( TikTokErrorCode.SESSION_EXPIRED, f"Session expired: {error_msg}" ) else: raise TikTokAPIError( TikTokErrorCode.UNKNOWN_ERROR, f"API error {status_code}: {error_msg}", {'raw_response': response} ) def get_error_stats(self) -> Dict[str, Any]: """获取错误统计信息""" success_rate = 0 if self.error_stats['total_requests'] > 0: success_rate = (self.error_stats['successful_requests'] / self.error_stats['total_requests']) * 100 return { **self.error_stats, 'success_rate': f"{success_rate:.2f}%", 'failure_rate': f"{100 - success_rate:.2f}%" } def reset_error_stats(self) -> None: """重置错误统计""" self.error_stats = { 'total_requests': 0, 'successful_requests': 0, 'failed_requests': 0, 'rate_limit_hits': 0, 'auth_errors': 0, 'network_errors': 0, 'last_error': None, 'last_error_time': None } ``` ### 4.2 添加监控和日志记录 ```python import logging from logging.handlers import RotatingFileHandler import json from datetime import datetime from typing import Dict, Any class TikTokMonitor: """TikTok采集监控器""" def __init__(self, log_file: str = "tiktok_monitor.log"): """ 初始化监控器 Args: log_file: 日志文件路径 """ self.logger = logging.getLogger('TikTokMonitor') self.logger.setLevel(logging.INFO) # 避免重复添加handler if not self.logger.handlers: # 文件处理器(按大小轮转) file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, # 10MB backupCount=5, encoding='utf-8' ) file_handler.setLevel(logging.INFO) file_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(file_formatter) self.logger.addHandler(file_handler) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' ) console_handler.setFormatter(console_formatter) self.logger.addHandler(console_handler) self.metrics = { 'start_time': datetime.now(), 'total_users_processed': 0, 'total_videos_collected': 0, 'successful_requests': 0, 'failed_requests': 0, 'rate_limit_events': 0, 'last_activity': None, 'user_stats': {} } def log_request(self, user_id: str, success: bool, video_count: int = 0) -> None: """ 记录请求日志 Args: user_id: 用户ID success: 是否成功 video_count: 获取的视频数量 """ self.metrics['total_users_processed'] += 1 self.metrics['last_activity'] = datetime.now() if success: self.metrics['successful_requests'] += 1 self.metrics['total_videos_collected'] += video_count # 更新用户统计 if user_id not in self.metrics['user_stats']: self.metrics['user_stats'][user_id] = { 'first_seen': datetime.now(), 'total_videos': 0, 'last_success': None, 'failure_count': 0 } user_stat = self.metrics['user_stats'][user_id] user_stat['total_videos'] += video_count user_stat['last_success'] = datetime.now() self.logger.info( f"成功获取用户 {user_id} 的 {video_count} 个视频。" f"总计: {self.metrics['total_videos_collected']} 个视频" ) else: self.metrics['failed_requests'] += 1 if user_id not in self.metrics['user_stats']: self.metrics['user_stats'][user_id] = { 'first_seen': datetime.now(), 'total_videos': 0, 'last_success': None, 'failure_count': 0 } self.metrics['user_stats'][user_id]['failure_count'] += 1 self.logger.warning(f"获取用户 {user_id} 视频失败") def log_rate_limit(self, user_id: str, retry_after: int) -> None: """ 记录速率限制事件 Args: user_id: 用户ID retry_after: 重试等待时间(秒) """ self.metrics['rate_limit_events'] += 1 self.logger.warning( f"用户 {user_id} 触发速率限制,{retry_after} 秒后重试" ) def get_metrics_report(self) -> Dict[str, Any]: """ 获取监控报告 Returns: 监控指标报告 """ current_time = datetime.now() runtime = current_time - self.metrics['start_time'] report = { 'runtime_seconds': runtime.total_seconds(), 'runtime_human': str(runtime), 'total_users_processed': self.metrics['total_users_processed'], 'total_videos_collected': self.metrics['total_videos_collected'], 'successful_requests': self.metrics['successful_requests'], 'failed_requests': self.metrics['failed_requests'], 'rate_limit_events': self.metrics['rate_limit_events'], 'success_rate': 0, 'last_activity': self.metrics['last_activity'].isoformat() if self.metrics['last_activity'] else None, 'top_users_by_videos': [], 'users_with_most_failures': [] } # 计算成功率 total_requests = self.metrics['successful_requests'] + self.metrics['failed_requests'] if total_requests > 0: report['success_rate'] = ( self.metrics['successful_requests'] / total_requests * 100 ) # 获取视频最多的用户 user_video_counts = [ (user_id, stats['total_videos']) for user_id, stats in self.metrics['user_stats'].items() ] user_video_counts.sort(key=lambda x: x[1], reverse=True) report['top_users_by_videos'] = user_video_counts[:10] # 获取失败最多的用户 user_failure_counts = [ (user_id, stats['failure_count']) for user_id, stats in self.metrics['user_stats'].items() if stats['failure_count'] > 0 ] user_failure_counts.sort(key=lambda x: x[1], reverse=True) report['users_with_most_failures'] = user_failure_counts[:10] return report def save_metrics_report(self, filename: str = "metrics_report.json") -> None: """ 保存监控报告到文件 Args: filename: 输出文件名 """ report = self.get_metrics_report() try: with open(filename, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2, default=str) self.logger.info(f"监控报告已保存到 {filename}") except Exception as e: self.logger.error(f"保存监控报告失败: {e}") def print_summary(self) -> None: """打印监控摘要""" report = self.get_metrics_report() print("\n" + "="*60) print("TikTok采集监控摘要") print("="*60) print(f"运行时间: {report['runtime_human']}") print(f"处理用户数: {report['total_users_processed']}") print(f"采集视频数: {report['total_videos_collected']}") print(f"成功请求: {report['successful_requests']}") print(f"失败请求: {report['failed_requests']}") print(f"成功率: {report['success_rate']:.2f}%") print(f"速率限制事件: {report['rate_limit_events']}") if report['top_users_by_videos']: print("\n视频最多的用户:") for user_id, count in report['top_users_by_videos'][:5]: print(f" {user_id[:20]}...: {count} 个视频") if report['users_with_most_failures']: print("\n失败最多的用户:") for user_id, failures in report['users_with_most_failures'][:5]: print(f" {user_id[:20]}...: {failures} 次失败") print("="*60) ``` ## 5. 完整示例:端到端的数据采集系统 现在让我们把所有组件组合起来,创建一个完整的端到端数据采集系统: ```python import argparse import sys from pathlib import Path from typing import List, Dict, Any import json class TikTokDataCollector: """TikTok数据采集系统""" def __init__(self, config_file: str = "config.json"): """ 初始化数据采集系统 Args: config_file: 配置文件路径 """ self.config = self._load_config(config_file) self.client = TikTokClientWithErrorHandling( session_cookie=self.config.get('session_cookie'), timeout=self.config.get('timeout', 30), max_retries=self.config.get('max_retries', 3) ) self.monitor = TikTokMonitor( log_file=self.config.get('log_file', 'tiktok_collector.log') ) self.exporter = TikTokDataExporter() # 创建输出目录 self.output_dir = Path(self.config.get('output_dir', 'data')) self.output_dir.mkdir(parents=True, exist_ok=True) def _load_config(self, config_file: str) -> Dict[str, Any]: """加载配置文件""" default_config = { 'session_cookie': None, 'timeout': 30, 'max_retries': 3, 'output_dir': 'data', 'log_file': 'tiktok_collector.log', 'max_videos_per_user': 100, 'batch_size': 5, 'request_delay': 1.0 } try: if Path(config_file).exists(): with open(config_file, 'r', encoding='utf-8') as f: user_config = json.load(f) # 合并配置 default_config.update(user_config) except Exception as e: print(f"加载配置文件失败,使用默认配置: {e}") return default_config def collect_user_videos( self, sec_user_id: str, max_videos: Optional[int] = None ) -> List[TikTokVideo]: """ 采集单个用户的视频 Args: sec_user_id: 用户ID max_videos: 最大视频数(None表示使用配置) Returns: 视频列表 """ if max_videos is None: max_videos = self.config['max_videos_per_user'] try: self.monitor.logger.info(f"开始采集用户 {sec_user_id} 的视频...") videos = self.client.get_user_videos_with_retry( sec_user_id=sec_user_id, max_count=max_videos ) success = len(videos) > 0 self.monitor.log_request(sec_user_id, success, len(videos)) if videos: # 保存数据 self._save_user_data(sec_user_id, videos) self.monitor.logger.info(f"用户 {sec_user_id} 采集完成,共 {len(videos)} 个视频") else: self.monitor.logger.warning(f"用户 {sec_user_id} 没有采集到视频") return videos except Exception as e: self.monitor.log_request(sec_user_id, False, 0) self.monitor.logger.error(f"采集用户 {sec_user_id} 失败: {e}") return [] def collect_multiple_users( self, user_ids: List[str], output_format: str = 'json' ) -> Dict[str, List[TikTokVideo]]: """ 批量采集多个用户的视频 Args: user_ids: 用户ID列表 output_format: 输出格式(json/csv/sqlite) Returns: 用户ID到视频列表的映射 """ results = {} self.monitor.logger.info(f"开始批量采集 {len(user_ids)} 个用户...") for i, user_id in enumerate(user_ids, 1): self.monitor.logger.info(f"处理用户 {i}/{len(user_ids)}: {user_id}") videos = self.collect_user_videos(user_id) results[user_id] = videos # 批量处理延迟 if i < len(user_ids): delay = self.config.get('request_delay', 1.0) time.sleep(delay + random.uniform(0, 0.5)) # 生成汇总报告 self._generate_summary_report(results, output_format) return results def _save_user_data( self, user_id: str, videos: List[TikTokVideo] ) -> None: """保存用户数据""" # 创建用户目录 user_dir = self.output_dir / user_id user_dir.mkdir(exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # 保存为JSON json_file = user_dir / f"videos_{timestamp}.json" self.exporter.export_to_json(videos, str(json_file)) # 保存为CSV csv_file = user_dir / f"videos_{timestamp}.csv" self.exporter.export_to_csv(videos, str(csv_file)) # 保存到SQLite(可选) if self.config.get('use_sqlite', False): db_file = self.output_dir / "tiktok_data.db" self.exporter.export_to_sqlite(videos, str(db_file)) def _generate_summary_report( self, results: Dict[str, List[TikTokVideo]], output_format: str ) -> None: """生成汇总报告""" total_users = len(results) total_videos = sum(len(videos) for videos in results.values()) users_with_videos = sum(1 for videos in results.values() if videos) report = { 'collection_time': datetime.now().isoformat(), 'total_users': total_users, 'users_with_videos': users_with_videos, 'users_without_videos': total_users - users_with_videos, 'total_videos_collected': total_videos, 'average_videos_per_user': total_videos / users_with_videos if users_with_videos > 0 else 0, 'user_details': {} } for user_id, videos in results.items(): report['user_details'][user_id] = { 'video_count': len(videos), 'has_videos': len(videos) > 0, 'first_video_time': None, 'last_video_time': None } if videos: # 按时间排序 sorted_videos = sorted(videos, key=lambda x: x.create_time) report['user_details'][user_id]['first_video_time'] = datetime.fromtimestamp( sorted_videos[0].create_time ).isoformat() if sorted_videos[0].create_time > 0 else None report['user_details'][user_id]['last_video_time'] = datetime.fromtimestamp( sorted_videos[-1].create_time ).isoformat() if sorted_videos[-1].create_time > 0 else None # 保存报告 report_file = self.output_dir / f"collection_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(report_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) self.monitor.logger.info(f"汇总报告已保存到 {report_file}") # 打印摘要 print(f"\n采集完成!") print(f"处理用户数: {total_users}") print(f"成功获取视频的用户: {users_with_videos}") print(f"采集视频总数: {total_videos}") if users_with_videos > 0: print(f"平均每个用户视频数: {total_videos/users_with_videos:.1f}") def run_from_file(self, input_file: str) -> None: """ 从文件读取用户ID并运行采集 Args: input_file: 输入文件路径(每行一个用户ID) """ try: with open(input_file, 'r', encoding='utf-8') as f: user_ids = [line.strip() for line in f if line.strip()] if not user_ids: self.monitor.logger.error("输入文件为空") return self.monitor.logger.info(f"从文件读取到 {len(user_ids)} 个用户ID") # 分批处理 batch_size = self.config.get('batch_size', 5) for i in range(0, len(user_ids), batch_size): batch = user_ids[i:i + batch_size] self.monitor.logger.info(f"处理批次 {i//batch_size + 1}/{(len(user_ids)-1)//batch_size + 1}") self.collect_multiple_users(batch) # 批次间延迟 if i + batch_size < len(user_ids): batch_delay = self.config.get('batch_delay', 5.0) time.sleep(batch_delay) # 保存监控报告 self.monitor.save_metrics_report(str(self.output_dir / "metrics_report.json")) self.monitor.print_summary() except FileNotFoundError: self.monitor.logger.error(f"输入文件不存在: {input_file}") except Exception as e: self.monitor.logger.error(f"运行失败: {e}") def main(): """主函数""" parser = argparse.ArgumentParser(description='TikTok用户视频采集系统') parser.add_argument('--config', '-c', default='config.json', help='配置文件路径') parser.add_argument('--input', '-i', help='用户ID列表文件路径') parser.add_argument('--user', '-u', help='单个用户ID') parser.add_argument('--max-videos', '-m', type=int, default=100, help='每个用户最大视频数') parser.add_argument('--output-dir', '-o', default='data', help='输出目录') args = parser.parse_args() # 创建采集器 collector = TikTokDataCollector(args.config) # 覆盖配置 if args.output_dir: collector.output_dir = Path(args.output_dir) collector.output_dir.mkdir(parents=True, exist_ok=True) # 运行采集 if args.input: collector.run_from_file(args.input) elif args.user: videos = collector.collect_user_videos(args.user, args.max_videos) print(f"采集到 {len(videos)} 个视频") else: print("请指定输入文件(--input)或单个用户ID(--user)") sys.exit(1) if __name__ == "__main__": main() ``` 这个完整的系统提供了从配置加载、数据采集、错误处理、监控日志到数据导出的完整功能。你可以通过命令行参数来控制采集行为,也可以修改配置文件来调整各种参数。 ## 6. 配置文件和运行示例 创建一个配置文件 `config.json`: ```json { "session_cookie": "你的TikTok会话Cookie(可选)", "timeout": 30, "max_retries": 3, "output_dir": "./tiktok_data", "log_file": "./logs/tiktok_collector.log", "max_videos_per_user": 100, "batch_size": 5, "request_delay": 1.5, "batch_delay": 10.0, "use_sqlite": true } ``` 创建一个用户ID列表文件 `users.txt`: ``` MS4wLjABAAAAv7iBp6J3b9Qd5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5 MS4wLjABAAAAAnotherExampleUserIDHere1234567890 MS4wLjABAAAAYetAnotherExampleUserID9876543210 ``` 运行采集系统: ```bash # 从文件批量采集 python tiktok_collector.py --input users.txt --config config.json # 采集单个用户 python tiktok_collector.py --user MS4wLjABAAAAv7iBp6J3b9Qd5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5J6Q5 --max-videos 50 # 指定输出目录 python tiktok_collector.py --input users.txt --output-dir ./my_tiktok_data ``` 系统运行后,会在指定的输出目录中为每个用户创建子目录,保存JSON和CSV格式的数据,同时生成详细的采集报告和监控日志。 ## 7. 实际应用中的注意事项与最佳实践 在真实的生产环境中使用这样的系统时,有几个关键点需要特别注意: ### 7.1 遵守平台规则与法律合规 **速率限制**:TikTok和其他社交媒体平台都有严格的速率限制。过于频繁的请求会导致IP被封禁。建议: - 为每个请求添加随机延迟(1-3秒) - 实现指数退避的重试机制 - 监控速率限制响应,动态调整请求频率 **数据使用条款**:仔细阅读TikTok的服务条款,确保你的数据使用方式符合规定。特别是: - 不要大规模采集个人隐私数据 - 尊重用户的隐私设置 - 不要将数据用于垃圾邮件、骚扰或其他不当用途 ### 7.2 数据存储与处理优化 对于大规模数据采集,考虑以下优化策略: ```python # 使用数据库连接池 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import QueuePool class DatabaseManager: """数据库管理器""" def __init__(self, db_url: str): self.engine = create_engine( db_url, poolclass=QueuePool, pool_size=10, max_overflow=20, pool_timeout=30, pool_recycle=3600 ) self.Session = sessionmaker(bind=self.engine) def batch_insert_videos(self, videos: List[TikTokVideo]): """批量插入视频数据""" session = self.Session() try: # 使用批量插入 video_dicts = [video.to_dict() for video in videos] # 使用SQLAlchemy Core进行批量插入,性能更好 from sqlalchemy import Table, MetaData metadata = MetaData() videos_table = Table('tiktok_videos', metadata, autoload_with=self.engine) # 分批插入,每批100条 batch_size = 100 for i in range(0, len(video_dicts), batch_size): batch = video_dicts[i:i + batch_size] session.execute(videos_table.insert(), batch) session.commit() except Exception as e: session.rollback() raise e finally: session.close() ``` ### 7.3 错误恢复与数据完整性 实现检查点机制,确保在程序崩溃或网络中断后能够恢复: ```python import pickle from pathlib import Path class CheckpointManager: """检查点管理器""" def __init__(self, checkpoint_dir: str = "checkpoints"): self.checkpoint_dir = Path(checkpoint_dir) self.checkpoint_dir.mkdir(exist_ok=True) def save_checkpoint(self, user_id: str, cursor: int, videos: List[TikTokVideo]): """保存检查点""" checkpoint_data = { 'user_id': user_id, 'cursor': cursor, 'videos': [video.to_dict() for video in videos], 'timestamp': datetime.now().isoformat() } checkpoint_file = self.checkpoint_dir / f"{user_id}_checkpoint.pkl" with open(checkpoint_file, 'wb') as f: pickle.dump(checkpoint_data, f) def load_checkpoint(self, user_id: str) -> Optional[Dict]: """加载检查点""" checkpoint_file = self.checkpoint_dir / f"{user_id}_checkpoint.pkl" if checkpoint_file.exists(): with open(checkpoint_file, 'rb') as f: return pickle.load(f) return None def clear_checkpoint(self, user_id: str): """清除检查点""" checkpoint_file = self.checkpoint_dir / f"{user_id}_checkpoint.pkl" if checkpoint_file.exists(): checkpoint_file.unlink() ``` ### 7.4 监控与告警 在生产环境中,需要设置监控和告警: ```python import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart class AlertManager: """告警管理器""" def __init__(self, config: Dict): self.config = config def send_alert(self, subject: str, message: str, level: str = "ERROR"): """发送告警""" if level not in self.config.get('alert_levels', ['ERROR

创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考

Python内容推荐

TikTokAPI-Python:TikTok API Python包装器

TikTokAPI-Python:TikTok API Python包装器

Python中的非官方TikTok API包装器这是一个非官方的TikTok Api python包装器。 我有一个使用此API的应用程序,因此将不断更新此包装器此实现受启发,但运行速度更快。目录通过

一个基于Python的TikTok语音文本转语音工具项目_极简说明是使用Python编程语言开发的简单程序能够将用户输入的文本内容转换为TikTok平台上的多种特色语音并输出为.zip

一个基于Python的TikTok语音文本转语音工具项目_极简说明是使用Python编程语言开发的简单程序能够将用户输入的文本内容转换为TikTok平台上的多种特色语音并输出为.zip

通过使用Python编程语言,开发者可以借助众多开源库和API来实现这一功能,例如Google的Text-to-Speech API、Amazon Polly、IBM Watson Text to Speech

Python库 | tiktokpy-0.7.3-py3-none-any.whl

Python库 | tiktokpy-0.7.3-py3-none-any.whl

**文档和示例**:一个完善的Python库通常会附带详细的使用文档和示例代码,帮助开发者快速上手并理解如何有效地使用`tiktokpy`。

非官方的抖音API包装在Python.zip

非官方的抖音API包装在Python.zip

另外,TikTok-Api_main.zip文件可能包含实际的Python库文件,即用于与抖音API通信的代码。

python大作业项目.zip

python大作业项目.zip

这可能涉及到API调用,如使用requests库访问OpenWeatherMap这样的服务。

手把手教你用Python开发抖音表白神器

手把手教你用Python开发抖音表白神器

**TikTok API**:虽然抖音没有公开的官方API供开发者使用,但我们可以利用网络爬虫技术抓取和分析抖音的相关数据。

python语言douyinshipin爬虫程序代码QZQ2.txt

python语言douyinshipin爬虫程序代码QZQ2.txt

数据获取方式:通过分析示例代码的URL和请求头,可以推断出爬虫程序是通过某个特定的接口获取视频数据,这涉及到对目标网站API的理解和使用。16.

FastAPI-Project:FastAPI是基于Web的高性能框架,可用于基于Python类型提示构建API

FastAPI-Project:FastAPI是基于Web的高性能框架,可用于基于Python类型提示构建API

**基于类型提示的参数解析**:FastAPI 利用 Python 的类型注解来定义 API 的输入和输出。这使得接口的定义更加清晰,同时在开发过程中提供强大的自动文档和代码补全功能。2.

自动语音识别 (ASR)、翻译的高性能异步 API(Python 源码)

自动语音识别 (ASR)、翻译的高性能异步 API(Python 源码)

Fast-Powerful-Whisper-AI-Services-API是一款用于自动语音识别 (ASR)、翻译的高性能异步 API。不需要购买Whisper API,使用本地运行的Whisper模

【Python编程】Python内存管理与垃圾回收机制

【Python编程】Python内存管理与垃圾回收机制

内容概要:本文深入剖析Python的内存管理架构,重点对比引用计数、标记清除、分代回收三种垃圾回收策略的协作机制与性能影响。文章从PyObject结构体的引用计数字段出发,详解循环引用的检测与打破策略、__del__析构方法的调用时机与陷阱、以及weakref弱引用在缓存设计中的应用。通过代码示例展示gc模块的手动回收控制、对象阈值调整、以及循环引用链的调试技巧,同时介绍内存池(pymalloc)对小对象分配的优化、大对象的直接mmap分配策略、以及tracemalloc的内存泄漏追踪能力,最后给出在长时间运行服务、大数据处理、游戏开发等场景下的内存优化建议与对象生命周期管理策略。 24直播网:www.yitevip.com 24直播网:www.xzxinlukeji.com 24直播网:www.xnpls.com 24直播网:www.gdhccc.com 24直播网:www.jssg929.com

【Python编程】Python类型提示与静态类型检查实践

【Python编程】Python类型提示与静态类型检查实践

内容概要:本文系统讲解Python类型注解(PEP 484)的技术体系,重点对比typing模块的泛型、联合类型、可选类型与Python 3.10+内置类型语法的演进差异。文章从mypy静态检查器的工作原理出发,深入分析TypeVar泛型参数约束、Generic基类的自定义泛型、Protocol结构子类型(鸭子类型)的接口定义。通过代码示例展示Callable回调类型、TypedDict结构化字典、NamedTuple命名元组的类型安全用法,同时介绍Pydantic的运行时数据校验、dataclasses的自动类型推断、以及overload函数重载在类型 narrowing 中的应用,最后给出在大型项目、API契约、团队协作等场景下的类型系统落地策略与渐进式迁移方案。 24直播网:slzy120.com 24直播网:xstit.com 24直播网:cqylqxsc.cn 24直播网:m.dingdongda.cn 24直播网:m.ym56park.com

【Python编程】Python虚拟环境与依赖管理方案

【Python编程】Python虚拟环境与依赖管理方案

内容概要:本文深入对比Python虚拟环境管理工具的技术特性,重点分析venv、virtualenv、conda、pipenv、poetry在环境隔离、依赖解析、锁定机制上的差异。文章从site-packages路径隔离原理出发,详解pip的requirements.txt语义、pipenv的Pipfile.lock确定性安装、以及poetry的pyproject.toml标准配置。通过代码示例展示conda的多语言包管理能力、pyenv的Python版本切换、以及docker在部署环境的一致性保证,同时介绍pip-tools的依赖编译工作流、renovate/dependabot的自动更新策略、以及私有PyPI仓库的搭建方案,最后给出在团队协作、生产部署、科学计算等场景下的环境管理最佳实践与可复现构建策略。 24直播网:qxnwomen.org.cn 24直播网:anesthesiology.org.cn 24直播网:m.laicaitrading.com 24直播网:m.hncsjgmy.com 24直播网:hdyuguang.net.cn

【Python编程】Python异常处理与自定义异常体系

【Python编程】Python异常处理与自定义异常体系

内容概要:本文深入探讨Python异常处理的完整机制,重点对比try-except-else-finally结构、异常捕获的粒度控制、异常链(exception chaining)与上下文管理。文章从异常类继承体系出发,详解BaseException与Exception的区别、内置异常类型的适用场景,以及raise from语法在异常转换中的追溯保留。通过代码示例展示contextlib模块的上下文管理器简化写法、suppress上下文的静默处理模式,同时介绍warnings模块的非致命告警机制、日志记录与异常信息的整合策略,最后给出在资源释放、事务回滚、API错误封装等场景下的异常处理最佳实践与反模式规避。 24直播网:m.jswoodfloor.com 24直播网:hztfzs.com 24直播网:m.gongshaguo.com 24直播网:heshengzou.com 24直播网:hnyyyl.com

基于多动作深度强化学习的柔性车间调度研究(Python代码实现)

基于多动作深度强化学习的柔性车间调度研究(Python代码实现)

内容概要:本文围绕“基于多动作深度强化学习的柔性车间调度研究”展开,结合Python代码实现,提出了一种面向复杂生产环境的智能调度解决方案。通过构建多动作深度强化学习框架,模型能够在同一决策时刻协同处理工序选择与机器分配等多个操作,有效提升调度系统的灵活性与效率。研究针对柔性作业车间调度问题(FJSP),系统设计了适配的任务状态空间、多维动作空间及精细化奖励函数,利用深度神经网络逼近策略函数,实现了对动态、不确定制造环境的自适应响应。文中配套提供了完整的Python代码实现方案,涵盖环境建模、智能体训练与调度结果可视化等环节,具备良好的可复现性与工程应用价值。; 适合人群:具备Python编程能力,掌握强化学习基本理论,从事智能制造、工业工程、自动化控制、运筹优化等相关领域的硕士/博士研究生、科研人员及企业研发工程师。; 使用场景及目标:① 解决传统启发式或数学规划方法难以应对的高维度、动态演化车间调度难题;② 掌握深度强化学习在生产调度中的建模方法与技术路径,推动智能工厂与工业4.0落地;③ 作为高水平学术论文复现、科研项目开发或课程实践的技术支撑资源。; 阅读建议:建议读者结合代码逐模块剖析算法实现细节,重点理解状态特征编码、多动作输出结构与奖励机制的设计逻辑,并在不同规模的标准算例上进行实验验证与参数调优,以深入掌握模型的泛化能力与改进潜力。

Tiktok个人详情demo.7z

Tiktok个人详情demo.7z

test.py" 是一个Python源代码文件,很可能包含了实现TikTok个人详情提取逻辑的代码。这个文件可能包含了函数调用来获取用户信息,如使用API请求,或者通过解析网页源代码来抓取信息。

tiktokpy:自动TikTok互动的工具

tiktokpy:自动TikTok互动的工具

TikTokPy是一个用于自动化TikTok社交互动的Python工具,支持点赞、关注、浏览用户动态和热门视频等功能。通过简单的API调用实现批量操作,适用于提升账号曝光与粉丝增长。项目基于异步编程模

tiktok.com:tiktok.com的源代码*

tiktok.com:tiktok.com的源代码*

因此,这个压缩包可能包含了TikTok网站的完整源代码结构,包括HTML、CSS、JavaScript、服务器端脚本、配置文件等。【相关知识点】1.

tiktok

tiktok

API接口开发:Flask或Django等Python Web框架可用于快速开发API接口,实现不同服务间的交互和数据交换。三、TikTok的数据驱动策略1.

TikTok商品数据采集教程[代码]

TikTok商品数据采集教程[代码]

教程提供了Python代码示例,详细地展示了如何通过API获取数据,并将其转换为易于处理的DataFrame格式。

meelement_tiktok-api-1_255716_1771733585991.zip

meelement_tiktok-api-1_255716_1771733585991.zip

API对于第三方开发者具有重要意义,它能使得开发者在不深入了解底层技术细节的情况下,通过接口调用的方式实现特定的功能。

最新推荐最新推荐

recommend-type

Tiktok个人详情demo.7z

test.py" 是一个Python源代码文件,很可能包含了实现TikTok个人详情提取逻辑的代码。这个文件可能包含了函数调用来获取用户信息,如使用API请求,或者通过解析网页源代码来抓取信息。
recommend-type

TikTokAPI-Python:TikTok API Python包装器

Python中的非官方TikTok API包装器这是一个非官方的TikTok Api python包装器。 我有一个使用此API的应用程序,因此将不断更新此包装器此实现受启发,但运行速度更快。目录通过
recommend-type

tiktokpy:自动TikTok互动的工具

TikTokPy是一个用于自动化TikTok社交互动的Python工具,支持点赞、关注、浏览用户动态和热门视频等功能。通过简单的API调用实现批量操作,适用于提升账号曝光与粉丝增长。项目基于异步编程模
recommend-type

tiktok.com:tiktok.com的源代码*

因此,这个压缩包可能包含了TikTok网站的完整源代码结构,包括HTML、CSS、JavaScript、服务器端脚本、配置文件等。【相关知识点】1.
recommend-type

tiktok

API接口开发:Flask或Django等Python Web框架可用于快速开发API接口,实现不同服务间的交互和数据交换。三、TikTok的数据驱动策略1.
recommend-type

学生成绩管理系统C++课程设计与实践

资源摘要信息:"学生成绩信息管理系统-C++(1).doc" 1. 系统需求分析与设计 在进行学生成绩信息管理系统开发前,首先需要进行系统需求分析,这是确定系统开发目标与范围的过程。需求分析应包括数据需求和功能需求两个方面。 - 数据需求分析: - 学生成绩信息:需要收集学生的姓名、学号、课程成绩等数据。 - 数据类型和长度:明确每个数据项的数据类型(如字符串、整型等)和长度,例如学号可能是字符串类型且长度为一定值。 - 描述:详细描述每个数据项的意义,以确保系统能够准确处理。 - 功能需求分析: - 列出功能列表:用户界面应提供清晰的操作指引,列出所有可用功能。 - 查询学生成绩:系统应能通过学号或姓名查询学生的成绩信息。 - 增加学生成绩信息:允许用户添加未保存的学生成绩信息。 - 删除学生成绩信息:能够通过学号或姓名删除已经保存的成绩信息。 - 修改学生成绩信息:通过学号或姓名修改已有的成绩记录。 - 退出程序:提供安全退出程序的选项,并确保所有修改都已保存。 2. 系统设计 系统设计阶段主要完成内存数据结构设计、数据文件设计、代码设计、输入输出设计、用户界面设计和处理过程设计。 - 内存数据结构设计: - 使用链表结构组织内存中的数据,便于动态增删查改操作。 - 数据文件设计: - 选择文本文件存储数据,便于查看和编辑。 - 代码设计: - 根据功能需求,编写相应的函数和模块。 - 输入输出设计: - 设计简洁明了的输入输出提示信息和操作流程。 - 用户界面设计: - 用户界面应为字符界面,方便在命令行环境下使用。 - 处理过程设计: - 设计数据处理流程,确保每个操作都有明确的处理逻辑。 3. 系统实现与测试 实现阶段需要根据设计阶段的成果编写程序代码,并进行系统测试。 - 程序编写: - 完成系统设计中所有功能的程序代码编写。 - 系统测试: - 设计测试用例,通过测试用例上机测试系统。 - 记录测试方法和测试结果,确保系统稳定可靠。 4. 设计报告撰写 最后,根据系统开发的各个阶段,撰写详细的设计报告。 - 系统描述:包括问题说明、数据需求和功能需求。 - 系统设计:详细记录内存数据结构设计、数据文件设计、代码设计、输入/输出设计、用户界面设计、处理过程设计。 - 系统测试:包括测试用例描述、测试方法和测试结果。 - 设计特点、不足、收获和体会:反思整个开发过程,总结经验和教训。 时间安排: - 第19周(7月12日至7月16日)完成项目。 - 7月9日8:00到计算机学院实验中心(三楼)提交程序和课程设计报告。 指导教师和系主任(或责任教师)需要在文档上签名确认。 系统需求分析: - 使用表格记录系统需求分析的结果,包括数据项、数据类型、数据长度和描述。 - 分析数据项如学生成绩信息、状态器、链表节点等,确定其属性和行为。 以上就是文档中提到的学生成绩信息管理系统开发的关键知识点。开发此类系统需要熟练掌握C++编程基础,了解面向对象的程序设计思想,以及熟悉文件操作和链表等数据结构的应用。此外,良好的软件开发流程意识、测试意识和文档撰写能力也是必不可少的。
recommend-type

别再手动拖拽了!用Lumerical脚本批量创建FDTD仿真结构(附完整代码)

# 告别低效建模:Lumerical脚本自动化实战指南 在光子学仿真领域,时间就是科研生命线。当同行还在GUI界面里反复点击菜单时,你已经用脚本批量生成了20组参数化结构——这不是未来场景,而是每位FDTD用户都应该掌握的基础生产力革命。本文将彻底改变你与Lumerical的交互方式,从手动拖拽的农耕时代,跃迁到自动化建模的工业文明。 ## 1. 为何脚本建模是必然选择 2019年Nature Photonics的一项研究显示,科研工作者在仿真工具上平均浪费37%的时间在重复性操作上。对于需要参数扫描的纳米光学结构设计,这个数字可能更高。手动创建10个不同尺寸的纳米柱阵列意味着: -
recommend-type

Java邮件解析任务中,如何安全高效地提取HTML邮件内容并避免硬编码、资源泄漏和类型转换异常?

<think>我们被要求优化一段Java代码,该代码用于处理邮件(特别是来自特定发件人的构建通知邮件)。代码的主要问题包括: 1. 重复获取邮件内容:在检查MIME类型后,多次调用`msg[i].getContent()`,这可能导致性能问题或流关闭异常。 2. 类型转换问题:直接将邮件内容转换为`Multipart`而不进行类型检查,可能引发`ClassCastException`。 3. 代码结构问题:逻辑嵌套过深,可读性差,且存在重复代码(如插入邮件详情的操作在两个地方都有)。 4. 硬编码和魔法值:例如在解析HTML表格时使用了硬编码的索引(如list3.get(10)),这容易因邮件
recommend-type

RH公司应收账款管理优化策略研究

资源摘要信息:"本文针对RH公司的应收账款管理问题进行了深入研究,并提出了改进策略。文章首先分析了应收账款在企业管理中的重要性,指出其对于提高企业竞争力、扩大销售和充分利用生产能力的作用。然后,以RH公司为例,探讨了公司应收账款管理的现状,并识别出合同管理、客户信用调查等方面的不足。在此基础上,文章提出了一系列改善措施,包括完善信用政策、改进业务流程、加强信用调查和提高账款回收力度。特别强调了建立专门的应收账款回收部门和流程的重要性,并建议在实际应用过程中进行持续优化。同时,文章也意识到企业面临复杂多变的内外部环境,因此提出的策略需要根据具体情况调整和优化。 针对财务管理领域的专业学生和从业者,本文提供了一个关于应收账款管理问题的案例研究,具有实际指导意义。文章还探讨了信用管理和征信体系在应收账款管理中的作用,强调了它们对于提升企业信用风险控制和市场竞争能力的重要性。通过对比国内外企业在应收账款管理上的差异,文章总结了适合中国企业实际环境的应收账款管理方法和策略。" 根据提供的文件内容,以下是详细的知识点: 1. 应收账款管理的重要性:应收账款作为企业的一项重要资产,其有效管理关系到企业的现金流、财务健康以及市场竞争力。不良的应收账款管理会导致资金链断裂、坏账损失增加等问题,严重影响企业的正常运营和长远发展。 2. 应收账款的信用风险:在信用交易日益频繁的商业环境中,企业必须对客户信用进行评估,以便采取合理的信用政策,降低信用风险。 3. 合同管理的薄弱环节:合同是应收账款管理的法律基础,严格的合同管理能够保障企业权益,减少因合同问题导致的应收账款风险。 4. 客户信用调查:了解客户的信用状况对于预测和控制应收账款风险至关重要。企业需要建立有效的客户信用调查机制,识别和筛选信用良好的客户。 5. 应收账款回收策略:企业应建立有效的账款回收机制,包括定期的账款跟进、逾期账款的催收等。同时,建立专门的应收账款回收部门可以提升回收效率。 6. 应收账款管理流程优化:通过改进企业内部管理流程,如简化审批流程、提高工作效率等措施,能够提升应收账款的管理效率。 7. 应收账款管理策略的调整和优化:由于企业的内外部环境复杂多变,因此制定的管理策略需要根据实际情况进行动态调整和持续优化。 8. 信用管理和征信体系的作用:建立和完善企业内部信用管理体系和征信体系,有助于企业更好地控制信用风险,并在市场竞争中占据有利地位。 9. 对比国内外应收账款管理实践:通过研究国内外企业在应收账款管理上的不同做法和经验,可以借鉴先进的管理理念和方法,提升国内企业的应收账款管理水平。 综上所述,本文深入探讨了应收账款管理的多个方面,为RH公司乃至其他同类型企业提供了应收账款管理的改进方向和策略,对于财务管理专业的教育和实践都具有重要的参考价值。
recommend-type

新手别慌!用BingPi-M2开发板带你5分钟搞懂Tina Linux SDK目录结构

# 新手别慌!用BingPi-M2开发板带你5分钟搞懂Tina Linux SDK目录结构 第一次拿到BingPi-M2开发板时,面对Tina Linux SDK里密密麻麻的文件夹,我完全不知道从哪下手。就像走进一个陌生的大仓库,每个货架上都堆满了工具和零件,却找不到操作手册。这种困惑持续了整整两天,直到我意识到——理解目录结构比死记硬背每个文件更重要。 ## 1. 为什么SDK目录结构如此重要 想象你正在组装一台复杂的模型飞机。如果所有零件都混在一个箱子里,你需要花大量时间寻找每个螺丝和面板。但如果有分门别类的隔层,标注着"机身部件"、"电子设备"、"紧固件",组装效率会成倍提升。Ti