您现在的位置是:首页 > 技术教程 正文

深度强化学习-DDPG算法原理与代码

admin 阅读: 2024-03-23
后台-插件-广告管理-内容页头部广告(手机)

深度强化学习-DDPG算法原理与代码

引言

1 DDPG算法简介

2 DDPG算法原理

2.1 经验回放

2.2 目标网络

2.2.1 算法更新过程

2.2.2 目标网络的更新

2.2.3 引入目标网络的目的

2.3 噪声探索

3 DDPG算法伪代码

 4 代码实现

5 实验结果

6 结论


引言

Deep Deterministic Policy Gradient (DDPG)算法是DeepMind团队提出的一种专门用于解决连续控制问题的在线式(on-line)深度强化学习算法,它其实本质上借鉴了Deep Q-Network (DQN)算法里面的一些思想。本文就带领大家了解一下这个算法,论文和代码的链接见下方。

论文:https://arxiv.org/pdf/1509.02971.pdf

代码:https://github.com/indigoLovee/DDPG

喜欢的话请点个star哦!

1 DDPG算法简介

在DDPG算法之前,我们在求解连续动作空间问题时,主要有两种方式:一是对连续动作做离散化处理,然后再利用强化学习算法(例如DQN)进行求解。二是使用Policy Gradient (PG)算法 (例如Reinforce) 直接求解。但是对于方式一,离散化处理在一定程度上脱离了工程实际;对于方式二,PG算法在求解连续控制问题时效果往往不尽人意。为此,DDPG算法横空出世,在许多连续控制问题上取得了非常不错的效果。

DDPG算法是Actor-Critic (AC) 框架下的一种在线式深度强化学习算法,因此算法内部包括Actor网络和Critic网络,每个网络分别遵从各自的更新法则进行更新,从而使得累计期望回报最大化。

2 DDPG算法原理

DDPG算法将确定性策略梯度算法和DQN算法中的相关技术结合在一起,之前我们在讲DQN算法时,详细说明了其中的两个重要的技术:经验回放和目标网络。具体而言,DDPG算法主要包括以下三个关键技术:

