# Python MCP Server开发避坑指南:从STDIO到SSE的5个常见错误及解决方案
在构建面向AI应用的Python MCP Server时,开发者常常会遇到各种看似简单却影响深远的陷阱。这些陷阱不仅影响服务的稳定性,更可能让整个AI集成体验大打折扣。今天,我将基于实际项目经验,深入剖析从STDIO到SSE部署过程中最常遇到的5个典型问题,并提供经过实战检验的解决方案。
## 1. 类型注解缺失导致的调用失败与运行时异常
**问题现象**:你的MCP工具在本地测试时一切正常,但当AI客户端调用时却频繁出现参数类型错误或返回数据格式异常。更令人困惑的是,这些错误往往在运行时才暴露,缺乏明确的错误提示。
**根本原因分析**:MCP协议严重依赖类型注解来确保AI模型正确理解工具接口。Python作为动态类型语言,虽然运行时不需要类型注解,但MCP SDK在生成工具描述时会解析这些注解。缺少或错误的类型注解会导致生成的JSON Schema不完整,进而影响AI模型对工具的理解和调用。
**错误示例对比**:
```python
# 问题代码:缺乏类型注解
@mcp.tool()
def calculate_discount(price, discount_rate):
"""计算商品折扣价格"""
return price * (1 - discount_rate)
# 正确实现:完整类型注解
@mcp.tool()
def calculate_discount(price: float, discount_rate: float) -> dict:
"""
计算商品折扣价格
Args:
price: 商品原价,必须为正数
discount_rate: 折扣率,范围0-1之间的小数
Returns:
包含折扣后价格和节省金额的字典
"""
if price <= 0:
raise ValueError("价格必须为正数")
if not 0 <= discount_rate <= 1:
raise ValueError("折扣率必须在0-1之间")
discounted_price = round(price * (1 - discount_rate), 2)
savings = round(price - discounted_price, 2)
return {
"original_price": price,
"discount_rate": discount_rate,
"discounted_price": discounted_price,
"savings": savings,
"currency": "CNY"
}
```
**解决方案深度解析**:
1. **强制类型注解规范**:为每个工具函数的所有参数和返回值添加明确的类型注解。这不仅帮助MCP生成准确的工具描述,还能在开发阶段通过mypy等工具提前发现问题。
2. **使用Pydantic进行数据验证**:对于复杂的数据结构,建议使用Pydantic模型来确保数据完整性:
```python
from pydantic import BaseModel, Field, validator
from typing import List, Optional
class ProductInfo(BaseModel):
"""产品信息模型"""
id: str = Field(..., description="产品唯一标识符")
name: str = Field(..., min_length=1, max_length=100)
price: float = Field(..., gt=0, description="产品价格,必须大于0")
category: str = Field(..., description="产品分类")
tags: List[str] = Field(default_factory=list)
@validator('price')
def validate_price(cls, v):
if v <= 0:
raise ValueError('价格必须为正数')
return round(v, 2)
@mcp.tool()
def process_product(product_data: dict) -> ProductInfo:
"""处理产品信息并返回结构化数据"""
try:
# 使用Pydantic进行数据验证和转换
product = ProductInfo(**product_data)
return product.dict()
except ValidationError as e:
# 提供详细的错误信息
error_details = []
for error in e.errors():
error_details.append(f"{error['loc'][0]}: {error['msg']}")
raise ValueError(f"产品数据验证失败: {'; '.join(error_details)}")
```
3. **配置类型检查工具**:在开发环境中集成类型检查,确保代码质量:
```python
# pyproject.toml 配置示例
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
[tool.ruff]
target-version = "py310"
select = ["E", "F", "I", "N", "UP", "YTT", "ANN"]
ignore = ["ANN101", "ANN102"]
```
> **关键提示**:类型注解不仅是给AI看的,更是给开发者自己看的。良好的类型注解可以显著减少调试时间,特别是在处理复杂业务逻辑时。
## 2. STDIO传输模式下的进程管理与资源泄漏
**问题现象**:在长时间运行的STDIO模式下,MCP Server进程可能出现内存持续增长、文件描述符耗尽,或者在AI客户端断开连接后进程无法正常退出。
**技术根源**:STDIO传输依赖于父子进程间的标准输入输出管道。当父进程(AI客户端)异常退出时,子进程(MCP Server)可能成为僵尸进程或孤儿进程,导致资源无法释放。
**进程管理最佳实践**:
```python
import asyncio
import signal
import sys
import logging
from contextlib import AsyncExitStack
from typing import Optional
from mcp.server.fastmcp import FastMCP
# 配置详细的日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(process)d - %(message)s',
handlers=[
logging.FileHandler('mcp_server.log'),
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger(__name__)
class ManagedMCPServer:
"""带有完善生命周期管理的MCP服务器"""
def __init__(self, name: str = "ManagedServer"):
self.mcp = FastMCP(name)
self.exit_stack = AsyncExitStack()
self.cleanup_tasks = []
self.shutdown_event = asyncio.Event()
# 注册信号处理器
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""处理终止信号"""
logger.info(f"收到终止信号 {signum},开始优雅关闭")
self.shutdown_event.set()
async def cleanup_resources(self):
"""清理所有资源"""
logger.info("开始清理资源...")
# 执行所有清理任务
cleanup_results = []
for task in self.cleanup_tasks:
try:
result = await task()
cleanup_results.append(result)
logger.debug(f"清理任务完成: {task.__name__}")
except Exception as e:
logger.error(f"清理任务失败: {e}")
# 关闭退出栈
await self.exit_stack.aclose()
logger.info("资源清理完成")
return cleanup_results
@mcp.tool()
async def monitor_resources(self) -> dict:
"""监控服务器资源使用情况"""
import psutil
import os
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return {
"pid": process.pid,
"memory_rss_mb": round(memory_info.rss / 1024 / 1024, 2),
"memory_vms_mb": round(memory_info.vms / 1024 / 1024, 2),
"cpu_percent": process.cpu_percent(interval=0.1),
"num_threads": process.num_threads(),
"num_fds": process.num_fds() if hasattr(process, 'num_fds') else None,
"create_time": process.create_time(),
"status": process.status()
}
async def run_with_management(self, transport: str = 'stdio'):
"""运行服务器并管理生命周期"""
try:
# 注册清理任务
self.cleanup_tasks.append(self._close_database_connections)
self.cleanup_tasks.append(self._flush_buffers)
# 启动健康检查协程
health_check_task = asyncio.create_task(self._health_check_loop())
# 运行MCP服务器
logger.info(f"启动MCP服务器,传输模式: {transport}")
await self.mcp.run(transport=transport)
except asyncio.CancelledError:
logger.info("服务器任务被取消")
except Exception as e:
logger.error(f"服务器运行异常: {e}", exc_info=True)
finally:
# 取消健康检查任务
health_check_task.cancel()
try:
await health_check_task
except asyncio.CancelledError:
pass
# 执行清理
await self.cleanup_resources()
logger.info("服务器完全关闭")
async def _health_check_loop(self):
"""健康检查循环"""
while not self.shutdown_event.is_set():
try:
# 定期检查资源使用情况
resources = await self.monitor_resources()
if resources["memory_rss_mb"] > 500: # 内存超过500MB警告
logger.warning(f"内存使用过高: {resources['memory_rss_mb']}MB")
await asyncio.sleep(60) # 每分钟检查一次
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"健康检查失败: {e}")
await asyncio.sleep(10)
async def _close_database_connections(self):
"""关闭数据库连接示例"""
# 实际项目中替换为真实的连接关闭逻辑
logger.info("关闭数据库连接")
return "数据库连接已关闭"
async def _flush_buffers(self):
"""刷新缓冲区示例"""
logger.info("刷新文件缓冲区")
return "缓冲区已刷新"
# 使用示例
if __name__ == "__main__":
server = ManagedMCPServer("ResourceManagedServer")
# 添加业务工具
@server.mcp.tool()
async def process_data(data: str) -> dict:
"""处理数据并返回结果"""
# 模拟数据处理
await asyncio.sleep(0.1)
return {"processed": data.upper(), "timestamp": time.time()}
# 运行服务器
asyncio.run(server.run_with_management(transport='stdio'))
```
**关键改进点**:
1. **信号处理**:捕获SIGINT和SIGTERM信号,实现优雅关闭
2. **资源监控**:定期检查内存、CPU等资源使用情况
3. **清理机制**:使用AsyncExitStack确保所有资源正确释放
4. **错误恢复**:对异常进行适当处理,避免进程崩溃
**进程监控脚本**(辅助工具):
```bash
#!/bin/bash
# monitor_mcp.sh - 监控MCP Server进程状态
SERVER_PID=$1
LOG_FILE="mcp_monitor_$(date +%Y%m%d_%H%M%S).log"
if [ -z "$SERVER_PID" ]; then
echo "Usage: $0 <MCP_SERVER_PID>"
exit 1
fi
echo "开始监控MCP Server进程 $SERVER_PID" | tee -a "$LOG_FILE"
while kill -0 "$SERVER_PID" 2>/dev/null; do
TIMESTAMP=$(date '+%Y-%m-%d %H:%M:%S')
# 获取进程状态
if ps -p "$SERVER_PID" -o pid,pcpu,pmem,rss,vsz,etime,cmd > /dev/null 2>&1; then
PROCESS_INFO=$(ps -p "$SERVER_PID" -o pcpu,pmem,rss,vsz,etime --no-headers)
CPU_USAGE=$(echo "$PROCESS_INFO" | awk '{print $1}')
MEM_PERCENT=$(echo "$PROCESS_INFO" | awk '{print $2}')
RSS_KB=$(echo "$PROCESS_INFO" | awk '{print $3}')
VSZ_KB=$(echo "$PROCESS_INFO" | awk '{print $4}')
ELAPSED_TIME=$(echo "$PROCESS_INFO" | awk '{print $5}')
# 转换为更易读的格式
RSS_MB=$((RSS_KB / 1024))
VSZ_MB=$((VSZ_KB / 1024))
echo "[$TIMESTAMP] PID:$SERVER_PID CPU:${CPU_USAGE}% MEM:${MEM_PERCENT}% RSS:${RSS_MB}MB VSZ:${VSZ_MB}MB TIME:$ELAPSED_TIME" | tee -a "$LOG_FILE"
else
echo "[$TIMESTAMP] 进程 $SERVER_PID 已退出" | tee -a "$LOG_FILE"
break
fi
# 检查文件描述符
if [ -d "/proc/$SERVER_PID/fd" ]; then
FD_COUNT=$(ls -1 "/proc/$SERVER_PID/fd" 2>/dev/null | wc -l)
echo "[$TIMESTAMP] 文件描述符数量: $FD_COUNT" | tee -a "$LOG_FILE"
# 如果文件描述符过多,发出警告
if [ "$FD_COUNT" -gt 1000 ]; then
echo "[$TIMESTAMP] 警告: 文件描述符数量过高 ($FD_COUNT)" | tee -a "$LOG_FILE"
fi
fi
sleep 30 # 每30秒检查一次
done
echo "监控结束" | tee -a "$LOG_FILE"
```
## 3. SSE连接稳定性与心跳机制缺失
**问题现象**:在SSE(Server-Sent Events)传输模式下,客户端经常遇到连接超时、数据丢失或连接意外断开的问题。特别是在网络不稳定的环境中,服务可用性大幅下降。
**技术挑战**:SSE连接本质上是长连接,需要维护持续的数据流。传统的HTTP请求-响应模式不适用于这种场景,需要专门的心跳和重连机制。
**完整的SSE服务器实现**:
```python
import asyncio
import json
import time
import logging
from typing import Dict, Optional, Set
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
from mcp.server.fastmcp import FastMCP
logger = logging.getLogger(__name__)
@dataclass
class ClientSession:
"""客户端会话信息"""
client_id: str
last_activity: datetime
ip_address: str
user_agent: Optional[str] = None
reconnect_count: int = 0
subscriptions: Set[str] = field(default_factory=set)
class StableSSEServer:
"""具有连接稳定性的SSE服务器实现"""
def __init__(self, server_name: str = "StableSSEServer"):
self.mcp = FastMCP(server_name)
self.app = FastAPI(title=server_name)
self.active_sessions: Dict[str, ClientSession] = {}
self.heartbeat_interval = 30 # 心跳间隔(秒)
self.max_inactive_time = 300 # 最大无活动时间(秒)
self.reconnect_limit = 5 # 最大重连次数
# 设置路由
self.setup_routes()
# 启动清理任务
self.cleanup_task = asyncio.create_task(self._session_cleanup_loop())
def setup_routes(self):
"""设置FastAPI路由"""
@self.app.get("/sse")
async def sse_endpoint(request: Request, client_id: Optional[str] = None):
"""SSE连接端点"""
# 获取客户端信息
client_ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("user-agent")
# 生成或验证客户端ID
if not client_id or not self._validate_client_id(client_id):
client_id = self._generate_client_id()
logger.info(f"新客户端连接: {client_id} from {client_ip}")
else:
logger.info(f"客户端重连: {client_id} from {client_ip}")
# 创建或更新会话
session = self.active_sessions.get(client_id)
if session:
session.reconnect_count += 1
session.last_activity = datetime.now()
session.ip_address = client_ip
if session.reconnect_count > self.reconnect_limit:
logger.warning(f"客户端 {client_id} 重连次数过多")
return Response(
content=json.dumps({"error": "重连次数超限"}),
status_code=429,
media_type="application/json"
)
else:
session = ClientSession(
client_id=client_id,
last_activity=datetime.now(),
ip_address=client_ip,
user_agent=user_agent
)
self.active_sessions[client_id] = session
# 返回SSE流
return EventSourceResponse(
self._sse_generator(client_id),
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # 禁用Nginx缓冲
}
)
@self.app.get("/health")
async def health_check():
"""健康检查端点"""
active_clients = len(self.active_sessions)
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"active_clients": active_clients,
"server_uptime": time.time() - self.start_time
}
@self.app.post("/sse/{client_id}/subscribe")
async def subscribe_topic(client_id: str, topic: str):
"""订阅主题"""
if client_id not in self.active_sessions:
return {"error": "客户端不存在"}
self.active_sessions[client_id].subscriptions.add(topic)
return {"status": "subscribed", "topic": topic}
@self.app.post("/sse/{client_id}/unsubscribe")
async def unsubscribe_topic(client_id: str, topic: str):
"""取消订阅主题"""
if client_id not in self.active_sessions:
return {"error": "客户端不存在"}
self.active_sessions[client_id].subscriptions.discard(topic)
return {"status": "unsubscribed", "topic": topic}
async def _sse_generator(self, client_id: str):
"""SSE事件生成器"""
session = self.active_sessions.get(client_id)
if not session:
yield {"event": "error", "data": "会话不存在"}
return
try:
# 发送连接确认
yield {
"event": "connected",
"data": json.dumps({
"client_id": client_id,
"server_time": datetime.now().isoformat(),
"heartbeat_interval": self.heartbeat_interval
})
}
# 主事件循环
last_heartbeat = time.time()
while True:
# 检查会话是否过期
if (datetime.now() - session.last_activity).seconds > self.max_inactive_time:
logger.info(f"客户端 {client_id} 会话超时")
yield {"event": "timeout", "data": "会话已超时"}
break
# 发送心跳
current_time = time.time()
if current_time - last_heartbeat >= self.heartbeat_interval:
yield {
"event": "heartbeat",
"data": json.dumps({
"timestamp": datetime.now().isoformat(),
"client_id": client_id
})
}
last_heartbeat = current_time
session.last_activity = datetime.now()
# 检查是否有需要发送的数据
# 这里可以添加业务逻辑,比如检查消息队列
await asyncio.sleep(1)
except asyncio.CancelledError:
logger.info(f"客户端 {client_id} 连接断开")
except Exception as e:
logger.error(f"SSE生成器异常: {e}")
yield {"event": "error", "data": str(e)}
finally:
# 清理会话
if client_id in self.active_sessions:
del self.active_sessions[client_id]
logger.info(f"客户端 {client_id} 会话清理完成")
async def _session_cleanup_loop(self):
"""会话清理循环"""
while True:
try:
current_time = datetime.now()
expired_clients = []
for client_id, session in self.active_sessions.items():
inactive_time = (current_time - session.last_activity).seconds
if inactive_time > self.max_inactive_time:
expired_clients.append(client_id)
for client_id in expired_clients:
if client_id in self.active_sessions:
del self.active_sessions[client_id]
logger.info(f"清理过期客户端: {client_id}")
await asyncio.sleep(60) # 每分钟清理一次
except Exception as e:
logger.error(f"会话清理循环异常: {e}")
await asyncio.sleep(10)
def _generate_client_id(self) -> str:
"""生成客户端ID"""
import uuid
return f"client_{uuid.uuid4().hex[:8]}"
def _validate_client_id(self, client_id: str) -> bool:
"""验证客户端ID格式"""
return client_id.startswith("client_") and len(client_id) == 15
@mcp.tool()
async def get_session_stats(self) -> dict:
"""获取会话统计信息"""
now = datetime.now()
active_sessions = []
for client_id, session in self.active_sessions.items():
inactive_seconds = (now - session.last_activity).seconds
active_sessions.append({
"client_id": client_id,
"ip_address": session.ip_address,
"last_activity": session.last_activity.isoformat(),
"inactive_seconds": inactive_seconds,
"reconnect_count": session.reconnect_count,
"subscriptions": list(session.subscriptions)
})
return {
"total_sessions": len(self.active_sessions),
"active_sessions": active_sessions,
"server_time": now.isoformat(),
"heartbeat_interval": self.heartbeat_interval,
"max_inactive_time": self.max_inactive_time
}
@mcp.tool()
async def broadcast_message(self, topic: str, message: str) -> dict:
"""广播消息到指定主题的客户端"""
# 在实际实现中,这里应该将消息推送到订阅了该主题的客户端
# 为了简化示例,我们只记录日志
logger.info(f"广播消息到主题 {topic}: {message}")
subscribers = []
for client_id, session in self.active_sessions.items():
if topic in session.subscriptions:
subscribers.append(client_id)
return {
"topic": topic,
"message": message,
"subscriber_count": len(subscribers),
"subscribers": subscribers,
"timestamp": datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
import uvicorn
# 创建稳定的SSE服务器
server = StableSSEServer("EnhancedSSEServer")
# 添加更多MCP工具
@server.mcp.tool()
async def calculate_statistics(data: list) -> dict:
"""计算数据统计信息"""
if not data:
return {"error": "数据不能为空"}
import statistics
return {
"count": len(data),
"mean": statistics.mean(data),
"median": statistics.median(data),
"stdev": statistics.stdev(data) if len(data) > 1 else 0,
"min": min(data),
"max": max(data)
}
# 配置并启动服务器
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
server.start_time = time.time()
logger.info("SSE服务器启动")
yield
# 清理任务
server.cleanup_task.cancel()
try:
await server.cleanup_task
except asyncio.CancelledError:
pass
logger.info("SSE服务器关闭")
server.app.lifespan = lifespan
# 启动服务器
uvicorn.run(
server.app,
host="0.0.0.0",
port=8000,
log_level="info",
access_log=True
)
```
**SSE客户端重连策略**:
```javascript
// 前端SSE客户端实现
class ResilientSSEClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
retryDelay: 1000,
maxRetries: 5,
heartbeatTimeout: 45000, // 45秒心跳超时
...options
};
this.eventSource = null;
this.retryCount = 0;
this.lastMessageTime = Date.now();
this.heartbeatInterval = null;
this.listeners = new Map();
this.reconnectTimeout = null;
this.connect();
}
connect() {
if (this.eventSource) {
this.eventSource.close();
}
// 添加重试标识
const connectUrl = new URL(this.url);
if (this.retryCount > 0) {
connectUrl.searchParams.set('retry', this.retryCount);
}
this.eventSource = new EventSource(connectUrl.toString());
this.eventSource.onopen = () => {
console.log('SSE连接已建立');
this.retryCount = 0;
this.startHeartbeatCheck();
};
this.eventSource.onmessage = (event) => {
this.lastMessageTime = Date.now();
this.handleMessage(event);
};
this.eventSource.addEventListener('heartbeat', (event) => {
this.lastMessageTime = Date.now();
console.log('收到心跳信号');
});
this.eventSource.onerror = (error) => {
console.error('SSE连接错误:', error);
this.handleDisconnect();
};
}
handleMessage(event) {
try {
const data = JSON.parse(event.data);
const eventType = event.type || 'message';
// 触发对应事件的监听器
const callbacks = this.listeners.get(eventType) || [];
callbacks.forEach(callback => callback(data));
// 通用消息处理
const generalCallbacks = this.listeners.get('*') || [];
generalCallbacks.forEach(callback => callback({ type: eventType, data }));
} catch (error) {
console.error('消息解析错误:', error);
}
}
startHeartbeatCheck() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
this.heartbeatInterval = setInterval(() => {
const timeSinceLastMessage = Date.now() - this.lastMessageTime;
if (timeSinceLastMessage > this.options.heartbeatTimeout) {
console.warn('心跳超时,重新连接...');
this.handleDisconnect();
}
}, 10000); // 每10秒检查一次
}
handleDisconnect() {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
if (this.retryCount >= this.options.maxRetries) {
console.error('达到最大重连次数,停止重试');
this.emit('maxRetriesExceeded');
return;
}
this.retryCount++;
const delay = this.options.retryDelay * Math.pow(2, this.retryCount - 1);
console.log(`将在 ${delay}ms 后尝试第 ${this.retryCount} 次重连`);
this.reconnectTimeout = setTimeout(() => {
this.connect();
}, delay);
}
on(event, callback) {
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(callback);
}
emit(event, data) {
const callbacks = this.listeners.get(event) || [];
callbacks.forEach(callback => callback(data));
}
close() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
console.log('SSE客户端已关闭');
}
}
// 使用示例
const sseClient = new ResilientSSEClient('http://localhost:8000/sse', {
retryDelay: 1000,
maxRetries: 10,
heartbeatTimeout: 60000
});
sseClient.on('connected', (data) => {
console.log('连接成功:', data);
// 订阅主题
fetch(`http://localhost:8000/sse/${data.client_id}/subscribe`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ topic: 'updates' })
});
});
sseClient.on('heartbeat', (data) => {
console.log('心跳:', data);
});
sseClient.on('error', (error) => {
console.error('错误事件:', error);
});
sseClient.on('*', (event) => {
console.log('收到事件:', event.type, event.data);
});
```
## 4. 异步处理与并发控制不当
**问题现象**:当多个AI客户端同时调用MCP工具时,服务器响应变慢甚至崩溃。特别是在处理I/O密集型或计算密集型任务时,性能问题尤为明显。
**问题分析**:Python的异步编程模型虽然强大,但使用不当会导致资源竞争、死锁或协程泄漏。MCP Server需要处理来自多个客户端的并发请求,必须合理管理异步任务。
**异步任务管理框架**:
```python
import asyncio
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import logging
from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP
logger = logging.getLogger(__name__)
class TaskPriority(Enum):
"""任务优先级枚举"""
LOW = 0
NORMAL = 1
HIGH = 2
CRITICAL = 3
class TaskStatus(Enum):
"""任务状态枚举"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class AsyncTask:
"""异步任务封装"""
task_id: str
coroutine: Any
priority: TaskPriority = TaskPriority.NORMAL
created_at: float = field(default_factory=time.time)
started_at: Optional[float] = None
completed_at: Optional[float] = None
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: Optional[str] = None
timeout: Optional[float] = None
class AsyncTaskManager:
"""异步任务管理器"""
def __init__(
self,
max_concurrent: int = 10,
max_workers: int = 4,
use_process_pool: bool = False
):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.tasks: Dict[str, AsyncTask] = {}
self.task_queue = asyncio.PriorityQueue()
# 线程池用于阻塞IO操作
self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
# 进程池用于CPU密集型操作
self.use_process_pool = use_process_pool
if use_process_pool:
self.process_pool = ProcessPoolExecutor(max_workers=max_workers)
# 任务清理协程
self.cleanup_task = asyncio.create_task(self._cleanup_completed_tasks())
async def submit(
self,
coroutine_func,
*args,
task_id: Optional[str] = None,
priority: TaskPriority = TaskPriority.NORMAL,
timeout: Optional[float] = None,
**kwargs
) -> str:
"""提交异步任务"""
if task_id is None:
import uuid
task_id = f"task_{uuid.uuid4().hex[:8]}"
# 创建任务对象
task = AsyncTask(
task_id=task_id,
coroutine=coroutine_func,
priority=priority,
timeout=timeout
)
# 存储任务
self.tasks[task_id] = task
# 根据优先级加入队列(优先级数字越小优先级越高)
priority_value = priority.value
await self.task_queue.put((priority_value, time.time(), task_id))
# 启动任务处理(如果未达到并发限制)
asyncio.create_task(self._process_task_queue())
return task_id
async def _process_task_queue(self):
"""处理任务队列"""
while not self.task_queue.empty():
async with self.semaphore:
try:
# 从队列获取任务
_, _, task_id = await self.task_queue.get()
task = self.tasks.get(task_id)
if not task or task.status != TaskStatus.PENDING:
self.task_queue.task_done()
continue
# 更新任务状态
task.status = TaskStatus.RUNNING
task.started_at = time.time()
# 执行任务
try:
if task.timeout:
result = await asyncio.wait_for(
task.coroutine(),
timeout=task.timeout
)
else:
result = await task.coroutine()
task.result = result
task.status = TaskStatus.COMPLETED
except asyncio.TimeoutError:
task.status = TaskStatus.FAILED
task.error = f"任务超时 ({task.timeout}秒)"
logger.warning(f"任务 {task_id} 超时")
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
logger.error(f"任务 {task_id} 失败: {e}")
finally:
task.completed_at = time.time()
self.task_queue.task_done()
except Exception as e:
logger.error(f"任务处理异常: {e}")
async def run_in_thread(self, func, *args, **kwargs):
"""在线程池中运行阻塞函数"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.thread_pool, lambda: func(*args, **kwargs))
async def run_in_process(self, func, *args, **kwargs):
"""在进程池中运行CPU密集型函数"""
if not self.use_process_pool:
raise RuntimeError("进程池未启用")
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.process_pool, lambda: func(*args, **kwargs))
async def get_task_result(self, task_id: str, wait: bool = False) -> Optional[Any]:
"""获取任务结果"""
task = self.tasks.get(task_id)
if not task:
return None
if wait and task.status == TaskStatus.RUNNING:
# 等待任务完成
while task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]:
await asyncio.sleep(0.1)
task = self.tasks.get(task_id)
if not task:
return None
if task.status == TaskStatus.COMPLETED:
return task.result
elif task.status == TaskStatus.FAILED:
raise Exception(f"任务失败: {task.error}")
else:
return None
async def cancel_task(self, task_id: str) -> bool:
"""取消任务"""
task = self.tasks.get(task_id)
if not task:
return False
if task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]:
task.status = TaskStatus.CANCELLED
task.completed_at = time.time()
return True
return False
async def get_task_status(self, task_id: str) -> Optional[Dict]:
"""获取任务状态"""
task = self.tasks.get(task_id)
if not task:
return None
return {
"task_id": task.task_id,
"status": task.status.value,
"created_at": task.created_at,
"started_at": task.started_at,
"completed_at": task.completed_at,
"priority": task.priority.value,
"error": task.error,
"duration": (task.completed_at or time.time()) - (task.started_at or task.created_at)
}
async def _cleanup_completed_tasks(self, retention_seconds: int = 3600):
"""清理已完成的任务"""
while True:
try:
current_time = time.time()
tasks_to_remove = []
for task_id, task in self.tasks.items():
if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:
if task.completed_at and (current_time - task.completed_at) > retention_seconds:
tasks_to_remove.append(task_id)
for task_id in tasks_to_remove:
del self.tasks[task_id]
await asyncio.sleep(300) # 每5分钟清理一次
except Exception as e:
logger.error(f"任务清理异常: {e}")
await asyncio.sleep(60)
async def shutdown(self):
"""关闭任务管理器"""
# 取消清理任务
self.cleanup_task.cancel()
try:
await self.cleanup_task
except asyncio.CancelledError:
pass
# 关闭线程池和进程池
self.thread_pool.shutdown(wait=True)
if self.use_process_pool:
self.process_pool.shutdown(wait=True)
# 取消所有进行中的任务
for task_id in list(self.tasks.keys()):
await self.cancel_task(task_id)
# 集成到MCP Server的示例
class ConcurrentMCPServer:
"""支持并发控制的MCP服务器"""
def __init__(self, name: str = "ConcurrentServer"):
self.mcp = FastMCP(name)
self.task_manager = AsyncTaskManager(
max_concurrent=20,
max_workers=8,
use_process_pool=True
)
# 注册工具
self._register_tools()
def _register_tools(self):
"""注册MCP工具"""
@self.mcp.tool()
async def process_batch_data(
data_list: List[dict],
batch_size: int = 10,
timeout_per_item: float = 30.0
) -> dict:
"""
批量处理数据
Args:
data_list: 待处理的数据列表
batch_size: 每批处理的数量
timeout_per_item: 每个项目的超时时间(秒)
"""
if not data_list:
return {"error": "数据列表不能为空"}
# 创建批处理任务
batches = [
data_list[i:i + batch_size]
for i in range(0, len(data_list), batch_size)
]
batch_tasks = []
for i, batch in enumerate(batches):
task_id = await self.task_manager.submit(
self._process_single_batch,
batch=batch,
batch_index=i,
priority=TaskPriority.NORMAL,
timeout=timeout_per_item * len(batch)
)
batch_tasks.append(task_id)
# 等待所有批处理完成
results = []
errors = []
for task_id in batch_tasks:
try:
result = await self.task_manager.get_task_result(task_id, wait=True)
if result:
results.append(result)
except Exception as e:
errors.append(str(e))
return {
"total_items": len(data_list),
"total_batches": len(batches),
"successful_batches": len(results),
"failed_batches": len(errors),
"results": results,
"errors": errors if errors else None
}
@self.mcp.tool()
async def cpu_intensive_calculation(
matrix_a: List[List[float]],
matrix_b: List[List[float]]
) -> List[List[float]]:
"""
CPU密集型矩阵乘法计算
Args:
matrix_a: 第一个矩阵
matrix_b: 第二个矩阵
"""
# 验证矩阵维度
if not matrix_a or not matrix_b:
raise ValueError("矩阵不能为空")
rows_a, cols_a = len(matrix_a), len(matrix_a[0])
rows_b, cols_b = len(matrix_b), len(matrix_b[0])
if cols_a != rows_b:
raise ValueError(f"矩阵维度不匹配: A({rows_a}x{cols_a}) * B({rows_b}x{cols_b})")
# 使用进程池执行CPU密集型计算
def multiply_matrices(a, b):
result = [[0] * cols_b for _ in range(rows_a)]
for i in range(rows_a):
for j in range(cols_b):
for k in range(cols_a):
result[i][j] += a[i][k] * b[k][j]
return result
return await self.task_manager.run_in_process(
multiply_matrices,
matrix_a,
matrix_b
)
@self.mcp.tool()
async def io_intensive_operation(
file_paths: List[str],
operation: str = "read"
) -> dict:
"""
I/O密集型文件操作
Args:
file_paths: 文件路径列表
operation: 操作类型(read/write/delete)
"""
import os
import shutil
async def process_file(file_path: str) -> dict:
try:
if operation == "read":
# 模拟读取大文件
await asyncio.sleep(0.5) # 模拟I/O延迟
if os.path.exists(file_path):
file_size = os.path.getsize(file_path)
return {
"file": file_path,
"status": "success",
"size": file_size,
"operation": "read"
}
else:
return {
"file": file_path,
"status": "error",
"error": "文件不存在",
"operation": "read"
}
elif operation == "write":
# 模拟写入文件
await asyncio.sleep(1.0)
with open(file_path, 'w') as f:
f.write("Test content")
return {
"file": file_path,
"status": "success",
"operation": "write"
}
elif operation == "delete":
# 删除文件
await asyncio.sleep(0.2)
if os.path.exists(file_path):
os.remove(file_path)
return {
"file": file_path,
"status": "success",
"operation": "delete"
}
else:
return {
"file": file_path,
"status": "error",
"error": "文件不存在",
"operation": "delete"
}
else:
return {
"file": file_path,
"status": "error",
"error": f"不支持的操作: {operation}"
}
except Exception as e:
return {
"file": file_path,
"status": "error",
"error": str(e),
"operation": operation
}
# 并发处理所有文件
tasks = []
for file_path in file_paths:
task_id = await self.task_manager.submit(
lambda fp=file_path: process_file(fp),
priority=TaskPriority.LOW
)
tasks.append(task_id)
# 收集结果
results = []
for task_id in tasks:
try:
result = await self.task_manager.get_task_result(task_id, wait=True)
if result:
results.append(result)
except Exception as e:
results.append({
"file": "unknown",
"status": "error",
"error": str(e)
})
return {
"operation": operation,
"total_files": len(file_paths),
"processed_files": len(results),
"results": results
}
@self.mcp.tool()
async def get_task_status(task_id: str) -> dict:
"""获取任务状态"""
status = await self.task_manager.get_task_status(task_id)
if status:
return status
else:
return {"error": "任务不存在"}
@self.mcp.tool()
async def cancel_task(task_id: str) -> dict:
"""取消任务"""
success = await self.task_manager.cancel_task(task_id)
return {
"task_id": task_id,
"cancelled": success,
"message": "任务已取消" if success else "任务不存在或无法取消"
}
async def _process_single_batch(self, batch: List[dict], batch_index: int) -> dict:
"""处理单个数据批次"""
processed_items = []
for item in batch:
try:
# 模拟数据处理
await asyncio.sleep(0.1)
# 这里添加实际的数据处理逻辑
processed_item = {
**item,
"processed": True,
"timestamp": time.time(),
"batch_index": batch_index
}
processed_items.append(processed_item)
except Exception as e:
processed_items.append({
**item,
"processed": False,
"error": str(e),
"batch_index": batch_index
})
return {
"batch_index": batch_index,
"batch_size": len(batch),
"successful_items": len([i for i in processed_items if i.get("processed")]),
"failed_items": len([i for i in processed_items if not i.get("processed")]),
"items": processed_items
}
async def run(self, transport: str = 'stdio'):
"""运行服务器"""
try:
await self.mcp.run(transport=transport)
finally:
# 确保任务管理器正确关闭
await self.task_manager.shutdown()
# 使用示例
if __name__ == "__main__":
import asyncio
server = ConcurrentMCPServer("AdvancedConcurrentServer")
# 运行服务器
asyncio.run(server.run(transport='stdio'))
```
**并发控制配置表**:
| 配置项 | 推荐值 | 说明 | 调整建议 |
|--------|--------|------|----------|
| `max_concurrent` | 10-50 | 最大并发任务数 | 根据服务器CPU核心数调整,通常为核心数的2-4倍 |
| `max_workers` | 4-8 | 线程池/进程池工作线程数 | I/O密集型用线程池,CPU密集型用进程池 |
| `task_timeout` | 30-300秒 | 任务超时时间 | 根据任务复杂度调整,避免资源占用过久 |
| `retention_seconds` | 3600秒 | 任务结果保留时间 | 根据内存限制调整,避免内存泄漏 |
| `heartbeat_interval` | 30秒 | 心跳间隔 | 网络不稳定时可适当缩短 |
| `max_retries` | 3-5次 | 最大重试次数 | 根据业务重要性调整 |
## 5. 配置管理与环境隔离不足
**问题现象**:不同环境(开发、测试、生产)的配置混用,敏感信息硬编码在代码中,环境变量管理混乱,导致部署时频繁出错。
**解决方案**:建立完善的配置管理系统,实现环境隔离和敏感信息保护。
**完整的配置管理实现**:
```python
import os
import json
import yaml
from typing import Dict, Any, Optional, Union
from pathlib import Path
from dataclasses import dataclass, field
from enum import Enum
import logging
from pydantic import BaseModel, Field, validator
from dotenv import load_dotenv
import hashlib
import base64
from cryptography.fernet import Fernet
logger = logging.getLogger(__name__)
class Environment(Enum):
"""环境枚举"""
DEVELOPMENT = "development"
TESTING = "testing"
STAGING = "staging"
PRODUCTION = "production"
class ConfigSource(Enum):
"""配置源枚举"""
ENV_VARS = "env_vars"
CONFIG_FILE = "config_file"
SECRETS_MANAGER = "secrets_manager"
DEFAULT = "default"
@dataclass
class ConfigValue:
"""配置值封装"""
value: Any
source: ConfigSource
required: bool = True
sensitive: bool = False
description: Optional[str] = None
def get_masked_value(self) -> str:
"""获取掩码后的值(用于日志)"""
if self.sensitive and self.value:
if isinstance(self.value, str):
if len(self.value) <= 4:
return "***"
return self.value[:2] + "***" + self.value[-2:]
return "***"
return str(self.value)
class ConfigSchema(BaseModel):
"""配置模式定义"""
# 服务器配置
server_name: str = Field(..., description="服务器名称")
server_host: str = Field("0.0.0.0", description="服务器监听地址")
server_port: int = Field(8000, description="服务器监听端口")
debug_mode: bool = Field(False, description="调试模式")
log_level: str = Field("INFO", description="日志级别")
# 数据库配置
database_url: str = Field(..., description="数据库连接URL")
database_pool_size: int = Field(10, description="数据库连接池大小")
database_max_overflow: int = Field(20, description="数据库最大溢出连接数")
# Redis配置
redis_url: Optional[str] = Field(None, description="Redis连接URL")
redis_password: Optional[str] = Field(None, description="Redis密码")
# 外部API配置
api_timeout: int = Field(30, description="API请求超时时间(秒)")
api_retry_count: int = Field(3, description="API重试次数")
# 安全配置
secret_key: str = Field(..., description="加密密钥")
token_expiry_hours: int = Field(24, description="令牌过期时间(小时)")
# 性能配置
max_concurrent_tasks: int = Field(20, description="最大并发任务数")
task_timeout_seconds: int = Field(300, description="任务超时时间(秒)")
# 验证器
@validator('server_port')
def validate_port(cls, v):
if not 1 <= v <= 65535:
raise ValueError('端口必须在1-65535之间')
return v
@validator('log_level')
def validate_log_level(cls, v):
valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
if v.upper() not in valid_levels:
raise ValueError(f'日志级别必须是: {", ".join(valid_levels)}')
return v.upper()
@validator('database_url')
def validate_database_url(cls, v):
if not v.startswith(('postgresql://', 'mysql://', 'sqlite://')):
raise ValueError('数据库URL格式不正确')
return v
class ConfigurationManager:
"""配置管理器"""
def __init__(self, env: Environment = None):
self.env = env or self._detect_environment()
self.config: Dict[str, ConfigValue] = {}
self.schema = ConfigSchema
self.encryption_key = None
# 加载环境变量
load_dotenv()
# 加载配置
self._load_configuration()
# 验证配置
self._validate_configuration()
def _detect_environment(self) -> Environment:
"""检测当前环境"""
env_var = os.getenv('MCP_ENV', '').lower()
if env_var in ['prod', 'production']:
return Environment.PRODUCTION
elif env_var in ['stage', 'staging']:
return Environment.STAGING
elif env_var in ['test', 'testing']:
return Environment.TESTING
else:
return Environment.DEVELOPMENT
def _load_configuration(self):
"""加载配置"""
# 1. 首先加载默认配置
self._load_defaults()
# 2. 加载环境特定的配置文件
self._load_config_file()
# 3. 加载环境变量(覆盖文件配置)
self._load_environment_variables()
# 4. 加载密钥(从安全存储)
self._load_secrets()
def _load_defaults(self):
"""加载默认配置"""
default_config = {
'server_name': f'mcp-server-{self.env.value}',
'server_host': '0.0.0.0',
'server_port': 8000,
'debug_mode': self.env == Environment.DEVELOPMENT,
'log_level': 'DEBUG' if self.env == Environment.DEVELOPMENT else 'INFO',
'database_pool_size': 10,
'database_max_overflow': 20,
'api_timeout': 30,
'api_retry_count': 3,
'token_expiry_hours': 24,
'max_concurrent_tasks': 20,
'task_timeout_seconds': 300,
}
for key, value in default_config.items():
self.config[key] = ConfigValue(
value=value,
source=ConfigSource.DEFAULT,
required=False,
sensitive=False
)
def _load_config_file(self):
"""从配置文件加载配置"""
config_files = [
# 环境特定配置
Path(f'config/{self.env.value}.yaml'),
Path(f'config/{self.env.value}.json'),
# 通用配置
Path('config/config.yaml'),
Path('config/config.json'),
# 用户主目录配置
Path.home() / f'.mcp/{self.env.value}.yaml',
Path.home() / '.mcp/config.yaml',
]
for config_file in config_files:
if config_file.exists():
try:
if config_file.suffix == '.yaml':
with open(config_file, 'r', encoding='utf-8') as f:
file_config = yaml.safe_load(f)
else:
with open(config_file, 'r', encoding='utf-8') as f:
file_config = json.load(f)
self._merge_config(file_config, ConfigSource.CONFIG_FILE)
logger.info(f"已加载配置文件: {config_file}")
break
except Exception as e:
logger.warning(f"加载配置文件失败 {config_file}: {e}")
def _load_environment_variables(self):
"""从环境变量加载配置"""
env_mapping = {
'MCP_SERVER_NAME': 'server_name',
'MCP_SERVER_HOST': 'server_host',
'MCP_SERVER_PORT': 'server_port',
'MCP_DEBUG_MODE': 'debug_mode',
'MCP_LOG_LEVEL': 'log_level',
'MCP_DATABASE_URL': 'database_url',
'MCP_DATABASE_POOL_SIZE': 'database_pool_size',
'MCP_DATABASE_MAX_OVERFLOW': 'database_max_overflow',
'MCP_REDIS_URL': 'redis_url',
'MCP_REDIS_PASSWORD': 'redis_password',
'MCP_API_TIMEOUT': 'api_timeout',
'MCP_API_RETRY_COUNT': 'api_retry_count',
'MCP_SECRET_KEY': 'secret_key',
'MCP_TOKEN_EXPIRY_HOURS': 'token_expiry_hours',
'MCP_MAX_CONCURRENT_TASKS': 'max_concurrent_tasks',
'MCP_TASK_TIMEOUT_SECONDS': 'task_timeout_seconds',
}
for env_var, config_key in env_mapping.items():
value = os.getenv(env_var)
if value is not None:
# 转换类型
if config_key in ['debug_mode']:
value = value.lower() in ['true', '1', 'yes', 'on']
elif config_key in ['server_port', 'database_pool_size',
'database_max_overflow', 'api_timeout',
'api_retry_count', 'token_expiry_hours',
'max_concurrent_tasks', 'task_timeout_seconds']:
try:
value = int(value)
except ValueError:
logger.warning(f"环境变量 {env_var} 的值无法转换为整数: {value}")
continue
self.config[config_key] = ConfigValue(
value=value,
source=ConfigSource.ENV_VARS,
required=config_key in ['server_name', 'database_url', 'secret_key'],
sensitive=config_key in ['database_url', 'redis_password', 'secret_key']
)
def _load_secrets(self):
"""从密钥管理器加载敏感信息"""
# 这里可以集成各种密钥管理服务
# 例如:AWS Secrets Manager, HashiCorp Vault, Azure Key Vault等
# 示例:从文件加载密钥
secrets_file = Path('config/secrets.yaml')
if secrets_file.exists():
try:
with open(secrets_file, 'r', encoding='utf-8') as f:
secrets = yaml.safe_load(f)
for key, value in secrets.items():
if key in self.config:
self.config[key].value = value
self.config[key].source = ConfigSource.SECRETS_MANAGER
self.config[key].sensitive = True
except Exception as e:
logger.error(f"加载密钥文件失败: {e}")
def _merge_config(self, new_config: Dict[str, Any], source: ConfigSource):
"""合并配置"""
for key, value in new_config.items():
if key in self.config:
self.config[key].value = value
self.config[key].source = source
else:
self.config[key] = ConfigValue(
value=value,
source=source,
required=False,
sensitive=False
)
def _validate_configuration(self):
"""验证配置"""
# 检查必需配置
required_keys = ['server_name', 'database_url', 'secret_key']
missing_keys = []
for key in required_keys:
if key not in self.config or self.config[key].value is None:
missing_keys.append(key)
if missing_keys:
raise ValueError(f"缺少必需配置: {', '.join(missing_keys)}")
# 使用Pydantic验证配置模式
config_dict = {k: v.value for k, v in self.config.items()}
try:
validated_config = self.schema(**config_dict)
# 更新验证后的值
for key, value in validated_config.dict().items():
if key in self.config:
self.config[key].value = value
else:
self.config[key] = ConfigValue(
value=value,
source=ConfigSource.DEFAULT,
required=False,
sensitive=False
)
except Exception as e:
raise ValueError(f"配置验证失败: {e}")
def get(self, key: str, default: Any = None) -> Any:
"""获取配置值"""
config_value = self.config.get(key)
if config_value and config_value.value is not None:
return config_value.value
return default
def get_masked(self, key: str) -> str:
"""获取掩码后的配置值(用于日志)"""
config_value = self.config.get(key)
if config_value:
return config_value.get_masked_value()
return "***"
def get_source(self, key: str) -> Optional[ConfigSource]:
"""获取配置源"""
config_value = self.config.get(key)
if config_value:
return config_value.source
return None
def get_all(self, mask_sensitive: bool = True) -> Dict[str, Any]:
"""获取所有配置"""
result = {}
for key, config_value in self.config.items():
if mask_sensitive and config_value.sensitive:
result[key] = config_value.get_masked_value()
else:
result[key] = config_value.value
return result
def get_config_summary(self) -> Dict[str, Any]:
"""获取配置摘要"""
return {
'environment': self.env.value,
'total_configs': len(self.config),
'required_configs': sum(1 for v in self.config.values() if v.required),
'sensitive_configs': sum(1 for v in self.config.values() if v.sensitive),
'sources': {
source.value: sum(1 for v in self.config.values() if v.source == source)
for source in ConfigSource
}
}
def setup_encryption(self, key: Optional[str] = None):
"""设置加密"""
if key:
# 使用提供的密钥
key_bytes = key.encode()
if len(key_bytes) < 32:
# 填充到32字节
key_bytes = key_bytes.ljust(32, b'0')
elif len(key_bytes) > 32:
# 截断到32字节
key_bytes = key_bytes[:32]
self.encryption_key = base64.urlsafe_b64encode(key_bytes)
else:
# 生成随机密钥
self.encryption_key = Fernet.generate_key()
self.cipher = Fernet(self.encryption_key)
def encrypt_value(self, value: str) -> str:
"""加密值"""
if not hasattr(self, 'cipher'):
raise RuntimeError("加密未初始化")
return self.cipher.encrypt(value.encode()).decode()
def decrypt_value(self, encrypted_value: str) -> str:
"""解密值"""
if not hasattr(self, 'cipher'):
raise RuntimeError("加密未初始化")
return self.cipher.decrypt(encrypted_value.encode()).decode()
# 配置示例文件:config/development.yaml
development_config_example = """
# 开发环境配置
server:
name: "mcp-dev-server"
host: "127.0.0.1"
port: 8000
debug: true
database:
url: "postgresql://user:password@localhost:5432/mcp_dev"
pool_size: 5
max_overflow: 10
redis:
url: "redis://localhost:6379/0"
logging:
level: "DEBUG"
file: "logs/mcp_dev.log"
security:
secret_key: "dev-secret-key-change-in-production"
token_expiry_hours: 24
performance:
max_concurrent_tasks: 10
task_timeout_seconds: 60
"""
# 配置示例文件:config/production.yaml
production_config_example = """
# 生产环境配置
server:
name: "mcp-prod-server"
host: "0.0.0.0"
port: 8080
debug: false
database:
url: ${DATABASE_URL} # 从环境变量读取
pool_size: 20
max_overflow: 40
redis:
url: ${REDIS_URL}
password: ${REDIS_PASSWORD}
logging:
level: "INFO"
file: "/var/log/mcp/server.log"
security:
secret_k