上一篇博客中,我们介绍了 DQN 及其改进算法。DQN 的核心思路是学习动作价值函数 ,再根据价值函数选择动作。这类方法在离散动作空间中非常有效,但当动作空间变为连续时,直接枚举所有动作并取最大值就不再现实。
本文将转向另一类强化学习方法:Policy-based 方法。本文会从策略梯度和 Actor-Critic 出发,进一步介绍 TRPO 如何用信赖域约束控制策略更新幅度,最后重点推导 PPO 如何用更简单的目标函数近似 TRPO 的优化思想,并给出一个可运行的 PPO 实现框架。
# 策略梯度算法
Q-learning、DQN 及 DQN 改进算法都是 Value-based 的方法,其中 Q-learning 是处理有限状态的算法,而 DQN 可以用来解决连续状态的问题。但是基于值函数的方法都只能处理离散动作空间,而 Policy-based 的方法直接优化策略,可以处理连续动作空间的优化问题。
对比两者,基于值函数的方法主要是学习值函数,然后根据值函数导出一个策略,学习过程中并不存在一个显式的策略;而基于策略的方法则是直接显式地学习一个目标策略。本节将简单回顾策略梯度算法。
策略梯度方法则选择直接参数化策略 。其中 是策略网络参数。优化目标是让策略在状态 下更倾向于选择能带来高回报的动作 。策略梯度的基本形式可以写为:
其中 是优势函数,用来衡量动作 相对于当前状态平均水平到底好多少。其中优势函数一般写为 ,即当前动作价值与当前状态价值的差值。
该梯度公式表明:
- 如果 ,说明这个动作比平均水平更好,应该提高该动作的概率。
- 如果 ,说明这个动作比平均水平更差,应该降低该动作的概率。
上述梯度公式中,包含当前动作的动作价值,还有状态价值。如果采用蒙特卡洛搜索方法来估计,那就是 REINFORCE 算法。
由于使用蒙特卡洛方法,REINFORCE 算法的梯度估计是无偏的,但是方差很大,可能会造成一定程度上的不稳定,这也是 Actor-Critic 算法要解决的问题。Actor-Critic 算法使用动作价值函数代替蒙特卡洛算法来估计回报,属于有偏估计,但方差小,训练更加稳定。
Actor-Critic 算法采用两个网络:分别是 Actor(策略网络)和 Critic(价值网络),对应策略梯度公式中的 与 。
- Actor 与环境交互,并在 Critic 价值函数的指导下用策略梯度学习一个更好的策略。
- Critic 通过 Actor 与环境交互收集的数据学习一个价值函数,这个价值函数会用于判断在当前状态什么动作是好的,什么动作不是好的,进而帮助 Actor 进行策略更新。
关于 Actor-Critic 的其他变体算法,可参考之前的博客,这里不再赘述。
# TRPO 算法
REINFORCE 与 Actor-Critic 都属于比较直接的策略梯度方法。它们的基本思路是估计策略梯度,然后沿着提高期望回报的方向更新策略参数。这个思路本身没有问题,但在深度强化学习中会遇到一个很现实的困难:策略更新幅度不好控制。
如果学习率太小,策略变化很慢,样本利用效率较低;如果学习率太大,策略可能在一次更新后发生明显偏移。由于策略直接决定智能体如何采样动作,一次过大的策略更新可能会让智能体从一个尚可的策略突然跳到一个很差的策略,导致训练过程剧烈震荡甚至性能崩溃。
TRPO(Trust Region Policy Optimization,信赖域策略优化)就是为了解决这个问题提出的。
策略梯度方法本质是优化策略 的期望回报:
有研究指出,新策略 的期望回报等于旧策略的期望回报 + 新策略在旧策略优势函数上的累计期望,即:
这里的 是旧策略 的优势函数,默认情况下是对应动作价值与状态价值的差值。所以 TRPO 希望通过估计新策略在旧策略优势函数上的期望来评估新策略的性能提升。
经过数学推导(此处略去),上述式子可以推导得到如下形式:
其中 $$\rho_{\tilde {\pi}}(s)$$ 是新策略 访问状态 的概率。所以,当我们需要估计新策略的期望回报时,依赖于新策略的采样数据来估计状态访问分布 。显然这种方式在实际训练中是不可行的,因为每次尝试一个新策略都要重新采样大量数据代价太高。因此,引入一个技巧和简化,即当新旧策略足够接近时,可以用旧策略的状态访问分布 来近似新策略的状态访问分布 ,从而得到一个替代目标:
TRPO 通过 KL 散度来衡量两个策略的差异:
由于 TRPO 使用上述替代目标来近似新策略的期望回报,因此需要保证新旧策略足够接近,所以通过限制新旧策略之间的 KL 散度来实现这一点:
其中 是一个小的超参数,表示允许的新旧策略之间的最大 KL 散度。
回到上述替代目标,TRPO 优化策略时本质上是优化下面目标:
上述优化目标中,动作需要在新策略 下采样,这意味着无法直接使用旧策略采样得到的数据来估计这个目标。为了解决这个问题,TRPO 引入重要性采样,对动作分布进行修正:
整理上述推导过程,TRPO 训练过程等价于求解如下优化问题:
直接求解上式带约束的优化问题比较麻烦,TRPO 在其具体实现中做了一步近似操作来快速求解,包括共轭梯度、近似搜索等。这部分理论推导比较复杂,由于笔者精力有限,这部分内容就不再展开,详细可见动手学强化学习 TRPO 算法。
# PPO 算法原理
TRPO 算法在很多场景上的应用都很成功,但是它的计算过程非常复杂,每一步更新的运算量非常大。于是,TRPO 算法的改进版 PPO 算法在 2017 年被提出。PPO 基于 TRPO 的思想,但是其算法实现更加简单。并且大量的实验结果表明,与 TRPO 相比,PPO 能学习得一样好(甚至更快),这使得 PPO 成为非常流行的强化学习算法。
PPO 基于 TRPO 简化而来,本质想要求解的优化问题与 TRPO 是一致的,见上一节的推导结果。
针对 TRPO 的优化问题,PPO 用了一些相对简单的方法来求解。具体来说,PPO 有两种形式,一是 PPO - 惩罚,二是 PPO - 截断,我们接下来对这两种形式进行介绍。
# PPO-Penalty
PPO - 惩罚(PPO-Penalty)用拉格朗日乘数法直接将 KL 散度的限制放进了目标函数中,这就变成了一个无约束的优化问题,即:
改成上述表达式以后,可以直接应用梯度上升法优化策略参数。但是这里还有个核心问题需要回答:这个 (惩罚系数)该怎么确定?如果 是个固定常数,会存在巨大的隐患:
- 如果 设得太大:惩罚太重,策略稍微更新就会被严重惩罚,导致训练缓慢。
- 如果 设得太小:惩罚太轻,起不到约束作用,就会导致新策略与旧策略相近的前提条件被打破。
而且,在训练的不同阶段,由于策略网络参数在变,相同的参数变化带来的 KL 散度波动是完全不同的。因此, 必须跟着训练进程动态调整。
OpenAI 的 PPO 论文中给出了一个启发式自适应算法,在每一次迭代中,算法会做两件事:
-
使用带拉格朗日惩罚项的目标函数,直接用普通的梯度上升(比如 Adam 优化器)更新参数 ,持续训练几个 Epoch。
-
一轮训练结束后,计算新策略和旧策略之间真实的平均 KL 散度,记为 ,接着与 KL 散度约束的超参数 进行对比,按照如下公式更新:
简单理解为,当策略变化比预期小时,减小惩罚项;反之提高惩罚系数。
# PPO-Clip
相比于 PPO-Pnalty,工业界更喜欢 PPO - 截断(PPO-Clip)。因为 PPO-Penalty 里的系数 需要在训练过程中动态根据 KL 散度的大小编码一套复杂的调整逻辑,而 PPO-Clip 相对更加简单。
PPO-Clip 提出了下面这个目标函数:
其中 函数的作用是将概率比值限制在 的范围内。 是一个超参数,常见取值为 或 。
当使用梯度上升法优化上述目标函数时,如果当前动作的优势 取值为正,那么新策略会提高该动作值的输出概率,反之则降低输出概率,符合直观理解。
当新策略与就策略的概率比值在 ,优化目标即为不带约束的 TRPO 优化目标,并且因为该概率比值在此范围内,所以可以认为符合 KL 散度不能过大的约束。而当概率比值超过这一区间后,上述优化目标变成一个常数,进而保证组织策略进一步更新。
# 广义优势估计
PPO 的策略更新依赖动作价值的优势估计,Advantage 的质量会直接影响训练稳定性。
最简单的一步 TD Advantage 是:
这里 也可以看作 TD error。如果 ,说明实际得到的结果比 Critic 预测更好;如果 ,说明结果比 Critic 预测更差。
但是一步 TD 估计虽然方差较小,却可能有较大偏差。另一种做法是使用蒙特卡洛回报:
蒙特卡洛估计偏差较小,但方差较大。强化学习中经常需要在偏差和方差之间做权衡。
广义优势估计(Generalized Advantage Estimation,GAE)正是为了解决这个问题。根据多步时序差分的思想,有:
GAE 将这些不同步数的优势估计进行指数加权平均,得到:
其中 是折扣因子; 用于控制 Bias-Variance Tradeoff。
-
当 时,GAE 接近一步 TD,方差较小但偏差较大。
-
当 时,GAE 接近蒙特卡洛估计,偏差较小但方差较大。
此外,PPO 实现中通常会对 Advantage 做标准化,让不同 batch 中的 Advantage 具有相近尺度,减少训练过程中的梯度波动。
# PPO 的完整损失函数
PPO 实际训练时通常包含三个损失项:Actor loss、Critic loss、Entropy bonus。
Actor 的目标是最大化 clipped objective:
在使用梯度下降实现时,通常写成最小化负数:
Critic 的任务是让状态价值估计 接近目标回报 。常见损失是均方误差:
其中 可以由折扣回报或 GAE 相关方法构造。
如果策略过早变得确定,智能体可能会陷入局部最优。为了鼓励探索,PPO 常常加入策略熵:
熵越大,说明策略越随机,探索性越强。
最终 PPO 的损失函数常写为:
# PPO 代码实践
下面针对 gymnasium 库中的 CartPole-v1 环境,使用 PPO 算法进行训练。代码以理解算法为目标,没有加入复杂工程封装。
该环境中,有一辆小车,智能体的任务是通过左右移动保持车上的杆竖直,若杆的倾斜度数过大,或者车子离初始位置左右的偏离程度过大,或者坚持时间到达指定帧数,则游戏结束。