(1)经验回放:智能体将得到的经验数据(s,a,r,s^{'},done)放入Replay%20Buffer中,更新网络参数时按照批量采样。

%20

(2)目标网络:在Actor网络和Critic网络外再使用一套用于估计目标的Target%20Actor网络和Target%20Critic网络。在更新目标网络时,为了避免参数更新过快,采用软更新方式。

%20

(3)噪声探索:确定性策略输出的动作为确定性动作,缺乏对环境的探索。在训练阶段,给Actor网络输出的动作加入噪声,从而让智能体具备一定的探索能力。

%20 2.1%20经验回放%20 %20 %20

经验回放就是一种让经验概率分布变得稳定的技术,可以提高训练的稳定性。经验回放主要有“存储”和“回放”两大关键步骤:

%20 %20

存储:将经验以形式存储在经验池中。

%20 %20

回放:按照某种规则从经验池中采样一条或多条经验数据。

%20 %20

存储的角度来看,经验回放可以分为集中式回放和分布式回放:
集中式回放:智能体在一个环境中运行,把经验统一存储在经验池中。

%20

分布式回放:多个智能体同时在多个环境中运行,并将经验统一存储在经验池中。由于多个智能体同时生成经验,所以能够使用更多资源的同时更快地收集经验。

%20

采样的角度来看,经验回放可以分为均匀回放和优先回放:
均匀回放:等概率从经验池中采样经验。
优先回放:为经验池中每条经验指定一个优先级,在采样经验时更倾向于选择优先级更高的经验。一般的做法是,如果某条经验(例如经验)的优先级为,那么选取该经验的概率为:

%20 %20 %20

%20 %20

优先回放可以具体参照这篇论文 :优先经验回放

%20 %20 %20

经验回放的优点:

%20 %20

1.在训练Q网络时,可以打破数据之间的相关性,使得数据满足独立同分布,从而减小参数更新的方差,提高收敛速度。

%20 %20

2.能够重复使用经验,数据利用率高,对于数据获取困难的情况尤其有用。

%20 %20

经验回放的缺点:

%20 %20

无法应用于回合更新和多步学习算法。但是将经验回放应用于Q学习,就规避了这个缺点。

%20 %20

 代码中采用集中式均匀回放,具体如下:

%20
  1. import%20numpy%20as%20np
  2. %20
  3. %20
  4. class%20ReplayBuffer:
  5. %20%20%20%20def%20__init__(self,%20max_size,%20state_dim,%20action_dim,%20batch_size):
  6. %20%20%20%20%20%20%20%20self.mem_size%20=%20max_size
  7. %20%20%20%20%20%20%20%20self.batch_size%20=%20batch_size
  8. %20%20%20%20%20%20%20%20self.mem_cnt%20=%200
  9. %20
  10. %20%20%20%20%20%20%20%20self.state_memory%20=%20np.zeros((self.mem_size,%20state_dim))
  11. %20%20%20%20%20%20%20%20self.action_memory%20=%20np.zeros((self.mem_size,%20action_dim))
  12. %20%20%20%20%20%20%20%20self.reward_memory%20=%20np.zeros((self.mem_size,%20))
  13. %20%20%20%20%20%20%20%20self.next_state_memory%20=%20np.zeros((self.mem_size,%20state_dim))
  14. %20%20%20%20%20%20%20%20self.terminal_memory%20=%20np.zeros((self.mem_size,%20),%20dtype=np.bool)
  15. %20
  16. %20%20%20%20def%20store_transition(self,%20state,%20action,%20reward,%20state_,%20done):
  17. %20%20%20%20%20%20%20%20mem_idx%20=%20self.mem_cnt%20%%20self.mem_size
  18. %20
  19. %20%20%20%20%20%20%20%20self.state_memory[mem_idx]%20=%20state
  20. %20%20%20%20%20%20%20%20self.action_memory[mem_idx]%20=%20action
  21. %20%20%20%20%20%20%20%20self.reward_memory[mem_idx]%20=%20reward
  22. %20%20%20%20%20%20%20%20self.next_state_memory[mem_idx]%20=%20state_
  23. %20%20%20%20%20%20%20%20self.terminal_memory[mem_idx]%20=%20done
  24. %20
  25. %20%20%20%20%20%20%20%20self.mem_cnt%20+=%201
  26. %20
  27. %20%20%20%20def%20sample_buffer(self):
  28. %20%20%20%20%20%20%20%20mem_len%20=%20min(self.mem_size,%20self.mem_cnt)
  29. %20%20%20%20%20%20%20%20batch%20=%20np.random.choice(mem_len,%20self.batch_size,%20replace=False)
  30. %20
  31. %20%20%20%20%20%20%20%20states%20=%20self.state_memory[batch]
  32. %20%20%20%20%20%20%20%20actions%20=%20self.action_memory[batch]
  33. %20%20%20%20%20%20%20%20rewards%20=%20self.reward_memory[batch]
  34. %20%20%20%20%20%20%20%20states_%20=%20self.next_state_memory[batch]
  35. %20%20%20%20%20%20%20%20terminals%20=%20self.terminal_memory[batch]
  36. %20
  37. %20%20%20%20%20%20%20%20return%20states,%20actions,%20rewards,%20states_,%20terminals
  38. %20
  39. %20%20%20%20def%20ready(self):
  40. %20%20%20%20%20%20%20%20return%20self.mem_cnt%20>=%20self.batch_size

2.2 目标网络

由于DDPG算法是基于AC框架,因此算法中必然含有Actor和Critic网络。另外每个网络都有其对应的目标网络,所以DDPG算法中包括四个网络,分别是Actor网络\mu(\cdot \mid \theta ^{\mu })

 上图中我们假设有三个动作,每个动作的实际动作价值依次是200,100和230,显然动作3的动作价值是最高的,智能体会选择动作3。如果网络出现过估计,并且是均匀的,假设过估计的量是100,那么网络评估出来的动作价值就依次是300,200和330,显然动作3的动作价值依然是最高的,智能体依旧会选择动作3。因此,均匀过估计对于最终的决策并不会产生影响。

 同样我们假设有三个动作,每个动作的实际动作价值依次是200,100和230,显然动作3的动作价值是最高的,智能体会选择动作3。如果网络出现不均匀过估计,评估出来的动作价值依次是280,300和240,此时显然动作2的动作价值是最高的,智能体会选择动作2。但是实际上动作2的真实动作价值是最低的,即该动作是最差的。因此,不均匀过估计对于最终的决策会产生很大的影响。

然而实际上网络的过估计是非均匀的,因此需要避免这个问题,本质上就是要解决Bootstrapping问题。采用目标网络后就能解决这个问题

y=r+\gamma(1- done) Q^{'}(s^{'},a^{'}\mid \theta ^{Q^{'}})

