# Python调用Dify API完整指南
Dify作为一个开源的大语言模型应用开发平台,提供了丰富的API接口供开发者调用。下面将详细介绍如何使用Python调用Dify API的完整流程。
## 1. 前期准备工作
### 1.1 获取API密钥和端点地址
在调用Dify API之前,首先需要获取以下关键信息:
| 配置项 | 说明 | 获取方式 |
|--------|------|----------|
| API密钥 | 用于身份验证 | Dify控制台 → 设置 → API密钥 |
| API端点 | API服务地址 | 通常是 `https://api.dify.ai/v1` 或本地部署地址 |
| 应用ID | 具体应用的标识符 | 在Dify应用中查看 |
```python
# 基础配置示例
import requests
import json
# Dify API配置
DIFY_API_KEY = "your-api-key-here"
DIFY_API_BASE = "https://api.dify.ai/v1" # 或者本地地址如 "http://localhost:5001/v1"
APP_ID = "your-app-id-here"
```
### 1.2 环境准备
确保安装必要的Python库:
```python
pip install requests
```
## 2. 基础API调用
### 2.1 聊天应用调用
Dify支持两种主要的调用模式:流式输出和阻塞模式[ref_4]。
#### 阻塞模式调用
```python
def call_dify_chat_blocking(question, conversation_id=None):
"""
阻塞模式调用Dify聊天API
等待执行完毕返回完整结果
"""
url = f"{DIFY_API_BASE}/chat-messages"
headers = {
"Authorization": f"Bearer {DIFY_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"inputs": {},
"query": question,
"response_mode": "blocking", # 阻塞模式
"user": "python-client"
}
if conversation_id:
payload["conversation_id"] = conversation_id
try:
response = requests.post(url, headers=headers, json=payload, timeout=30)
response.raise_for_status()
result = response.json()
return {
"answer": result.get("answer", ""),
"conversation_id": result.get("conversation_id"),
"metadata": result.get("metadata", {})
}
except requests.exceptions.RequestException as e:
print(f"API调用失败: {e}")
return None
# 使用示例
result = call_dify_chat_blocking("你好,今天天气怎么样?")
if result:
print(f"AI回复: {result['answer']}")
print(f"会话ID: {result['conversation_id']}")
```
#### 流式输出调用
```python
def call_dify_chat_streaming(question, conversation_id=None):
"""
流式输出调用Dify聊天API
基于SSE实现打字机式返回效果[ref_4]
"""
url = f"{DIFY_API_BASE}/chat-messages"
headers = {
"Authorization": f"Bearer {DIFY_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"inputs": {},
"query": question,
"response_mode": "streaming", # 流式模式
"user": "python-client"
}
if conversation_id:
payload["conversation_id"] = conversation_id
try:
response = requests.post(url, headers=headers, json=payload, stream=True, timeout=60)
response.raise_for_status()
full_answer = ""
for line in response.iter_lines():
if line:
line_text = line.decode('utf-8')
if line_text.startswith('data: '):
data = line_text[6:] # 移除 'data: ' 前缀
if data != '[DONE]':
try:
json_data = json.loads(data)
if 'answer' in json_data:
chunk = json_data['answer']
full_answer += chunk
print(chunk, end='', flush=True) # 实时输出
except json.JSONDecodeError:
continue
return full_answer
except requests.exceptions.RequestException as e:
print(f"流式API调用失败: {e}")
return None
```
## 3. 文件上传处理
当工作流涉及文件、图片、音视频上传时,需要先调用upload方法获取file_id[ref_4]。
```python
def upload_file_to_dify(file_path):
"""
上传文件到Dify并获取file_id
"""
upload_url = f"{DIFY_API_BASE}/files/upload"
headers = {
"Authorization": f"Bearer {DIFY_API_KEY}"
}
try:
with open(file_path, 'rb') as file:
files = {'file': (file_path, file, 'application/octet-stream')}
data = {'user': 'python-client'}
response = requests.post(upload_url, headers=headers, files=files, data=data)
response.raise_for_status()
result = response.json()
return result.get('id') # 返回file_id
except Exception as e:
print(f"文件上传失败: {e}")
return None
def chat_with_file(question, file_path):
"""
携带文件的聊天调用
"""
file_id = upload_file_to_dify(file_path)
if not file_id:
return "文件上传失败"
url = f"{DIFY_API_BASE}/chat-messages"
headers = {
"Authorization": f"Bearer {DIFY_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"inputs": {
"file": file_id # 使用上传获得的file_id
},
"query": question,
"response_mode": "blocking",
"user": "python-client"
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json().get('answer', '')
except requests.exceptions.RequestException as e:
return f"API调用失败: {e}"
# 使用示例
# answer = chat_with_file("请分析这个文档的主要内容", "document.pdf")
```
## 4. 工作流调用
Dify的工作流模式提供了更复杂的业务流程处理能力[ref_4]。
```python
def call_dify_workflow(workflow_id, inputs):
"""
调用Dify工作流
"""
url = f"{DIFY_API_BASE}/workflows/{workflow_id}/run"
headers = {
"Authorization": f"Bearer {DIFY_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"inputs": inputs,
"response_mode": "blocking",
"user": "python-client"
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=60)
response.raise_for_status()
result = response.json()
return result.get('data', {}).get('outputs', {})
except requests.exceptions.RequestException as e:
print(f"工作流调用失败: {e}")
return None
# 使用示例
# workflow_inputs = {"text": "需要处理的文本", "param": "value"}
# outputs = call_dify_workflow("your-workflow-id", workflow_inputs)
```
## 5. 实际应用案例
### 5.1 微信自动回复系统
参考实际项目经验,可以构建微信自动回复系统[ref_5]:
```python
import time
from typing import Optional
class DifyWeChatBot:
"""基于Dify API的微信自动回复机器人"""
def __init__(self, api_key: str, base_url: str, app_id: str):
self.api_key = api_key
self.base_url = base_url
self.app_id = app_id
self.conversation_cache = {} # 用户会话缓存
def get_ai_response(self, user_id: str, user_message: str) -> Optional[str]:
"""获取AI回复"""
conversation_id = self.conversation_cache.get(user_id)
url = f"{self.base_url}/chat-messages"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"inputs": {},
"query": user_message,
"response_mode": "blocking",
"user": user_id
}
if conversation_id:
payload["conversation_id"] = conversation_id
try:
response = requests.post(url, headers=headers, json=payload, timeout=30)
if response.status_code == 200:
result = response.json()
# 更新会话ID缓存
self.conversation_cache[user_id] = result.get('conversation_id')
return result.get('answer', '抱歉,我暂时无法回答这个问题。')
else:
return "服务暂时不可用,请稍后重试。"
except Exception as e:
print(f"AI服务调用异常: {e}")
return "网络连接异常,请检查网络设置。"
def process_wechat_message(self, user_id: str, message: str) -> str:
"""处理微信消息"""
# 这里可以添加消息预处理逻辑
ai_response = self.get_ai_response(user_id, message)
return ai_response
# 初始化机器人
wechat_bot = DifyWeChatBot(
api_key="your-dify-api-key",
base_url="https://api.dify.ai/v1",
app_id="your-app-id"
)
# 模拟微信消息处理
# user_message = "你好,能帮我推荐一些学习资料吗?"
# response = wechat_bot.process_wechat_message("user123", user_message)
```
### 5.2 与本地模型集成
如果使用本地部署的Dify配合Ollama或Xinference[ref_2][ref_3],配置会有所不同:
```python
# 本地Dify + Ollama配置示例
LOCAL_DIFY_CONFIG = {
"api_base": "http://localhost:5001/v1",
"api_key": "your-local-dify-key",
"model_provider": "ollama", # 或 "xinference"
"model_name": "qwen:7b" # 本地模型名称
}
def call_local_dify(question):
"""调用本地部署的Dify"""
url = f"{LOCAL_DIFY_CONFIG['api_base']}/chat-messages"
headers = {
"Authorization": f"Bearer {LOCAL_DIFY_CONFIG['api_key']}",
"Content-Type": "application/json"
}
payload = {
"inputs": {},
"query": question,
"response_mode": "blocking",
"user": "local-client"
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=60)
return response.json().get('answer', '')
except Exception as e:
return f"本地服务调用失败: {e}"
```
## 6. 错误处理与最佳实践
### 6.1 完善的错误处理
```python
class DifyClient:
"""Dify API客户端封装"""
def __init__(self, api_key, base_url, app_id):
self.api_key = api_key
self.base_url = base_url
self.app_id = app_id
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
})
def chat(self, message, conversation_id=None, stream=False):
"""发送聊天消息"""
url = f"{self.base_url}/chat-messages"
payload = {
"inputs": {},
"query": message,
"response_mode": "streaming" if stream else "blocking",
"user": "python-client"
}
if conversation_id:
payload["conversation_id"] = conversation_id
try:
if stream:
response = self.session.post(url, json=payload, stream=True, timeout=60)
return self._handle_stream_response(response)
else:
response = self.session.post(url, json=payload, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
raise Exception("请求超时,请检查网络连接或增加超时时间")
except requests.exceptions.ConnectionError:
raise Exception("网络连接错误,请检查API端点地址")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
raise Exception("API密钥无效,请检查认证信息")
elif e.response.status_code == 404:
raise Exception("API端点不存在,请检查URL配置")
else:
raise Exception(f"HTTP错误: {e.response.status_code}")
def _handle_stream_response(self, response):
"""处理流式响应"""
response.raise_for_status()
for line in response.iter_lines():
if line:
line_text = line.decode('utf-8')
if line_text.startswith('data: '):
data = line_text[6:]
if data != '[DONE]':
try:
yield json.loads(data)
except json.JSONDecodeError:
continue
```
### 6.2 性能优化建议
1. **连接复用**: 使用requests.Session()复用HTTP连接
2. **超时设置**: 根据场景合理设置超时时间
3. **重试机制**: 对临时性错误实现重试逻辑
4. **异步调用**: 对于高并发场景考虑使用异步请求
通过上述完整的Python调用示例,您可以轻松集成Dify API到各种应用中,无论是简单的聊天机器人还是复杂的企业级应用。关键是根据实际需求选择合适的调用模式,并做好错误处理和性能优化。