您现在的位置是:首页 > 技术教程 正文

【深度强化学习】(6) PPO 模型解析,附Pytorch完整代码

admin 阅读: 2024-03-20
后台-插件-广告管理-内容页头部广告(手机)

大家好,今天和各位分享一下深度强化学习中的近端策略优化算法(proximal policy optimization,PPO),并借助 OpenAI 的 gym 环境完成一个小案例,完整代码可以从我的 GitHub 中获得:

https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model


1. 算法原理

PPO 算法之所以被提出,根本原因在于 Policy Gradient 在处理连续动作空间时 Learning rate 取值抉择困难。Learning rate 取值过小,就会导致深度强化学习收敛性较差,陷入完不成训练的局面,取值过大则导致新旧策略迭代时数据不一致,造成学习波动较大或局部震荡。除此之外,Policy Gradient 因为在线学习的性质,进行迭代策略时原先的采样数据无法被重复利用,每次迭代都需要重新采样

同样地置信域策略梯度算法(Trust Region Policy Optimization,TRPO)虽然利用重要性采样(Important-sampling)、共轭梯度法求解提升了样本效率、训练速率等,但在处理函数的二阶近似时会面临计算量过大,以及实现过程复杂、兼容性差等缺陷。 

PPO 算法具备 Policy Gradient、TRPO 的部分优点采样数据和使用随机梯度上升方法优化代替目标函数之间交替进行,虽然标准的策略梯度方法对每个数据样本执行一次梯度更新,但 PPO 提出新目标函数,可以实现小批量更新。

鉴于上述问题,该算法在迭代更新时,观察当前策略在 t 时刻智能体处于状态 s 所采取的行为概率\pi (a_t |s_t)

如果优势函数为正数,需要增大新旧策略比值 r_t

PPO 算法主要由 Actor 和 Critic 两部分构成,Critic 部分更新方式与其他Actor-Critic 类型相似,通常采用计算 TD  error(时序差分误差)形式。对于 Actor 的更新方式,PPO 可在KLPENL 、CLIPL 之间选择对于当前实验环境稳定性适用性更强的目标函数,经过 OpenAI 研究团队实验论证,PPO- Clip 比 PPO- Penalty有更好的数据效率和可行性 


2. 代码实现