%20

此时我们再让逼近目标值时,就已经不再是自举了(大家可以对照自举的含义仔细观察一下)。

%20 2.3%20噪声探索%20

 探索对于智能体来说是至关重要的,而确定性策略“天生”就缺乏探索能力,因此我们需要人为地给输出的动作上加入噪声,从而让智能体具备探索能力。在DDPG算法中,作者采用Ornstein%20Uhlenbeck过程作为动作噪声。Ornstein%20Uhlenbeck过程是用下列随机微分方程定义的%20(以一维的情况为例):

%20

%20

其中是参数(0,\sigma%20>0"%20src="https://latex.csdn.net/eq?%5Ctheta%20%3E0%2C%5Csigma%20%3E0">),是标准Brownain运动。当初始扰动是在原点的单点分布(即限定),并且时,上述方程的解为

%20

%20

 (证明:将代入,化简可得。将此式从0积到,得。当时化简可得结果。)

%20

这个解得均值为0,方差为,协方差为

%20

%20

(证明:由于均值为0,所以。另外,Ito%20Isometry告诉我们,所以,进一步化简可得结果。)

%20

对于总有,所以 0"%20src="https://latex.csdn.net/eq?Cov%28N_%7Bt%7D%2CN_%7Bs%7D%29%3E0">。据此可知,使用Ornstein%20Uhlenbeck过程让相邻扰动正相关,进而让动作向相近的方向偏移。

%20

OU噪声的代码实现:

%20
  1. class%20OUActionNoise:
  2. %20%20%20%20def%20__init__(self,%20mu,%20sigma=0.15,%20theta=0.2,%20dt=1e-2,%20x0=None):
  3. %20%20%20%20%20%20%20%20self.theta%20=%20theta
  4. %20%20%20%20%20%20%20%20self.mu%20=%20mu
  5. %20%20%20%20%20%20%20%20self.sigma%20=%20sigma
  6. %20%20%20%20%20%20%20%20self.dt%20=%20dt
  7. %20%20%20%20%20%20%20%20self.x0%20=%20x0
  8. %20%20%20%20%20%20%20%20self.reset()
  9. %20
  10. %20%20%20%20def%20__call__(self):
  11. %20%20%20%20%20%20%20%20x%20=%20self.x_prev%20+%20self.theta%20*%20(self.mu%20-%20self.x_prev)%20*%20self.dt%20+%20\
  12. %20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20self.sigma%20*%20np.sqrt(self.dt)%20*%20np.random.normal(size=self.mu.shape)
  13. %20%20%20%20%20%20%20%20self.x_prev%20=%20x
  14. %20
  15. %20%20%20%20%20%20%20%20return%20x
  16. %20
  17. %20%20%20%20def%20reset(self):
  18. %20%20%20%20%20%20%20%20self.x_prev%20=%20self.x0%20if%20self.x0%20is%20not%20None%20else%20np.zeros_like(self.mu)

看完OU噪声后,可能很多小伙伴是懵的,这个也太复杂了。不过我会告诉大家,其实OU噪声是没必要的,因为我们完全可以采用服从正态分布的噪声来取代它,实验结果也证实了这一点。因此Twin Delayed Deep Deterministic policy gradient (TD3)算法舍弃了OU噪声,而是采用服从正态分布的噪声,实现起来更加简单。

另外还需要提醒大家一点:噪声只会加在训练阶段Actor网络输出的动作上,推理阶段不要加上噪声,以及在更新网络参数时也不要加上噪声,前面已经提醒过了。因为我们只需要在训练阶段让智能体具备探索能力,推理时是不需要的。

3 DDPG算法伪代码

 4 代码实现

Actor和Critic网络的代码实现(networks.py):

  1. import torch as T
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. device = T.device("cuda:0" if T.cuda.is_available() else "cpu")
  5. def weight_init(m):
  6. if isinstance(m, nn.Linear):
  7. nn.init.xavier_normal_(m.weight)
  8. if m.bias is not None:
  9. nn.init.constant_(m.bias, 0.0)
  10. elif isinstance(m, nn.BatchNorm1d):
  11. nn.init.constant_(m.weight, 1.0)
  12. nn.init.constant_(m.bias, 0.0)
  13. class ActorNetwork(nn.Module):
  14. def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim):
  15. super(ActorNetwork, self).__init__()
  16. self.fc1 = nn.Linear(state_dim, fc1_dim)
  17. self.ln1 = nn.LayerNorm(fc1_dim)
  18. self.fc2 = nn.Linear(fc1_dim, fc2_dim)
  19. self.ln2 = nn.LayerNorm(fc2_dim)
  20. self.action = nn.Linear(fc2_dim, action_dim)
  21. self.optimizer = optim.Adam(self.parameters(), lr=alpha)
  22. self.apply(weight_init)
  23. self.to(device)
  24. def forward(self, state):
  25. x = T.relu(self.ln1(self.fc1(state)))
  26. x = T.relu(self.ln2(self.fc2(x)))
  27. action = T.tanh(self.action(x))
  28. return action
  29. def save_checkpoint(self, checkpoint_file):
  30. T.save(self.state_dict(), checkpoint_file)
  31. def load_checkpoint(self, checkpoint_file):
  32. self.load_state_dict(T.load(checkpoint_file))
  33. class CriticNetwork(nn.Module):
  34. def __init__(self, beta, state_dim, action_dim, fc1_dim, fc2_dim):
  35. super(CriticNetwork, self).__init__()
  36. self.fc1 = nn.Linear(state_dim, fc1_dim)
  37. self.ln1 = nn.LayerNorm(fc1_dim)
  38. self.fc2 = nn.Linear(fc1_dim, fc2_dim)
  39. self.ln2 = nn.LayerNorm(fc2_dim)
  40. self.fc3 = nn.Linear(action_dim, fc2_dim)
  41. self.q = nn.Linear(fc2_dim, 1)
  42. self.optimizer = optim.Adam(self.parameters(), lr=beta, weight_decay=0.001)
  43. self.apply(weight_init)
  44. self.to(device)
  45. def forward(self, state, action):
  46. x_s = T.relu(self.ln1(self.fc1(state)))
  47. x_s = self.ln2(self.fc2(x_s))
  48. x_a = self.fc3(action)
  49. x = T.relu(x_s + x_a)
  50. q = self.q(x)
  51. return q
  52. def save_checkpoint(self, checkpoint_file):
  53. T.save(self.state_dict(), checkpoint_file)
  54. def load_checkpoint(self, checkpoint_file):
  55. self.load_state_dict(T.load(checkpoint_file))

