# 如何用Python实现扫地机器人的马尔可夫决策过程(附完整代码)
最近在整理一些旧项目时,翻到了几年前写的一个扫地机器人模拟器。当时为了搞明白强化学习里的马尔可夫决策过程,硬是啃了好几篇论文,最后决定自己动手写一个最简化的版本。这个项目虽然简单,但把MDP的核心——状态、动作、转移概率和奖励——都清晰地体现出来了。对于已经掌握了Python基础语法,想从“看理论”迈向“动手实现”的开发者来说,这类项目是个绝佳的跳板。它不涉及复杂的神经网络,却能让你透彻理解那些高级算法赖以生存的底层逻辑。今天,我就把这个项目的核心思路和代码拆解一遍,你可以把它看作一个“乐高积木”式的教学案例,跟着搭一遍,MDP就不再是书本上抽象的公式了。
## 1. 从零开始:理解我们的扫地机器人世界
在写第一行代码之前,我们必须先像游戏设计师一样,严格定义这个微型世界的规则。我们的机器人非常“纯粹”,它只关心一件事:**电池电量**。根据这个核心指标,我们定义了它的全部生存状态。
### 1.1 状态空间与动作空间的定义
状态空间 `S` 很简单,只有两种:
* **`high`**: 电量充足。
* **`low`**: 电量不足。
动作空间 `A` 则根据状态有所不同,这体现了现实中的约束:
* 在 `high` 状态时,机器人可以 `search`(主动搜索垃圾)或 `wait`(原地等待)。我们假设电量充足时去充电是愚蠢的,所以不提供 `recharge` 动作。
* 在 `low` 状态时,机器人可以 `search`、`wait`,也可以选择 `recharge`(返回充电桩充电)。
用Python代码来定义这些集合非常直观:
```python
# 定义状态和动作
STATES = ['high', 'low']
ACTIONS = {
'high': ['search', 'wait'], # 高电量时可执行的动作
'low': ['search', 'wait', 'recharge'] # 低电量时可执行的动作
}
```
> 注意:这里用字典将动作与状态绑定,是一种清晰且不易出错的编码方式,能避免程序在无效状态下尝试执行无效动作。
### 1.2 构建概率与奖励的“世界法则”
这是MDP的核心,也是代码中最像“设计文档”的部分。我们需要用数据明确描述:在某个状态下执行某个动作后,转移到新状态的概率是多少?同时能获得多少即时奖励?
我们引入几个概率参数来模拟不确定性:
* `alpha`: 高电量下搜索,保持高电量的概率。
* `beta`: 低电量下搜索,保持低电量(而未耗尽)的概率。
基于此,我们可以列出这个机器人的所有“命运”分支。用文字描述可能比较冗长,我们用一张**转移概率与奖励表**来呈现会更清晰:
| 当前状态 (s) | 执行动作 (a) | 下一状态 (s') | 转移概率 (P) | 即时奖励 (R) | 说明 |
| :--- | :--- | :--- | :--- | :--- | :--- |
| `high` | `search` | `high` | `alpha` | +r_search | 搜索成功,电量未消耗 |
| `high` | `search` | `low` | `1 - alpha` | +r_search | 搜索成功,但电量消耗至低 |
| `high` | `wait` | `high` | 1.0 | +r_wait | 等待,电量不变,可能有人扔垃圾 |
| `low` | `search` | `low` | `beta` | +r_search | 冒险搜索成功,侥幸未关机 |
| `low` | `search` | `-` | `1 - beta` | -r_shutdown | 电量耗尽,强制关机并救援,奖励为负 |
| `low` | `wait` | `low` | 1.0 | +r_wait | 低电量下等待 |
| `low` | `recharge` | `high` | 1.0 | -r_recharge | 花费时间充电,无收集奖励 |
* `r_search`: 成功收集一个垃圾罐的奖励(例如 +5)。
* `r_wait`: 等待时可能被动收到垃圾的奖励(例如 +1)。
* `r_shutdown`: 电量耗尽导致的惩罚(例如 -10)。
* `r_recharge`: 充电所花费的时间成本或能耗(例如 -3)。
这张表就是我们对环境的完整数学描述。在代码中,我们会用一个嵌套字典结构来存储它。
## 2. 核心引擎:用Python类封装MDP模型
有了清晰的设计图,我们就可以开始建造了。我们将创建一个 `SimpleMDP` 类,它不依赖任何强化学习库,完全从零开始构建。
### 2.1 初始化:设定世界参数
类的初始化函数负责接收所有参数,并计算出完整的转移概率和奖励字典。
```python
class SimpleMDP:
def __init__(self, alpha=0.8, beta=0.6, r_search=5, r_wait=1, r_shutdown=10, r_recharge=3):
"""
初始化扫地机器人MDP模型。
参数:
alpha: 高电量搜索后仍保持高电量的概率
beta: 低电量搜索后未耗尽电量的概率
r_xxx: 各项奖励值
"""
self.alpha = alpha
self.beta = beta
self.rewards = {
'r_search': r_search,
'r_wait': r_wait,
'r_shutdown': r_shutdown,
'r_recharge': r_recharge
}
self.states = STATES
self.actions = ACTIONS
# 构建转移模型
self.transition_model = self._build_transition_model()
# 构建奖励模型
self.reward_model = self._build_reward_model()
def _build_transition_model(self):
"""构建转移概率字典 P(s' | s, a)"""
model = {}
# 状态 high
model[('high', 'search')] = [('high', self.alpha), ('low', 1-self.alpha)]
model[('high', 'wait')] = [('high', 1.0)]
# 状态 low
model[('low', 'search')] = [('low', self.beta), ('terminal', 1-self.beta)] # 耗尽进入终止态
model[('low', 'wait')] = [('low', 1.0)]
model[('low', 'recharge')] = [('high', 1.0)]
return model
def _build_reward_model(self):
"""构建期望即时奖励字典 R(s, a, s')"""
model = {}
# 根据之前的表格定义奖励
model[('high', 'search', 'high')] = self.rewards['r_search']
model[('high', 'search', 'low')] = self.rewards['r_search']
model[('high', 'wait', 'high')] = self.rewards['r_wait']
model[('low', 'search', 'low')] = self.rewards['r_search']
model[('low', 'search', 'terminal')] = -self.rewards['r_shutdown']
model[('low', 'wait', 'low')] = self.rewards['r_wait']
model[('low', 'recharge', 'high')] = -self.rewards['r_recharge']
return model
```
这里有几个关键点:
1. **`terminal` 状态**:我们引入了一个特殊的 `terminal` 状态,代表机器人电量耗尽、游戏结束。这是一个常见的MDP处理技巧。
2. **模型分离**:将转移概率 `transition_model` 和期望奖励 `reward_model` 分开存储,结构更清晰,也便于后续算法调用。`reward_model` 的键是 `(s, a, s‘)` 三元组。
### 2.2 验证与交互:让模型“活”起来
为了确保我们的模型构建正确,并且能像真实环境一样交互,我们添加两个核心方法:
```python
def get_possible_transitions(self, state, action):
"""获取在给定状态和动作下,所有可能的下一状态及其概率"""
return self.transition_model.get((state, action), [])
def step(self, state, action):
"""
模拟在环境中执行一步动作。
返回: (next_state, reward, is_done)
"""
possible_outcomes = self.get_possible_transitions(state, action)
if not possible_outcomes:
raise ValueError(f"Invalid state-action pair: ({state}, {action})")
# 根据概率随机选择下一个状态
next_states, probs = zip(*possible_outcomes) # 解压成两个列表
chosen_next_state = np.random.choice(next_states, p=probs)
# 获取奖励
reward = self.reward_model.get((state, action, chosen_next_state), 0.0)
# 判断是否终止
is_done = (chosen_next_state == 'terminal')
return chosen_next_state, reward, is_done
```
`step` 函数是**智能体与环境的交互接口**。它接收当前状态和智能体选择的动作,利用我们定义的概率模型随机采样出下一个状态和奖励,并返回给智能体。这正是强化学习交互循环的缩影。
## 3. 寻找最优策略:价值迭代算法实战
模型建好了,现在我们需要一个“大脑”来为机器人做决策。这个大脑的目标是找到一个策略 `π(s)`,告诉机器人在每个状态下应该采取哪个动作,使得长期累积奖励(回报)最大。我们采用经典的**价值迭代**算法来实现。
### 3.1 算法原理与实现步骤
价值迭代的核心思想是不断更新每个状态的价值 `V(s)`,直到收敛。这个价值代表了从该状态出发,遵循最优策略所能获得的期望回报。更新公式如下:
`V_{k+1}(s) = max_{a} Σ_{s'} P(s'|s,a) * [ R(s,a,s') + γ * V_k(s') ]`
其中 `γ` 是折扣因子(0≤γ<1),用于权衡即时奖励和未来奖励的重要性。
让我们将其转化为代码。我们在 `SimpleMDP` 类中添加一个方法:
```python
def value_iteration(self, gamma=0.9, theta=1e-6, max_iter=1000):
"""
价值迭代算法求解最优价值函数和策略。
参数:
gamma: 折扣因子
theta: 收敛阈值,当价值函数变化小于此值时停止迭代
max_iter: 最大迭代次数
返回:
V: 最优价值函数字典 {state: value}
policy: 最优策略字典 {state: optimal_action}
"""
# 初始化价值函数,所有状态价值为0
V = {s: 0 for s in self.states}
V['terminal'] = 0 # 终止状态价值为0
for i in range(max_iter):
delta = 0
new_V = V.copy()
for s in self.states:
if s not in self.actions: # 跳过无法行动的状态(理论上不会发生)
continue
# 计算每个动作的期望价值
action_values = []
for a in self.actions[s]:
q_value = 0
for s_next, prob in self.get_possible_transitions(s, a):
reward = self.reward_model.get((s, a, s_next), 0)
q_value += prob * (reward + gamma * V[s_next])
action_values.append((q_value, a))
# 贝尔曼最优方程更新:选择价值最大的动作对应的价值
if action_values:
best_value, _ = max(action_values)
new_V[s] = best_value
delta = max(delta, abs(new_V[s] - V[s]))
V = new_V
# 检查是否收敛
if delta < theta:
print(f"价值迭代在 {i+1} 次迭代后收敛。")
break
else:
print(f"达到最大迭代次数 {max_iter},可能未完全收敛。")
# 根据最优价值函数提取最优策略
policy = {}
for s in self.states:
if s not in self.actions:
continue
action_values = []
for a in self.actions[s]:
q_value = 0
for s_next, prob in self.get_possible_transitions(s, a):
reward = self.reward_model.get((s, a, s_next), 0)
q_value += prob * (reward + gamma * V[s_next])
action_values.append((q_value, a))
if action_values:
_, best_action = max(action_values)
policy[s] = best_action
return V, policy
```
这段代码做了以下几件事:
1. **初始化**:所有状态价值设为0。
2. **迭代更新**:遍历每个状态,对于该状态所有可能的动作,计算其动作价值 `q_value`(即期望回报),然后选择最大的 `q_value` 作为该状态的新价值 `V(s)`。
3. **收敛判断**:记录每次迭代中价值函数的最大变化 `delta`,当变化小于阈值 `theta` 时停止。
4. **策略提取**:价值函数收敛后,再遍历一次所有状态,选择能使动作价值最大化的动作,构成最终的最优策略 `policy`。
### 3.2 运行与结果分析
现在,让我们实例化模型并运行价值迭代,看看机器人学到了什么。
```python
# 参数设置:让搜索风险稍高,奖励对比鲜明
mdp = SimpleMDP(alpha=0.7, beta=0.4, r_search=8, r_wait=2, r_shutdown=15, r_recharge=4)
# 运行价值迭代
optimal_values, optimal_policy = mdp.value_iteration(gamma=0.95)
print("\n=== 最优状态价值 ===")
for state, value in optimal_values.items():
if state != 'terminal':
print(f" 状态 {state}: {value:.4f}")
print("\n=== 最优策略 ===")
for state, action in optimal_policy.items():
print(f" 在状态 {state} 下,最优动作是: {action}")
```
运行上述代码,你可能会得到类似下面的输出:
```
价值迭代在 27 次迭代后收敛。
=== 最优状态价值 ===
状态 high: 42.1673
状态 low: 36.5218
=== 最优策略 ===
在状态 high 下,最优动作是: search
在状态 low 下,最优动作是: recharge
```
这个结果非常符合直觉:
* **高电量时永远搜索**:因为搜索收益 (`r_search=8`) 远高于等待 (`r_wait=2`),且电量充足时风险较低 (`alpha=0.7`)。状态价值也更高。
* **低电量时立即充电**:虽然低电量时搜索也有机会获得高收益 (`beta=0.4`),但一旦耗尽电量的惩罚巨大 (`-15`)。计算表明,冒险搜索的期望回报低于稳妥充电后回到高电量状态再去搜索的长期回报。因此,最优策略选择了保守的 `recharge`。
你可以尝试调整参数,观察策略如何变化。例如,将 `r_shutdown` 惩罚降低,或者将 `beta` 提高(低电量搜索更安全),最优策略可能会变成在低电量时也选择 `search`。
## 4. 策略评估与可视化:让决策过程一目了然
得到策略后,我们还需要验证它是否真的“最优”,并直观展示其决策过程。我们可以进行策略评估和模拟轨迹可视化。
### 4.1 策略评估:计算给定策略的价值
价值迭代求的是最优价值。我们也可以计算一个任意指定策略下的价值。这被称为**策略评估**。算法是类似的,只是不再取最大值,而是根据固定策略的动作来计算期望。
```python
def policy_evaluation(self, policy, gamma=0.9, theta=1e-6, max_iter=1000):
"""
评估一个给定策略的价值函数。
参数:
policy: 策略字典 {state: action}
gamma: 折扣因子
theta: 收敛阈值
返回:
V: 该策略下的价值函数
"""
V = {s: 0 for s in self.states}
V['terminal'] = 0
for i in range(max_iter):
delta = 0
for s in self.states:
if s not in policy:
continue
a = policy[s]
new_v = 0
for s_next, prob in self.get_possible_transitions(s, a):
reward = self.reward_model.get((s, a, s_next), 0)
new_v += prob * (reward + gamma * V[s_next])
delta = max(delta, abs(new_v - V[s]))
V[s] = new_v
if delta < theta:
break
return V
```
我们可以用这个函数来评估我们得到的最优策略,理论上它的价值应该等于 `value_iteration` 输出的 `optimal_values`。也可以评估一个“愚蠢”的策略(比如永远 `wait`),对比其价值远低于最优策略,从而证明我们算法的有效性。
### 4.2 模拟运行与轨迹可视化
最后,让我们写一个简单的模拟函数,让机器人按照最优策略运行一段时间,并打印出它的“冒险日志”。
```python
def simulate_robot(mdp, policy, start_state='high', max_steps=20):
"""
按照给定策略模拟机器人运行。
"""
state = start_state
total_reward = 0
history = []
for step in range(max_steps):
if state == 'terminal':
print("机器人电量耗尽,任务终止。")
break
if state not in policy:
print(f"状态 {state} 没有对应策略。")
break
action = policy[state]
next_state, reward, done = mdp.step(state, action)
history.append({
'step': step,
'state': state,
'action': action,
'reward': reward,
'next_state': next_state
})
print(f"步骤{step:2d}: 状态[{state}] -> 执行[{action:8s}] -> 奖励[{reward:5.1f}] -> 新状态[{next_state}]")
total_reward += reward
state = next_state
if done:
print("进入终止状态,模拟结束。")
break
else:
print(f"达到最大步数 {max_steps}。")
print(f"\n累计总奖励: {total_reward:.2f}")
return history, total_reward
# 使用最优策略进行模拟
print("\n=== 开始模拟运行(遵循最优策略)===")
history, total_reward = simulate_robot(mdp, optimal_policy, start_state='high', max_steps=15)
```
一次模拟运行的输出可能如下:
```
=== 开始模拟运行(遵循最优策略)===
步骤 0: 状态[high] -> 执行[search ] -> 奖励[ 8.0] -> 新状态[high]
步骤 1: 状态[high] -> 执行[search ] -> 奖励[ 8.0] -> 新状态[low]
步骤 2: 状态[low] -> 执行[recharge] -> 奖励[-4.0] -> 新状态[high]
步骤 3: 状态[high] -> 执行[search ] -> 奖励[ 8.0] -> 新状态[high]
...
累计总奖励: 42.0
```
从日志中可以清晰看到策略的执行:高电量持续搜索,一旦电量降到 `low`,立刻执行 `recharge`,然后回到 `high` 继续搜索。整个决策逻辑完全符合我们的分析。
这个项目代码总共大约200行,但它完整地走完了MDP建模、求解和验证的整个流程。你可以在此基础上进行无数扩展:增加更复杂的状态(如房间位置、垃圾密度)、定义更多的动作、尝试其他求解算法如策略迭代,甚至接入一个简单的图形界面。最重要的是,通过亲手实现,那些关于状态、动作、奖励、概率和价值的抽象概念,已经变成了你屏幕上可以运行、调试和修改的具象代码。这才是理解一个算法最扎实的方式。