下面我就采用 Clip 形式的 PPO。模型构建代码如下。下面的模型适用于 action 是离散的情况,连续情况的代码可以从我的 GitHub 中获取。

  1. # 代码用于离散环境的模型
  2. import numpy as np
  3. import torch
  4. from torch import nn
  5. from torch.nn import functional as F
  6. # ----------------------------------- #
  7. # 构建策略网络--actor
  8. # ----------------------------------- #
  9. class PolicyNet(nn.Module):
  10. def __init__(self, n_states, n_hiddens, n_actions):
  11. super(PolicyNet, self).__init__()
  12. self.fc1 = nn.Linear(n_states, n_hiddens)
  13. self.fc2 = nn.Linear(n_hiddens, n_actions)
  14. def forward(self, x):
  15. x = self.fc1(x) # [b,n_states]-->[b,n_hiddens]
  16. x = F.relu(x)
  17. x = self.fc2(x) # [b, n_actions]
  18. x = F.softmax(x, dim=1) # [b, n_actions] 计算每个动作的概率
  19. return x
  20. # ----------------------------------- #
  21. # 构建价值网络--critic
  22. # ----------------------------------- #
  23. class ValueNet(nn.Module):
  24. def __init__(self, n_states, n_hiddens):
  25. super(ValueNet, self).__init__()
  26. self.fc1 = nn.Linear(n_states, n_hiddens)
  27. self.fc2 = nn.Linear(n_hiddens, 1)
  28. def forward(self, x):
  29. x = self.fc1(x) # [b,n_states]-->[b,n_hiddens]
  30. x = F.relu(x)
  31. x = self.fc2(x) # [b,n_hiddens]-->[b,1] 评价当前的状态价值state_value
  32. return x
  33. # ----------------------------------- #
  34. # 构建模型
  35. # ----------------------------------- #
  36. class PPO:
  37. def __init__(self, n_states, n_hiddens, n_actions,
  38. actor_lr, critic_lr, lmbda, epochs, eps, gamma, device):
  39. # 实例化策略网络
  40. self.actor = PolicyNet(n_states, n_hiddens, n_actions).to(device)
  41. # 实例化价值网络
  42. self.critic = ValueNet(n_states, n_hiddens).to(device)
  43. # 策略网络的优化器
  44. self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
  45. # 价值网络的优化器
  46. self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr = critic_lr)
  47. self.gamma = gamma # 折扣因子
  48. self.lmbda = lmbda # GAE优势函数的缩放系数
  49. self.epochs = epochs # 一条序列的数据用来训练轮数
  50. self.eps = eps # PPO中截断范围的参数
  51. self.device = device
  52. # 动作选择
  53. def take_action(self, state):
  54. # 维度变换 [n_state]-->tensor[1,n_states]
  55. state = torch.tensor(state[np.newaxis, :]).to(self.device)
  56. # 当前状态下,每个动作的概率分布 [1,n_states]
  57. probs = self.actor(state)
  58. # 创建以probs为标准的概率分布
  59. action_list = torch.distributions.Categorical(probs)
  60. # 依据其概率随机挑选一个动作
  61. action = action_list.sample().item()
  62. return action
  63. # 训练
  64. def learn(self, transition_dict):
  65. # 提取数据集
  66. states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
  67. actions = torch.tensor(transition_dict['actions']).to(self.device).view(-1,1)
  68. rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).to(self.device).view(-1,1)
  69. next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
  70. dones = torch.tensor(transition_dict['dones'], dtype=torch.float).to(self.device).view(-1,1)
  71. # 目标,下一个状态的state_value [b,1]
  72. next_q_target = self.critic(next_states)
  73. # 目标,当前状态的state_value [b,1]
  74. td_target = rewards + self.gamma * next_q_target * (1-dones)
  75. # 预测,当前状态的state_value [b,1]
  76. td_value = self.critic(states)
  77. # 目标值和预测值state_value之差 [b,1]
  78. td_delta = td_target - td_value
  79. # 时序差分值 tensor-->numpy [b,1]
  80. td_delta = td_delta.cpu().detach().numpy()
  81. advantage = 0 # 优势函数初始化
  82. advantage_list = []
  83. # 计算优势函数
  84. for delta in td_delta[::-1]: # 逆序时序差分值 axis=1轴上倒着取 [], [], []
  85. # 优势函数GAE的公式
  86. advantage = self.gamma * self.lmbda * advantage + delta
  87. advantage_list.append(advantage)
  88. # 正序
  89. advantage_list.reverse()
  90. # numpy --> tensor [b,1]
  91. advantage = torch.tensor(advantage_list, dtype=torch.float).to(self.device)
  92. # 策略网络给出每个动作的概率,根据action得到当前时刻下该动作的概率
  93. old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach()
  94. # 一组数据训练 epochs 轮
  95. for _ in range(self.epochs):
  96. # 每一轮更新一次策略网络预测的状态
  97. log_probs = torch.log(self.actor(states).gather(1, actions))
  98. # 新旧策略之间的比例
  99. ratio = torch.exp(log_probs - old_log_probs)
  100. # 近端策略优化裁剪目标函数公式的左侧项
  101. surr1 = ratio * advantage
  102. # 公式的右侧项,ratio小于1-eps就输出1-eps,大于1+eps就输出1+eps
  103. surr2 = torch.clamp(ratio, 1-self.eps, 1+self.eps) * advantage
  104. # 策略网络的损失函数
  105. actor_loss = torch.mean(-torch.min(surr1, surr2))
  106. # 价值网络的损失函数,当前时刻的state_value - 下一时刻的state_value
  107. critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))
  108. # 梯度清0
  109. self.actor_optimizer.zero_grad()
  110. self.critic_optimizer.zero_grad()
  111. # 反向传播
  112. actor_loss.backward()
  113. critic_loss.backward()
  114. # 梯度更新
  115. self.actor_optimizer.step()
  116. self.critic_optimizer.step()

