该提问与当前博客内容无直接关联。
需要继续实现异步HTTP用户管理服务的客户端测试示例,以下是完整的Python测试客户端代码实现:
```python
# test_client.py
import requests
import json
import time
import threading
import statistics
def test_user_api():
base_url = "http://localhost:8080"
# 测试健康检查
print("1. 测试健康检查...")
response = requests.get(f"{base_url}/health")
print(f" 状态码: {response.status_code}")
print(f" 响应: {response.json()}")
# 测试创建用户
print("\n2. 测试创建用户...")
user_data = {
"username": "testuser",
"email": "test@example.com"
}
response = requests.post(f"{base_url}/api/users", json=user_data)
print(f" 状态码: {response.status_code}")
print(f" 响应: {response.json()}")
if response.status_code == 201:
user_id = response.json()["id"]
# 测试获取用户
print(f"\n3. 测试获取用户(ID: {user_id})...")
response = requests.get(f"{base_url}/api/users/{user_id}")
print(f" 状态码: {response.status_code}")
print(f" 响应: {response.json()}")
# 测试更新用户
print(f"\n4. 测试更新用户(ID: {user_id})...")
update_data = {
"username": "updateduser",
"email": "updated@example.com"
}
response = requests.put(f"{base_url}/api/users/{user_id}", json=update_data)
print(f" 状态码: {response.status_code}")
print(f" 响应: {response.json()}")
# 验证更新
print(f"\n5. 验证用户更新...")
response = requests.get(f"{base_url}/api/users/{user_id}")
user_info = response.json()
print(f" 更新后用户名: {user_info['data']['username']}")
print(f" 更新后邮箱: {user_info['data']['email']}")
# 测试获取用户列表
print(f"\n6. 测试获取用户列表...")
response = requests.get(f"{base_url}/api/users")
print(f" 状态码: {response.status_code}")
list_info = response.json()
print(f" 总用户数: {list_info['data']['total']}")
print(f" 当前页用户数: {len(list_info['data']['users'])}")
# 测试分页
print(f"\n7. 测试分页功能...")
response = requests.get(f"{base_url}/api/users?page=1&pageSize=5")
print(f" 状态码: {response.status_code}")
page_info = response.json()
print(f" 第1页大小: {page_info['data']['pageSize']}")
# 测试删除用户
print(f"\n8. 测试删除用户(ID: {user_id})...")
response = requests.delete(f"{base_url}/api/users/{user_id}")
print(f" 状态码: {response.status_code}")
print(f" 响应: {response.json()}")
# 验证删除
print(f"\n9. 验证用户删除...")
response = requests.get(f"{base_url}/api/users/{user_id}")
print(f" 状态码: {response.status_code}")
print(f" 响应: {response.json()}")
# 测试监控指标
print(f"\n10. 测试监控指标...")
response = requests.get(f"{base_url}/metrics")
print(f" 状态码: {response.status_code}")
print(f" 响应类型: {response.headers.get('Content-Type')}")
print(f" 响应大小: {len(response.text)} 字节")
# 测试错误处理
print(f"\n11. 测试错误处理...")
# 测试不存在的用户
print(" a) 获取不存在的用户...")
response = requests.get(f"{base_url}/api/users/99999")
print(f" 状态码: {response.status_code}")
# 测试无效JSON
print(" b) 测试无效JSON请求...")
response = requests.post(f"{base_url}/api/users",
data="invalid json",
headers={"Content-Type": "application/json"})
print(f" 状态码: {response.status_code}")
# 测试缺少必要字段
print(" c) 测试缺少必要字段...")
response = requests.post(f"{base_url}/api/users",
json={"username": "test"}) # 缺少email
print(f" 状态码: {response.status_code}")
else:
print("创建用户失败,跳过后续测试")
def performance_test():
"""性能测试"""
print("\n=== 性能测试 ===")
base_url = "http://localhost:8080"
# 创建测试数据
test_users = []
for i in range(10):
test_users.append({
"username": f"perfuser{i}",
"email": f"perfuser{i}@example.com"
})
# 并发创建用户
print("1. 并发创建用户测试...")
def create_user_task(user_data):
start_time = time.time()
response = requests.post(f"{base_url}/api/users", json=user_data)
end_time = time.time()
return end_time - start_time
latencies = []
for user_data in test_users:
latency = create_user_task(user_data)
latencies.append(latency)
print(f" 平均延迟: {statistics.mean(latencies):.4f}秒")
print(f" 最大延迟: {max(latencies):.4f}秒")
print(f" 最小延迟: {min(latencies):.4f}秒")
# 并发查询测试
print("\n2. 并发查询测试...")
def query_user_task(user_id):
start_time = time.time()
response = requests.get(f"{base_url}/api/users/{user_id}")
end_time = time.time()
return end_time - start_time
# 先获取已创建的用户ID
response = requests.get(f"{base_url}/api/users?pageSize=10")
user_ids = [user['id'] for user in response.json()['data']['users']]
query_latencies = []
for user_id in user_ids[:5]: # 测试前5个用户
latency = query_user_task(user_id)
query_latencies.append(latency)
print(f" 平均查询延迟: {statistics.mean(query_latencies):.4f}秒")
# 清理测试数据
print("\n3. 清理测试数据...")
for user_id in user_ids:
requests.delete(f"{base_url}/api/users/{user_id}")
def stress_test():
"""压力测试"""
print("\n=== 压力测试 ===")
base_url = "http://localhost:8080"
# 创建大量用户
print("创建100个测试用户...")
created_ids = []
for i in range(100):
user_data = {
"username": f"stressuser{i}",
"email": f"stressuser{i}@example.com"
}
response = requests.post(f"{base_url}/api/users", json=user_data)
if response.status_code == 201:
created_ids.append(response.json()["id"])
print(f"成功创建 {len(created_ids)} 个用户")
# 并发读取测试
print("\n并发读取测试...")
def stress_read_task(user_id):
try:
response = requests.get(f"{base_url}/api/users/{user_id}", timeout=5)
return response.status_code == 200
except:
return False
# 使用多线程进行并发测试
successful_reads = 0
total_reads = 50
def worker():
nonlocal successful_reads
import random
user_id = random.choice(created_ids)
if stress_read_task(user_id):
successful_reads += 1
threads = []
for _ in range(total_reads):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"并发读取成功率: {successful_reads}/{total_reads} ({successful_reads/total_reads*100:.1f}%)")
# 清理数据
print("\n清理压力测试数据...")
for user_id in created_ids:
try:
requests.delete(f"{base_url}/api/users/{user_id}", timeout=2)
except:
pass
def api_documentation():
"""生成API文档"""
print("\n=== API文档 ===")
docs = {
"健康检查": {
"端点": "GET /health",
"描述": "检查服务是否健康",
"响应": {
"status": "healthy",
"timestamp": "Unix时间戳"
}
},
"监控指标": {
"端点": "GET /metrics",
"描述": "获取Prometheus格式的监控指标",
"响应类型": "text/plain"
},
"创建用户": {
"端点": "POST /api/users",
"描述": "创建新用户",
"请求体": {
"username": "字符串,用户名",
"email": "字符串,邮箱地址"
},
"响应": {
"status": "success",
"id": "创建的用户ID"
}
},
"获取用户": {
"端点": "GET /api/users/{id}",
"描述": "根据ID获取用户信息",
"路径参数": {
"id": "用户ID"
},
"响应": {
"status": "success",
"data": {
"id": "用户ID",
"username": "用户名",
"email": "邮箱"
}
}
},
"更新用户": {
"端点": "PUT /api/users/{id}",
"描述": "更新用户信息",
"路径参数": {
"id": "用户ID"
},
"请求体": {
"username": "字符串,新用户名",
"email": "字符串,新邮箱地址"
},
"响应": {
"status": "success/error",
"message": "操作结果消息"
}
},
"删除用户": {
"端点": "DELETE /api/users/{id}",
"描述": "删除用户",
"路径参数": {
"id": "用户ID"
},
"响应": {
"status": "success/error",
"message": "操作结果消息"
}
},
"获取用户列表": {
"端点": "GET /api/users",
"描述": "获取用户列表(分页)",
"查询参数": {
"page": "页码,默认1",
"pageSize": "每页大小,默认20"
},
"响应": {
"status": "success",
"data": {
"users": "用户数组",
"total": "总用户数",
"page": "当前页码",
"pageSize": "每页大小"
}
}
}
}
for api_name, api_info in docs.items():
print(f"\n{api_name}:")
print(f" 端点: {api_info['端点']}")
print(f" 描述: {api_info['描述']}")
if '请求体' in api_info:
print(f" 请求体: {json.dumps(api_info['请求体'], indent=6, ensure_ascii=False)}")
if '路径参数' in api_info:
print(f" 路径参数: {json.dumps(api_info['路径参数'], indent=6, ensure_ascii=False)}")
if '查询参数' in api_info:
print(f" 查询参数: {json.dumps(api_info['查询参数'], indent=6, ensure_ascii=False)}")
if '响应' in api_info:
print(f" 响应: {json.dumps(api_info['响应'], indent=6, ensure_ascii=False)}")
def main():
"""主测试函数"""
print("=== 异步HTTP用户管理服务测试客户端 ===")
print(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
try:
# 基本功能测试
test_user_api()
# 性能测试(可选)
performance_input = input("\n是否运行性能测试?(y/n): ")
if performance_input.lower() == 'y':
performance_test()
# 压力测试(可选)
stress_input = input("\n是否运行压力测试?(y/n): ")
if stress_input.lower() == 'y':
stress_test()
# 显示API文档
docs_input = input("\n是否显示API文档?(y/n): ")
if docs_input.lower() == 'y':
api_documentation()
print(f"\n测试完成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
except requests.exceptions.ConnectionError:
print("错误: 无法连接到服务器,请确保服务器正在运行")
except Exception as e:
print(f"测试过程中发生错误: {e}")
if __name__ == "__main__":
main()
```
## 完整的Docker部署配置
```dockerfile
# Dockerfile
FROM ubuntu:22.04
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
cmake \
libssl-dev \
libboost-all-dev \
python3 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# 创建工作目录
WORKDIR /app
# 复制源代码
COPY . .
# 创建构建目录并编译
RUN mkdir build && cd build && \
cmake .. -DCMAKE_BUILD_TYPE=Release && \
make -j$(nproc)
# 暴露端口
EXPOSE 8080
# 运行服务
CMD ["./build/async_http_server"]
```
```yaml
# docker-compose.yml
version: '3.8'
services:
async-http-server:
build: .
ports:
- "8080:8080"
environment:
- LOG_LEVEL=INFO
- MAX_CONNECTIONS=10000
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--storage.tsdb.retention.time=200h'
- '--web.enable-lifecycle'
restart: unless-stopped
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
restart: unless-stopped
volumes:
prometheus_data:
grafana_data:
```
```yaml
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'async-http-server'
static_configs:
- targets: ['async-http-server:8080']
metrics_path: '/metrics'
scrape_interval: 10s
```
## 监控和告警配置
```python
# monitoring.py
import requests
import time
import json
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class ServiceMonitor:
def __init__(self, base_url="http://localhost:8080", check_interval=60):
self.base_url = base_url
self.check_interval = check_interval
self.alert_history = []
def check_health(self):
"""检查服务健康状态"""
try:
start_time = time.time()
response = requests.get(f"{self.base_url}/health", timeout=5)
latency = (time.time() - start_time) * 1000 # 毫秒
if response.status_code == 200:
data = response.json()
return {
"status": "healthy",
"latency_ms": latency,
"timestamp": data.get("timestamp"),
"response_time": datetime.now().isoformat()
}
else:
return {
"status": "unhealthy",
"error": f"HTTP {response.status_code}",
"response_time": datetime.now().isoformat()
}
except requests.exceptions.Timeout:
return {
"status": "timeout",
"error": "请求超时",
"response_time": datetime.now().isoformat()
}
except requests.exceptions.ConnectionError:
return {
"status": "connection_error",
"error": "连接失败",
"response_time": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"response_time": datetime.now().isoformat()
}
def check_metrics(self):
"""检查监控指标"""
try:
response = requests.get(f"{self.base_url}/metrics", timeout=5)
if response.status_code == 200:
metrics = {}
for line