注:Actor和Critic网络中前面两个Linear层后面都跟上了Layer Normalization (LN)层。因为我在实验时发现了一个非常有趣的现象,如果不加LN层,或者加入Batch Normalization (BN)层,整个训练过程很容易坍塌或者训练效果很差,具体原因我也不是特别清楚。感兴趣的小伙伴可以把代码git下来跑一遍,如果知道原因的话不妨一起交流。

DDPG算法的代码实现(DDPG.py):

  1. import torch as T
  2. import torch.nn.functional as F
  3. import numpy as np
  4. from networks import ActorNetwork, CriticNetwork
  5. from buffer import ReplayBuffer
  6. device = T.device("cuda:0" if T.cuda.is_available() else "cpu")
  7. class DDPG:
  8. def __init__(self, alpha, beta, state_dim, action_dim, actor_fc1_dim,
  9. actor_fc2_dim, critic_fc1_dim, critic_fc2_dim, ckpt_dir,
  10. gamma=0.99, tau=0.005, action_noise=0.1, max_size=1000000,
  11. batch_size=256):
  12. self.gamma = gamma
  13. self.tau = tau
  14. self.action_noise = action_noise
  15. self.checkpoint_dir = ckpt_dir
  16. self.actor = ActorNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
  17. fc1_dim=actor_fc1_dim, fc2_dim=actor_fc2_dim)
  18. self.target_actor = ActorNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
  19. fc1_dim=actor_fc1_dim, fc2_dim=actor_fc2_dim)
  20. self.critic = CriticNetwork(beta=beta, state_dim=state_dim, action_dim=action_dim,
  21. fc1_dim=critic_fc1_dim, fc2_dim=critic_fc2_dim)
  22. self.target_critic = CriticNetwork(beta=beta, state_dim=state_dim, action_dim=action_dim,
  23. fc1_dim=critic_fc1_dim, fc2_dim=critic_fc2_dim)
  24. self.memory = ReplayBuffer(max_size=max_size, state_dim=state_dim, action_dim=action_dim,
  25. batch_size=batch_size)
  26. self.update_network_parameters(tau=1.0)
  27. def update_network_parameters(self, tau=None):
  28. if tau is None:
  29. tau = self.tau
  30. for actor_params, target_actor_params in zip(self.actor.parameters(),
  31. self.target_actor.parameters()):
  32. target_actor_params.data.copy_(tau * actor_params + (1 - tau) * target_actor_params)
  33. for critic_params, target_critic_params in zip(self.critic.parameters(),
  34. self.target_critic.parameters()):
  35. target_critic_params.data.copy_(tau * critic_params + (1 - tau) * target_critic_params)
  36. def remember(self, state, action, reward, state_, done):
  37. self.memory.store_transition(state, action, reward, state_, done)
  38. def choose_action(self, observation, train=True):
  39. self.actor.eval()
  40. state = T.tensor([observation], dtype=T.float).to(device)
  41. action = self.actor.forward(state).squeeze()
  42. if train:
  43. noise = T.tensor(np.random.normal(loc=0.0, scale=self.action_noise),
  44. dtype=T.float).to(device)
  45. action = T.clamp(action+noise, -1, 1)
  46. self.actor.train()
  47. return action.detach().cpu().numpy()
  48. def learn(self):
  49. if not self.memory.ready():
  50. return
  51. states, actions, reward, states_, terminals = self.memory.sample_buffer()
  52. states_tensor = T.tensor(states, dtype=T.float).to(device)
  53. actions_tensor = T.tensor(actions, dtype=T.float).to(device)
  54. rewards_tensor = T.tensor(reward, dtype=T.float).to(device)
  55. next_states_tensor = T.tensor(states_, dtype=T.float).to(device)
  56. terminals_tensor = T.tensor(terminals).to(device)
  57. with T.no_grad():
  58. next_actions_tensor = self.target_actor.forward(next_states_tensor)
  59. q_ = self.target_critic.forward(next_states_tensor, next_actions_tensor).view(-1)
  60. q_[terminals_tensor] = 0.0
  61. target = rewards_tensor + self.gamma * q_
  62. q = self.critic.forward(states_tensor, actions_tensor).view(-1)
  63. critic_loss = F.mse_loss(q, target.detach())
  64. self.critic.optimizer.zero_grad()
  65. critic_loss.backward()
  66. self.critic.optimizer.step()
  67. new_actions_tensor = self.actor.forward(states_tensor)
  68. actor_loss = -T.mean(self.critic(states_tensor, new_actions_tensor))
  69. self.actor.optimizer.zero_grad()
  70. actor_loss.backward()
  71. self.actor.optimizer.step()
  72. self.update_network_parameters()
  73. def save_models(self, episode):
  74. self.actor.save_checkpoint(self.checkpoint_dir + 'Actor/DDPG_actor_{}.pth'.format(episode))
  75. print('Saving actor network successfully!')
  76. self.target_actor.save_checkpoint(self.checkpoint_dir +
  77. 'Target_actor/DDPG_target_actor_{}.pth'.format(episode))
  78. print('Saving target_actor network successfully!')
  79. self.critic.save_checkpoint(self.checkpoint_dir + 'Critic/DDPG_critic_{}'.format(episode))
  80. print('Saving critic network successfully!')
  81. self.target_critic.save_checkpoint(self.checkpoint_dir +
  82. 'Target_critic/DDPG_target_critic_{}'.format(episode))
  83. print('Saving target critic network successfully!')
  84. def load_models(self, episode):
  85. self.actor.load_checkpoint(self.checkpoint_dir + 'Actor/DDPG_actor_{}.pth'.format(episode))
  86. print('Loading actor network successfully!')
  87. self.target_actor.load_checkpoint(self.checkpoint_dir +
  88. 'Target_actor/DDPG_target_actor_{}.pth'.format(episode))
  89. print('Loading target_actor network successfully!')
  90. self.critic.load_checkpoint(self.checkpoint_dir + 'Critic/DDPG_critic_{}'.format(episode))
  91. print('Loading critic network successfully!')
  92. self.target_critic.load_checkpoint(self.checkpoint_dir +
  93. 'Target_critic/DDPG_target_critic_{}'.format(episode))
  94. print('Loading target critic network successfully!')

