# Python流式请求实战:用httpx处理大模型API的StreamingResponse与EventSourceResponse
如果你正在构建一个需要与大模型API交互的Python应用,特别是那些需要实时显示生成结果的场景,比如聊天机器人、代码助手或者内容创作工具,那么流式响应处理绝对是你绕不开的技术点。想象一下,用户输入一个问题,然后看着答案一个字一个字地“流”出来,而不是等待几秒钟后突然看到一整段文字——这种体验上的差异,正是流式处理带来的核心价值。
在实际开发中,我发现很多开发者虽然知道流式响应的概念,但在具体实现时却常常陷入各种技术细节的泥潭:同步和异步客户端该怎么选?`StreamingResponse`和`EventSourceResponse`到底有什么区别?如何处理超时和连接中断?如何优雅地解析那些源源不断的数据块?这些问题如果不搞清楚,很容易写出既不稳定又难以维护的代码。
这篇文章就是为你解决这些实际问题而写的。我不会只给你一堆理论,而是会结合我最近在几个实际项目中的经验,从底层原理到具体实现,一步步带你掌握用`httpx`处理大模型API流式响应的完整技能栈。无论你是刚刚接触这个领域,还是已经有一些经验但想深入优化,相信都能从中找到实用的解决方案。
## 1. 理解流式响应的核心:两种协议与三种场景
在深入代码之前,我们需要先搞清楚几个基本概念。流式响应并不是什么神秘的黑科技,它本质上是一种数据传输方式,允许服务器在生成完整响应之前就开始向客户端发送数据。对于大模型API来说,这意味着模型每生成一个token(可以理解为词或字),服务器就能立即将其发送给客户端,而不是等到整个回答都生成完毕。
### 1.1 StreamingResponse vs EventSourceResponse:不只是名字不同
你可能在文档中看到过这两个术语,它们都用于实现流式响应,但设计理念和使用场景有着本质区别。
**StreamingResponse**是FastAPI框架提供的一种通用流式响应机制。它不关心你传输的是什么类型的数据——可以是文本、二进制文件、视频流,或者任何你能想到的数据格式。它的工作方式很简单:你提供一个生成器函数,这个函数会`yield`出一块块的数据,FastAPI负责把这些数据块通过HTTP连接发送给客户端。
```python
# 一个简单的StreamingResponse示例
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
@app.get("/stream-text")
async def stream_text():
async def generate():
for i in range(10):
yield f"数据块 {i}\n"
await asyncio.sleep(0.5)
return StreamingResponse(generate(), media_type="text/plain")
```
**EventSourceResponse**(通常指SSE,Server-Sent Events)则是一种专门为实时事件推送设计的协议。它建立在HTTP之上,使用特定的数据格式,并且天然支持自动重连机制。SSE协议要求数据必须遵循特定的格式,每个事件以`data:`开头,以两个换行符结束。
```python
# SSE格式的数据示例
data: 这是第一条消息\n\n
data: 这是第二条消息\n\n
event: custom_event
data: 这是自定义事件\n\n
```
为了更清晰地理解两者的差异,我整理了一个对比表格:
| 特性 | StreamingResponse | EventSourceResponse (SSE) |
|------|-------------------|---------------------------|
| **协议标准** | 无特定标准,通用HTTP流 | 遵循SSE规范 |
| **数据格式** | 任意格式 | 必须符合`data: {内容}\n\n`格式 |
| **重连机制** | 需要手动实现 | 浏览器自动处理 |
| **适用场景** | 文件下载、视频流、自定义协议 | 实时通知、聊天消息、股票行情 |
| **前端兼容性** | 需要自定义处理 | 浏览器原生支持EventSource API |
| **连接管理** | 简单直接 | 支持事件类型、重试时间等元数据 |
### 1.2 大模型API的流式响应实践
现在的主流大模型API(如OpenAI API、Anthropic Claude、国内的各种大模型服务)大多同时支持两种响应方式:非流式的完整响应和流式的分块响应。当你设置`stream=True`参数时,API会返回一个SSE格式的流,每个数据块都是一个完整的JSON对象,包含模型生成的最新内容。
这里有一个关键点需要注意:**虽然大模型API返回的是SSE格式的数据,但我们在客户端处理时,通常使用`httpx`的流式请求功能,而不是浏览器原生的EventSource API**。这是因为Python后端服务需要作为中间层,接收大模型API的流式响应,然后再以适合前端的方式转发给用户界面。
### 1.3 为什么选择httpx?
在Python的HTTP客户端库中,`httpx`有几个明显的优势让它成为处理流式请求的首选:
1. **同时支持同步和异步**:你可以在同一个项目中根据需求选择同步或异步方式,API设计保持高度一致
2. **优秀的流式处理支持**:`client.stream()`方法专门为处理大响应设计,内存使用效率高
3. **灵活的超时配置**:可以分别为连接、读取、写入设置不同的超时时间
4. **活跃的社区和良好的文档**:遇到问题时更容易找到解决方案
我最近在一个需要同时对接多个大模型服务商的项目中就深有体会。有些服务商响应很快,有些则比较慢,还有些网络状况不稳定。`httpx`的灵活配置让我能够针对不同服务商设置不同的超时策略,大大提高了系统的稳定性。
## 2. 同步客户端的实战:从基础到生产级封装
让我们从最基础的同步客户端开始。虽然在实际的生产环境中,异步客户端更常见(毕竟流式处理天然适合异步),但理解同步版本的工作原理对于掌握整个技术栈至关重要。
### 2.1 基础实现:直接使用httpx.Client
先来看一个最简单的同步流式请求示例。这个例子展示了如何向一个大模型API发送请求并处理流式响应:
```python
import json
import httpx
from httpx_sse import EventSource
def simple_stream_request(api_url: str, api_key: str, prompt: str):
"""基础同步流式请求函数"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
data = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": prompt}],
"stream": True,
"temperature": 0.7
}
with httpx.Client() as client:
with client.stream('POST', api_url, headers=headers, json=data) as response:
# 检查响应类型
content_type = response.headers.get('content-type', '')
if 'text/event-stream' in content_type:
# 处理SSE流
for sse_event in EventSource(response).iter_sse():
if sse_event.data == '[DONE]':
break
try:
chunk_data = json.loads(sse_event.data)
content = chunk_data['choices'][0]['delta'].get('content', '')
if content:
yield content
except (json.JSONDecodeError, KeyError) as e:
print(f"解析数据块时出错: {e}")
continue
else:
# 非流式响应
result = response.read()
full_data = json.loads(result)
yield full_data['choices'][0]['message']['content']
# 使用示例
if __name__ == "__main__":
api_url = "https://api.openai.com/v1/chat/completions"
api_key = "your-api-key-here" # 实际使用时请替换为你的API密钥
print("开始流式响应:")
for chunk in simple_stream_request(api_url, api_key, "请用Python写一个快速排序算法"):
print(chunk, end='', flush=True)
print("\n响应结束")
```
这个基础版本虽然能工作,但在生产环境中会遇到几个问题:
1. 没有错误处理
2. 没有超时控制
3. 代码重复度高
4. 难以测试和维护
### 2.2 生产级封装:面向对象的客户端设计
基于上述问题,我设计了一个更加健壮的同步客户端类。这个类封装了所有的流式处理逻辑,提供了更好的错误处理和配置选项:
```python
import json
import time
from typing import Iterator, Optional, Dict, Any
import httpx
from httpx_sse import EventSource
class SyncStreamingClient:
"""同步流式HTTP客户端,专为大模型API设计"""
def __init__(
self,
base_url: str,
api_key: str,
timeout: float = 30.0,
max_retries: int = 3,
default_model: str = "gpt-3.5-turbo"
):
"""
初始化客户端
Args:
base_url: API基础URL
api_key: API密钥
timeout: 请求超时时间(秒)
max_retries: 最大重试次数
default_model: 默认模型名称
"""
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
self.max_retries = max_retries
self.default_model = default_model
# 配置默认headers
self.default_headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "text/event-stream"
}
def _parse_sse_chunk(self, sse_data: str) -> Optional[Dict[str, Any]]:
"""解析SSE数据块"""
if not sse_data or sse_data == '[DONE]':
return None
try:
return json.loads(sse_data)
except json.JSONDecodeError as e:
# 记录日志但不中断流程
print(f"JSON解析失败: {e}, 原始数据: {sse_data[:100]}")
return None
def _extract_content(self, chunk_data: Dict[str, Any]) -> str:
"""从数据块中提取文本内容"""
try:
# 适配不同API的响应格式
if 'choices' in chunk_data and len(chunk_data['choices']) > 0:
choice = chunk_data['choices'][0]
# OpenAI格式
if 'delta' in choice and 'content' in choice['delta']:
return choice['delta']['content']
# 其他可能的格式
if 'text' in choice:
return choice['text']
# Claude等格式
if 'message' in choice and 'content' in choice['message']:
return choice['message']['content']
# 如果没有找到内容,返回空字符串
return ""
except (KeyError, TypeError) as e:
print(f"提取内容时出错: {e}")
return ""
def stream_completion(
self,
messages: list,
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> Iterator[str]:
"""
流式生成文本
Args:
messages: 消息列表
model: 模型名称,如果为None则使用默认模型
temperature: 温度参数
max_tokens: 最大token数
**kwargs: 其他API参数
Yields:
生成的文本块
"""
model = model or self.default_model
# 构建请求数据
request_data = {
"model": model,
"messages": messages,
"stream": True,
"temperature": temperature,
"max_tokens": max_tokens,
**kwargs
}
# 配置超时
timeout_config = httpx.Timeout(
connect=5.0, # 连接超时
read=self.timeout, # 读取超时
write=10.0, # 写入超时
pool=5.0 # 连接池超时
)
retry_count = 0
last_exception = None
while retry_count <= self.max_retries:
try:
with httpx.Client(timeout=timeout_config) as client:
with client.stream(
'POST',
f"{self.base_url}/chat/completions",
headers=self.default_headers,
json=request_data
) as response:
response.raise_for_status()
content_type = response.headers.get('content-type', '').lower()
if 'text/event-stream' in content_type:
# 处理流式响应
for sse_event in EventSource(response).iter_sse():
chunk_data = self._parse_sse_chunk(sse_event.data)
if chunk_data is None:
continue
content = self._extract_content(chunk_data)
if content:
yield content
# 检查是否结束
if chunk_data.get('choices', [{}])[0].get('finish_reason'):
break
else:
# 非流式响应
result = response.read()
full_data = json.loads(result)
content = self._extract_content(full_data)
yield content
# 成功完成,跳出重试循环
return
except (httpx.RequestError, httpx.HTTPStatusError) as e:
last_exception = e
retry_count += 1
if retry_count <= self.max_retries:
wait_time = 2 ** retry_count # 指数退避
print(f"请求失败,{wait_time}秒后重试 ({retry_count}/{self.max_retries})")
time.sleep(wait_time)
else:
print(f"达到最大重试次数,最后错误: {last_exception}")
raise
# 如果所有重试都失败
if last_exception:
raise last_exception
def generate_with_callback(
self,
messages: list,
callback: callable,
**kwargs
) -> str:
"""
使用回调函数处理流式响应
Args:
messages: 消息列表
callback: 回调函数,接收两个参数:当前内容和完整内容
**kwargs: 传递给stream_completion的参数
Returns:
完整的生成文本
"""
full_content = ""
for chunk in self.stream_completion(messages, **kwargs):
full_content += chunk
callback(chunk, full_content)
return full_content
```
这个类提供了几个关键改进:
1. **完善的错误处理**:包括JSON解析错误、网络错误、API错误等
2. **重试机制**:使用指数退避策略自动重试失败的请求
3. **灵活的配置**:可以自定义超时时间、重试次数等参数
4. **回调支持**:方便集成到各种应用场景中
### 2.3 实际应用示例:构建一个简单的命令行聊天工具
让我们用上面封装的客户端来构建一个实用的命令行工具:
```python
import sys
from typing import List
class CommandLineChat:
"""命令行聊天界面"""
def __init__(self, client: SyncStreamingClient):
self.client = client
self.conversation_history: List[dict] = []
def print_streaming(self, chunk: str, full_content: str):
"""实时打印流式响应"""
sys.stdout.write(chunk)
sys.stdout.flush()
def chat_loop(self):
"""主聊天循环"""
print("=== 大模型命令行聊天工具 ===")
print("输入 'quit' 或 'exit' 退出")
print("输入 'clear' 清空对话历史")
print("=" * 40)
while True:
try:
# 获取用户输入
user_input = input("\n你: ").strip()
if user_input.lower() in ['quit', 'exit', 'q']:
print("再见!")
break
if user_input.lower() == 'clear':
self.conversation_history = []
print("对话历史已清空")
continue
if not user_input:
continue
# 添加到历史记录
self.conversation_history.append({
"role": "user",
"content": user_input
})
# 生成回复
print("\n助手: ", end='')
full_response = self.client.generate_with_callback(
messages=self.conversation_history,
callback=lambda chunk, full: sys.stdout.write(chunk)
)
# 将助手回复添加到历史记录
self.conversation_history.append({
"role": "assistant",
"content": full_response
})
print() # 换行
except KeyboardInterrupt:
print("\n\n程序被中断")
break
except Exception as e:
print(f"\n发生错误: {e}")
# 可以选择是否清空有问题的消息
if self.conversation_history and self.conversation_history[-1]["role"] == "user":
self.conversation_history.pop()
# 使用示例
if __name__ == "__main__":
# 配置客户端
client = SyncStreamingClient(
base_url="https://api.openai.com/v1",
api_key="your-api-key-here", # 请替换为实际API密钥
timeout=60.0,
max_retries=2
)
# 启动聊天
chat = CommandLineChat(client)
chat.chat_loop()
```
这个命令行工具展示了流式响应在实际应用中的价值:用户可以实时看到模型的思考过程,而不是等待完整的响应。这对于调试和理解模型行为特别有用。
> **注意**:在实际生产环境中,你需要考虑更多因素,比如API密钥的安全存储、请求频率限制、上下文长度管理等。上面的示例主要为了展示核心概念。
## 3. 异步客户端的深度探索:性能与并发的艺术
当你的应用需要处理大量并发请求,或者需要与其他异步服务集成时,异步客户端就成为了必然选择。Python的`asyncio`框架为异步编程提供了强大的支持,而`httpx`的异步客户端则让处理流式请求变得更加高效。
### 3.1 异步基础:理解async/await与流式处理
在深入代码之前,我们先理解几个关键概念。异步编程的核心是**非阻塞I/O**——当一个操作需要等待(比如网络请求)时,程序可以继续执行其他任务,而不是干等着。对于流式请求来说,这意味着我们可以同时处理多个请求,每个请求都在独立地接收数据流。
```python
import asyncio
import httpx
from httpx_sse import EventSource
import json
async def process_single_stream(url: str, data: dict):
"""处理单个流式请求"""
async with httpx.AsyncClient() as client:
async with client.stream('POST', url, json=data) as response:
async for sse_event in EventSource(response).aiter_sse():
if sse_event.data == '[DONE]':
break
try:
chunk = json.loads(sse_event.data)
content = chunk.get('choices', [{}])[0].get('delta', {}).get('content', '')
if content:
yield content
except json.JSONDecodeError:
continue
async def main():
"""主函数:演示基本异步流式处理"""
url = "https://api.example.com/v1/chat/completions"
data = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "你好"}],
"stream": True
}
print("开始接收流式响应:")
async for chunk in process_single_stream(url, data):
print(chunk, end='', flush=True)
print("\n响应结束")
# 运行
if __name__ == "__main__":
asyncio.run(main())
```
这个基础示例展示了异步流式处理的核心模式:使用`async with`管理客户端和响应,使用`async for`遍历SSE事件流。
### 3.2 高级异步客户端:并发、超时与错误恢复
在实际项目中,我们需要处理更复杂的情况:多个并发请求、连接超时、网络中断、API限制等。下面是一个更加健壮的异步客户端实现:
```python
import asyncio
import json
import time
from typing import AsyncIterator, Dict, Any, Optional, List
import httpx
from httpx_sse import EventSource
class AsyncStreamingClient:
"""异步流式HTTP客户端,支持并发请求和高级错误处理"""
def __init__(
self,
base_url: str,
api_key: str,
max_concurrent: int = 10,
timeout: float = 30.0,
retry_delay: float = 1.0,
max_retries: int = 3
):
self.base_url = base_url
self.api_key = api_key
self.max_concurrent = max_concurrent
self.timeout = timeout
self.retry_delay = retry_delay
self.max_retries = max_retries
# 使用连接池提高性能
limits = httpx.Limits(
max_connections=max_concurrent,
max_keepalive_connections=5
)
self.client = httpx.AsyncClient(
limits=limits,
timeout=httpx.Timeout(timeout),
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.aclose()
async def stream_completion(
self,
messages: List[Dict[str, str]],
model: str = "gpt-3.5-turbo",
temperature: float = 0.7,
**kwargs
) -> AsyncIterator[str]:
"""
流式生成文本
Args:
messages: 消息列表
model: 模型名称
temperature: 温度参数
**kwargs: 其他API参数
Yields:
生成的文本块
"""
request_data = {
"model": model,
"messages": messages,
"stream": True,
"temperature": temperature,
**kwargs
}
retries = 0
last_error = None
while retries <= self.max_retries:
try:
async with self.client.stream(
'POST',
f"{self.base_url}/chat/completions",
json=request_data
) as response:
response.raise_for_status()
# 检查响应类型
content_type = response.headers.get('content-type', '')
if 'text/event-stream' in content_type:
async for sse_event in EventSource(response).aiter_sse():
if not sse_event.data or sse_event.data == '[DONE]':
break
try:
chunk = json.loads(sse_event.data)
if 'choices' in chunk and chunk['choices']:
delta = chunk['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
yield content
# 检查是否结束
if chunk['choices'][0].get('finish_reason'):
break
except json.JSONDecodeError:
continue
else:
# 非流式响应
data = await response.aread()
result = json.loads(data)
content = result['choices'][0]['message']['content']
yield content
# 成功完成
return
except (httpx.RequestError, httpx.HTTPStatusError) as e:
last_error = e
retries += 1
if retries <= self.max_retries:
wait_time = self.retry_delay * (2 ** (retries - 1))
print(f"请求失败,{wait_time:.1f}秒后重试 ({retries}/{self.max_retries})")
await asyncio.sleep(wait_time)
else:
print(f"达到最大重试次数: {last_error}")
raise last_error
if last_error:
raise last_error
async def concurrent_streams(
self,
requests: List[Dict[str, Any]]
) -> List[AsyncIterator[str]]:
"""
并发处理多个流式请求
Args:
requests: 请求参数列表,每个元素包含messages和可选的其他参数
Returns:
每个请求的流式响应迭代器列表
"""
tasks = []
for req in requests:
task = self.stream_completion(
messages=req['messages'],
model=req.get('model', 'gpt-3.5-turbo'),
temperature=req.get('temperature', 0.7)
)
tasks.append(task)
return tasks
async def collect_stream(
self,
stream: AsyncIterator[str],
callback: Optional[callable] = None
) -> str:
"""
收集流式响应的完整内容
Args:
stream: 流式响应迭代器
callback: 可选的回调函数,接收每个数据块
Returns:
完整的响应文本
"""
full_content = []
async for chunk in stream:
full_content.append(chunk)
if callback:
callback(chunk, ''.join(full_content))
return ''.join(full_content)
# 使用示例:并发处理多个请求
async def demo_concurrent_requests():
"""演示并发处理多个流式请求"""
# 初始化客户端
async with AsyncStreamingClient(
base_url="https://api.openai.com/v1",
api_key="your-api-key-here",
max_concurrent=5
) as client:
# 准备多个请求
requests = [
{
"messages": [{"role": "user", "content": "解释什么是机器学习"}],
"model": "gpt-3.5-turbo"
},
{
"messages": [{"role": "user", "content": "用Python写一个二分查找算法"}],
"model": "gpt-3.5-turbo"
},
{
"messages": [{"role": "user", "content": "简述量子计算的基本原理"}],
"model": "gpt-3.5-turbo"
}
]
# 获取所有流的迭代器
streams = await client.concurrent_streams(requests)
# 创建收集任务
tasks = []
for i, stream in enumerate(streams):
task = asyncio.create_task(
client.collect_stream(
stream,
callback=lambda chunk, full, idx=i: print(f"请求{idx}: 收到 {len(chunk)} 字符")
)
)
tasks.append(task)
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 输出结果
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"请求{i}失败: {result}")
else:
print(f"\n请求{i}的完整响应:")
print(result[:200] + "..." if len(result) > 200 else result)
# 运行演示
if __name__ == "__main__":
asyncio.run(demo_concurrent_requests())
```
这个异步客户端类提供了几个重要特性:
1. **连接池管理**:通过`httpx.Limits`控制最大并发连接数
2. **并发请求处理**:可以同时处理多个流式请求
3. **完善的错误恢复**:指数退避重试机制
4. **资源管理**:使用`async with`确保连接正确关闭
### 3.3 性能优化技巧:从理论到实践
在处理大量流式请求时,性能优化变得至关重要。以下是我在实践中总结的几个关键技巧:
**连接复用**:避免为每个请求创建新的连接,使用连接池可以显著减少延迟。
```python
# 创建优化后的客户端
async def create_optimized_client():
return httpx.AsyncClient(
limits=httpx.Limits(
max_connections=100, # 最大连接数
max_keepalive_connections=20, # 保持活跃的连接数
keepalive_expiry=30.0 # 连接保持时间
),
timeout=httpx.Timeout(
connect=5.0,
read=30.0, # 流式请求需要更长的读取超时
write=10.0,
pool=5.0
)
)
```
**缓冲区管理**:适当调整缓冲区大小可以提高吞吐量。
```python
# 自定义缓冲区大小
async with client.stream(
'POST',
url,
json=data,
# 调整缓冲区大小
stream_buffer_size=1024 * 64 # 64KB缓冲区
) as response:
# 处理响应
```
**超时策略**:为不同类型的超时设置不同的值。
```python
timeout_config = httpx.Timeout(
connect=3.0, # 连接建立超时
read=60.0, # 读取超时(流式请求需要更长)
write=10.0, # 写入超时
pool=5.0 # 连接池超时
)
```
**并发控制**:使用信号量限制同时进行的请求数量。
```python
import asyncio
from typing import List
class RateLimitedClient:
"""带速率限制的客户端"""
def __init__(self, client: AsyncStreamingClient, max_concurrent: int = 5):
self.client = client
self.semaphore = asyncio.Semaphore(max_concurrent)
async def limited_stream(self, *args, **kwargs):
"""带并发限制的流式请求"""
async with self.semaphore:
return self.client.stream_completion(*args, **kwargs)
# 使用示例
async def process_with_rate_limit():
client = AsyncStreamingClient(...)
limited_client = RateLimitedClient(client, max_concurrent=3)
# 即使有100个请求,也只会同时进行3个
tasks = []
for i in range(100):
task = limited_client.limited_stream(
messages=[{"role": "user", "content": f"问题{i}"}]
)
tasks.append(task)
# 处理所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
```
## 4. 生产环境实战:从原型到部署的完整方案
在实际生产环境中,流式请求的处理需要考虑更多因素:监控、日志、错误处理、性能优化等。这一部分我将分享一个完整的生产级解决方案。
### 4.1 架构设计:分层与解耦
一个好的流式处理系统应该遵循分层架构原则,将不同的关注点分离:
```
应用层(Web界面/API)
↓
业务逻辑层(流式处理器)
↓
服务层(HTTP客户端)
↓
基础设施层(连接池、监控、日志)
```
让我们实现这样一个分层的系统:
```python
import logging
import time
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
import httpx
from httpx_sse import EventSource
import json
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class StreamConfig:
"""流式配置数据类"""
base_url: str
api_key: str
model: str = "gpt-3.5-turbo"
timeout: float = 30.0
max_retries: int = 3
temperature: float = 0.7
max_tokens: int = 2000
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'total_tokens': 0,
'total_time': 0.0
}
def record_request(self, success: bool, tokens: int, duration: float):
"""记录请求指标"""
self.metrics['total_requests'] += 1
if success:
self.metrics['successful_requests'] += 1
else:
self.metrics['failed_requests'] += 1
self.metrics['total_tokens'] += tokens
self.metrics['total_time'] += duration
def get_metrics(self) -> Dict[str, Any]:
"""获取当前指标"""
avg_time = (self.metrics['total_time'] / self.metrics['total_requests']
if self.metrics['total_requests'] > 0 else 0)
return {
**self.metrics,
'avg_time_per_request': avg_time,
'success_rate': (self.metrics['successful_requests'] /
self.metrics['total_requests']
if self.metrics['total_requests'] > 0 else 0)
}
class ProductionStreamingClient:
"""生产环境流式客户端"""
def __init__(self, config: StreamConfig):
self.config = config
self.metrics = MetricsCollector()
# 创建HTTP客户端
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(
connect=5.0,
read=config.timeout,
write=10.0,
pool=5.0
),
limits=httpx.Limits(
max_connections=50,
max_keepalive_connections=10
),
headers={
"Authorization": f"Bearer {config.api_key}",
"Content-Type": "application/json"
}
)
async def stream_with_metrics(
self,
messages: List[Dict[str, str]],
**kwargs
):
"""带指标收集的流式请求"""
start_time = time.time()
tokens_received = 0
success = False
try:
request_data = {
"model": kwargs.get('model', self.config.model),
"messages": messages,
"stream": True,
"temperature": kwargs.get('temperature', self.config.temperature),
"max_tokens": kwargs.get('max_tokens', self.config.max_tokens),
**{k: v for k, v in kwargs.items()
if k not in ['model', 'temperature', 'max_tokens']}
}
async with self.client.stream(
'POST',
f"{self.config.base_url}/chat/completions",
json=request_data
) as response:
response.raise_for_status()
async for sse_event in EventSource(response).aiter_sse():
if not sse_event.data or sse_event.data == '[DONE]':
break
try:
chunk = json.loads(sse_event.data)
if 'choices' in chunk and chunk['choices']:
delta = chunk['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
tokens_received += len(content.split()) # 简单估算
yield {
'type': 'content',
'data': content,
'chunk': chunk
}
# 检查结束标志
if chunk['choices'][0].get('finish_reason'):
yield {
'type': 'done',
'reason': chunk['choices'][0]['finish_reason']
}
break
except json.JSONDecodeError as e:
logger.warning(f"JSON解析错误: {e}")
yield {
'type': 'error',
'error': 'parse_error',
'message': str(e)
}
success = True
except httpx.HTTPStatusError as e:
logger.error(f"HTTP错误: {e.response.status_code}")
yield {
'type': 'error',
'error': 'http_error',
'status_code': e.response.status_code,
'message': str(e)
}
except httpx.RequestError as e:
logger.error(f"请求错误: {e}")
yield {
'type': 'error',
'error': 'request_error',
'message': str(e)
}
except Exception as e:
logger.error(f"未知错误: {e}")
yield {
'type': 'error',
'error': 'unknown_error',
'message': str(e)
}
finally:
duration = time.time() - start_time
self.metrics.record_request(success, tokens_received, duration)
async def process_stream(
self,
messages: List[Dict[str, str]],
callback: callable,
**kwargs
) -> Dict[str, Any]:
"""处理流式响应并调用回调"""
full_content = []
metadata = {
'start_time': time.time(),
'chunks_received': 0,
'errors': []
}
async for event in self.stream_with_metrics(messages, **kwargs):
metadata['chunks_received'] += 1
if event['type'] == 'content':
content = event['data']
full_content.append(content)
# 调用回调
callback({
'chunk': content,
'full_text': ''.join(full_content),
'metadata': {
'chunk_index': metadata['chunks_received'],
'chunk_data': event.get('chunk')
}
})
elif event['type'] == 'error':
metadata['errors'].append(event)
logger.error(f"流处理错误: {event}")
elif event['type'] == 'done':
metadata['finish_reason'] = event['reason']
metadata['end_time'] = time.time()
metadata['duration'] = metadata['end_time'] - metadata['start_time']
break
return {
'content': ''.join(full_content),
'metadata': metadata,
'success': len(metadata['errors']) == 0
}
async def close(self):
"""关闭客户端"""
await self.client.aclose()
def get_metrics(self) -> Dict[str, Any]:
"""获取性能指标"""
return self.metrics.get_metrics()
# 使用示例:WebSocket集成
async def websocket_stream_handler(websocket, client: ProductionStreamingClient):
"""WebSocket处理器示例"""
import json as json_module
async for message in websocket:
try:
data = json_module.loads(message)
messages = data.get('messages', [])
if not messages:
await websocket.send(json_module.dumps({
'type': 'error',
'message': 'No messages provided'
}))
continue
# 定义回调函数,通过WebSocket发送数据
async def send_chunk(data):
await websocket.send(json_module.dumps({
'type': 'chunk',
'content': data['chunk'],
'full_text': data['full_text']
}))
# 处理流式响应
result = await client.process_stream(
messages=messages,
callback=send_chunk,
model=data.get('model', 'gpt-3.5-turbo')
)
# 发送完成消息
await websocket.send(json_module.dumps({
'type': 'complete',
'content': result['content'],
'metadata': result['metadata']
}))
except Exception as e:
logger.error(f"WebSocket处理错误: {e}")
await websocket.send(json_module.dumps({
'type': 'error',
'message': str(e)
}))
```
### 4.2 监控与告警:确保系统可靠性
在生产环境中,监控是必不可少的。以下是一些关键的监控指标和实现方法:
```python
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
import time
from typing import Dict, Any
# 定义Prometheus指标
REQUEST_COUNT = Counter(
'streaming_requests_total',
'Total streaming requests',
['status', 'model']
)
REQUEST_DURATION = Histogram(
'streaming_request_duration_seconds',
'Streaming request duration',
['model'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
TOKENS_PER_REQUEST = Histogram(
'streaming_tokens_per_request',
'Tokens per streaming request',
['model'],
buckets=[10, 50, 100, 200, 500, 1000, 2000]
)
ACTIVE_REQUESTS = Gauge(
'streaming_active_requests',
'Currently active streaming requests'
)
class MonitoredStreamingClient:
"""带监控的流式客户端"""
def __init__(self, base_client: ProductionStreamingClient):
self.client = base_client
async def monitored_stream(self, messages: List[Dict[str, str]], **kwargs):
"""带监控的流式请求"""
model = kwargs.get('model', 'unknown')
start_time = time.time()
ACTIVE_REQUESTS.inc()
try:
tokens = 0
async for event in self.client.stream_with_metrics(messages, **kwargs):
if event['type'] == 'content':
# 估算token数量(实际应该使用tiktoken等库)
tokens += len(event['data'].split())
yield event
# 记录成功指标
duration = time.time() - start_time
REQUEST_COUNT.labels(status='success', model=model).inc()
REQUEST_DURATION.labels(model=model).observe(duration)
TOKENS_PER_REQUEST.labels(model=model).observe(tokens)
except Exception as e:
# 记录失败指标
REQUEST_COUNT.labels(status='error', model=model).inc()
raise e
finally:
ACTIVE_REQUESTS.dec()
# 启动Prometheus指标服务器(通常在单独的端口)
def start_metrics_server(port: int = 8000):
"""启动指标服务器"""
prometheus_client.start_http_server(port)
print(f"指标服务器启动在端口 {port}")
```
### 4.3 缓存与优化:减少重复计算
对于某些场景,我们可以使用缓存来优化性能:
```python
import hashlib
import pickle
from typing import Optional
from datetime import datetime, timedelta
class CachedStreamingClient:
"""带缓存的流式客户端"""
def __init__(self, base_client: ProductionStreamingClient, cache_ttl: int = 3600):
self.client = base_client
self.cache_ttl = cache_ttl # 缓存过期时间(秒)
self.cache = {} # 简单内存缓存,生产环境应该使用Redis等
def _get_cache_key(self, messages: List[Dict[str, str]], **kwargs) -> str:
"""生成缓存键"""
# 排除stream参数,因为缓存只用于非流式响应
cache_kwargs = {k: v for k, v in kwargs.items() if k != 'stream'}
cache_data = {
'messages': messages,
**cache_kwargs
}
# 使用SHA256生成唯一键
key_data = pickle.dumps(cache_data)
return hashlib.sha256(key_data).hexdigest()
async def cached_stream(self, messages: List[Dict[str, str]], **kwargs):
"""带缓存的流式/非流式请求"""
# 如果是流式请求,直接转发
if kwargs.get('stream', True):
async for event in self.client.stream_with_metrics(messages, **kwargs):
yield event
return
# 非流式请求,尝试从缓存获取
cache_key = self._get_cache_key(messages, **kwargs)
if cache_key in self.cache:
cache_entry = self.cache[cache_key]
# 检查是否过期
if datetime.now() - cache_entry['timestamp'] < timedelta(seconds=self.cache_ttl):
# 返回缓存结果
yield {
'type': 'content',
'data': cache_entry['content'],
'cached': True
}
yield {'type': 'done'}
return
# 缓存未命中,执行实际请求
full_content = []
async for event in self.client.stream_with_metrics(messages, **kwargs):
if event['type'] == 'content':
full_content.append(event['data'])
yield event
elif event['type'] == 'done':
# 缓存结果
self.cache[cache_key] = {
'content': ''.join(full_content),
'timestamp': datetime.now()
}
yield event
break
else:
yield event
```
### 4.4 部署考虑:容器化与扩缩容
在容器化部署时,需要考虑一些特定的配置:
```dockerfile
# Dockerfile示例
FROM python:3.9-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 设置环境变量
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8000/health')"
# 运行应用
CMD ["python", "app/main.py"]
```
对应的Kubernetes部署配置:
```yaml
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: streaming-service
spec:
replicas: 3
selector:
matchLabels:
app: streaming-service
template:
metadata:
labels:
app: streaming-service
spec:
containers:
- name: streaming-service
image: your-registry/streaming-service:latest
ports:
- containerPort: 8000
env:
- name: API_KEY
valueFrom:
secretKeyRef:
name: api-secrets
key: openai-api-key
- name: BASE_URL
value: "https://api.openai.com/v1"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: streaming-service
spec:
selector:
app: streaming-service
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
```
### 4.5 测试策略:确保代码质量
最后,不要忘记为你的流式处理代码编写测试:
```python
import pytest
import pytest_asyncio
from unittest.mock import AsyncMock, MagicMock
import json
class TestStreamingClient:
"""流式客户端测试"""
@pytest_asyncio.fixture
async def mock_client(self):
"""创建模拟客户端"""
client = AsyncMock()
# 模拟响应
mock_response = AsyncMock()
mock_response.headers = {'content-type': 'text/event-stream'}
mock_response.raise_for_status = MagicMock()
# 模拟SSE事件
sse_events = [
MagicMock(data=json.dumps({
'choices': [{
'delta': {'content': 'Hello'},
'finish_reason': None
}]
})),
MagicMock(data=json.dumps({
'choices': [{
'delta': {'content': ' World'},
'finish_reason': None
}]
})),
MagicMock(data=json.dumps({
'choices': [{
'delta': {},
'finish_reason': 'stop'
}]
})),
MagicMock(data='[DONE]')
]
# 模拟EventSource
mock_event_source = AsyncMock()
mock_event_source.aiter_sse.return_value = sse_events
# 设置客户端行为
client.stream.return_value.__aenter__.return_value = mock_response
client.stream.return_value.__aexit__.return_value = None
# 这里需要根据实际实现调整
# 通常我们会使用patch来模拟httpx_sse.EventSource
return client
@pytest.mark.asyncio
async def test_stream_processing(self, mock_client):
"""测试流式处理"""
# 这里编写具体的测试逻辑
# 由于模拟比较复杂,实际测试中可能需要更详细的设置
pass
@pytest.mark.asyncio
async def test_error_handling(self):
"""测试错误处理"""
# 测试各种错误场景:网络错误、API错误、解析错误等
pass
@pytest.mark.asyncio
async def test_concurrent_requests(self):
"""测试并发请求"""
# 测试多个并发流式请求
pass
# 集成测试
@pytest.mark.integration
class TestIntegration:
"""集成测试"""
@pytest.mark.asyncio
async def test_real_api_connection(self):
"""测试真实API连接(需要网络和API密钥)"""
# 这个测试可以在有API密钥的环境中运行
# 或者使用测试专用的API端点
pass
```
在实际项目中,我通常会建立这样的测试金字塔:底层的单元测试覆盖核心逻辑,中间层的集成测试验证组件协作,顶层的端到端测试确保整个系统正常工作。对于流式处理这种涉及网络和并发的功能,集成测试尤其重要。
通过以上四个部分的深入探讨,我们从基础概念到生产实践,完整地覆盖了使用`httpx`处理大模型API流式响应的各个方面。无论是简单的同步请求,还是复杂的异步并发处理,亦或是生产环境中的监控和部署考虑,都有了具体的实现方案。