3. 案例演示

基于 OpenAI 的 gym 环境完成一个推车游戏,一个离散的环境,目标是左右移动小车将黄色的杆子保持竖直。动作维度为2,属于离散值;状态维度为 4,分别是坐标、速度、角度、角速度。

  1. import numpy as np
  2. import matplotlib.pyplot as plt
  3. import gym
  4. import torch
  5. from RL_brain import PPO
  6. device = torch.device('cuda') if torch.cuda.is_available() \
  7. else torch.device('cpu')
  8. # ----------------------------------------- #
  9. # 参数设置
  10. # ----------------------------------------- #
  11. num_episodes = 100 # 总迭代次数
  12. gamma = 0.9 # 折扣因子
  13. actor_lr = 1e-3 # 策略网络的学习率
  14. critic_lr = 1e-2 # 价值网络的学习率
  15. n_hiddens = 16 # 隐含层神经元个数
  16. env_name = 'CartPole-v1'
  17. return_list = [] # 保存每个回合的return
  18. # ----------------------------------------- #
  19. # 环境加载
  20. # ----------------------------------------- #
  21. env = gym.make(env_name, render_mode="human")
  22. n_states = env.observation_space.shape[0] # 状态数 4
  23. n_actions = env.action_space.n # 动作数 2
  24. # ----------------------------------------- #
  25. # 模型构建
  26. # ----------------------------------------- #
  27. agent = PPO(n_states=n_states, # 状态数
  28. n_hiddens=n_hiddens, # 隐含层数
  29. n_actions=n_actions, # 动作数
  30. actor_lr=actor_lr, # 策略网络学习率
  31. critic_lr=critic_lr, # 价值网络学习率
  32. lmbda = 0.95, # 优势函数的缩放因子
  33. epochs = 10, # 一组序列训练的轮次
  34. eps = 0.2, # PPO中截断范围的参数
  35. gamma=gamma, # 折扣因子
  36. device = device
  37. )
  38. # ----------------------------------------- #
  39. # 训练--回合更新 on_policy
  40. # ----------------------------------------- #
  41. for i in range(num_episodes):
  42. state = env.reset()[0] # 环境重置
  43. done = False # 任务完成的标记
  44. episode_return = 0 # 累计每回合的reward
  45. # 构造数据集,保存每个回合的状态数据
  46. transition_dict = {
  47. 'states': [],
  48. 'actions': [],
  49. 'next_states': [],
  50. 'rewards': [],
  51. 'dones': [],
  52. }
  53. while not done:
  54. action = agent.take_action(state) # 动作选择
  55. next_state, reward, done, _, _ = env.step(action) # 环境更新
  56. # 保存每个时刻的状态\动作\...
  57. transition_dict['states'].append(state)
  58. transition_dict['actions'].append(action)
  59. transition_dict['next_states'].append(next_state)
  60. transition_dict['rewards'].append(reward)
  61. transition_dict['dones'].append(done)
  62. # 更新状态
  63. state = next_state
  64. # 累计回合奖励
  65. episode_return += reward
  66. # 保存每个回合的return
  67. return_list.append(episode_return)
  68. # 模型训练
  69. agent.learn(transition_dict)
  70. # 打印回合信息
  71. print(f'iter:{i}, return:{np.mean(return_list[-10:])}')
  72. # -------------------------------------- #
  73. # 绘图
  74. # -------------------------------------- #
  75. plt.plot(return_list)
  76. plt.title('return')
  77. plt.show()

训练100回合,绘制每回合的 return

标签:
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

在线投稿:投稿 站长QQ:1888636

后台-插件-广告管理-内容页尾部广告(手机)
关注我们

扫一扫关注我们,了解最新精彩内容

搜索
排行榜