# 贝尔曼最优公式实战:用Python手把手教你实现强化学习中的策略优化
很多朋友在初次接触强化学习理论时,都会被贝尔曼方程、最优策略、值迭代这些概念绕得云里雾里。公式推导看似严谨,但总感觉隔着一层纱,不知道这些抽象的符号如何变成屏幕上能跑起来的代码,更不清楚一个智能体是如何通过计算“学会”做出最佳决策的。今天,我们就抛开复杂的数学证明,直接动手,用Python从零实现贝尔曼最优公式的求解过程,让你亲眼看到策略是如何一步步进化到最优的。
我们将构建一个经典的“网格世界”环境,智能体需要学会避开禁区,找到通往目标的最优路径。整个过程,我们会使用NumPy进行高效的矩阵运算,用Matplotlib动态可视化策略和状态值的变化,并深入探讨代码实现中的关键细节,比如如何避免维度错误、如何设置收敛条件,以及如何对算法进行性能优化。无论你是希望巩固理论的初学者,还是想寻找一个可运行代码模板的实践者,这篇文章都将为你提供一个清晰、可操作的实战指南。
## 1. 环境搭建与问题定义
在开始编码之前,我们必须先清晰地定义智能体所处的世界。强化学习的核心是智能体与环境交互,通过试错来学习。环境决定了智能体可以做什么(动作),做了之后会怎么样(状态转移和奖励)。我们选择“网格世界”作为示例,因为它直观且包含了强化学习的所有核心要素:状态、动作、奖励和动态。
我们的网格世界是一个4x4的方格,总共有16个状态。其中:
* **普通状态(白色格子)**:智能体可以自由移动的起点或路径。
* **目标状态(绿色格子)**:智能体到达此处获得正奖励,并结束本轮尝试。
* **禁区状态(红色格子)**:智能体进入此处获得负奖励(惩罚),并且通常也会结束或受到阻碍。
* **边界**:智能体试图移出网格时,会停留在原地并受到惩罚。
智能体在每个状态有四个可能的动作:上(0)、右(1)、下(2)、左(3)。环境会根据智能体执行的动作,按照一定的概率将其转移到下一个状态,并给予相应的奖励。为了简化,我们首先考虑**确定性环境**,即执行一个动作后,转移到下一个状态是100%确定的。后续我们可以轻松地将其扩展为随机环境。
下面,我们用Python的类来定义这个环境。这个类将封装所有环境逻辑,包括状态转移、奖励计算和可视化。
```python
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import colors
class GridWorld:
def __init__(self, size=4, goal_state=None, forbidden_states=None):
"""
初始化网格世界。
:param size: 网格边长,默认为4,即4x4网格。
:param goal_state: 目标状态坐标,例如 (3,3)。
:param forbidden_states: 禁区状态坐标列表,例如 [(1,1), (2,2)]。
"""
self.size = size
self.n_states = size * size
self.n_actions = 4 # 上,右,下,左
self.actions = [0, 1, 2, 3]
# 定义目标状态和禁区
self.goal_state = goal_state if goal_state else (size-1, size-1)
self.forbidden_states = forbidden_states if forbidden_states else [(1,1), (2,2)]
# 将二维坐标转换为一维状态索引的辅助函数
self._state_to_index = lambda s: s[0] * self.size + s[1]
self._index_to_state = lambda i: (i // self.size, i % self.size)
# 预计算所有状态-动作对的下一个状态和奖励
self._build_transition_model()
def _build_transition_model(self):
"""构建确定性的状态转移模型和奖励函数。"""
# 初始化转移矩阵 P(s'|s,a) 和奖励矩阵 R(s,a,s')
# 这里我们用字典存储,键为(s,a),值为(下一个状态索引, 即时奖励)
self.transition = {}
for i in range(self.n_states):
row, col = self._index_to_state(i)
for a in range(self.n_actions):
next_row, next_col = row, col
if a == 0: # 上
next_row = max(row - 1, 0)
elif a == 1: # 右
next_col = min(col + 1, self.size - 1)
elif a == 2: # 下
next_row = min(row + 1, self.size - 1)
elif a == 3: # 左
next_col = max(col - 1, 0)
# 计算奖励
reward = 0.0
next_state = (next_row, next_col)
# 检查是否到达目标
if next_state == self.goal_state:
reward = 1.0
# 检查是否进入禁区
elif next_state in self.forbidden_states:
reward = -1.0
# 检查是否撞墙(试图移出边界但被阻挡)
if (a == 0 and row == 0) or (a == 1 and col == self.size-1) or \
(a == 2 and row == self.size-1) or (a == 3 and col == 0):
reward = -0.1 # 轻微的边界惩罚
next_state_idx = self._state_to_index(next_state)
self.transition[(i, a)] = (next_state_idx, reward)
def step(self, state_idx, action):
"""
执行一个动作。
:param state_idx: 当前状态的一维索引。
:param action: 动作索引。
:return: 下一个状态索引,即时奖励,是否终止(done)。
"""
next_state_idx, reward = self.transition[(state_idx, action)]
row, col = self._index_to_state(next_state_idx)
done = ((row, col) == self.goal_state) or ((row, col) in self.forbidden_states)
return next_state_idx, reward, done
def reset(self):
"""重置环境到起始状态(左上角)。"""
return 0 # 状态(0,0)的索引
def render(self, values=None, policy=None):
"""可视化网格世界,可选地叠加状态值或策略箭头。"""
fig, ax = plt.subplots(figsize=(6,6))
# 创建网格颜色映射
cmap = colors.ListedColormap(['white', 'red', 'green'])
grid_data = np.zeros((self.size, self.size))
for fs in self.forbidden_states:
grid_data[fs[0], fs[1]] = 1
grid_data[self.goal_state[0], self.goal_state[1]] = 2
ax.imshow(grid_data, cmap=cmap, vmin=0, vmax=2)
# 绘制网格线
ax.set_xticks(np.arange(-0.5, self.size, 1), minor=True)
ax.set_yticks(np.arange(-0.5, self.size, 1), minor=True)
ax.grid(which='minor', color='black', linestyle='-', linewidth=2)
ax.tick_params(which='both', bottom=False, left=False, labelbottom=False, labelleft=False)
# 如果提供了状态值,则在每个格子中心显示
if values is not None:
for i in range(self.n_states):
row, col = self._index_to_state(i)
ax.text(col, row, f'{values[i]:.2f}', ha='center', va='center', fontsize=12, color='blue')
# 如果提供了策略(确定性策略),则绘制箭头
if policy is not None:
arrow_map = {0: '↑', 1: '→', 2: '↓', 3: '←'}
for i in range(self.n_states):
row, col = self._index_to_state(i)
# 只在非终止状态绘制策略箭头
state_tuple = (row, col)
if state_tuple != self.goal_state and state_tuple not in self.forbidden_states:
action = policy[i]
ax.text(col, row, arrow_map[action], ha='center', va='center', fontsize=20, color='black')
plt.title("Grid World Environment")
plt.show()
```
> 注意:在`_build_transition_model`函数中,我们为撞墙设置了-0.1的奖励。这是一个常见的技巧,鼓励智能体寻找更直接的路径,而不是在边界“蹭墙”。你可以调整这个值来观察对最终策略的影响。
初始化环境并查看其样貌:
```python
env = GridWorld(size=4)
print(f"状态总数: {env.n_states}")
print(f"动作总数: {env.n_actions}")
# 测试一个状态转移
test_state = env._state_to_index((0,0)) # 左上角
next_state, reward, done = env.step(test_state, 1) # 向右移动
print(f"从状态(0,0)执行动作‘右’: 到达状态{env._index_to_state(next_state)}, 奖励{reward}, 终止? {done}")
env.render() # 可视化基础环境
```
通过这段代码,我们不仅创建了环境,还理解了状态、动作和奖励是如何在代码中表示的。这是将理论公式转化为代码的第一步,也是最关键的一步。一个设计良好的环境类,能让后续的算法实现变得清晰很多。
## 2. 贝尔曼最优方程与值迭代算法原理
在搭建好环境之后,我们需要理解驱动智能体学习的核心引擎:贝尔曼最优方程。很多教程会直接抛出公式,但我们先从一个更直观的问题开始:**一个状态的价值,究竟由什么决定?**
想象一下你在玩一个迷宫游戏。某个位置(状态)的价值,并不在于这个位置本身,而在于从**这个位置出发,你未来能获得多少总奖励**。这个总奖励是未来每一步即时奖励的加权和,距离越远的奖励,其权重(由折扣因子γ控制)越小。这就是**状态值函数V(s)** 的概念。贝尔曼方程则描述了这个价值函数自身的递归关系:一个状态的价值,等于采取某个动作后得到的即时奖励,加上下一个状态价值的折扣值,并对所有可能的动作和结果求期望。
而贝尔曼**最优**方程,则是在此基础上加了一个“最大化”操作。它不再针对某个给定的策略π,而是直接寻找那个能让状态价值最大化的**最优策略π***。其核心思想可以概括为:
**最优状态值V*(s),等于在所有可选动作中,能带来的最大“动作值”Q*(s, a)。而动作值Q*(s, a),等于执行该动作后的即时奖励,加上所有可能转移到的下一个状态s‘的最优价值V*(s’)的折扣期望。**
用公式表示就是:
```
V*(s) = max_a [ R(s,a) + γ * Σ_{s'} P(s'|s,a) * V*(s') ]
```
其中,`R(s,a)`是期望即时奖励,`P(s'|s,a)`是状态转移概率。
这个方程是一个**不动点方程**。我们要求解的V*,是使得方程两边相等的那个特殊值。直接求解这个方程组(尤其是状态空间大时)非常困难。幸运的是,数学上可以证明,这个方程右边的部分构成一个**压缩映射**。这意味着,我们可以从一个任意的价值函数估计V0开始,反复应用贝尔曼最优方程的右边部分来更新它,这个迭代过程最终一定会收敛到唯一的最优解V*。
这个迭代算法,就是著名的**值迭代算法**。其更新规则非常简单:
对于所有状态s,
1. 计算每个动作a的Q值:`Q(s,a) = R(s,a) + γ * Σ_{s'} P(s'|s,a) * V(s')`
2. 找出使Q值最大的动作:`a* = argmax_a Q(s,a)`
3. 用这个最大的Q值更新状态s的价值:`V(s) = max_a Q(s,a)`
不断重复这个过程,直到V的变化非常小(小于一个预设的阈值θ),我们就认为它收敛到了V*。一旦得到V*,最优策略就唾手可得:在每个状态s,选择使得Q*(s,a)最大的动作a即可。由于我们是在确定性环境下,这个策略通常是确定性的(以概率1选择某个动作)。
为了更清晰地对比值迭代与其他基础算法的关系,我们来看下面这个表格:
| 特性 | 策略迭代 (Policy Iteration) | 值迭代 (Value Iteration) | 区别与联系 |
| :--- | :--- | :--- | :--- |
| **核心思想** | 交替进行**策略评估**(计算当前策略的价值)和**策略改进**(基于当前价值选择更优动作)。 | 直接迭代更新状态价值函数V(s),使其向最优价值V*收敛,最后一次性提取策略。 | 值迭代可以看作是策略迭代的一种特殊情况,它将策略评估步骤压缩为一次更新(只更新一次价值就进行策略改进)。 |
| **更新对象** | 显式地维护并更新策略π和价值函数V。 | 只更新价值函数V,策略是隐式地从V中推导的。 | 值迭代的代码结构通常更简洁。 |
| **收敛速度** | 通常收敛到最优策略所需的迭代次数较少。 | 收敛到最优价值函数所需的迭代次数可能较多,但每次迭代计算量可能较小。 | 在实际中,值迭代往往更常用,因为其实现简单,且在很多问题上效率足够高。 |
| **终止条件** | 策略不再发生变化。 | 价值函数的变化量小于阈值θ。 | 值迭代的终止条件更易于数值判断。 |
> 提示:值迭代算法中的折扣因子γ是一个极其重要的超参数。γ越接近1,智能体越“远视”,更看重长期回报;γ越接近0,智能体越“短视”,只关注即时奖励。调整γ会显著改变最优策略的行为。
理解了这个原理,我们就可以着手用代码实现它了。算法的骨架非常清晰,但魔鬼藏在细节里,比如如何高效地计算Q值,如何处理终止状态,以及如何设置收敛条件。
## 3. 值迭代算法的Python实现
现在,我们将理论转化为实实在在的代码。我们将实现一个`value_iteration`函数,它接收我们创建的环境、折扣因子γ和收敛阈值θ作为输入,输出最优价值函数V*和最优策略π*。
实现的关键在于高效地计算Q值。我们可以利用之前环境类中预计算的转移模型`self.transition`。对于给定的状态s和动作a,我们已经知道确定性的下一个状态s‘和奖励r。因此,Q值的计算简化为:`Q(s,a) = r + γ * V[s']`。对于随机环境,我们需要遍历所有可能的s‘并计算期望,但确定性环境让我们的第一版代码更加简洁。
下面是我们值迭代算法的完整实现:
```python
def value_iteration(env, gamma=0.9, theta=1e-6, max_iter=1000):
"""
值迭代算法求解网格世界的最优价值函数和策略。
:param env: GridWorld环境实例。
:param gamma: 折扣因子,范围[0,1)。
:param theta: 收敛阈值,当所有状态的价值更新量小于此值时停止迭代。
:param max_iter: 最大迭代次数,防止无限循环。
:return: 最优价值函数V_star, 最优策略policy_star。
"""
# 初始化价值函数,所有状态价值为0
V = np.zeros(env.n_states)
# 初始化一个随机策略(可选,主要用于记录中间过程)
policy = np.zeros(env.n_states, dtype=int)
for i in range(max_iter):
delta = 0 # 记录本轮迭代中,所有状态价值变化的最大值
# 遍历所有状态
for s in range(env.n_states):
# 如果是终止状态(目标或禁区),其价值固定为0(或即时奖励),无需更新
# 在我们的环境中,终止状态的转移奖励已包含在模型中,但价值迭代中通常将其价值视为0并停止更新。
# 更通用的做法是:计算所有动作的Q值,但终止状态的下一个状态仍是自己,且奖励为0。
# 这里我们简单处理:跳过对终止状态的更新,保持其V值为0。
state_tuple = env._index_to_state(s)
if state_tuple == env.goal_state or state_tuple in env.forbidden_states:
continue
# 计算当前状态s下,所有动作的Q值
q_values = np.zeros(env.n_actions)
for a in range(env.n_actions):
next_s, r = env.transition[(s, a)]
q_values[a] = r + gamma * V[next_s]
# 找出最大的Q值
max_q = np.max(q_values)
# 找出所有能取得最大Q值的动作(可能有多个)
best_actions = np.where(q_values == max_q)[0]
# 更新策略:随机选择一个最优动作(确定性策略通常只选一个)
policy[s] = np.random.choice(best_actions)
# 计算价值更新量,并更新V(s)
delta = max(delta, np.abs(max_q - V[s]))
V[s] = max_q
# 检查是否收敛
if delta < theta:
print(f"值迭代在 {i+1} 次迭代后收敛。")
break
else:
# 如果for循环正常结束(未break),说明达到最大迭代次数
print(f"达到最大迭代次数 {max_iter},可能未完全收敛。")
# 根据最终的最优价值函数V,再计算一次确定性的最优策略
for s in range(env.n_states):
state_tuple = env._index_to_state(s)
if state_tuple == env.goal_state or state_tuple in env.forbidden_states:
policy[s] = -1 # 终止状态标记为-1
continue
q_values = np.zeros(env.n_actions)
for a in range(env.n_actions):
next_s, r = env.transition[(s, a)]
q_values[a] = r + gamma * V[next_s]
policy[s] = np.argmax(q_values) # 选择Q值最大的动作
return V, policy
```
让我们逐段解析这个实现中的关键点:
1. **初始化**:价值函数V初始化为全零。这是一个常见的起点,当然你也可以用随机小数值初始化。
2. **主循环**:算法在`max_iter`内循环,每次循环遍历所有非终止状态。
3. **Q值计算**:对于每个状态-动作对,我们利用环境预存的`transition`字典,直接获取`(s, a)`对应的下一个状态和奖励,然后套用公式`q = r + gamma * V[next_s]`。这是算法中最核心的计算步骤。
4. **策略更新**:我们找出能产生最大Q值的动作。注意,这里使用了`np.where`来找出所有最优动作,并随机选择其中一个。在确定性环境中,通常只有一个最优动作,但在某些特殊配置下(如对称情况)可能存在多个同等最优的动作。这一步在迭代过程中不是必须的,因为最终策略是从收敛的V中提取的,但记录中间策略有助于观察学习过程。
5. **价值更新与收敛判断**:用最大的Q值更新`V[s]`。我们记录本轮所有状态价值变化的最大值`delta`。当`delta`小于预设的阈值`theta`时,我们认为价值函数已经稳定,算法收敛。
6. **最终策略提取**:在循环结束后,我们基于最终的最优价值函数V,重新为每个状态计算一次Q值,并选择Q值最大的动作作为最优策略。对于终止状态,我们将其策略标记为-1。
现在,让我们运行这个算法,看看它能否为我们的网格世界找到最优路径。
```python
# 运行值迭代算法
gamma = 0.9
V_star, policy_star = value_iteration(env, gamma=gamma, theta=1e-6)
# 打印部分结果
print("最优状态价值函数 (V*):")
print(V_star.reshape((4,4)).round(3)) # 重塑为4x4网格便于查看
print("\n最优策略 (箭头表示):")
policy_grid = policy_star.reshape((4,4))
arrow_map = {0: '↑', 1: '→', 2: '↓', 3: '←', -1: 'T'}
for row in policy_grid:
print(' '.join([arrow_map.get(a, '?') for a in row]))
# 可视化结果
env.render(values=V_star, policy=policy_star)
```
运行这段代码,你会看到控制台输出收敛信息、最优价值函数矩阵以及用箭头表示的最优策略。可视化图形会同时显示每个格子的价值(数字)和策略(箭头)。一个成功的学习结果应该显示,智能体学会了一条从起点(0,0)到目标(3,3)的路径,并且通常会选择一条避开禁区(如果可能且划算)或总价值最高的路径。
## 4. 可视化学习过程与性能优化技巧
仅仅看到最终结果是不够的。观察价值函数和策略在迭代过程中如何演变,能极大地加深我们对算法动态行为的理解。同时,我们最初的实现虽然正确,但在效率上还有很大的优化空间,尤其是当状态空间增大时。
### 4.1 动态可视化迭代过程
我们可以修改值迭代函数,使其在每次迭代后记录当前的V和策略,然后制作一个动画来展示它们的演变。
```python
def value_iteration_with_history(env, gamma=0.9, theta=1e-6, max_iter=100):
"""带历史记录的值迭代,用于可视化。"""
V = np.zeros(env.n_states)
V_history = [V.copy()]
policy_history = []
for i in range(max_iter):
delta = 0
new_V = V.copy()
for s in range(env.n_states):
state_tuple = env._index_to_state(s)
if state_tuple == env.goal_state or state_tuple in env.forbidden_states:
continue
q_values = np.zeros(env.n_actions)
for a in range(env.n_actions):
next_s, r = env.transition[(s, a)]
q_values[a] = r + gamma * V[next_s]
max_q = np.max(q_values)
delta = max(delta, np.abs(max_q - new_V[s]))
new_V[s] = max_q
V = new_V
V_history.append(V.copy())
# 记录当前V对应的策略
current_policy = np.zeros(env.n_states, dtype=int)
for s in range(env.n_states):
state_tuple = env._index_to_state(s)
if state_tuple == env.goal_state or state_tuple in env.forbidden_states:
current_policy[s] = -1
continue
q_values = np.zeros(env.n_actions)
for a in range(env.n_actions):
next_s, r = env.transition[(s, a)]
q_values[a] = r + gamma * V[next_s]
current_policy[s] = np.argmax(q_values)
policy_history.append(current_policy.copy())
if delta < theta:
print(f"收敛于迭代 {i+1}")
break
return V, V_history, policy_history
# 运行并可视化历史
V_final, V_hist, policy_hist = value_iteration_with_history(env, gamma=0.9, max_iter=20)
# 绘制价值函数随迭代次数的变化(以某个状态为例)
state_of_interest = env._state_to_index((0,0)) # 观察起点状态的价值变化
values_at_state = [vh[state_of_interest] for vh in V_hist]
plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.plot(values_at_state, marker='o')
plt.xlabel('迭代次数')
plt.ylabel(f'状态(0,0)的价值 V(s)')
plt.title('特定状态价值收敛过程')
plt.grid(True)
# 绘制所有状态价值的总变化量(衡量收敛)
deltas = [np.max(np.abs(V_hist[i+1] - V_hist[i])) for i in range(len(V_hist)-1)]
plt.subplot(1,2,2)
plt.plot(deltas, marker='s', color='red')
plt.xlabel('迭代次数')
plt.ylabel('最大价值更新量 (Delta)')
plt.title('全局收敛情况')
plt.yscale('log') # 使用对数坐标更清晰地观察收敛速度
plt.grid(True)
plt.tight_layout()
plt.show()
```
通过绘制图表,你可以清晰地看到价值函数如何随着迭代次数增加而快速收敛,以及`delta`如何呈指数下降(在对数坐标下近似直线),这正是压缩映射理论所预测的。
### 4.2 向量化与性能优化
我们之前的实现使用了双重for循环(遍历状态和动作),这在状态空间较小时没有问题。但对于更大的问题(例如10x10的网格有100个状态),这种朴素实现会变慢。我们可以利用NumPy的向量化操作进行优化。
优化的核心思想是:**将基于循环的逐状态更新,转变为对整个价值函数向量V的批量更新**。这需要我们将状态转移模型表示为矩阵形式。
首先,我们需要从环境的`transition`字典中构建两个矩阵:
* **奖励矩阵R**:形状为`(n_states, n_actions)`,`R[s, a]`表示在状态s执行动作a的期望即时奖励。
* **转移概率矩阵P**:形状为`(n_states, n_actions, n_states)`,`P[s, a, s']`表示在状态s执行动作a后转移到状态s‘的概率。在确定性环境中,这个矩阵非常稀疏(每行只有一个1)。
有了这两个矩阵,值迭代的核心更新步骤`V(s) = max_a [ R(s,a) + γ * Σ_{s'} P(s,a,s') * V(s') ]` 就可以被向量化为:
```python
# 计算所有状态-动作对的Q值矩阵
Q = R + gamma * np.dot(P, V) # 注意:这里np.dot需要根据P的维度调整,实际是张量运算
# 对每个状态,取Q值的最大值来更新V
new_V = np.max(Q, axis=1)
```
让我们实现这个向量化版本:
```python
def build_transition_matrices(env):
"""从环境构建奖励矩阵R和转移概率矩阵P。"""
n_states = env.n_states
n_actions = env.n_actions
R = np.zeros((n_states, n_actions))
P = np.zeros((n_states, n_actions, n_states))
for s in range(n_states):
for a in range(n_actions):
next_s, r = env.transition[(s, a)]
R[s, a] = r
P[s, a, next_s] = 1.0 # 确定性转移
return R, P
def value_iteration_vectorized(env, gamma=0.9, theta=1e-6, max_iter=1000):
"""向量化版本的值迭代算法。"""
R, P = build_transition_matrices(env)
V = np.zeros(env.n_states)
for i in range(max_iter):
# 向量化计算Q值:Q[s,a] = R[s,a] + γ * Σ_s' P[s,a,s'] * V[s']
# 利用广播和点积,这里P的维度是(S,A,S),V是(S,),点积后得到(S,A)
Q = R + gamma * np.dot(P, V) # 注意:这里需要确保维度对齐,对于三维P和向量V,np.dot可能不是预期操作。
# 更准确的写法是:Q = R + gamma * np.einsum('sas, s -> sa', P, V)
# 或者显式循环动作维度,但利用矩阵乘法计算每个动作的期望价值。
# 为了清晰,我们采用一种更直观的向量化方式:
Q = R + gamma * np.matmul(P, V) # 如果P是(S,A,S),V是(S,),则matmul(P, V)得到(S,A)
new_V = np.max(Q, axis=1)
delta = np.max(np.abs(new_V - V))
V = new_V
if delta < theta:
print(f"向量化值迭代在 {i+1} 次迭代后收敛。")
break
else:
print(f"达到最大迭代次数 {max_iter}。")
# 从最终的V中提取策略
Q_final = R + gamma * np.matmul(P, V)
policy = np.argmax(Q_final, axis=1)
# 标记终止状态
for s in range(env.n_states):
state_tuple = env._index_to_state(s)
if state_tuple == env.goal_state or state_tuple in env.forbidden_states:
policy[s] = -1
return V, policy
# 测试向量化版本
import time
start = time.time()
V_vec, policy_vec = value_iteration_vectorized(env, gamma=0.9, theta=1e-6)
end = time.time()
print(f"向量化版本运行时间: {end-start:.4f} 秒")
print("最优价值函数 (向量化):")
print(V_vec.reshape((4,4)).round(3))
```
> 注意:上面的向量化代码中,`np.matmul(P, V)`在P是三维、V是一维时,其运算规则是`(s,a,s) * (s) -> (s,a)`,即对最后一个s维度进行求和。这与我们计算`Σ_{s'} P[s,a,s'] * V[s']`的意图一致。确保你理解这里的维度变化。
对于小型网格,速度提升可能不明显,但对于更大的状态空间,向量化实现会比纯Python循环快上一个数量级。这是在实际工程应用中必须掌握的优化技巧。
### 4.3 探索参数影响与常见陷阱
最后,我们来探讨几个关键参数和实现中容易踩的坑。
* **折扣因子γ的影响**:如前所述,γ控制着智能体的“远视”程度。尝试将γ设为0.5甚至0.1重新运行算法,观察最优策略的变化。你会发现,当γ很小时,智能体变得非常“短视”,可能宁愿绕远路避开眼前的禁区惩罚,也不愿承受短期惩罚以获取更快的长期回报。
```python
# 比较不同gamma下的策略
for g in [0.1, 0.5, 0.9, 0.99]:
V_g, policy_g = value_iteration(env, gamma=g, theta=1e-6)
print(f"\nGamma = {g}:")
# 可以简单打印策略或可视化
# env.render(values=V_g, policy=policy_g) # 注释掉以避免弹出太多窗口
# 或者计算路径长度来量化策略优劣
```
* **收敛阈值θ的选择**:θ决定了我们何时停止迭代。θ太小会导致不必要的迭代,增加计算时间;θ太大则可能导致算法提前停止,得到次优解。通常`1e-6`是一个安全的选择。你可以尝试将其改为`1e-3`或`1e-9`,观察收敛迭代次数和最终价值函数的细微差异。
* **初始化与收敛**:我们的V初始化为0。在存在正奖励的环境中,这通常没问题。但在某些所有奖励都为负的环境中,初始化为0可能过于乐观,导致收敛变慢。有时用随机小值初始化可能更好。此外,值迭代保证收敛,但迭代次数与γ有关。γ越接近1,收敛越慢。
* **处理多个最优动作**:在我们的策略提取中,当多个动作具有相同的最大Q值时,`np.argmax`默认返回第一个索引。这可能导致策略在对称场景下出现不希望的偏差。更健壮的做法是随机选择其中一个,就像我们在迭代过程中做的那样。
* **终止状态的处理**:在值迭代中,终止状态(如目标)的价值通常不再更新。在我们的代码中,我们直接跳过了对这些状态的更新。另一种常见做法是,在计算Q值时,如果下一个状态是终止状态,则其价值视为0(或一个固定值)。两种方式本质是等价的,但必须保持一致,否则会影响收敛性。
通过亲手实现、可视化并优化这个算法,你不仅理解了贝尔曼最优公式如何驱动学习,更掌握了将其转化为高效、健壮代码的完整流程。这为你后续学习更复杂的强化学习算法(如Q-Learning、深度强化学习)打下了坚实的实践基础。记住,理解算法最好的方式,就是让它在你自己的代码中运行起来,并观察它的一举一动。