您现在的位置是:首页 > 技术教程 正文

【深度强化学习】(5) DDPG 模型解析,附Pytorch完整代码

admin 阅读: 2024-03-20
后台-插件-广告管理-内容页头部广告(手机)

大家好,今天和各位分享一下深度确定性策略梯度算法 (Deterministic Policy Gradient,DDPG)。并基于 OpenAI 的 gym 环境完成一个小游戏。完整代码在我的 GitHub 中获得:

https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model


1. 基本原理

深度确定性策略梯度算法是结合确定性策略梯度算法的思想,对 DQN 的一种改进,是一种无模型的深度强化学习算法。

DDPG 算法使用演员-评论家(Actor-Critic)算法作为其基本框架,采用深度神经网络作为策略网络和动作值函数的近似,使用随机梯度法训练策略网络和价值网络模型中的参数。DDPG 算法的原理如下图所示。

DDPG 算法架构中使用双重神经网络架构,对于策略函数和价值函数均使用双重神经网络模型架构(即 Online 网络和 Target 网络),使得算法的学习过程更加稳定,收敛的速度加快。同时该算法引入经验回放机制,Actor 与环境交互生产生的经验数据样本存储到经验池中,抽取批量数据样本进行训练,即类似于 DQN 的经验回放机制,去除样本的相关性和依赖性,使得算法更加容易收敛。 


2. 公式推导

为了便于大家理解 DDPG 的推导过程,算法框架如下图所示:

DDPG 共包含 4 个神经网络,用于对 Q 值函数和策略的近似表示。Critic 目标网络用于近似估计下一时刻的状态-动作的 Q 值函数 Q_{w'}(S_{t+1},\pi _{\theta '}(S_{t+1}))


3. 代码实现

DDPG 的伪代码如下:

模型代码如下:

  1. import torch
  2. from torch import nn
  3. from torch.nn import functional as F
  4. import numpy as np
  5. import collections
  6. import random
  7. # ------------------------------------- #
  8. # 经验回放池
  9. # ------------------------------------- #
  10. class ReplayBuffer:
  11. def __init__(self, capacity): # 经验池的最大容量
  12. # 创建一个队列,先进先出
  13. self.buffer = collections.deque(maxlen=capacity)
  14. # 在队列中添加数据
  15. def add(self, state, action, reward, next_state, done):
  16. # 以list类型保存
  17. self.buffer.append((state, action, reward, next_state, done))
  18. # 在队列中随机取样batch_size组数据
  19. def sample(self, batch_size):
  20. transitions = random.sample(self.buffer, batch_size)
  21. # 将数据集拆分开来
  22. state, action, reward, next_state, done = zip(*transitions)
  23. return np.array(state), action, reward, np.array(next_state), done
  24. # 测量当前时刻的队列长度
  25. def size(self):
  26. return len(self.buffer)
  27. # ------------------------------------- #
  28. # 策略网络
  29. # ------------------------------------- #
  30. class PolicyNet(nn.Module):
  31. def __init__(self, n_states, n_hiddens, n_actions, action_bound):
  32. super(PolicyNet, self).__init__()
  33. # 环境可以接受的动作最大值
  34. self.action_bound = action_bound
  35. # 只包含一个隐含层
  36. self.fc1 = nn.Linear(n_states, n_hiddens)
  37. self.fc2 = nn.Linear(n_hiddens, n_actions)
  38. # 前向传播
  39. def forward(self, x):
  40. x = self.fc1(x) # [b,n_states]-->[b,n_hiddens]
  41. x = F.relu(x)
  42. x = self.fc2(x) # [b,n_hiddens]-->[b,n_actions]
  43. x= torch.tanh(x) # 将数值调整到 [-1,1]
  44. x = x * self.action_bound # 缩放到 [-action_bound, action_bound]
  45. return x
  46. # ------------------------------------- #
  47. # 价值网络
  48. # ------------------------------------- #
  49. class QValueNet(nn.Module):
  50. def __init__(self, n_states, n_hiddens, n_actions):
  51. super(QValueNet, self).__init__()
  52. #
  53. self.fc1 = nn.Linear(n_states + n_actions, n_hiddens)
  54. self.fc2 = nn.Linear(n_hiddens, n_hiddens)
  55. self.fc3 = nn.Linear(n_hiddens, 1)
  56. # 前向传播
  57. def forward(self, x, a):
  58. # 拼接状态和动作
  59. cat = torch.cat([x, a], dim=1) # [b, n_states + n_actions]
  60. x = self.fc1(cat) # -->[b, n_hiddens]
  61. x = F.relu(x)
  62. x = self.fc2(x) # -->[b, n_hiddens]
  63. x = F.relu(x)
  64. x = self.fc3(x) # -->[b, 1]
  65. return x
  66. # ------------------------------------- #
  67. # 算法主体
  68. # ------------------------------------- #
  69. class DDPG:
  70. def __init__(self, n_states, n_hiddens, n_actions, action_bound,
  71. sigma, actor_lr, critic_lr, tau, gamma, device):
  72. # 策略网络--训练
  73. self.actor = PolicyNet(n_states, n_hiddens, n_actions, action_bound).to(device)
  74. # 价值网络--训练
  75. self.critic = QValueNet(n_states, n_hiddens, n_actions).to(device)
  76. # 策略网络--目标
  77. self.target_actor = PolicyNet(n_states, n_hiddens, n_actions, action_bound).to(device)
  78. # 价值网络--目标
  79. self.target_critic = QValueNet(n_states, n_hiddens, n_actions).to(device
  80. )
  81. # 初始化价值网络的参数,两个价值网络的参数相同
  82. self.target_critic.load_state_dict(self.critic.state_dict())
  83. # 初始化策略网络的参数,两个策略网络的参数相同
  84. self.target_actor.load_state_dict(self.actor.state_dict())
  85. # 策略网络的优化器
  86. self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
  87. # 价值网络的优化器
  88. self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
  89. # 属性分配
  90. self.gamma = gamma # 折扣因子
  91. self.sigma = sigma # 高斯噪声的标准差,均值设为0
  92. self.tau = tau # 目标网络的软更新参数
  93. self.n_actions = n_actions
  94. self.device = device
  95. # 动作选择
  96. def take_action(self, state):
  97. # 维度变换 list[n_states]-->tensor[1,n_states]-->gpu
  98. state = torch.tensor(state, dtype=torch.float).view(1,-1).to(self.device)
  99. # 策略网络计算出当前状态下的动作价值 [1,n_states]-->[1,1]-->int
  100. action = self.actor(state).item()
  101. # 给动作添加噪声,增加搜索
  102. action = action + self.sigma * np.random.randn(self.n_actions)
  103. return action
  104. # 软更新, 意思是每次learn的时候更新部分参数
  105. def soft_update(self, net, target_net):
  106. # 获取训练网络和目标网络需要更新的参数
  107. for param_target, param in zip(target_net.parameters(), net.parameters()):
  108. # 训练网络的参数更新要综合考虑目标网络和训练网络
  109. param_target.data.copy_(param_target.data*(1-self.tau) + param.data*self.tau)
  110. # 训练
  111. def update(self, transition_dict):
  112. # 从训练集中取出数据
  113. states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) # [b,n_states]
  114. actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1,1).to(self.device) # [b,1]
  115. rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1,1).to(self.device) # [b,1]
  116. next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device) # [b,next_states]
  117. dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1,1).to(self.device) # [b,1]
  118. # 价值目标网络获取下一时刻的动作[b,n_states]-->[b,n_actors]
  119. next_q_values = self.target_actor(next_states)
  120. # 策略目标网络获取下一时刻状态选出的动作价值 [b,n_states+n_actions]-->[b,1]
  121. next_q_values = self.target_critic(next_states, next_q_values)
  122. # 当前时刻的动作价值的目标值 [b,1]
  123. q_targets = rewards + self.gamma * next_q_values * (1-dones)
  124. # 当前时刻动作价值的预测值 [b,n_states+n_actions]-->[b,1]
  125. q_values = self.critic(states, actions)
  126. # 预测值和目标值之间的均方差损失
  127. critic_loss = torch.mean(F.mse_loss(q_values, q_targets))
  128. # 价值网络梯度
  129. self.critic_optimizer.zero_grad()
  130. critic_loss.backward()
  131. self.critic_optimizer.step()
  132. # 当前状态的每个动作的价值 [b, n_actions]
  133. actor_q_values = self.actor(states)
  134. # 当前状态选出的动作价值 [b,1]
  135. score = self.critic(states, actor_q_values)
  136. # 计算损失
  137. actor_loss = -torch.mean(score)
  138. # 策略网络梯度
  139. self.actor_optimizer.zero_grad()
  140. actor_loss.backward()
  141. self.actor_optimizer.step()
  142. # 软更新策略网络的参数
  143. self.soft_update(self.actor, self.target_actor)
  144. # 软更新价值网络的参数
  145. self.soft_update(self.critic, self.target_critic)

