# 从自然语言到机器人动作:构建工业级指令解析系统的实战指南
想象一下,在嘈杂的工厂车间里,你对着一个移动机器人说:“去仓库第三排货架取两盒螺丝,然后送到装配线A区。”几秒钟后,机器人开始行动——它准确地理解了你的意图,规划出最优路径,执行了完整的取货送货流程。这不再是科幻电影的场景,而是今天的技术团队正在实现的现实。
对于机器人开发者来说,让机器理解人类的自然语言指令,并将其转化为精确、可执行的动作序列,一直是极具挑战性的任务。传统的机器人编程需要工程师编写复杂的代码,定义每一个动作的坐标、速度和时序。而现在,结合先进的文档解析引擎和智能体平台,我们可以构建一个端到端的自然语言指令解析系统,让机器人真正“听懂”人话。
这篇文章将带你深入探索如何利用TextIn xParse文档解析引擎与Coze平台的自然语言处理能力,构建一个工业级的机器人指令解析系统。我会分享从架构设计到代码实现的完整流程,包括如何处理复杂的地图坐标转换、设计鲁棒的异常处理逻辑,以及如何将这套系统集成到实际的机器人控制框架中。
## 1. 理解核心挑战:为什么自然语言指令解析如此困难
在深入技术实现之前,我们需要先理解机器人自然语言交互面临的核心挑战。这不仅仅是“语音转文字”那么简单,而是一个复杂的多阶段处理过程。
### 1.1 语义理解的复杂性
人类语言充满了歧义、省略和上下文依赖。当你说“去那边拿东西”时,“那边”具体指哪里?“东西”是什么?这些都需要系统结合环境信息进行推理。在工业场景中,指令可能更加复杂:
- **空间关系的模糊性**:“左边第二个货架”需要结合机器人的当前朝向和坐标系
- **数量单位的不确定性**:“几盒螺丝”中的“几”可能是2、3或更多
- **动作的隐含顺序**:“先检查再拿取”包含了时间顺序逻辑
- **异常情况的处理**:“如果找不到就换一个”需要预设备选方案
### 1.2 环境建模的精确性要求
机器人要执行指令,首先需要理解它所处的环境。这包括:
- **地图的精确表示**:不仅仅是二维平面,还包括高度、障碍物、可通行区域
- **坐标系的统一**:世界坐标系、机器人坐标系、地图坐标系之间的转换
- **物体识别与定位**:在复杂环境中准确识别目标物体并确定其位置
- **路径规划约束**:考虑机器人的运动学约束、动态障碍物、安全区域
### 1.3 动作序列的生成与验证
将高层指令分解为原子动作序列需要解决:
- **动作的粒度选择**:什么是“原子动作”?导航到点、抓取、放置、等待
- **前置条件的检查**:在执行动作前需要满足哪些条件
- **后置条件的验证**:动作执行后需要验证什么
- **异常处理策略**:当某个动作失败时应该怎么办
### 1.4 系统集成的工程挑战
即使算法层面解决了问题,工程实现上还有诸多挑战:
- **实时性要求**:从接收到指令到开始执行需要在秒级完成
- **可靠性保障**:工业环境要求系统7x24小时稳定运行
- **可扩展性设计**:需要支持不同类型的机器人、不同的任务场景
- **调试与监控**:当系统出错时,需要有完善的日志和诊断机制
理解了这些挑战,我们就能更好地设计解决方案。接下来,我将介绍如何利用现有的工具链构建一个能够应对这些挑战的系统。
## 2. 技术栈选择:为什么是TextIn xParse + Coze组合
在众多可选的技术方案中,我选择了TextIn xParse与Coze平台的组合。这个选择基于几个关键的考量因素,这些因素直接关系到系统的实用性、可维护性和扩展性。
### 2.1 TextIn xParse的核心优势
TextIn xParse不是一个简单的OCR工具,而是一个完整的文档理解引擎。在机器人指令解析场景中,它的价值体现在多个层面:
**结构化输出能力**
传统的文本提取工具只能提供原始的文本内容,而xParse能够理解文档的语义结构。对于机器人指令来说,这意味着:
```python
# 传统OCR输出(难以处理)
"去仓库第三排货架取两盒螺丝然后送到装配线A区"
# TextIn xParse结构化输出(易于处理)
{
"actions": [
{
"type": "navigate",
"destination": "仓库",
"sub_location": "第三排货架"
},
{
"type": "grasp",
"object": "螺丝",
"quantity": 2,
"unit": "盒"
},
{
"type": "navigate",
"destination": "装配线A区"
},
{
"type": "place",
"object": "螺丝"
}
]
}
```
**多模态理解能力**
机器人指令往往不是纯文本形式。在实际应用中,指令可能来自:
- **语音转文字后的文本**:包含口语化表达和停顿词
- **手写指令单扫描件**:需要识别手写字体和表格结构
- **包含示意图的文档**:需要理解文字与图形的对应关系
- **多语言混合指令**:在跨国工厂中常见
xParse能够处理这些复杂的输入格式,将其统一转化为结构化的数据表示。
**坐标信息的精确提取**
对于机器人导航来说,位置信息至关重要。xParse能够从包含坐标信息的文档中精确提取数值:
```json
{
"locations": [
{
"name": "仓库第三排货架",
"coordinates": {
"x": 12.5,
"y": 8.3,
"z": 0.0,
"theta": 1.57
},
"type": "storage_shelf",
"properties": {
"height": 2.0,
"width": 1.5,
"accessibility": "front_only"
}
}
]
}
```
### 2.2 Coze平台的自然语言处理优势
Coze平台提供了强大的大模型集成能力,特别适合处理自然语言理解和生成任务。在机器人指令解析场景中,它的优势包括:
**灵活的提示工程**
通过精心设计的提示词,我们可以引导大模型按照特定的格式和逻辑处理指令:
```python
# 机器人指令解析专用提示词模板
prompt_template = """
你是一个专业的机器人任务规划专家。请将以下自然语言指令解析为机器人可执行的动作序列。
输入指令:{instruction}
环境信息:
- 地图类型:{map_type}
- 可用动作:{available_actions}
- 坐标单位:{coordinate_unit}
输出要求:
1. 将指令分解为原子动作序列
2. 每个动作包含类型、参数、前置条件和后置条件
3. 考虑可能的异常情况并制定处理策略
4. 输出格式必须为JSON,符合以下schema:
{output_schema}
请严格按照要求输出,不要添加任何解释性文字。
"""
```
**工作流编排能力**
Coze的工作流功能允许我们将复杂的处理流程可视化编排:
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 开始节点 │───▶│ TextIn xParse│───▶│ 大模型解析 │───▶│ 输出节点 │
│ (接收指令) │ │ (文档结构化)│ │ (语义理解) │ │ (动作序列) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 文件上传 │ │ 格式转换 │ │ 意图识别 │ │ JSON验证 │
│ 语音输入 │ │ 结构提取 │ │ 动作分解 │ │ 格式优化 │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
```
**多模型支持与切换**
不同的任务可能需要不同的大模型。Coze支持多种模型的无缝切换:
| 模型类型 | 适用场景 | 优势 | 注意事项 |
|---------|---------|------|---------|
| 豆包·Pro | 复杂逻辑推理 | 上下文理解能力强,适合处理多步骤指令 | 响应时间稍长,成本较高 |
| 豆包·Lite | 简单指令解析 | 响应速度快,成本低 | 复杂指令可能解析不完整 |
| DeepSeek | 技术文档理解 | 代码和结构化数据理解能力强 | 对中文口语支持相对较弱 |
| Kimi | 长文本处理 | 上下文窗口大,适合处理详细的环境描述 | 实时性要求高的场景需谨慎 |
### 2.3 组合使用的协同效应
单独使用TextIn xParse或Coze都能解决部分问题,但两者的结合产生了1+1>2的效果:
**处理流程的优化**
```
原始输入 → TextIn xParse预处理 → 结构化中间表示 → Coze大模型深度理解 → 可执行动作序列
↓ ↓ ↓ ↓
各种格式文档 统一的结构化格式 语义丰富的表示 机器可读的JSON
```
**错误处理的层次化**
- **第一层**:TextIn xParse处理格式错误、编码问题
- **第二层**:Coze大模型处理语义歧义、逻辑矛盾
- **第三层**:自定义验证逻辑处理业务规则违反
**性能与成本的平衡**
通过将计算密集型任务合理分配,我们可以在保证质量的同时控制成本:
> 提示:在实际部署中,建议对简单指令使用轻量级处理流程(直接规则匹配),对复杂指令才启用完整的大模型解析流程。这样可以显著降低运营成本,同时保证响应速度。
## 3. 系统架构设计:从指令输入到动作执行
现在让我们深入系统的架构设计。一个好的架构应该清晰、可扩展、易于维护。我设计的这个架构经过了多个实际项目的验证,能够满足工业场景的需求。
### 3.1 整体架构概览
系统采用分层设计,每一层都有明确的职责:
```
┌─────────────────────────────────────────────────────────────────┐
│ 用户交互层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 语音输入 │ │ 文本输入 │ │ 文件上传 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 指令解析层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TextIn预处理│ │ Coze语义理解│ │ 动作序列生成 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 环境感知层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 地图服务 │ │ 物体检测 │ │ 状态监控 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 动作执行层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 路径规划 │ │ 运动控制 │ │ 机械臂控制 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### 3.2 指令解析层的详细设计
指令解析层是整个系统的核心,它负责将原始输入转化为结构化的动作序列。这一层的设计需要特别关注鲁棒性和可扩展性。
**TextIn xParse预处理模块**
这个模块负责处理各种格式的输入,将其转化为统一的中间表示:
```python
class InstructionPreprocessor:
"""指令预处理器,使用TextIn xParse处理各种输入格式"""
def __init__(self, app_id: str, secret_code: str):
self.app_id = app_id
self.secret_code = secret_code
self.session = requests.Session()
self.session.headers.update({
'x-ti-app-id': self.app_id,
'x-ti-secret-code': self.secret_code
})
def preprocess(self, input_data: Union[str, bytes, Path]) -> Dict:
"""
预处理输入数据,返回结构化中间表示
参数:
input_data: 可以是文本字符串、文件路径或二进制数据
返回:
包含结构化信息的字典
"""
# 判断输入类型并选择相应的处理方法
if isinstance(input_data, str) and input_data.startswith(('http://', 'https://')):
return self._process_url(input_data)
elif isinstance(input_data, Path) or (isinstance(input_data, str) and os.path.exists(input_data)):
return self._process_file(input_data)
elif isinstance(input_data, str):
return self._process_text(input_data)
elif isinstance(input_data, bytes):
return self._process_binary(input_data)
else:
raise ValueError(f"不支持的输入类型: {type(input_data)}")
def _process_file(self, file_path: Union[str, Path]) -> Dict:
"""处理本地文件"""
with open(file_path, 'rb') as f:
file_content = f.read()
# 根据文件类型选择不同的API端点
file_ext = Path(file_path).suffix.lower()
if file_ext in ['.pdf', '.doc', '.docx', '.xlsx']:
api_endpoint = 'https://api.textin.com/ai/service/v1/document/parse'
elif file_ext in ['.jpg', '.jpeg', '.png', '.bmp']:
api_endpoint = 'https://api.textin.com/ai/service/v1/image/parse'
else:
api_endpoint = 'https://api.textin.com/ai/service/v1/text/parse'
response = self.session.post(
api_endpoint,
data=file_content,
headers={'Content-Type': 'application/octet-stream'}
)
return self._parse_response(response)
def _process_text(self, text: str) -> Dict:
"""处理纯文本输入"""
# 对于纯文本,我们可以直接进行一些简单的预处理
# 比如分句、提取关键词等
return {
'type': 'text',
'content': text,
'sentences': self._split_sentences(text),
'keywords': self._extract_keywords(text),
'entities': self._extract_entities(text)
}
def _parse_response(self, response: requests.Response) -> Dict:
"""解析TextIn API的响应"""
if response.status_code != 200:
raise RuntimeError(f"API调用失败: {response.status_code}")
result = response.json()
if result.get('code') != 200:
raise RuntimeError(f"处理失败: {result.get('message', '未知错误')}")
# 提取结构化信息
structured_data = {
'raw_text': result.get('result', {}).get('text', ''),
'markdown': result.get('result', {}).get('markdown', ''),
'tables': result.get('result', {}).get('tables', []),
'images': result.get('result', {}).get('images', []),
'layout': result.get('result', {}).get('layout', {}),
'metadata': {
'processing_time': result.get('result', {}).get('processing_time', 0),
'page_count': result.get('result', {}).get('page_count', 1),
'confidence': result.get('result', {}).get('confidence', 0.0)
}
}
return structured_data
```
**Coze语义理解模块**
这个模块使用大模型进行深度的语义理解:
```python
class SemanticUnderstanding:
"""语义理解模块,使用Coze平台的大模型能力"""
def __init__(self, coze_api_token: str, workflow_id: str):
self.coze = Coze(auth=TokenAuth(token=coze_api_token))
self.workflow_id = workflow_id
self.prompt_templates = self._load_prompt_templates()
def understand_instruction(self,
structured_input: Dict,
context: Dict = None) -> Dict:
"""
理解指令的语义,生成动作规划
参数:
structured_input: TextIn预处理后的结构化数据
context: 环境上下文信息(地图、机器人状态等)
返回:
包含动作序列和理解结果的字典
"""
# 构建完整的提示词
prompt = self._build_prompt(structured_input, context)
# 调用Coze工作流
parameters = {
"instruction": prompt,
"context": json.dumps(context) if context else "{}"
}
try:
workflow_run = self.coze.workflows.runs.create(
workflow_id=self.workflow_id,
parameters=parameters
)
# 解析响应
result = json.loads(workflow_run.data)
return self._parse_action_plan(result)
except Exception as e:
logger.error(f"语义理解失败: {e}")
return self._fallback_plan(structured_input)
def _build_prompt(self, structured_input: Dict, context: Dict) -> str:
"""构建给大模型的提示词"""
template = self.prompt_templates['robot_instruction']
# 提取关键信息
instruction_text = structured_input.get('raw_text', '')
if not instruction_text and 'markdown' in structured_input:
instruction_text = structured_input['markdown']
# 如果有表格信息,也加入提示词
tables_info = ""
if 'tables' in structured_input and structured_input['tables']:
tables_info = "\n相关表格信息:\n"
for i, table in enumerate(structured_input['tables'][:3], 1):
tables_info += f"表格{i}:\n{table}\n"
# 构建上下文描述
context_desc = ""
if context:
context_desc = "\n环境上下文:\n"
if 'map' in context:
context_desc += f"- 地图尺寸: {context['map'].get('size', '未知')}\n"
context_desc += f"- 当前位置: {context['map'].get('current_position', '未知')}\n"
if 'robot' in context:
context_desc += f"- 机器人状态: {context['robot'].get('status', '未知')}\n"
context_desc += f"- 电池电量: {context['robot'].get('battery', '未知')}%\n"
return template.format(
instruction=instruction_text,
tables_info=tables_info,
context=context_desc,
available_actions=self._get_available_actions()
)
def _parse_action_plan(self, coze_response: Dict) -> Dict:
"""解析Coze返回的动作计划"""
try:
# 尝试从响应中提取JSON
if 'output' in coze_response:
output_str = coze_response['output']
if isinstance(output_str, str):
# 尝试提取JSON部分
import re
json_match = re.search(r'\{.*\}', output_str, re.DOTALL)
if json_match:
action_plan = json.loads(json_match.group())
else:
action_plan = {'raw_output': output_str}
else:
action_plan = output_str
else:
action_plan = coze_response
# 验证动作计划的结构
return self._validate_action_plan(action_plan)
except json.JSONDecodeError as e:
logger.warning(f"JSON解析失败,使用备用解析: {e}")
return self._extract_actions_from_text(coze_response)
def _validate_action_plan(self, plan: Dict) -> Dict:
"""验证动作计划的结构完整性"""
required_fields = ['plan_id', 'plan']
if not all(field in plan for field in required_fields):
raise ValueError(f"动作计划缺少必要字段: {required_fields}")
# 验证每个动作
valid_actions = []
for action in plan.get('plan', []):
if self._is_valid_action(action):
valid_actions.append(action)
else:
logger.warning(f"跳过无效动作: {action}")
plan['plan'] = valid_actions
plan['valid_action_count'] = len(valid_actions)
plan['total_action_count'] = len(plan.get('plan', []))
return plan
def _is_valid_action(self, action: Dict) -> bool:
"""检查单个动作是否有效"""
required = ['step', 'action', 'action_id', 'parameters']
return all(field in action for field in required)
```
### 3.3 动作序列生成器
这个组件负责将语义理解的结果转化为具体的、可执行的动作序列:
```python
class ActionSequenceGenerator:
"""动作序列生成器,将语义理解结果转化为可执行动作"""
def __init__(self, action_library: Dict):
self.action_library = action_library
self.coordinate_converter = CoordinateConverter()
def generate_sequence(self,
semantic_result: Dict,
environment: Dict) -> List[Dict]:
"""
生成可执行的动作序列
参数:
semantic_result: 语义理解模块的输出
environment: 环境信息(地图、物体位置等)
返回:
可执行的动作序列
"""
actions = []
# 提取高层意图
high_level_plan = semantic_result.get('plan', [])
for plan_item in high_level_plan:
# 将高层计划转化为具体动作
concrete_actions = self._plan_to_actions(plan_item, environment)
actions.extend(concrete_actions)
# 添加必要的连接动作
actions = self._add_transition_actions(actions, environment)
# 优化动作序列(去除冗余、合并相似动作)
actions = self._optimize_sequence(actions)
# 添加检查点和恢复策略
actions = self._add_checkpoints(actions)
return actions
def _plan_to_actions(self, plan_item: Dict, environment: Dict) -> List[Dict]:
"""将单个高层计划项转化为具体动作"""
action_type = plan_item.get('action', '').lower()
if action_type == 'navigate_to':
return self._generate_navigation_actions(plan_item, environment)
elif action_type == 'grasp':
return self._generate_grasp_actions(plan_item, environment)
elif action_type == 'place':
return self._generate_place_actions(plan_item, environment)
elif action_type == 'scan_area':
return self._generate_scan_actions(plan_item, environment)
elif action_type == 'wait':
return self._generate_wait_actions(plan_item, environment)
else:
logger.warning(f"未知动作类型: {action_type}")
return []
def _generate_navigation_actions(self, plan_item: Dict, env: Dict) -> List[Dict]:
"""生成导航相关动作"""
actions = []
# 提取目标位置
target_location = plan_item.get('parameters', {}).get('location', '')
if not target_location:
target_coords = plan_item.get('parameters', {}).get('coordinates')
else:
# 从环境地图中查找坐标
target_coords = self._find_coordinates(target_location, env.get('map', {}))
if not target_coords:
logger.error(f"无法找到位置: {target_location}")
return []
# 生成导航动作序列
actions.append({
'action_id': f"nav_prepare_{plan_item.get('step', 0)}",
'type': 'prepare_navigation',
'parameters': {
'target': target_coords,
'path_planning': 'global',
'obstacle_avoidance': True
},
'pre_conditions': [
{'type': 'battery', 'min_level': 20},
{'type': 'localization', 'required': True}
],
'estimated_duration': 2.0
})
actions.append({
'action_id': f"nav_execute_{plan_item.get('step', 0)}",
'type': 'execute_navigation',
'parameters': {
'target': target_coords,
'speed_limit': env.get('speed_limit', 0.5),
'precision': 0.1 # 米
},
'pre_conditions': [
{'type': 'path_planned', 'required': True}
],
'post_conditions': [
{'type': 'position_reached', 'tolerance': 0.15}
],
'estimated_duration': self._estimate_navigation_time(target_coords, env),
'retry_policy': {
'max_attempts': 3,
'retry_delay': 2.0,
'on_failure': f"nav_replan_{plan_item.get('step', 0)}"
}
})
actions.append({
'action_id': f"nav_confirm_{plan_item.get('step', 0)}",
'type': 'confirm_position',
'parameters': {
'expected_position': target_coords,
'tolerance': 0.15
},
'pre_conditions': [
{'type': 'navigation_complete', 'required': True}
],
'estimated_duration': 1.0
})
return actions
def _find_coordinates(self, location_name: str, map_data: Dict) -> Optional[Dict]:
"""在地图中查找位置坐标"""
# 首先检查预定义的路点
waypoints = map_data.get('waypoints', {})
if location_name in waypoints:
return waypoints[location_name]
# 尝试模糊匹配
for wp_name, coords in waypoints.items():
if location_name.lower() in wp_name.lower():
return coords
# 尝试从语义描述中解析坐标
# 例如:"仓库第三排货架" -> 可能需要更复杂的解析
return None
def _estimate_navigation_time(self, target_coords: Dict, env: Dict) -> float:
"""估计导航时间"""
current_pos = env.get('robot', {}).get('position', {'x': 0, 'y': 0})
target_pos = target_coords
# 计算欧几里得距离
dx = target_pos.get('x', 0) - current_pos.get('x', 0)
dy = target_pos.get('y', 0) - current_pos.get('y', 0)
distance = math.sqrt(dx*dx + dy*dy)
# 考虑平均速度和加速度
avg_speed = env.get('avg_speed', 0.3) # 米/秒
return distance / avg_speed
def _add_transition_actions(self, actions: List[Dict], env: Dict) -> List[Dict]:
"""在动作之间添加必要的过渡动作"""
if not actions:
return actions
enhanced_actions = []
for i, action in enumerate(actions):
enhanced_actions.append(action)
# 如果不是最后一个动作,检查是否需要添加过渡
if i < len(actions) - 1:
next_action = actions[i + 1]
# 如果当前是导航结束,下一个是抓取,可能需要调整姿态
if (action['type'] == 'confirm_position' and
next_action['type'] == 'prepare_grasp'):
transition = {
'action_id': f"transition_{i}",
'type': 'adjust_pose',
'parameters': {
'target_orientation': next_action['parameters'].get('approach_angle', 0),
'precision': 0.1
},
'estimated_duration': 1.5
}
enhanced_actions.append(transition)
return enhanced_actions
def _optimize_sequence(self, actions: List[Dict]) -> List[Dict]:
"""优化动作序列,去除冗余,合并相似动作"""
if len(actions) <= 1:
return actions
optimized = []
i = 0
while i < len(actions):
current = actions[i]
# 检查是否可以与下一个动作合并
if i + 1 < len(actions):
next_action = actions[i + 1]
# 合并连续的等待动作
if (current['type'] == 'wait' and
next_action['type'] == 'wait'):
merged_wait = {
'action_id': f"merged_wait_{i}",
'type': 'wait',
'parameters': {
'duration': current['parameters'].get('duration', 0) +
next_action['parameters'].get('duration', 0)
},
'estimated_duration': current.get('estimated_duration', 0) +
next_action.get('estimated_duration', 0)
}
optimized.append(merged_wait)
i += 2 # 跳过两个已合并的动作
continue
optimized.append(current)
i += 1
return optimized
def _add_checkpoints(self, actions: List[Dict]) -> List[Dict]:
"""在关键步骤添加检查点"""
checkpoints_added = 0
enhanced_actions = []
for i, action in enumerate(actions):
enhanced_actions.append(action)
# 在关键动作后添加检查点
if action['type'] in ['execute_navigation', 'grasp_object', 'place_object']:
checkpoint = {
'action_id': f"checkpoint_{i}",
'type': 'save_state',
'parameters': {
'checkpoint_name': f"cp_{checkpoints_added}",
'actions_completed': i + 1
},
'estimated_duration': 0.5
}
enhanced_actions.append(checkpoint)
checkpoints_added += 1
return enhanced_actions
```
### 3.4 异常处理与恢复机制
在工业环境中,异常处理是至关重要的。系统需要能够优雅地处理各种异常情况:
```python
class ExceptionHandler:
"""异常处理器,负责检测和处理执行过程中的异常"""
def __init__(self, recovery_strategies: Dict):
self.recovery_strategies = recovery_strategies
self.error_log = []
def handle_exception(self,
action: Dict,
exception: Exception,
context: Dict) -> Dict:
"""
处理执行异常,返回恢复策略
参数:
action: 失败的动作
exception: 异常对象
context: 当前上下文
返回:
恢复策略
"""
error_type = type(exception).__name__
error_msg = str(exception)
# 记录错误
self._log_error(action, error_type, error_msg, context)
# 根据错误类型选择恢复策略
recovery = self._select_recovery_strategy(error_type, action, context)
# 如果找不到特定策略,使用默认策略
if not recovery:
recovery = self._get_default_recovery(action, context)
return recovery
def _select_recovery_strategy(self,
error_type: str,
action: Dict,
context: Dict) -> Optional[Dict]:
"""根据错误类型选择恢复策略"""
action_type = action.get('type', '')
# 导航相关错误
if error_type in ['PathPlanningError', 'NavigationFailed', 'ObstacleDetected']:
if action_type == 'execute_navigation':
return {
'strategy': 'replan_path',
'parameters': {
'current_position': context.get('robot', {}).get('position'),
'target_position': action['parameters'].get('target'),
'avoidance_mode': 'aggressive',
'max_retries': 2
},
'fallback_action': {
'type': 'request_human_assistance',
'message': f'导航到 {action["parameters"].get("target")} 失败'
}
}
# 抓取相关错误
elif error_type in ['GraspFailed', 'ObjectNotFound', 'CollisionDetected']:
if action_type in ['grasp_object', 'prepare_grasp']:
return {
'strategy': 'retry_with_adjustment',
'parameters': {
'max_retries': 3,
'adjustment_angle': 15, # 度
'adjustment_distance': 0.05 # 米
},
'fallback_action': {
'type': 'scan_area',
'parameters': {
'area': action['parameters'].get('target_area'),
'object_class': action['parameters'].get('object_class'),
'timeout': 10
}
}
}
# 超时错误
elif error_type == 'TimeoutError':
return {
'strategy': 'retry_with_timeout_extended',
'parameters': {
'timeout_multiplier': 1.5,
'max_retries': 2
}
}
# 通信错误
elif error_type in ['ConnectionError', 'CommunicationError']:
return {
'strategy': 'reconnect_and_retry',
'parameters': {
'reconnect_attempts': 3,
'retry_delay': 2.0
}
}
return None
def _get_default_recovery(self, action: Dict, context: Dict) -> Dict:
"""获取默认恢复策略"""
return {
'strategy': 'skip_and_continue',
'parameters': {
'skip_action_id': action.get('action_id'),
'continue_from_next': True
},
'fallback_action': {
'type': 'log_error_and_pause',
'message': f'动作 {action.get("action_id")} 执行失败,已跳过',
'severity': 'warning'
}
}
def _log_error(self,
action: Dict,
error_type: str,
error_msg: str,
context: Dict):
"""记录错误日志"""
error_entry = {
'timestamp': datetime.now().isoformat(),
'action_id': action.get('action_id'),
'action_type': action.get('type'),
'error_type': error_type,
'error_message': error_msg,
'context': {
'robot_position': context.get('robot', {}).get('position'),
'battery_level': context.get('robot', {}).get('battery'),
'system_status': context.get('system', {}).get('status')
},
'recovery_strategy': None # 将在处理完成后更新
}
self.error_log.append(error_entry)
# 如果错误日志过长,移除旧记录
if len(self.error_log) > 1000:
self.error_log = self.error_log[-1000:]
def get_error_statistics(self) -> Dict:
"""获取错误统计信息"""
if not self.error_log:
return {'total_errors': 0, 'error_types': {}}
error_types = {}
for entry in self.error_log:
error_type = entry['error_type']
error_types[error_type] = error_types.get(error_type, 0) + 1
return {
'total_errors': len(self.error_log),
'error_types': error_types,
'recent_errors': self.error_log[-10:] if self.error_log else []
}
```
## 4. 完整实现:从指令解析到机器人控制
现在让我们把这些组件组合起来,构建一个完整的系统。我将提供一个完整的Python实现,你可以直接在自己的项目中使用。
### 4.1 主控制系统
```python
class RobotInstructionSystem:
"""机器人指令系统主控制器"""
def __init__(self,
textin_app_id: str,
textin_secret_code: str,
coze_api_token: str,
coze_workflow_id: str,
robot_interface: Optional[Any] = None):
"""
初始化指令系统
参数:
textin_app_id: TextIn应用ID
textin_secret_code: TextIn密钥
coze_api_token: Coze API令牌
coze_workflow_id: Coze工作流ID
robot_interface: 机器人控制接口(可选)
"""
# 初始化各组件
self.preprocessor = InstructionPreprocessor(textin_app_id, textin_secret_code)
self.semantic_understanding = SemanticUnderstanding(coze_api_token, coze_workflow_id)
self.action_generator = ActionSequenceGenerator(self._load_action_library())
self.exception_handler = ExceptionHandler(self._load_recovery_strategies())
# 机器人控制接口
self.robot = robot_interface
self.environment = self._load_environment()
# 状态跟踪
self.current_plan = None
self.execution_history = []
self.system_status = 'idle'
# 配置日志
self.logger = self._setup_logger()
def process_instruction(self,
instruction_input: Union[str, bytes, Path],
context: Optional[Dict] = None) -> Dict:
"""
处理自然语言指令
参数:
instruction_input: 指令输入(文本、文件路径或二进制数据)
context: 额外的上下文信息
返回:
处理结果,包含动作序列和执行状态
"""
self.logger.info(f"开始处理指令: {instruction_input}")
self.system_status = 'processing'
try:
# 步骤1:预处理输入
self.logger.debug("步骤1: 预处理输入")
structured_input = self.preprocessor.preprocess(instruction_input)
# 步骤2:语义理解
self.logger.debug("步骤2: 语义理解")
semantic_result = self.semantic_understanding.understand_instruction(
structured_input,
{**self.environment, **(context or {})}
)
# 步骤3:生成动作序列
self.logger.debug("步骤3: 生成动作序列")
action_sequence = self.action_generator.generate_sequence(
semantic_result,
{**self.environment, **(context or {})}
)
# 步骤4:验证动作序列
self.logger.debug("步骤4: 验证动作序列")
validation_result = self._validate_action_sequence(action_sequence)
if not validation_result['valid']:
self.logger.warning(f"动作序列验证失败: {validation_result['errors']}")
return {
'success': False,
'error': '动作序列验证失败',
'details': validation_result['errors'],
'action_sequence': action_sequence
}
# 保存当前计划
self.current_plan = {
'plan_id': f"plan_{int(time.time())}",
'input': instruction_input,
'structured_input': structured_input,
'semantic_result': semantic_result,
'action_sequence': action_sequence,
'created_at': datetime.now().isoformat(),
'status': 'generated'
}
self.logger.info(f"指令处理完成,生成 {len(action_sequence)} 个动作")
self.system_status = 'ready'
return {
'success': True,
'plan_id': self.current_plan['plan_id'],
'action_count': len(action_sequence),
'estimated_duration': sum(a.get('estimated_duration', 0) for a in action_sequence),
'action_sequence': action_sequence,
'validation_result': validation_result
}
except Exception as e:
self.logger.error(f"指令处理失败: {e}", exc_info=True)
self.system_status = 'error'
return {
'success': False,
'error': str(e),
'error_type': type(e).__name__
}
def execute_plan(self,
plan_id: Optional[str] = None,
start_from: int = 0,
callback: Optional[Callable] = None) -> Dict:
"""
执行动作计划
参数:
plan_id: 计划ID,如果为None则执行当前计划
start_from: 从哪个动作开始执行
callback: 每个动作执行后的回调函数
返回:
执行结果
"""
if plan_id and plan_id != self.current_plan.get('plan_id'):
return {
'success': False,
'error': f'计划 {plan_id} 不存在或不是当前计划'
}
if not self.current_plan:
return {
'success': False,
'error': '没有可执行的计划'
}
self.logger.info(f"开始执行计划 {self.current_plan['plan_id']}")
self.system_status = 'executing'
action_sequence = self.current_plan['action_sequence']
execution_results = []
failed_actions = []
for i in range(start_from, len(action_sequence)):
action = action_sequence[i]
action_index = i + 1
total_actions = len(action_sequence)
self.logger.info(f"执行动作 {action_index}/{total_actions}: {action['action_id']}")
try:
# 检查前置条件
pre_conditions_met = self._check_pre_conditions(action)
if not pre_conditions_met:
self.logger.warning(f"动作 {action['action_id']} 的前置条件不满足")
# 尝试恢复
recovery = self.exception_handler.handle_exception(
action,
Exception("前置条件不满足"),
self._get_current_context()
)
if recovery['strategy'] == 'skip_and_continue':
self.logger.info(f"跳过动作 {action['action_id']}")
execution_results.append({
'action_id': action['action_id'],
'status': 'skipped',
'reason': 'preconditions_not_met',
'timestamp': datetime.now().isoformat()
})
continue
else:
# 执行恢复策略
recovery_result = self._execute_recovery(recovery)
if not recovery_result['success']:
raise Exception(f"恢复策略执行失败: {recovery_result['error']}")
# 执行动作
execution_start = time.time()
action_result = self._execute_single_action(action)
execution_time = time.time() - execution_start
# 检查后置条件
post_conditions_met = self._check_post_conditions(action, action_result)
# 记录执行结果
result_entry = {
'action_id': action['action_id'],
'action_type': action['type'],
'status': 'success' if action_result['success'] and post_conditions_met else 'partial_success',
'execution_time': execution_time,
'result': action_result,
'post_conditions_met': post_conditions_met,
'timestamp': datetime.now().isoformat()
}
execution_results.append(result_entry)
# 更新环境状态
self._update_environment(action, action_result)
# 调用回调函数
if callback:
callback({
'action': action,
'result': result_entry,
'progress': {
'current': action_index,
'total': total_actions,
'percentage': (action_index / total_actions) * 100
}
})
# 如果动作执行失败
if not action_result['success']:
self.logger.error(f"动作 {action['action_id']} 执行失败: {action_result.get('error')}")
failed_actions.append({
'action': action,
'result': action_result,
'attempt': 1
})
# 检查重试策略
retry_policy = action.get('retry_policy', {})
max_attempts = retry_policy.get('max_attempts', 1)
if max_attempts > 1:
self.logger.info(f"准备重试动作 {action['action_id']},剩余尝试次数: {max_attempts - 1}")
# 这里可以添加重试逻辑
except Exception as e:
self.logger.error(f"执行动作 {action['action_id']} 时发生异常: {e}", exc_info=True)
# 处理异常
recovery = self.exception_handler.handle_exception(
action, e, self._get_current_context()
)
execution_results.append({
'action_id': action['action_id'],
'status': 'failed',
'error': str(e),
'recovery_strategy': recovery,
'timestamp': datetime.now().isoformat()
})
failed_actions.append({
'action': action,
'error': str(e),
'recovery': recovery
})
# 根据恢复策略决定是否继续
if recovery['strategy'] in ['skip_and_continue', 'retry_and_continue']:
self.logger.info(f"根据恢复策略继续执行: {recovery['strategy']}")
continue
else:
self.logger.error(f"无法恢复,停止执行")
break
# 更新计划状态
self.current_plan['execution_results'] = execution_results
self.current_plan['failed_actions'] = failed_actions
self.current_plan['completed_at'] = datetime.now().isoformat()
if failed_actions:
self.current_plan['status'] = 'partially_completed'
self.system_status = 'partial_success'
else:
self.current_plan['status'] = 'completed'
self.system_status = 'success'
# 生成执行报告
report = self._generate_execution_report(execution_results, failed_actions)
self.logger.info(f"计划执行完成,成功 {len(execution_results) - len(failed_actions)}/{len(execution_results)} 个动作")
return {
'success': len(failed_actions) == 0,
'plan_id': self.current_plan['plan_id'],
'total_actions': len(action_sequence),
'executed_actions': len(execution_results),
'successful_actions': len([r for r in execution_results if r['status'] == 'success']),
'failed_actions': len(failed_actions),
'execution_time': sum(r.get('execution_time', 0) for r in execution_results),
'report': report,
'details': {
'execution_results': execution_results,
'failed_actions': failed_actions
}
}
def _execute_single_action(self, action: Dict) -> Dict:
"""执行单个动作"""
action_type = action['type']
# 如果没有机器人接口,则模拟执行
if not self.robot:
return self._simulate_action(action)
# 根据动作类型调用相应的机器人接口
try:
if action_type == 'execute_navigation':
return self.robot.navigate_to(
target=action['parameters']['target'],
speed_limit=action['parameters'].get('speed_limit', 0.5)
)
elif action_type == 'grasp_object':
return self.robot.grasp(
object_id=action['parameters']['object_id'],
grasp_pose=action['parameters'].get('grasp_pose')
)
elif action_type == 'place_object':
return self.robot.place(
object_id=action['parameters']['object_id'],
target_location=action['parameters']['target_location']
)
elif action_type == 'scan_area':
return self.robot.scan(
area_id=action['parameters']['scan_area_id'],
target_classes=action['parameters'].get('target_object_class', []),
timeout=action['parameters'].get('timeout', 10)
)
elif action_type == 'wait':
time.sleep(action['parameters']['duration'])
return {'success': True, 'message': '等待完成'}
else:
return {
'success': False,
'error': f'未知动作类型: {action_type}'
}
except Exception as e:
return {
'success': False,
'error': str(e),
'error_type': type(e).__name__
}
def _simulate_action(self, action: Dict) -> Dict:
"""模拟执行动作(用于测试)"""
action_type = action['type']
estimated_duration = action.get('estimated_duration', 1.0)
# 模拟执行时间
time.sleep(min(estimated_duration, 0.1)) # 测试时缩短等待时间
# 90%的概率成功,10%的概率失败(用于测试异常处理)
import random
if random.random() < 0.9:
return {
'success': True,
'message': f'模拟执行 {action_type} 成功',
'simulated': True,
'execution_time': estimated_duration
}
else:
return {
'success': False,
'error': f'模拟执行 {action_type} 失败',
'simulated': True,
'execution_time': estimated_duration
}
def _check_pre_conditions(self, action: Dict) -> bool:
"""检查动作的前置条件"""
pre_conditions = action.get('pre_conditions', [])
for condition in pre_conditions:
condition_type = condition.get('type')
if condition_type == 'battery':
min_level = condition.get('min_level', 20)
current_battery = self.environment.get('robot', {}).get('battery', 100)
if current_battery < min_level:
self.logger.warning(f"电池电量不足: {current_battery}% < {min_level}%")
return False
elif condition_type == 'localization':
required = condition.get('required', True)
localized = self.environment.get('robot', {}).get('localized', False)
if required and not localized:
self.logger.warning("机器人未完成定位")
return False
# 可以添加更多条件检查
return True
def _check_post_conditions(self, action: Dict, result: Dict) -> bool:
"""检查动作的后置条件"""
post_conditions = action.get('post_conditions', [])
for condition in post_conditions:
condition_type = condition.get('type')
if condition_type == 'position_reached':
expected = action['parameters'].get('target')
tolerance = condition.get('tolerance', 0.1)
actual = result.get('actual_position', {})
if expected and actual:
# 计算位置误差
dx = expected.get('x', 0) - actual.get('x', 0)
dy = expected.get('y', 0) - actual.get('y', 0)
distance = math.sqrt(dx*dx + dy*dy)
if distance > tolerance:
self.logger.warning(f"位置误差过大: {distance:.3f}m > {tolerance}m")
return False
# 可以添加更多后置条件检查
return True
def _update_environment(self, action: Dict, result: Dict):
"""更新环境状态"""
if action['type'] == 'execute_navigation' and result.get('success'):
# 更新机器人位置
if 'actual_position' in result:
self.environment['robot']['position'] = result['actual_position']
# 更新电池电量(假设导航消耗电量)
if 'battery_consumed' in result:
current_battery = self.environment.get('robot', {}).get('battery', 100)
self.environment['robot']['battery'] = max(0, current_battery - result['battery_consumed'])
elif action['type'] == 'grasp_object' and result.get('success'):
# 更新抓取状态
object_id = action['parameters'].get('object_id')
if object_id:
self.environment['robot']['carrying'] = object_id
# 可以添加更多状态更新逻辑
def _get_current_context(self) -> Dict:
"""获取当前上下文"""
return {
'environment': self.environment,
'current_plan': self.current_plan,
'system_status': self.system_status,
'error_statistics': self.exception_handler.get_error_statistics()
}
def _validate_action_sequence(self, actions: List[Dict]) -> Dict:
"""验证动作序列的完整性"""
errors = []
warnings = []
if not actions:
errors.append("动作序列为空")
return {'valid': False, 'errors': errors, 'warnings': warnings}
# 检查必要的动作
action_types = [a['type'] for a in actions]
# 如果有抓取动作,前面应该有导航动作
if 'grasp_object' in action_types:
grasp_index = action_types.index('grasp_object')
preceding_actions = action_types[:grasp_index]
if 'execute_navigation' not in preceding_actions:
warnings.append("抓取动作前没有导航动作,机器人可能无法到达目标位置")
# 检查动作ID的唯一性
action_ids = [a['action_id'] for a in actions]
if len(action_ids) != len(set(action_ids)):
errors.append("动作ID不唯一")
# 检查时间估计的合理性
total_estimated = sum(a.get('estimated_duration', 0) for a in actions)
if total_estimated > 3600: # 超过1小时
warnings.append(f"总估计时间过长: {total_estimated:.1f}秒")
return {
'valid': len(errors) == 0,
'errors': errors,
'warnings': warnings,
'total_actions': len(actions),
'total_estimated_duration': total_estimated
}
def _generate_execution_report(self,
results: List[Dict],
failures: List[Dict]) -> Dict:
"""生成执行报告"""
successful = [r for r in results if r['status'] == 'success']
failed = [r for r in results if r['status'] == 'failed']
skipped = [r for r in results if r['status'] == 'skipped']
total_time = sum(r.get('execution_time', 0) for r in results)
avg_time = total_time / len(results) if results else 0
return {
'summary': {
'total_actions': len(results),
'successful': len(successful),
'failed': len(failed),
'skipped': len(skipped),
'success_rate': len(successful) / len(results) if results else 0,
'total_execution_time': total_time,
'average_action_time': avg_time
},
'timeline': [
{
'action_id': r['action_id'],
'action_type': r['action_type'],
'status': r['status'],
'timestamp': r['timestamp'],
'execution_time': r.get('execution_time', 0)
}
for r in results
],
'failures': [
{
'action_id': f['action']['action_id'],
'error': f.get('error', '未知错误'),
'recovery_strategy': f.get('recovery', {}).get('strategy')
}
for f in failures
],
'recommendations': self._generate_recommendations(results, failures)
}
def _generate_recommendations(self,
results: List[Dict],
failures: List[Dict]) -> List[str]:
"""根据执行结果生成改进建议"""
recommendations = []
# 分析失败原因
failure_types = {}
for failure in failures:
error_type = failure.get('error_type', 'unknown')
failure_types[error_type] = failure_types.get(error_type, 0) + 1
for error_type, count in failure_types.items():
if count > 2: # 同类型错误多次出现
if error_type == 'NavigationFailed':
recommendations.append("导航失败频繁,建议检查地图准确性和传感器校准")
elif error_type == 'GraspFailed':
recommendations.append("抓取失败频繁,建议检查机械爪校准和目标物体识别")
elif error_type == 'TimeoutError':
recommendations.append("超时错误频繁,建议调整动作超时时间或优化路径规划")
# 分析执行时间
execution_times = [r.get('execution_time', 0) for r in results if r.get('execution_time', 0) > 0]
if execution_times:
avg_time = sum(execution_times) / len(execution_times)
max_time = max(execution_times)
if max_time > avg_time * 3:
recommendations.append("部分动作执行时间异常,建议检查相关硬件状态")
return recommendations
def _setup_logger(self):
"""配置日志系统"""
logger = logging.getLogger('RobotInstructionSystem')
logger.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_format = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_format)
logger.addHandler(console_handler)
# 文件处理器
file_handler = logging.FileHandler('robot_instruction_system.log')
file_handler.setLevel(logging.DEBUG)
file_format = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(file_format)
logger.addHandler(file_handler)
return logger
def _load_action_library(self) -> Dict:
"""加载动作库"""
return {
'navigate_to': {
'description': '导航到指定位置',
'parameters': ['target', 'speed_limit', 'precision'],
'pre_conditions': ['battery', 'localization'],
'post_conditions': ['position_reached']
},
'grasp_object': {
'description': '抓取物体',
'parameters': ['object_id', 'grasp_pose', 'force_limit'],
'pre_conditions': ['object_in_range', 'gripper_ready'],
'post_conditions': ['object_grasped']
},
'place_object': {
'description': '放置物体',
'parameters': ['object_id', 'target_location'],
'pre_conditions': ['object_held', 'location_clear'],
'post_conditions': ['object_placed']
},
'scan_area': {
'description': '扫描区域',
'parameters': ['scan_area_id', 'target_object_class', 'scan_mode', 'timeout'],
'pre_conditions': ['camera_ready'],
'post_conditions': ['scan_complete']
},
'wait': {
'description': '等待指定时间',
'parameters': ['duration'],
'pre_conditions': [],
'post_conditions': []
}
}
def _load_recovery_strategies(self) -> Dict:
"""加载恢复策略"""
return {
'navigation_failure': {
'primary': 'replan_path',
'fallback': 'request_human_assistance',
'max_retries': 3
},
'grasp_failure': {
'primary': 'retry_with_adjustment',
'fallback': 'skip_and_continue',
'max_retries': 2
},
'communication_failure': {
'primary': 'reconnect_and_retry',
'fallback': 'pause_and_alert',
'max_retries': 5
},
'low_battery': {
'primary': 'return_to_charge',
'fallback': 'emergency_stop',
'max_retries': 0
}
}
def _load_environment(self) -> Dict:
"""加载环境配置"""
return {
'robot': {
'position': {'x': 0, 'y': 0, 'z': 0, 'theta': 0},
'battery': 100,
'status': 'ready',
'localized': True,
'carrying': None
},
'map': {
'size': {'width': 20, 'height': 20},
'resolution': 0.05,
'origin': {'x': -10, 'y': -10, 'theta': 0},
'waypoints': {
'charging_station': {'x': 0, 'y': 0, 'z': 0, 'theta': 0},
'storage_area': {'x': 5, 'y': 3, 'z': 0, 'theta': 1.57},
'assembly_line_a': {'x': 8, 'y': -2, 'z': 0, 'theta': 0},
'assembly_line_b': {'x': 8, 'y': 2, 'z': 0, 'theta': 3.14}
}
},
'objects': {
'screw_box': {
'type': 'container',
'location': 'storage_area',
'position': {'x': 5.2, 'y': 3.1, 'z': 0.5},
'quantity': 10
},
'bolt_box': {
'type': 'container',
'location': 'storage_area',
'position': {'x': 5.2, 'y': 2.9, 'z': 0.5},
'quantity': 8
}
}
}
```
### 4.2 使用示例
现在让我们看看如何使用这个系统:
```python
# 示例1:基本使用
def main():
# 初始化系统
system = RobotInstructionSystem(
textin_app_id='你的TextIn应用ID',
textin_secret_code='你的TextIn密钥',
coze_api_token='你的Coze API令牌',
coze_workflow_id='你的Coze工作流ID'
)
# 处理文本指令
instruction = "去仓库区拿两盒螺丝,然后送到装配线A区"
result = system.process_instruction(instruction)
if result['success']:
print(f"计划生成成功!共 {result['action_count']} 个动作")
print(f"预计耗时: {result['estimated_duration']:.1f} 秒")
# 显示动作序列
for i, action in enumerate(result['action_sequence'], 1):
print(f"{i}. {action['action_id']}: {action['type']}")
if 'parameters' in action:
print(f" 参数: {action['parameters']}")
# 执行计划
execution_result = system.execute_plan()
print(f"\n执行结果: {'成功' if execution_result['success'] else '失败'}")
print(f"成功动作: {execution_result['successful_actions']}/{execution_result['executed_actions']}")
# 显示详细报告
report = execution_result['report']
print(f"\n执行报告:")
print(f" 成功率: {report['summary']['success_rate']*100:.1f}%")
print(f" 总时间: {report['summary']['total_execution_time']:.1f}秒")
if report['recommendations']:
print(f"\n改进建议:")
for rec in report['recommendations']:
print(f" - {rec}")
else:
print(f"指令处理失败: {result.get('error')}")
# 示例2:处理文件指令
def process_file_instruction():
system = RobotInstructionSystem(
textin_app_id='你的TextIn应用ID',
textin_secret_code='你的TextIn密钥',
coze_api_token='你的Coze API令牌',
coze_workflow_id='你的Coze工作流ID'
)
# 处理包含指令的文档文件
# 可以是PDF、Word、图片等格式
result = system.process_instruction('path/to/instruction_document.pdf')
if result['success']:
# 添加自定义上下文信息
context = {
'robot': {
'battery': 85,
'position': {'x': 1.2, 'y': 0.8, 'theta': 0.5}
},
'map': {
'obstacles': [
{'x': 3.0, 'y': 2.0, 'radius': 0.5},
{'x': 4.5, 'y': 1.2, 'radius': 0.3}
]
}
}
# 重新处理,考虑上下文
result_with_context = system.process_instruction(
'path/to/instruction_document.pdf',
context=context
)
# 执行计划,并添加进度回调
def progress_callback(info):
print(f"进度: {info['progress']['percentage']:.1f}% - {info['action']['action_id']}")
if info['result']['status'] == 'success':
print(f" 执行成功,耗时: {info['result']['execution_time']:.2f}秒")
else:
print(f" 执行失败: {info['result'].get('error', '未知错误')}")
execution_result = system.execute_plan(callback=progress_callback)
# 保存执行记录
import json
with open('execution_log.json', 'w') as f:
json.dump({
'plan': system.current_plan,
'execution_result': execution_result
}, f, indent=2, ensure_ascii=False)
print(f"执行记录已保存到 execution_log.json")
# 示例3:批量处理指令
def batch_process_instructions():
system = RobotInstructionSystem(
textin_app_id='你的TextIn应用ID',
textin_secret_code='你的TextIn密钥',
coze_api_token='你的Coze API令牌',
coze_workflow_id='你的Coze工作流ID'
)
instructions = [
"去仓库取一盒螺栓",
"将零件送到质检台",
"返回充电站充电",
"扫描装配区域,报告零件数量"
]
results = []
for i, instruction in enumerate(instructions, 1):
print(f"处理指令 {i}/{len(instructions)}: {instruction}")
result = system.process_instruction(instruction)
if result['success']:
exec_result = system.execute_plan()
results.append({
'instruction': instruction,
'plan_id': result['plan_id'],
'execution_result': exec_result
})
else:
print(f" 处理失败: {result.get('error')}")
results.append({
'instruction': instruction,
'error': result.get('error')
})
# 生成批量处理报告
successful = [r for r in results if 'execution_result' in r and r['execution_result']['success']]
failed = [r for r in results if 'error' in r]
print(f"\n批量处理完成:")
print(f" 成功: {len(successful)}/{len(instructions)}")
print(f" 失败: {len(failed)}/{len(instructions)}")
if failed:
print(f"\n失败的指令:")
for f in failed:
print(f" - {f['instruction']}: {f['error']}")
if __name__ == "__main__":
# 运行示例
main()
# 或者运行其他示例
# process_file_instruction()
# batch_process_instructions()
```
### 4.3 配置与部署
为了让系统在生产环境中稳定运行,我们需要进行适当的配置和部署:
```python
# config.py - 配置文件
import os
from pathlib import Path
class Config:
"""系统配置"""
# TextIn配置
TEXTIN_APP_ID = os.getenv('TEXTIN_APP_ID', 'your_app_id_here')
TEXTIN_SECRET_CODE = os.getenv('TEXTIN_SECRET_CODE', 'your_secret_code_here')
# Coze配置
COZE_API_TOKEN = os.getenv('COZE_API_TOKEN', 'your_api_token_here')
COZE_WORKFLOW_ID = os.getenv('COZE_WORKFLOW_ID', 'your_workflow_id_here')
# 机器人配置
ROBOT_TYPE = os.getenv('ROBOT_TYPE', 'simulation') # simulation, turtlebot, ur5, etc.
ROBOT_CONFIG = {
'simulation': {
'max_speed': 0.5,
'acceleration': 0.2,
'gripper_force': 20.0
},
'turtlebot': {
'max_speed': 0.3,
'acceleration': 0.1,
'gripper_force': 15.0
},
'ur5': {
'max_speed': 0.8,
'acceleration': 0.3,
'gripper_force': 50.0
}
}
# 地图配置
MAP_CONFIG = {
'resolution': 0.05,
'origin': {'x': -10, 'y': -10, 'theta': 0},
'inflation_radius': 0.3,
'waypoints_file': 'config/waypoints.yaml'
}
# 动作配置
ACTION_CONFIG = {
'default_timeout': 30.0,
'max_retries': 3,
'retry_delay': 2.0,
'safety_margin': 0.1
}
# 日志配置
LOG_CONFIG = {
'level': 'INFO',
'file': 'logs/robot_system.log',
'max_size': 10485760, # 10MB
'backup_count': 5
}
# 性能配置
PERFORMANCE_CONFIG = {
'max_concurrent_actions': 1,
'plan_cache_size': 10,
'response_timeout': 10.0
}
@classmethod
def validate(cls):
"""验证配置"""
errors = []
if cls.TEXTIN_APP_ID == 'your_app_id_here':
errors.append("TEXTIN_APP_ID 未配置")
if cls.TEXTIN_SECRET_CODE == 'your_secret_code_here':
errors.append("TEXT