Python实现多个文档同时上传主要涉及Web应用、FTP传输和自动化测试三大场景,每种场景都有特定的实现方法。以下是三种主要场景的完整实现方案对比:
| 场景 | 核心技术 | 适用场景 | 关键特性 |
|------|---------|---------|---------|
| Web应用上传 | Flask框架 | Web表单多文件上传 | 支持批量选择、类型验证、大小限制 |
| FTP传输 | ftplib模块 | 服务器文件传输 | 支持目录递归上传、中文编码处理 |
| 自动化上传 | Selenium | Web自动化测试 | 支持input和非input标签上传 |
## 1. Flask Web应用多文件上传
### 1.1 基础配置和设置
```python
from flask import Flask, request, render_template
import os
from werkzeug.utils import secure_filename
app = Flask(__name__)
# 配置上传参数
app.config['UPLOAD_FOLDER'] = 'uploads/'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB限制
app.config['ALLOWED_EXTENSIONS'] = {'txt', 'pdf', 'doc', 'docx', 'xls', 'xlsx'}
# 确保上传目录存在
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
def allowed_file(filename):
"""检查文件扩展名是否允许"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
```
### 1.2 多文件上传接口实现
```python
@app.route('/upload', methods=['GET', 'POST'])
def upload_files():
if request.method == 'POST':
# 获取上传的文件列表
files = request.files.getlist('files') # HTML中input的name必须是"files"
uploaded_files = []
failed_files = []
for file in files:
if file and allowed_file(file.filename):
# 安全处理文件名
filename = secure_filename(file.filename)
# 生成唯一文件名避免冲突
unique_filename = f"{os.urandom(8).hex()}_{filename}"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
# 保存文件
file.save(filepath)
uploaded_files.append({
'original_name': file.filename,
'saved_name': unique_filename,
'size': os.path.getsize(filepath)
})
else:
failed_files.append(file.filename if file else '空文件')
return {
'success': True,
'uploaded': len(uploaded_files),
'failed': len(failed_files),
'uploaded_files': uploaded_files,
'failed_files': failed_files
}
# GET请求返回上传表单
return '''
<!doctype html>
<html>
<head><title>多文件上传</title></head>
<body>
<h2>上传多个文档</h2>
<form method="post" enctype="multipart/form-data">
<input type="file" name="files" multiple>
<input type="submit" value="上传">
</form>
</body>
</html>
'''
```
### 1.3 高级功能:数据库记录和文件管理
```python
import sqlite3
from datetime import datetime
def init_db():
"""初始化SQLite数据库"""
conn = sqlite3.connect('uploads.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS uploaded_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_name TEXT NOT NULL,
saved_name TEXT NOT NULL,
file_size INTEGER,
upload_time TIMESTAMP,
file_type TEXT,
uploader_ip TEXT
)
''')
conn.commit()
conn.close()
@app.route('/upload_with_db', methods=['POST'])
def upload_with_database():
"""带数据库记录的上传"""
files = request.files.getlist('files')
uploader_ip = request.remote_addr
conn = sqlite3.connect('uploads.db')
cursor = conn.cursor()
for file in files:
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
unique_filename = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{filename}"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
# 保存文件
file.save(filepath)
# 记录到数据库
cursor.execute('''
INSERT INTO uploaded_files
(original_name, saved_name, file_size, upload_time, file_type, uploader_ip)
VALUES (?, ?, ?, ?, ?, ?)
''', (
file.filename,
unique_filename,
os.path.getsize(filepath),
datetime.now(),
filename.rsplit('.', 1)[1].lower() if '.' in filename else 'unknown',
uploader_ip
))
conn.commit()
conn.close()
return {'status': 'success', 'message': f'成功上传{len(files)}个文件'}
```
## 2. FTP多文件上传
### 2.1 基础FTP上传实现
```python
from ftplib import FTP
import os
class FTPUploader:
def __init__(self, host, username, password, port=21):
"""初始化FTP连接"""
self.ftp = FTP()
self.ftp.connect(host, port)
self.ftp.login(username, password)
print(f"已连接到FTP服务器: {host}")
def upload_single_file(self, local_path, remote_path):
"""上传单个文件"""
with open(local_path, 'rb') as file:
self.ftp.storbinary(f'STOR {remote_path}', file)
print(f"已上传: {local_path} -> {remote_path}")
def upload_multiple_files(self, file_list):
"""批量上传多个文件"""
for local_path, remote_path in file_list:
if os.path.isfile(local_path):
self.upload_single_file(local_path, remote_path)
else:
print(f"文件不存在: {local_path}")
def upload_directory(self, local_dir, remote_dir):
"""上传整个目录"""
# 确保远程目录存在
try:
self.ftp.cwd(remote_dir)
except:
self.ftp.mkd(remote_dir)
self.ftp.cwd(remote_dir)
# 遍历本地目录
for item in os.listdir(local_dir):
local_path = os.path.join(local_dir, item)
if os.path.isfile(local_path):
self.upload_single_file(local_path, item)
else:
# 递归上传子目录
self.upload_directory(local_path, item)
def close(self):
"""关闭FTP连接"""
self.ftp.quit()
print("FTP连接已关闭")
# 使用示例
if __name__ == "__main__":
# 初始化FTP连接
ftp = FTPUploader('ftp.example.com', 'username', 'password')
# 批量上传文件
files_to_upload = [
('/local/path/file1.pdf', '/remote/path/file1.pdf'),
('/local/path/file2.docx', '/remote/path/file2.docx'),
('/local/path/file3.xlsx', '/remote/path/file3.xlsx')
]
ftp.upload_multiple_files(files_to_upload)
# 上传整个目录
ftp.upload_directory('/local/documents', '/remote/documents')
ftp.close()
```
### 2.2 处理中文编码问题
```python
class FTPUploaderWithEncoding(FTPUploader):
def __init__(self, host, username, password, port=21, encoding='gbk'):
"""支持中文编码的FTP上传器"""
super().__init__(host, username, password, port)
self.encoding = encoding
# 设置编码
self.ftp.encoding = encoding
def upload_single_file(self, local_path, remote_path):
"""处理中文文件名的上传"""
# 编码转换
encoded_remote = remote_path.encode(self.encoding).decode('latin-1')
with open(local_path, 'rb') as file:
self.ftp.storbinary(f'STOR {encoded_remote}', file)
print(f"已上传(中文): {local_path} -> {remote_path}")
def list_directory(self, path='.'):
"""列出目录内容(支持中文)"""
files = []
self.ftp.retrlines(f'LIST {path}', files.append)
return files
```
## 3. Selenium自动化多文件上传
### 3.1 input标签的多文件上传
```python
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
class SeleniumUploader:
def __init__(self, driver_path='chromedriver'):
"""初始化Selenium驱动"""
self.driver = webdriver.Chrome(executable_path=driver_path)
def upload_to_input(self, url, input_selector, file_paths):
"""
向input标签上传多个文件
:param url: 目标网页URL
:param input_selector: input元素的CSS选择器
:param file_paths: 文件路径列表
"""
self.driver.get(url)
# 找到文件上传input元素
file_input = self.driver.find_element(By.CSS_SELECTOR, input_selector)
# 拼接多个文件路径(Windows用分号,Linux/Mac用换行)
if len(file_paths) > 1:
file_path = "\n".join(file_paths) # 多文件用换行分隔
else:
file_path = file_paths[0]
# 发送文件路径
file_input.send_keys(file_path)
# 等待上传完成
time.sleep(2)
# 提交表单(如果有提交按钮)
try:
submit_button = self.driver.find_element(By.CSS_SELECTOR, "input[type='submit']")
submit_button.click()
except:
pass
print(f"已上传 {len(file_paths)} 个文件")
def upload_with_autoit(self, file_paths):
"""
使用AutoIT处理非input标签的上传
适用于需要调用系统文件对话框的情况
"""
# 这里需要配合AutoIT脚本使用
# AutoIT脚本示例:
'''
; upload_files.au3
Local $file_paths = $CmdLine[1]
Local $files = StringSplit($file_paths, "|")
; 等待文件对话框出现
WinWait("打开")
WinActivate("打开")
; 输入文件路径
For $i = 1 To $files[0]
ControlSetText("打开", "", "Edit1", $files[$i])
If $i < $files[0] Then
Send("{SPACE}")
EndIf
Next
; 点击打开按钮
ControlClick("打开", "", "Button1")
'''
# Python调用AutoIT
import subprocess
file_paths_str = "|".join(file_paths)
subprocess.run(['autoit3.exe', 'upload_files.au3', file_paths_str])
def close(self):
"""关闭浏览器"""
self.driver.quit()
# 使用示例
if __name__ == "__main__":
uploader = SeleniumUploader()
# 上传多个文件到支持multiple的input
files = [
r'C:\documents\report1.pdf',
r'C:\documents\report2.docx',
r'C:\documents\data.xlsx'
]
uploader.upload_to_input(
url='https://example.com/upload',
input_selector='input[type="file"]',
file_paths=files
)
uploader.close()
```
### 3.2 处理复杂上传场景
```python
class AdvancedSeleniumUploader(SeleniumUploader):
def upload_with_drag_drop(self, url, drop_zone_selector, file_paths):
"""
使用拖拽方式上传文件
"""
from selenium.webdriver.common.action_chains import ActionChains
import pyautogui
self.driver.get(url)
# 模拟拖拽操作
drop_zone = self.driver.find_element(By.CSS_SELECTOR, drop_zone_selector)
# 点击拖拽区域激活
ActionChains(self.driver).click(drop_zone).perform()
# 使用pyautogui模拟文件选择
time.sleep(1)
for file_path in file_paths:
# 复制文件路径到剪贴板
import pyperclip
pyperclip.copy(file_path)
# 粘贴文件路径
pyautogui.hotkey('ctrl', 'v')
pyautogui.press('enter')
time.sleep(0.5)
def upload_large_files(self, url, input_selector, file_paths, chunk_size=1024*1024):
"""
分块上传大文件
"""
self.driver.get(url)
for file_path in file_paths:
file_size = os.path.getsize(file_path)
chunks = file_size // chunk_size + 1
print(f"开始上传 {file_path},大小: {file_size} bytes,分 {chunks} 块")
with open(file_path, 'rb') as f:
for i in range(chunks):
chunk = f.read(chunk_size)
# 这里需要根据实际接口实现分块上传逻辑
# 通常是调用JavaScript接口或AJAX上传
print(f" 上传第 {i+1}/{chunks} 块")
time.sleep(0.5)
```
## 4. 综合应用示例:带进度显示的多文件上传
```python
import threading
import queue
from tqdm import tqdm
class MultiThreadUploader:
def __init__(self, max_workers=3):
"""多线程上传管理器"""
self.max_workers = max_workers
self.task_queue = queue.Queue()
self.results = []
def add_task(self, upload_func, *args, **kwargs):
"""添加上传任务"""
self.task_queue.put((upload_func, args, kwargs))
def worker(self, progress_bar):
"""工作线程"""
while not self.task_queue.empty():
try:
upload_func, args, kwargs = self.task_queue.get_nowait()
# 执行上传
result = upload_func(*args, **kwargs)
self.results.append(result)
# 更新进度
progress_bar.update(1)
self.task_queue.task_done()
except queue.Empty:
break
def start(self, total_tasks):
"""开始多线程上传"""
threads = []
# 创建进度条
with tqdm(total=total_tasks, desc="上传进度") as progress_bar:
# 创建工作线程
for _ in range(self.max_workers):
thread = threading.Thread(target=self.worker, args=(progress_bar,))
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
return self.results
# 使用示例
def flask_upload_task(file_path, upload_url):
"""Flask上传任务函数"""
import requests
with open(file_path, 'rb') as f:
files = {'files': f}
response = requests.post(upload_url, files=files)
return response.json()
if __name__ == "__main__":
# 创建上传任务
uploader = MultiThreadUploader(max_workers=5)
files_to_upload = [
'document1.pdf',
'document2.docx',
'document3.xlsx',
'document4.txt',
'document5.pdf'
]
for file in files_to_upload:
uploader.add_task(flask_upload_task, file, 'http://localhost:5000/upload')
# 开始并行上传
results = uploader.start(len(files_to_upload))
print(f"上传完成,成功: {sum(1 for r in results if r.get('success'))}")
```
## 5. 最佳实践建议
### 5.1 安全性考虑
```python
class SecureUploader:
@staticmethod
def validate_file(file_stream, max_size=10*1024*1024):
"""安全验证上传文件"""
import magic # python-magic库
# 检查文件大小
file_stream.seek(0, 2) # 移动到文件末尾
size = file_stream.tell()
file_stream.seek(0) # 回到文件开头
if size > max_size:
return False, "文件过大"
# 检查文件类型(通过魔术数字)
file_content = file_stream.read(1024)
file_stream.seek(0)
mime_type = magic.from_buffer(file_content, mime=True)
allowed_mimes = {
'application/pdf': 'pdf',
'application/msword': 'doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx',
'application/vnd.ms-excel': 'xls',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': 'xlsx',
'text/plain': 'txt'
}
if mime_type not in allowed_mimes:
return False, "不支持的文件类型"
return True, allowed_mimes[mime_type]
```
### 5.2 错误处理和重试机制
```python
class ResilientUploader:
def upload_with_retry(self, upload_func, file_path, max_retries=3):
"""带重试机制的上传"""
import time
for attempt in range(max_retries):
try:
result = upload_func(file_path)
return result
except Exception as e:
if attempt == max_retries - 1:
raise e
print(f"上传失败,{max_retries-attempt-1}次重试