4. 案例演示

基于 OpenAI 的 gym 环境完成一个推车游戏,目标是将小车推到山顶旗子处。动作维度为1,属于连续值;状态维度为 2,分别是 x 坐标和小车速度。

代码如下:

  1. import numpy as np
  2. import torch
  3. import matplotlib.pyplot as plt
  4. import gym
  5. from parsers import args
  6. from RL_brain import ReplayBuffer, DDPG
  7. device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
  8. # -------------------------------------- #
  9. # 环境加载
  10. # -------------------------------------- #
  11. env_name = "MountainCarContinuous-v0" # 连续型动作
  12. env = gym.make(env_name, render_mode="human")
  13. n_states = env.observation_space.shape[0] # 状态数 2
  14. n_actions = env.action_space.shape[0] # 动作数 1
  15. action_bound = env.action_space.high[0] # 动作的最大值 1.0
  16. # -------------------------------------- #
  17. # 模型构建
  18. # -------------------------------------- #
  19. # 经验回放池实例化
  20. replay_buffer = ReplayBuffer(capacity=args.buffer_size)
  21. # 模型实例化
  22. agent = DDPG(n_states = n_states, # 状态数
  23. n_hiddens = args.n_hiddens, # 隐含层数
  24. n_actions = n_actions, # 动作数
  25. action_bound = action_bound, # 动作最大值
  26. sigma = args.sigma, # 高斯噪声
  27. actor_lr = args.actor_lr, # 策略网络学习率
  28. critic_lr = args.critic_lr, # 价值网络学习率
  29. tau = args.tau, # 软更新系数
  30. gamma = args.gamma, # 折扣因子
  31. device = device
  32. )
  33. # -------------------------------------- #
  34. # 模型训练
  35. # -------------------------------------- #
  36. return_list = [] # 记录每个回合的return
  37. mean_return_list = [] # 记录每个回合的return均值
  38. for i in range(10): # 迭代10回合
  39. episode_return = 0 # 累计每条链上的reward
  40. state = env.reset()[0] # 初始时的状态
  41. done = False # 回合结束标记
  42. while not done:
  43. # 获取当前状态对应的动作
  44. action = agent.take_action(state)
  45. # 环境更新
  46. next_state, reward, done, _, _ = env.step(action)
  47. # 更新经验回放池
  48. replay_buffer.add(state, action, reward, next_state, done)
  49. # 状态更新
  50. state = next_state
  51. # 累计每一步的reward
  52. episode_return += reward
  53. # 如果经验池超过容量,开始训练
  54. if replay_buffer.size() > args.min_size:
  55. # 经验池随机采样batch_size组
  56. s, a, r, ns, d = replay_buffer.sample(args.batch_size)
  57. # 构造数据集
  58. transition_dict = {
  59. 'states': s,
  60. 'actions': a,
  61. 'rewards': r,
  62. 'next_states': ns,
  63. 'dones': d,
  64. }
  65. # 模型训练
  66. agent.update(transition_dict)
  67. # 保存每一个回合的回报
  68. return_list.append(episode_return)
  69. mean_return_list.append(np.mean(return_list[-10:])) # 平滑
  70. # 打印回合信息
  71. print(f'iter:{i}, return:{episode_return}, mean_return:{np.mean(return_list[-10:])}')
  72. # 关闭动画窗格
  73. env.close()
  74. # -------------------------------------- #
  75. # 绘图
  76. # -------------------------------------- #
  77. x_range = list(range(len(return_list)))
  78. plt.subplot(121)
  79. plt.plot(x_range, return_list) # 每个回合return
  80. plt.xlabel('episode')
  81. plt.ylabel('return')
  82. plt.subplot(122)
  83. plt.plot(x_range, mean_return_list) # 每回合return均值
  84. plt.xlabel('episode')
  85. plt.ylabel('mean_return')
标签:
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

在线投稿:投稿 站长QQ:1888636

后台-插件-广告管理-内容页尾部广告(手机)
关注我们

扫一扫关注我们,了解最新精彩内容

搜索