算法仿真环境是gym库中的LunarLanderContinuous-v2环境,因此需要先配置好gym库。进入Aanconda中对应的Python环境中,执行下面的指令

pip install gym

但是,这样安装的gym库只包括少量的内置环境,如算法环境、简单文字游戏环境和经典控制环境,无法使用LunarLanderContinuous-v2。因此还要安装一些其他依赖项,具体可以参照这篇blog: AttributeError: module ‘gym.envs.box2d‘ has no attribute ‘LunarLander‘ 解决办法。如果已经配置好环境,那请忽略这一段。

训练脚本(train.py):

  1. import gym
  2. import numpy as np
  3. import argparse
  4. from DDPG import DDPG
  5. from utils import create_directory, plot_learning_curve, scale_action
  6. parser = argparse.ArgumentParser("DDPG parameters")
  7. parser.add_argument('--max_episodes', type=int, default=1000)
  8. parser.add_argument('--checkpoint_dir', type=str, default='./checkpoints/DDPG/')
  9. parser.add_argument('--figure_file', type=str, default='./output_images/reward.png')
  10. args = parser.parse_args()
  11. def main():
  12. env = gym.make('LunarLanderContinuous-v2')
  13. agent = DDPG(alpha=0.0003, beta=0.0003, state_dim=env.observation_space.shape[0],
  14. action_dim=env.action_space.shape[0], actor_fc1_dim=400, actor_fc2_dim=300,
  15. critic_fc1_dim=400, critic_fc2_dim=300, ckpt_dir=args.checkpoint_dir,
  16. batch_size=256)
  17. create_directory(args.checkpoint_dir,
  18. sub_paths=['Actor', 'Target_actor', 'Critic', 'Target_critic'])
  19. reward_history = []
  20. avg_reward_history = []
  21. for episode in range(args.max_episodes):
  22. done = False
  23. total_reward = 0
  24. observation = env.reset()
  25. while not done:
  26. action = agent.choose_action(observation, train=True)
  27. action_ = scale_action(action.copy(), env.action_space.high, env.action_space.low)
  28. observation_, reward, done, info = env.step(action_)
  29. agent.remember(observation, action, reward, observation_, done)
  30. agent.learn()
  31. total_reward += reward
  32. observation = observation_
  33. reward_history.append(total_reward)
  34. avg_reward = np.mean(reward_history[-100:])
  35. avg_reward_history.append(avg_reward)
  36. print('Ep: {} Reward: {:.1f} AvgReward: {:.1f}'.format(episode+1, total_reward, avg_reward))
  37. if (episode + 1) % 200 == 0:
  38. agent.save_models(episode+1)
  39. episodes = [i+1 for i in range(args.max_episodes)]
  40. plot_learning_curve(episodes, avg_reward_history, title='AvgReward',
  41. ylabel='reward', figure_file=args.figure_file)
  42. if __name__ == '__main__':
  43. main()