智能体的状态是一个维数为 4 的向量(车的位置,车的速度,杆的角度,杆尖速度),每一维都是连续的,其动作是离散的,动作空间大小为 2(向左移动,向右移动)。在游戏中每坚持一帧,智能体能获得分数为 1 的奖励,坚持时间越长,则最后的分数越高。
首先导入所需库和依赖:
import random | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import gymnasium as gym | |
import numpy as np | |
from torch.distributions import Categorical |
定义所有超参数如下:
EPISODES = 500 # 训练的总回合数 | |
HIDDEN_DIM = 128 # 隐藏层神经元数量 | |
ACTOR_LR = 1e-3 # 策略网络(Actor)的学习率 | |
CRITIC_LR = 1e-3 # 价值网络(Critic)的学习率 | |
GAMMA = 0.99 # 折扣因子,用于计算未来奖励的现值 | |
LAMBDA = 0.95 # GAE(广义优势估计)中的 lambda 参数,控制方差与偏差的权衡 | |
PPO_EPOCHS = 8 # 每次更新时,在同一个轨迹上重复优化的轮数 | |
CLIP_EPS = 0.2 # PPO 裁剪范围,限制策略更新的幅度 | |
ENTROPY_COEF = 0.001 # 熵正则化系数,鼓励探索,防止策略过早确定性 | |
MAX_GRAD_NORM = 0.5 # 梯度裁剪的最大范数,防止梯度爆炸 |
定义所需的策略网络和价值网络,本质均为多层前馈神经网络。 PolicyNet 输入环境状态,输出两个动作的概率, ValueNet 输入环境状态,输出一个状态值。
class PolicyNet(nn.Module): | |
def __init__(self, state_dim, action_dim): | |
super().__init__() | |
self.net = nn.Sequential( | |
nn.Linear(state_dim, HIDDEN_DIM), | |
nn.ReLU(), | |
nn.Linear(HIDDEN_DIM, HIDDEN_DIM), | |
nn.ReLU(), | |
nn.Linear(HIDDEN_DIM, action_dim), | |
) | |
def forward(self, x): | |
return self.net(x) | |
class ValueNet(nn.Module): | |
def __init__(self, state_dim): | |
super().__init__() | |
self.net = nn.Sequential( | |
nn.Linear(state_dim, HIDDEN_DIM), | |
nn.ReLU(), | |
nn.Linear(HIDDEN_DIM, HIDDEN_DIM), | |
nn.ReLU(), | |
nn.Linear(HIDDEN_DIM, 1), | |
) | |
def forward(self, x): | |
return self.net(x) |
PPO 类封装了整个 PPO 算法的核心逻辑,包括动作选择和策略更新两个主要方法。
class PPO: | |
def __init__(self, state_dim, action_dim, device): | |
self.actor = PolicyNet(state_dim, action_dim).to(device) | |
self.critic = ValueNet(state_dim).to(device) | |
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=ACTOR_LR) | |
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=CRITIC_LR) | |
self.device = device | |
def take_action(self, state): | |
state = torch.as_tensor(state, dtype=torch.float32, device=self.device).unsqueeze(0) | |
# 根据策略网络输出的 logits 构造分类分布并采样动作 | |
with torch.no_grad(): | |
dist = Categorical(logits=self.actor(state)) | |
action = dist.sample() | |
return action.item(), dist.log_prob(action).item() | |
def update(self, transitions): | |
states = torch.as_tensor(np.array(transitions["states"]), dtype=torch.float32, device=self.device) | |
actions = torch.as_tensor(transitions["actions"], dtype=torch.long, device=self.device) | |
old_log_probs = torch.as_tensor(transitions["log_probs"], dtype=torch.float32, device=self.device) | |
rewards = torch.as_tensor(transitions["rewards"], dtype=torch.float32, device=self.device) | |
next_states = torch.as_tensor(np.array(transitions["next_states"]), dtype=torch.float32, | |
device=self.device) | |
terminals = torch.as_tensor(transitions["terminals"], dtype=torch.float32, device=self.device) | |
episode_ends = torch.as_tensor(transitions["episode_ends"], dtype=torch.float32, device=self.device) | |
# 计算 GAE 优势函数以及目标回报值 | |
with torch.no_grad(): | |
values = self.critic(states).squeeze(1) | |
next_values = self.critic(next_states).squeeze(1) | |
td_deltas = rewards + GAMMA * next_values * (1.0 - terminals) - values | |
advantages = compute_gae(td_deltas, episode_ends) | |
returns = advantages + values | |
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) | |
actor_losses, critic_losses = [], [] | |
# 多轮 PPO 更新 | |
for _ in range(PPO_EPOCHS): | |
# 根据当前策略计算动作的对数概率 | |
dist = Categorical(logits=self.actor(states)) | |
log_probs = dist.log_prob(actions) | |
ratio = torch.exp(log_probs - old_log_probs) | |
# 策略损失 | |
surrogate1 = ratio * advantages | |
surrogate2 = torch.clamp(ratio, 1.0 - CLIP_EPS, 1.0 + CLIP_EPS) * advantages | |
actor_loss = -torch.min(surrogate1, surrogate2).mean() - ENTROPY_COEF * dist.entropy().mean() | |
# 价值损失 | |
critic_loss = F.mse_loss(self.critic(states).squeeze(1), returns) | |
# 更新策略网络 | |
self.actor_optimizer.zero_grad() | |
actor_loss.backward() | |
torch.nn.utils.clip_grad_norm_(self.actor.parameters(), MAX_GRAD_NORM) | |
self.actor_optimizer.step() | |
# 更新价值网络 | |
self.critic_optimizer.zero_grad() | |
critic_loss.backward() | |
torch.nn.utils.clip_grad_norm_(self.critic.parameters(), MAX_GRAD_NORM) | |
self.critic_optimizer.step() | |
# 记录本次更新的损失 | |
actor_losses.append(actor_loss.item()) | |
critic_losses.append(critic_loss.item()) | |
# 返回本轮更新中损失的平均值 | |
return float(np.mean(actor_losses)), float(np.mean(critic_losses)) |
PPO.take_action 方法根据当前状态通过策略网络输出动作的概率分布,并从中采样一个动作,同时返回该动作的对数概率。取对数概率是为了后续计算重要性采样比率时使用。另外,注意这里使用了 torch.no_grad() 来避免在动作选择过程中计算梯度,因为此处不需要更新网络参数。
PPO.update 方法接收一个包含状态、动作、奖励等信息的字典,表示旧策略下的采样数据,来自一条完整的 Agent 交互轨迹。其中每个键对应一个列表,包含了轨迹中每个时间步的相关信息。
该方法首先在 torch.no_grad() 环境下使用 critic 网络计算当前状态的价值和下一状态的价值,进而计算 TD error、GAE 优势函数以及目标回报值。其中优势函数经过标准化处理,确保在训练过程中具有相似的尺度,减少梯度波动。
接着,进行多轮 PPO 更新。在每轮更新中,计算当前策略下动作的对数概率,进而计算重要性采样比率,并根据 PPO 的截断目标函数计算策略损失和价值损失。最后分别更新策略网络和价值网络的参数。
上述代码中, compute_gae 函数用于计算 GAE 优势函数。它接受 td_deltas (TD error)和 episode_ends (表示每个时间步是否为 episode 结束的标志)作为输入,按照 GAE 的定义从后向往前递归计算,得到每个时间步的优势值。
def compute_gae(td_deltas, episode_ends): | |
advantages = torch.zeros_like(td_deltas) | |
gae = 0.0 | |
for t in reversed(range(len(td_deltas))): | |
gae = td_deltas[t] + GAMMA * LAMBDA * (1.0 - episode_ends[t]) * gae | |
advantages[t] = gae | |
return advantages |
最后,整合上述组件,定义训练函数 train ,在 CartPole-v1 环境中使用 PPO 算法进行训练,并记录每个 episode 的回报以及 Actor 和 Critic 的损失:
def train(): | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# 初始化环境和 PPO 智能体 | |
env = gym.make("CartPole-v1") | |
state_dim = env.observation_space.shape[0] | |
action_dim = env.action_space.n | |
agent = PPO(state_dim, action_dim, device) | |
# 主训练循环 | |
returns, actor_losses, critic_losses = [], [], [] | |
for episode in range(EPISODES): | |
state, _ = env.reset() | |
transitions = { | |
"states": [], | |
"actions": [], | |
"log_probs": [], | |
"rewards": [], | |
"next_states": [], | |
"terminals": [], | |
"episode_ends": [], | |
} | |
episode_return = 0.0 | |
done = False | |
# 与环境交互,收集数据直到 episode 结束 | |
while not done: | |
action, log_prob = agent.take_action(state) | |
next_state, reward, terminated, truncated, _ = env.step(action) | |
done = terminated or truncated | |
transitions["states"].append(state) | |
transitions["actions"].append(action) | |
transitions["log_probs"].append(log_prob) | |
transitions["rewards"].append(reward) | |
transitions["next_states"].append(next_state) | |
transitions["terminals"].append(terminated) | |
transitions["episode_ends"].append(done) | |
state = next_state | |
episode_return += reward | |
actor_loss, critic_loss = agent.update(transitions) | |
returns.append(episode_return) | |
actor_losses.append(actor_loss) | |
critic_losses.append(critic_loss) | |
if (episode + 1) % 10 == 0 or episode == 0: | |
mean_return = np.mean(returns[-20:]) | |
print( | |
f"episode={episode + 1}, return={episode_return:.1f}, " | |
f"mean_return_20={mean_return:.1f}, actor_loss={actor_loss:.4f}, " | |
f"critic_loss={critic_loss:.4f}" | |
) | |
env.close() | |
return returns, actor_losses, critic_losses |
训练曲线如下:

