# PP-DocLayoutV3实操手册:从Swagger文档自动生成Python调用SDK的方法
## 1. 引言
如果你正在处理文档数字化项目,比如要把一堆扫描的合同、论文或者报告变成结构化的数据,那你肯定遇到过这样的问题:文档里的文字、表格、图片都混在一起,直接扔给OCR工具识别,结果往往乱七八糟。标题被当成正文,表格数据识别得一塌糊涂,图片区域更是直接被忽略。
这就是文档版面分析要解决的问题。简单来说,它就像给文档拍一张“X光片”,先把文档的“骨骼结构”看清楚——哪里是标题,哪里是正文,哪里是表格,哪里是图片。有了这个结构图,你再让OCR去识别每个区域的内容,准确率就能大幅提升。
PP-DocLayoutV3就是干这个活的专家。它是飞桨开源的一个专门分析文档版面的模型,能识别十几种不同的版面元素,并且给出精确的坐标位置。最近我在一个项目里用它处理了几百份扫描合同,效果确实不错。
但有个小麻烦:虽然它提供了标准的REST API(通过Swagger文档),但每次调用都要手动写HTTP请求,调试起来不太方便。特别是当你需要批量处理大量文档,或者要把这个功能集成到自己的系统里时,有个封装好的Python SDK会省事很多。
所以,今天我就来分享一个实用的方法:如何基于PP-DocLayoutV3的Swagger API文档,快速生成一个可以直接调用的Python SDK。这个方法不仅适用于PP-DocLayoutV3,对于其他提供标准OpenAPI/Swagger规范的AI服务也同样有效。
## 2. 环境准备与快速部署
### 2.1 部署PP-DocLayoutV3镜像
首先,你需要有一个正在运行的PP-DocLayoutV3服务。如果你还没有部署,可以按照以下步骤快速搭建:
1. **选择镜像**:在平台的镜像市场中搜索 `ins-doclayout-paddle33-v1`
2. **启动实例**:点击“部署”按钮,等待1-2分钟实例启动完成
3. **获取访问地址**:实例状态变为“已启动”后,记下你的实例IP地址
首次启动需要5-8秒加载模型到显存,这是正常现象。模型加载完成后,服务就准备好了。
### 2.2 验证服务状态
部署完成后,建议先通过Web界面快速验证服务是否正常:
```bash
# 访问WebUI(可视化界面)
# 在浏览器中打开:http://<你的实例IP>:7860
# 或者直接测试API
# 在浏览器中打开:http://<你的实例IP>:8000/docs
```
如果能看到Swagger API文档页面,说明服务运行正常。Swagger页面会显示所有可用的API接口、参数说明和请求示例,这是我们生成SDK的基础。
### 2.3 安装必要的Python工具
在开始生成SDK之前,确保你的本地环境安装了以下工具:
```bash
# 安装openapi-generator-cli(用于生成SDK)
# 这是一个Java工具,需要先安装Java 8或更高版本
# 对于Mac用户
brew install openapi-generator
# 对于Linux用户
sudo apt-get update
sudo apt-get install openjdk-11-jdk
wget https://repo1.maven.org/maven2/org/openapitools/openapi-generator-cli/6.6.0/openapi-generator-cli-6.6.0.jar -O openapi-generator-cli.jar
# 安装Python依赖(用于测试生成的SDK)
pip install requests pytest
```
如果你不想在本地安装Java环境,也可以使用在线的OpenAPI Generator,但本地工具会更方便一些。
## 3. 获取并解析Swagger文档
### 3.1 导出Swagger规范文件
PP-DocLayoutV3的API文档遵循OpenAPI 3.0规范,我们可以直接导出这个规范文件:
```python
import requests
import json
# PP-DocLayoutV3服务的地址(替换为你的实际IP)
BASE_URL = "http://你的实例IP:8000"
# 获取OpenAPI规范
def download_openapi_spec():
# 尝试不同的端点,FastAPI通常提供这些
endpoints = ["/openapi.json", "/docs/openapi.json", "/swagger.json"]
for endpoint in endpoints:
try:
response = requests.get(f"{BASE_URL}{endpoint}", timeout=10)
if response.status_code == 200:
spec = response.json()
# 保存到本地文件
with open("pp_doclayout_openapi.json", "w", encoding="utf-8") as f:
json.dump(spec, f, indent=2, ensure_ascii=False)
print(f"✅ OpenAPI规范已保存到 pp_doclayout_openapi.json")
print(f"📊 包含 {len(spec.get('paths', {}))} 个API端点")
return spec
except Exception as e:
print(f"尝试端点 {endpoint} 失败: {e}")
# 如果自动获取失败,手动从Swagger页面提取
print("⚠️ 无法自动获取OpenAPI规范,请手动操作:")
print("1. 访问 http://你的实例IP:8000/docs")
print("2. 在页面右上角找到 'Download' 或 'Export' 按钮")
print("3. 选择 'OpenAPI specification' 或 'JSON' 格式下载")
return None
# 执行下载
if __name__ == "__main__":
download_openapi_spec()
```
### 3.2 分析API结构
下载完OpenAPI规范后,我们先看看PP-DocLayoutV3提供了哪些接口:
```python
import json
def analyze_api_structure(spec_file="pp_doclayout_openapi.json"):
with open(spec_file, "r", encoding="utf-8") as f:
spec = json.load(f)
print("🔍 API结构分析结果:")
print("=" * 50)
# 基本信息
print(f"API标题: {spec.get('info', {}).get('title', '未知')}")
print(f"版本: {spec.get('info', {}).get('version', '未知')}")
print(f"描述: {spec.get('info', {}).get('description', '无描述')[:100]}...")
# 服务器信息
servers = spec.get('servers', [])
if servers:
print(f"服务器地址: {servers[0].get('url', '未知')}")
# 路径分析
paths = spec.get('paths', {})
print(f"\n📋 共发现 {len(paths)} 个API端点:")
for path, methods in paths.items():
print(f"\n路径: {path}")
for method, details in methods.items():
summary = details.get('summary', '无摘要')
operation_id = details.get('operationId', '未知')
print(f" {method.upper():6s} - {summary} (ID: {operation_id})")
# 显示参数
params = details.get('parameters', [])
if params:
print(" 参数:", ", ".join([p.get('name') for p in params]))
# 模型定义
schemas = spec.get('components', {}).get('schemas', {})
print(f"\n📦 数据模型 ({len(schemas)} 个):")
for name, schema in schemas.items():
props = schema.get('properties', {})
print(f" {name}: {len(props)} 个属性")
return spec
# 执行分析
analyze_api_structure()
```
运行这个脚本,你会看到类似这样的输出:
```
🔍 API结构分析结果:
==================================================
API标题: PP-DocLayoutV3 API
版本: 1.0.0
描述: PP-DocLayoutV3 文档版面分析模型的REST API接口...
服务器地址: http://localhost:8000
📋 共发现 3 个API端点:
路径: /analyze
POST - 分析文档版面 (ID: analyze_document)
参数: file
路径: /health
GET - 健康检查 (ID: health_check)
路径: /version
GET - 获取版本信息 (ID: get_version)
📦 数据模型 (4 个):
AnalysisRequest: 1 个属性
AnalysisResponse: 3 个属性
HealthResponse: 2 个属性
VersionResponse: 2 个属性
```
从分析结果可以看出,PP-DocLayoutV3主要提供了三个接口:
1. `/analyze` - 核心的文档分析接口
2. `/health` - 服务健康检查
3. `/version` - 获取版本信息
## 4. 自动生成Python SDK
### 4.1 使用OpenAPI Generator生成基础SDK
有了OpenAPI规范文件,我们现在可以用工具自动生成Python SDK:
```bash
# 使用openapi-generator生成Python客户端
# 如果你通过brew或apt安装了openapi-generator
openapi-generator generate \
-i pp_doclayout_openapi.json \
-g python \
-o pp_doclayout_client \
--package-name pp_doclayout_client \
--additional-properties=projectName=pp-doclayout-client,packageVersion=1.0.0
# 或者使用jar文件
java -jar openapi-generator-cli.jar generate \
-i pp_doclayout_openapi.json \
-g python \
-o pp_doclayout_client \
--package-name pp_doclayout_client
```
这个命令会生成一个完整的Python包,包含以下结构:
```
pp_doclayout_client/
├── README.md
├── requirements.txt
├── setup.py
├── pp_doclayout_client/
│ ├── __init__.py
│ ├── api_client.py
│ ├── configuration.py
│ ├── api/ # API接口类
│ │ ├── __init__.py
│ │ ├── default_api.py
│ │ └── ...
│ ├── models/ # 数据模型类
│ │ ├── __init__.py
│ │ ├── analysis_request.py
│ │ ├── analysis_response.py
│ │ └── ...
│ └── api_client.py
└── test/ # 测试文件
```
### 4.2 安装和测试生成的SDK
进入生成的目录,安装这个SDK包:
```bash
cd pp_doclayout_client
pip install -e .
```
现在我们可以写一个简单的测试脚本,验证SDK是否能正常工作:
```python
# test_sdk_basic.py
import sys
sys.path.append(".")
from pp_doclayout_client import ApiClient, Configuration
from pp_doclayout_client.api.default_api import DefaultApi
from pp_doclayout_client.models import AnalysisRequest
# 配置客户端
configuration = Configuration(
host="http://你的实例IP:8000" # 替换为你的实际地址
)
# 创建API客户端
with ApiClient(configuration) as api_client:
api_instance = DefaultApi(api_client)
# 测试健康检查
try:
health_response = api_instance.health_check()
print(f"✅ 服务健康状态: {health_response.status}")
print(f"🕒 服务启动时间: {health_response.timestamp}")
except Exception as e:
print(f"❌ 健康检查失败: {e}")
# 测试版本信息
try:
version_response = api_instance.get_version()
print(f"📦 模型版本: {version_response.model_version}")
print(f"🔧 API版本: {version_response.api_version}")
except Exception as e:
print(f"❌ 获取版本失败: {e}")
```
运行这个测试脚本,如果一切正常,你会看到类似这样的输出:
```
✅ 服务健康状态: healthy
🕒 服务启动时间: 2024-01-15T10:30:00Z
📦 模型版本: PP-DocLayoutV3-v1.0
🔧 API版本: 1.0.0
```
## 5. 定制化增强SDK功能
自动生成的SDK虽然能用,但通常比较基础。在实际使用中,我们可能需要添加一些便利功能。下面我来展示如何增强这个SDK。
### 5.1 创建更友好的客户端类
自动生成的SDK使用方式比较繁琐,我们可以封装一个更易用的客户端:
```python
# pp_doclayout_enhanced.py
import os
import time
from typing import Dict, List, Optional, Union, BinaryIO
from pathlib import Path
from pp_doclayout_client import ApiClient, Configuration
from pp_doclayout_client.api.default_api import DefaultApi
from pp_doclayout_client.models import AnalysisResponse
class PP_DocLayoutClient:
"""PP-DocLayoutV3增强版客户端"""
def __init__(self, base_url: str = "http://localhost:8000", timeout: int = 30):
"""
初始化客户端
Args:
base_url: PP-DocLayoutV3服务地址
timeout: 请求超时时间(秒)
"""
self.base_url = base_url.rstrip('/')
self.timeout = timeout
# 配置API客户端
config = Configuration(host=self.base_url)
self.api_client = ApiClient(configuration=config)
self.api_instance = DefaultApi(self.api_client)
# 验证连接
self._verify_connection()
def _verify_connection(self):
"""验证服务连接"""
try:
health = self.api_instance.health_check()
if health.status != "healthy":
raise ConnectionError(f"服务状态异常: {health.status}")
print(f"✅ 成功连接到 PP-DocLayoutV3 ({self.base_url})")
except Exception as e:
raise ConnectionError(f"无法连接到PP-DocLayoutV3服务: {e}")
def analyze_document(
self,
file_path: Union[str, Path, BinaryIO],
return_image: bool = False,
confidence_threshold: float = 0.5
) -> Dict:
"""
分析文档版面
Args:
file_path: 文档图片路径或文件对象
return_image: 是否返回标注图像
confidence_threshold: 置信度阈值(0.0-1.0)
Returns:
分析结果字典,包含区域信息和标注图像(如果启用)
"""
try:
# 准备文件
if isinstance(file_path, (str, Path)):
file_obj = open(file_path, 'rb')
file_name = os.path.basename(str(file_path))
else:
file_obj = file_path
file_name = "document.jpg"
# 调用API
start_time = time.time()
# 注意:这里需要根据实际的API参数调整
# 自动生成的SDK可能需要适配
api_response = self.api_instance.analyze_document(
file=(file_name, file_obj, 'image/jpeg')
)
processing_time = time.time() - start_time
# 转换为字典格式
result = {
"success": True,
"processing_time": round(processing_time, 3),
"regions_count": api_response.regions_count,
"regions": []
}
# 处理区域数据
for region in api_response.regions:
if region.confidence >= confidence_threshold:
result["regions"].append({
"label": region.label,
"confidence": round(region.confidence, 3),
"bbox": region.bbox, # [x1, y1, x2, y2]
"area": self._calculate_area(region.bbox)
})
# 过滤低置信度区域
result["filtered_regions_count"] = len(result["regions"])
# 如果传入了文件路径,记得关闭文件
if isinstance(file_path, (str, Path)):
file_obj.close()
return result
except Exception as e:
return {
"success": False,
"error": str(e),
"processing_time": 0
}
def _calculate_area(self, bbox: List[float]) -> float:
"""计算边界框面积"""
if len(bbox) != 4:
return 0
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
return width * height
def batch_analyze(
self,
file_paths: List[Union[str, Path]],
max_workers: int = 1
) -> List[Dict]:
"""
批量分析多个文档
Args:
file_paths: 文档图片路径列表
max_workers: 最大并发数(注意:服务端是单线程)
Returns:
每个文档的分析结果列表
"""
results = []
if max_workers > 1:
# 使用线程池并发处理
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(self.analyze_document, fp): fp
for fp in file_paths
}
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
result["file"] = str(file_path)
results.append(result)
except Exception as e:
results.append({
"file": str(file_path),
"success": False,
"error": str(e)
})
else:
# 顺序处理
for file_path in file_paths:
result = self.analyze_document(file_path)
result["file"] = str(file_path)
results.append(result)
return results
def get_statistics(self, results: List[Dict]) -> Dict:
"""
统计分析结果
Args:
results: analyze_document返回的结果列表
Returns:
统计信息
"""
if not results:
return {}
stats = {
"total_documents": len(results),
"successful_documents": sum(1 for r in results if r.get("success", False)),
"total_regions": 0,
"regions_by_type": {},
"avg_processing_time": 0,
"confidence_distribution": {
"high": 0, # >= 0.8
"medium": 0, # 0.5-0.8
"low": 0 # < 0.5
}
}
total_time = 0
total_regions = 0
for result in results:
if result.get("success"):
total_time += result.get("processing_time", 0)
regions = result.get("regions", [])
total_regions += len(regions)
for region in regions:
# 按类型统计
label = region.get("label", "unknown")
stats["regions_by_type"][label] = stats["regions_by_type"].get(label, 0) + 1
# 置信度分布
conf = region.get("confidence", 0)
if conf >= 0.8:
stats["confidence_distribution"]["high"] += 1
elif conf >= 0.5:
stats["confidence_distribution"]["medium"] += 1
else:
stats["confidence_distribution"]["low"] += 1
if stats["successful_documents"] > 0:
stats["avg_processing_time"] = round(total_time / stats["successful_documents"], 3)
stats["avg_regions_per_doc"] = round(total_regions / stats["successful_documents"], 1)
stats["total_regions"] = total_regions
return stats
def export_to_csv(self, results: List[Dict], output_path: str):
"""
将结果导出为CSV文件
Args:
results: 分析结果列表
output_path: 输出CSV文件路径
"""
import csv
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['file', 'label', 'confidence', 'x1', 'y1', 'x2', 'y2', 'area']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for result in results:
if result.get("success"):
file_name = result.get("file", "unknown")
for region in result.get("regions", []):
bbox = region.get("bbox", [0, 0, 0, 0])
writer.writerow({
'file': file_name,
'label': region.get("label", ""),
'confidence': region.get("confidence", 0),
'x1': bbox[0],
'y1': bbox[1],
'x2': bbox[2],
'y2': bbox[3],
'area': region.get("area", 0)
})
print(f"✅ 结果已导出到: {output_path}")
def close(self):
"""关闭客户端连接"""
if self.api_client:
self.api_client.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
```
### 5.2 使用增强版SDK的示例
现在让我们看看如何使用这个增强版的客户端:
```python
# example_usage.py
from pathlib import Path
from pp_doclayout_enhanced import PP_DocLayoutClient
def main():
# 初始化客户端
client = PP_DocLayoutClient(base_url="http://你的实例IP:8000")
# 示例1: 分析单个文档
print("📄 示例1: 分析单个文档")
result = client.analyze_document("sample_document.jpg")
if result["success"]:
print(f"✅ 分析成功!")
print(f" 处理时间: {result['processing_time']}秒")
print(f" 检测到区域: {result['regions_count']}个")
print(f" 过滤后区域: {result['filtered_regions_count']}个")
# 显示前5个区域
print("\n 前5个区域详情:")
for i, region in enumerate(result["regions"][:5]):
print(f" {i+1}. {region['label']} (置信度: {region['confidence']})")
print(f" 坐标: {region['bbox']}")
print(f" 面积: {region['area']}像素")
else:
print(f"❌ 分析失败: {result.get('error')}")
# 示例2: 批量处理
print("\n📂 示例2: 批量处理文档")
document_folder = Path("./documents")
if document_folder.exists():
# 获取所有图片文件
image_files = list(document_folder.glob("*.jpg")) + \
list(document_folder.glob("*.png")) + \
list(document_folder.glob("*.jpeg"))
if image_files:
print(f"找到 {len(image_files)} 个文档文件")
# 批量分析(单线程,因为服务端是单线程)
batch_results = client.batch_analyze(image_files[:3], max_workers=1)
# 统计信息
stats = client.get_statistics(batch_results)
print(f"\n📊 批量处理统计:")
print(f" 总文档数: {stats['total_documents']}")
print(f" 成功数: {stats['successful_documents']}")
print(f" 总区域数: {stats['total_regions']}")
print(f" 平均处理时间: {stats['avg_processing_time']}秒/文档")
print(f" 平均区域数: {stats.get('avg_regions_per_doc', 0)}区域/文档")
# 区域类型分布
print(f"\n 区域类型分布:")
for label, count in stats.get('regions_by_type', {}).items():
print(f" {label}: {count}个")
# 导出结果
client.export_to_csv(batch_results, "analysis_results.csv")
# 示例3: 过滤特定类型的区域
print("\n🎯 示例3: 提取表格区域")
if result["success"]:
table_regions = [r for r in result["regions"] if r["label"] == "table"]
if table_regions:
print(f"找到 {len(table_regions)} 个表格区域:")
for i, table in enumerate(table_regions):
print(f" 表格{i+1}: 置信度={table['confidence']}, 坐标={table['bbox']}")
# 这里可以添加表格识别的后续处理
# 比如:裁剪图片区域,发送到表格识别模型
else:
print("未检测到表格区域")
# 关闭客户端
client.close()
if __name__ == "__main__":
main()
```
## 6. 高级功能与集成示例
### 6.1 与OCR工具集成
PP-DocLayoutV3的主要价值在于为OCR提供区域划分。下面是一个与PaddleOCR集成的示例:
```python
# ocr_integration.py
from pp_doclayout_enhanced import PP_DocLayoutClient
from PIL import Image
import numpy as np
class DocumentProcessor:
"""文档处理流水线:版面分析 + OCR"""
def __init__(self, doclayout_url="http://localhost:8000"):
"""
初始化文档处理器
Args:
doclayout_url: PP-DocLayoutV3服务地址
"""
self.doclayout_client = PP_DocLayoutClient(base_url=doclayout_url)
# 初始化OCR(这里以PaddleOCR为例)
try:
from paddleocr import PaddleOCR
self.ocr_engine = PaddleOCR(
use_angle_cls=True, # 使用方向分类器
lang='ch', # 中文识别
show_log=False # 关闭日志
)
print("✅ PaddleOCR初始化成功")
except ImportError:
print("⚠️ 未安装PaddleOCR,仅支持版面分析")
self.ocr_engine = None
def process_document(self, image_path, ocr_threshold=0.7):
"""
完整处理文档:版面分析 + OCR识别
Args:
image_path: 文档图片路径
ocr_threshold: OCR置信度阈值
Returns:
结构化文档数据
"""
# 步骤1: 版面分析
print(f"📄 处理文档: {image_path}")
layout_result = self.doclayout_client.analyze_document(image_path)
if not layout_result["success"]:
return {"error": "版面分析失败", "details": layout_result.get("error")}
# 步骤2: 按区域进行OCR
document_data = {
"metadata": {
"file": image_path,
"processing_time": layout_result["processing_time"],
"total_regions": layout_result["regions_count"]
},
"regions": []
}
# 打开图片
image = Image.open(image_path)
img_width, img_height = image.size
for i, region in enumerate(layout_result["regions"]):
region_data = {
"id": i + 1,
"type": region["label"],
"confidence": region["confidence"],
"bbox": region["bbox"],
"text": "",
"ocr_confidence": 0
}
# 根据区域类型决定是否进行OCR
if region["label"] in ["text", "title", "paragraph_title"]:
# 裁剪区域
x1, y1, x2, y2 = map(int, region["bbox"])
# 确保坐标在图片范围内
x1 = max(0, min(x1, img_width))
y1 = max(0, min(y1, img_height))
x2 = max(0, min(x2, img_width))
y2 = max(0, min(y2, img_height))
if x2 > x1 and y2 > y1: # 确保区域有效
region_img = image.crop((x1, y1, x2, y2))
# 进行OCR
if self.ocr_engine:
ocr_result = self.ocr_engine.ocr(np.array(region_img), cls=True)
if ocr_result and ocr_result[0]:
# 提取文本和置信度
texts = []
confidences = []
for line in ocr_result[0]:
text = line[1][0]
confidence = line[1][1]
texts.append(text)
confidences.append(confidence)
region_data["text"] = "\n".join(texts)
region_data["ocr_confidence"] = sum(confidences) / len(confidences) if confidences else 0
document_data["regions"].append(region_data)
return document_data
def export_to_markdown(self, document_data, output_path):
"""
将处理结果导出为Markdown格式
Args:
document_data: process_document返回的数据
output_path: 输出文件路径
"""
with open(output_path, 'w', encoding='utf-8') as f:
# 文档标题
f.write(f"# 文档分析结果\n\n")
f.write(f"**文件**: {document_data['metadata']['file']}\n")
f.write(f"**处理时间**: {document_data['metadata']['processing_time']}秒\n")
f.write(f"**区域总数**: {document_data['metadata']['total_regions']}\n\n")
# 按类型分组
regions_by_type = {}
for region in document_data["regions"]:
region_type = region["type"]
if region_type not in regions_by_type:
regions_by_type[region_type] = []
regions_by_type[region_type].append(region)
# 输出每个区域
for region_type, regions in regions_by_type.items():
f.write(f"## {region_type.upper()} 区域 ({len(regions)}个)\n\n")
for region in regions:
f.write(f"### 区域 {region['id']}\n")
f.write(f"- **置信度**: {region['confidence']:.3f}\n")
f.write(f"- **坐标**: {region['bbox']}\n")
if region["text"]:
f.write(f"- **OCR置信度**: {region['ocr_confidence']:.3f}\n")
f.write(f"- **识别文本**:\n```\n{region['text']}\n```\n")
else:
f.write(f"- **类型**: 非文本区域\n")
f.write("\n")
def close(self):
"""关闭所有连接"""
self.doclayout_client.close()
# 使用示例
if __name__ == "__main__":
# 初始化处理器
processor = DocumentProcessor("http://你的实例IP:8000")
try:
# 处理文档
result = processor.process_document("sample_contract.jpg")
if "error" not in result:
# 导出为Markdown
processor.export_to_markdown(result, "document_analysis.md")
print("✅ 文档处理完成,结果已保存到 document_analysis.md")
# 打印摘要
print(f"\n📊 处理摘要:")
print(f" 文件: {result['metadata']['file']}")
print(f" 总区域数: {result['metadata']['total_regions']}")
# 统计文本区域
text_regions = [r for r in result['regions'] if r['text']]
print(f" 文本区域: {len(text_regions)}个")
# 显示识别到的文本
if text_regions:
print(f"\n📝 识别到的文本示例:")
for region in text_regions[:3]: # 显示前3个
text_preview = region['text'][:100] + "..." if len(region['text']) > 100 else region['text']
print(f" [{region['type']}] {text_preview}")
else:
print(f"❌ 处理失败: {result['error']}")
finally:
processor.close()
```
### 6.2 异步处理支持
对于需要处理大量文档的场景,我们可以添加异步支持:
```python
# async_client.py
import asyncio
import aiohttp
from typing import List, Dict, Any
import base64
from pathlib import Path
class AsyncDocLayoutClient:
"""异步PP-DocLayoutV3客户端"""
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url.rstrip('/')
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def analyze_document_async(self, file_path: str) -> Dict[str, Any]:
"""异步分析文档"""
if not self.session:
self.session = aiohttp.ClientSession()
url = f"{self.base_url}/analyze"
# 读取文件
with open(file_path, 'rb') as f:
file_data = f.read()
# 准备表单数据
data = aiohttp.FormData()
data.add_field('file',
file_data,
filename=Path(file_path).name,
content_type='image/jpeg')
try:
async with self.session.post(url, data=data) as response:
if response.status == 200:
result = await response.json()
return {
"success": True,
"file": file_path,
"data": result
}
else:
return {
"success": False,
"file": file_path,
"error": f"HTTP {response.status}: {await response.text()}"
}
except Exception as e:
return {
"success": False,
"file": file_path,
"error": str(e)
}
async def batch_analyze_async(self, file_paths: List[str], max_concurrent: int = 3) -> List[Dict]:
"""异步批量分析"""
semaphore = asyncio.Semaphore(max_concurrent)
async def analyze_with_semaphore(file_path):
async with semaphore:
return await self.analyze_document_async(file_path)
tasks = [analyze_with_semaphore(fp) for fp in file_paths]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"success": False,
"file": file_paths[i],
"error": str(result)
})
else:
processed_results.append(result)
return processed_results
# 使用示例
async def main_async():
# 文档列表
documents = [
"documents/contract1.jpg",
"documents/contract2.jpg",
"documents/report1.png",
"documents/report2.png",
"documents/invoice1.jpg"
]
# 过滤存在的文件
existing_docs = [d for d in documents if Path(d).exists()]
if not existing_docs:
print("未找到文档文件")
return
print(f"找到 {len(existing_docs)} 个文档,开始异步分析...")
async with AsyncDocLayoutClient("http://你的实例IP:8000") as client:
start_time = asyncio.get_event_loop().time()
# 批量分析,最大并发3个
results = await client.batch_analyze_async(existing_docs, max_concurrent=3)
end_time = asyncio.get_event_loop().time()
total_time = end_time - start_time
# 统计结果
successful = sum(1 for r in results if r["success"])
total_regions = sum(len(r.get("data", {}).get("regions", [])) for r in results if r["success"])
print(f"\n✅ 批量分析完成!")
print(f" 总文档数: {len(results)}")
print(f" 成功数: {successful}")
print(f" 总区域数: {total_regions}")
print(f" 总耗时: {total_time:.2f}秒")
print(f" 平均每个文档: {total_time/len(results):.2f}秒")
# 显示每个文档的结果
for result in results:
status = "✅" if result["success"] else "❌"
regions = len(result.get("data", {}).get("regions", [])) if result["success"] else 0
print(f" {status} {Path(result['file']).name}: {regions}个区域")
# 运行异步示例
if __name__ == "__main__":
asyncio.run(main_async())
```
## 7. 总结
通过本文的方法,你可以快速为PP-DocLayoutV3或其他提供Swagger/OpenAPI规范的AI服务生成Python SDK。整个过程可以分为几个关键步骤:
### 7.1 方法回顾
1. **获取API规范**:首先从服务的`/docs`或`/openapi.json`端点获取完整的OpenAPI规范文件。这是生成SDK的基础。
2. **自动生成基础SDK**:使用OpenAPI Generator工具,一行命令就能生成包含所有API接口和数据模型的完整Python包。虽然生成的代码可能比较“机械”,但功能是完整的。
3. **定制化增强**:在自动生成的基础上,根据实际需求添加便利功能。比如:
- 更友好的客户端类,简化调用方式
- 批量处理支持,提高处理效率
- 结果统计和导出功能,方便数据分析
- 与其他工具(如OCR)的集成接口
4. **实际应用**:将生成的SDK集成到你的文档处理流程中,实现自动化、批量化的文档版面分析。
### 7.2 核心价值
这种方法的最大价值在于**标准化和自动化**:
- **标准化接口**:无论后端服务如何变化,只要遵循OpenAPI规范,前端调用方式就是一致的
- **减少重复工作**:不用为每个API手动编写HTTP请求代码
- **类型安全**:自动生成的SDK包含完整的数据模型,IDE可以提供代码补全和类型检查
- **易于维护**:当API更新时,重新生成SDK即可,不需要手动修改大量代码
### 7.3 实践建议
在实际项目中,我有几点建议:
1. **版本管理**:将生成的SDK作为独立的Python包管理,使用`setup.py`或`pyproject.toml`定义依赖和版本
2. **错误处理**:增强错误处理逻辑,特别是网络异常、服务不可用、参数错误等情况
3. **性能优化**:对于批量处理场景,合理控制并发数,避免对服务端造成过大压力
4. **结果缓存**:对于相同的文档,可以考虑缓存分析结果,避免重复分析
5. **监控日志**:添加详细的日志记录,便于调试和监控处理状态
### 7.4 扩展思考
这个方法不仅适用于PP-DocLayoutV3,实际上适用于任何提供标准OpenAPI/Swagger文档的AI服务。你可以用同样的方法为:
- 图像识别服务生成SDK
- 自然语言处理服务生成SDK
- 语音处理服务生成SDK
- 任何RESTful API服务生成SDK
随着AI服务的标准化程度越来越高,这种“文档即代码”的自动化方法会变得越来越重要。它不仅能提高开发效率,还能保证不同服务之间接口的一致性。
最后,生成的SDK只是工具,真正的价值在于如何将它应用到实际的业务场景中。无论是档案数字化、合同处理、论文分析,还是其他文档处理需求,一个好的SDK都能让你的工作事半功倍。
---
> **获取更多AI镜像**
>
> 想探索更多AI镜像和应用场景?访问 [CSDN星图镜像广场](https://ai.csdn.net/?utm_source=mirror_blog_end),提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。