训练脚本中有三个参数,max_episodes表示训练幕数,checkpoint_dir表示训练权重保存路径,figure_file表示训练结果的保存路径(其实是一张累积奖励曲线图),按照默认设置即可。

训练时还会用到画图函数和创建文件夹函数,它们都被放在utils.py脚本中:

  1. import os
  2. import numpy as np
  3. import matplotlib.pyplot as plt
  4. class OUActionNoise:
  5. def __init__(self, mu, sigma=0.15, theta=0.2, dt=1e-2, x0=None):
  6. self.theta = theta
  7. self.mu = mu
  8. self.sigma = sigma
  9. self.dt = dt
  10. self.x0 = x0
  11. self.reset()
  12. def __call__(self):
  13. x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
  14. self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
  15. self.x_prev = x
  16. return x
  17. def reset(self):
  18. self.x_prev = self.x0 if self.x0 is not None else np.zeros_like(self.mu)
  19. def create_directory(path: str, sub_paths: list):
  20. for sub_path in sub_paths:
  21. if not os.path.exists(path + sub_path):
  22. os.makedirs(path + sub_path, exist_ok=True)
  23. print('Create path: {} successfully'.format(path+sub_path))
  24. else:
  25. print('Path: {} is already existence'.format(path+sub_path))
  26. def plot_learning_curve(episodes, records, title, ylabel, figure_file):
  27. plt.figure()
  28. plt.plot(episodes, records, color='r', linestyle='-')
  29. plt.title(title)
  30. plt.xlabel('episode')
  31. plt.ylabel(ylabel)
  32. plt.show()
  33. plt.savefig(figure_file)
  34. def scale_action(action, high, low):
  35. action = np.clip(action, -1, 1)
  36. weight = (high - low) / 2
  37. bias = (high + low) / 2
  38. action_ = action * weight + bias
  39. return action_

另外我们还提供了测试代码,主要用于测试训练效果以及观察环境的动态渲染 (test.py):

  1. import gym
  2. import imageio
  3. import argparse
  4. from DDPG import DDPG
  5. from utils import scale_action
  6. parser = argparse.ArgumentParser()
  7. parser.add_argument('--filename', type=str, default='./output_images/LunarLander.gif')
  8. parser.add_argument('--checkpoint_dir', type=str, default='./checkpoints/DDPG/')
  9. parser.add_argument('--save_video', type=bool, default=True)
  10. parser.add_argument('--fps', type=int, default=30)
  11. parser.add_argument('--render', type=bool, default=True)
  12. args = parser.parse_args()
  13. def main():
  14. env = gym.make('LunarLanderContinuous-v2')
  15. agent = DDPG(alpha=0.0003, beta=0.0003, state_dim=env.observation_space.shape[0],
  16. action_dim=env.action_space.shape[0], actor_fc1_dim=400, actor_fc2_dim=300,
  17. critic_fc1_dim=400, critic_fc2_dim=300, ckpt_dir=args.checkpoint_dir,
  18. batch_size=256)
  19. agent.load_models(1000)
  20. video = imageio.get_writer(args.filename, fps=args.fps)
  21. done = False
  22. observation = env.reset()
  23. while not done:
  24. if args.render:
  25. env.render()
  26. action = agent.choose_action(observation, train=True)
  27. action_ = scale_action(action.copy(), env.action_space.high, env.action_space.low)
  28. observation_, reward, done, info = env.step(action_)
  29. observation = observation_
  30. if args.save_video:
  31. video.append_data(env.render(mode='rgb_array'))
  32. if __name__ == '__main__':
  33. main()

测试脚本中包括五个参数,filename表示环境动态图的保存路径,checkpoint_dir表示加载的权重路径,save_video表示是否要保存动态图,fps表示动态图的帧率,rander表示是否开启环境渲染。大家只需要调整save_video和rander这两个参数,其余保持默认即可。

5 实验结果

通过平均奖励曲线可以看出,大概迭代到700步左右时算法趋于收敛。 

这是测试效果图,智能体能够很好地完成降落任务! 

6 结论

本文主要讲解了DDPG算法的原理以及代码实现。尽管它是一个非常优秀的算法,但是仍然存在一些问题需要改进,例如过估计。后面我们会讲解一下TD3算法,它其实就是在DDPG算法的基础做了一些改进工作,克服了DDPG算法中的一些问题,从而让算法的性能得到显著提升。

以上如果有出现错误的地方,欢迎各位怒斥!

标签:
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

在线投稿:投稿 站长QQ:1888636

后台-插件-广告管理-内容页尾部广告(手机)
关注我们

扫一扫关注我们,了解最新精彩内容

搜索