深度强化学习(六):连续动作空间的问题
一、问题的引入
1.1、连续动作空间
在此之前,我们讨论和研究的都是离散的动作空间,而在实际问题中,存在大量的连续空间的问题,比如价格、角度、时间等。对于离散空间的问题,可以使用探索算法尽可能地将状态行动枚举出来,而对于连续动作的问题,想要枚举所有的动作变得更困难, 而如何将所有可行的动作逐一尝试出来也变得不那么可能。在讨论连续空间的问题前,不妨先看一下之前几种算法的更新方式。
DQN 算法的更新公式:
在更新的过程中,先完成策略评估的工作,再进行策略改进,即需要先计算出下一时刻状态下所以动作的价值,并从中选出最优的行动价值,如果动作数量是有限的,这是可行的,而如果动作空间连续,无法进行这种计算和选择。
策略梯度的更新方式:
其中
策略梯度法直接对轨迹的价值期望求导,不需要进行最优行动的选择,因此连续型动作空间的问题可以使用策略梯度算法求解。
1.2、确定性策略搜索
考虑采用策略梯度的方法后就又引入了策略搜索的问题。主要有以下两种策略:
- 随机性策略:策略输出的是动作的概率,使用正态分布对动作进行采样选择,即每个动作都有概率被选到;优点,将探索和改进集成到一个策略中;缺点,需要大量训练数据。
-
确定性策略:策略输出即是动作;优点,需要采样的数据少,算法效率高;缺点,无法探索环境。
策略梯度法是一个On-Policy的方法, 它非常依赖与环境交互的过程, 而DQN方法直接对值函数进行优化,可以使用Off-Policy的方法进行训练。所以,如果想在连续行动空间使用Off-Policy 算法进行优化,可以考虑结合两种算法的特点。
在与环境交互时, DQN 算法一般使用的策略, 策略梯度则是从一个概率分布中采样得到的, 而DPG 的交互方式结合了前面两种算法。从形式上看, DPG 使用了的策略,以一定的概率使用随机策略,而在剩下的情况下使用最优行动; 从策略产生的动作上看, DPG 将先得到一个确定的行动,这个动作由确定的策略得到,不需要从概率分布中采样,相当于当前状态下的最优行动。如果决定使用随机策略,那么就在求出的确定行动基础上加上一定的噪声,反之则没有噪声。
虽然确定策略的思想和DQN 相近,但实际上, DPG 也可以看作是策略梯度法的一种特殊情况。我们知道随机策略梯度的输出是行动分布形式,对于离散行动空间,模型输出的是一个Category 的分布,也就是每一个取值的概率;而对于连续行动空间,一般会输出一个高斯分布,其中一部分值表示分布的均值,另一部分值表示分布的方差,然后可以使用这些分布的参数采样出动作值。DPG的输出也可以想象成一个连续的分布,只不过这个分布的方差为0 ,这样我们就把DPG 和策略梯度法统一起来了。
二、Deterministic Policy Gradient(DPG)
为了更好地说明确定策略的算法,首先看一下随机策略的梯度计算公式:其中状态的采样空间为, 是分值函数,可见随机性策略梯度需要在整个动作的空间进行采样。显见,策略梯度公式是关于状态和动作的期望,在求期望时,需要对状态分布和动作分布求积分,这就要求在状态空间和动作空间采集⼤量的样本,这样求均值才能近似期望。
由于策略产生的动作是确定的,即,和随机策略不同,相同的策略(即相同时),在状态为时,动作是唯⼀确定的,因此策略梯度的求解不需要在动作空间采样积分。相比于随机策略方法,确定性策略需要的样本数据更小,算法效率⾼。
如果采用确定性策略,当初试状态已知时,⽤确定性策略所产⽣的轨迹是固定的,智能体⽆法探索其他轨迹或访问其他状态,即智能体⽆法学习。事实上,确定性策略使用Off-Policy 方法进行学习,即动作策略和评估策略不是同⼀个策略,此处具体为动作策略是随机策略,以保证充足的探索;评估策略是确定性策略,即利用函数逼近方法估计值函数。
则Off-Policy确定性策略梯度为:
DPG则是确定性策略梯度与AC算法的结合,Actor采用随机策略,Critic采用确定性策略,更新过程如下: 进行Critic的参数更新(即利用值函数逼近的方法更新值函数参数),即上面更新公式的前两行时,动作为输入,权重连接的是输入状态和动作。在进行Actor更新的时候,需要更新的参数是 ,确定性策略计算中与参数无关。
三、Deep Deterministic Policy Gradient(DDPG)
DDPG 即深度确定性策略,所谓深度是指利⽤深度神经⽹络逼近值函数和确定性策略。为了打破数据之间的相关性,DDPG同样⽤了两个技巧:经验回放和独⽴的目标网络,其实现框架和流程如下图所示。
- Actor当前网络:负责策略网络参数的迭代更新,负责根据当前状态选择当前动作,用于和环境交互生成。
- Actor目标网络:负责根据经验回放池中采样的下一状态选择最优下一动作,网络参数定期从复制。
- Critic当前网络:负责价值网络参数的迭代更新,负责计算负责计算当前值。目标值
- Critic目标网络:负责计算目标Q值中的部分。网络参数定期从复制。
DDPG从当前网络到目标网络的复制和之前讲到的DQN不一样。回想DQN,我们是直接把将当前Q网络的参数复制到目标Q网络,即, DDPG没有使用这种硬更新,而是使用了软更新,即每次参数只更新一点点,即: 其中是更新系数,一般取的比较小,比如0.1或者0.01这样的值。
同时,为了学习过程可以增加一些随机性,增加学习的覆盖,DDPG对选择出来的动作会增加一定的噪声,例如Ornstein-Uhlenbeck噪声,,即最终和环境交互的动作的表达式是:
最后,我们来看看DDPG的损失函数。对于Critic当前网络,其损失函数和DQN是类似的,都是均方误差,即:
而对于 Actor当前网络,其损失函数就和之前讲的PG,A3C不同了,这里由于是确定性策略,原论文定义的损失梯度是:
其实理解起来很简单,假如对同一个状态,我们输出了两个不同的动作和,从Critic当前网络得到了两个反馈的Q值,分别是,假设,即采取动作1可以得到更多的奖励,那么策略梯度的思想就是增加的概率,降低的概率,也就是说,Actor想要尽可能的得到更大的Q值。所以我们的Actor的损失可以简单的理解为得到的反馈Q值越大损失越小,得到的反馈Q值越小损失越大,因此只要对状态估计网络返回的Q值取个负号即可,即:
以下为DDPG算法伪代码形式:
四、案例分析
倒立摆问题 Pendulum-v0 倒立摆问题是控制文献中的经典问题,如下图所示。 在这个版本的问题中,钟摆以随机位置开始,目标是将其向上摆动,使其保持直立,是一个连续控制问题。其状态及动作空间如下表所示,初始状态从和的随机角度,以及-1和1之间的随机速度,奖励的表示为,目标是保持零角度(垂直),旋转速度最小,力度最小。
Pendulum-v0及其坐标表示
状态及动作空间
以下为DDPG实现Pendulum-v0的过程。
- 包的调用及初始化参数设置,采用pytorch框架。
import os, sys, random
from itertools import count
import numpy as np
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal
from tensorboardX import SummaryWriter
TAU = 0.005
LR = 1e-3
GAMMA = 0.99
MEMORY_CAPACITY = 5000
BATCH_SIZE = 64
MAX_EPISODE = 100000
MODE = 'train' # or 'test'
sample_frequency = 256
log_interval = 50
render_interval = 100
exploration_noise = 0.1
max_length_of_trajectory = 2000
target_update_interval = 1
test_iteration = 10
update_iteration = 10
device = 'cuda' if torch.cuda.is_available() else 'cpu'
env = gym.make('Pendulum-v0').unwrapped
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
min_Val = torch.tensor(1e-7).float().to(device)
directory = './runs'
- 经验回放缓存空间的类,包括存和取的操作。
class Replay_buffer():
def __init__(self,max_size=MEMORY_CAPACITY):
self.storage = []
self.max_size = max_size
self.ptr = 0
def push(self,data):
if len(self.storage) == self.max_size:
self.storage[int(self.ptr)] = data
self.ptr = (self.ptr+1) % self.max_size
else:
self.storage.append(data)
def sample(self,batch_size):
ind = np.random.randint(0,len(self.storage),size=batch_size)
x,y,u,r,d = [],[],[],[],[]
for i in ind:
X,Y,U,R,D = self.storage[i]
x.append(np.array(X,copy=False))
y.append(np.array(Y,copy=False))
u.append(np.array(U,copy=False))
r.append(np.array(R,copy=False))
d.append(np.array(D,copy=False))
return np.array(x),np.array(y),np.array(u),np.array(r),np.array(d)
- Actor与Critic网络的搭建,均采用三层线性全连接神经网络。
class Actor(nn.Module):
"""docstring for Actor"""
def __init__(self, state_dim,action_dim,max_action):
super(Actor, self).__init__()
self.l1 = nn.Linear(state_dim,400)
self.l2 = nn.Linear(400,300)
self.l3 = nn.Linear(300,action_dim)
self.max_action = max_action
def forward(self,x):
x = F.relu(self.l1(x))
x = F.relu(self.l2(x))
x = self.max_action * torch.tanh(self.l3(x))
return x
class Critic(nn.Module):
"""docstring for Critic"""
def __init__(self, state_dim,action_dim):
super(Critic, self).__init__()
self.l1 = nn.Linear(state_dim+action_dim,400)
self.l2 = nn.Linear(400,300)
self.l3 = nn.Linear(300,1)
def forward(self,x,u):
x = F.relu(self.l1(torch.cat([x,u],1)))
x = F.relu(self.l2(x))
x = self.l3(x)
return x
- 算法主体类(agent),包括动作选择,参数更新,模型保存和读取操作。
class DDPG(object):
"""docstring for DDPG"""
def __init__(self, state_dim,action_dim,max_action):
super(DDPG, self).__init__()
self.actor = Actor(state_dim,action_dim,max_action).to(device)
self.actor_target = Actor(state_dim,action_dim,max_action).to(device)
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(),LR)
self.critic = Critic(state_dim,action_dim).to(device)
self.critic_target = Critic(state_dim,action_dim).to(device)
self.critic_target.load_state_dict(self.critic.state_dict())
self.critic_optimizer = optim.Adam(self.critic.parameters(),LR)
self.replay_buffer = Replay_buffer()
self.writer = SummaryWriter(directory)
self.num_critic_update_iteration = 0
self.num_actor_update_iteration = 0
self.num_training = 0
def select_action(self,state):
state = torch.FloatTensor(state.reshape(1,-1)).to(device)
return self.actor(state).cpu().data.numpy().flatten()
def update(self):
for it in range(update_iteration):
# sample replay buffer
x,y,u,r,d = self.replay_buffer.sample(BATCH_SIZE)
state = torch.FloatTensor(x).to(device)
action = torch.FloatTensor(u).to(device)
next_state = torch.FloatTensor(y).to(device)
done = torch.FloatTensor(d).to(device)
reward = torch.FloatTensor(r).to(device)
# compute the target Q value
target_Q = self.critic_target(next_state,self.actor_target(next_state))
target_Q = reward + ((1-done)*GAMMA*target_Q).detach()
# get current Q estimate
current_Q = self.critic(state,action)
# compute critic loss
critic_loss = F.mse_loss(current_Q,target_Q)
self.writer.add_scalar('Loss/critic_loss',critic_loss,global_step=self.num_critic_update_iteration)
# optimize the critic
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# compute actor loss
actor_loss = - self.critic(state,self.actor(state)).mean()
self.writer.add_scalar('Loss/actor_loss',actor_loss,global_step=self.num_actor_update_iteration)
# optimize the actor
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# update the frozen target models
for param,target_param in zip(self.critic.parameters(),self.critic_target.parameters()):
target_param.data.copy_(TAU * param.data + (1-TAU) * target_param.data)
for param,target_param in zip(self.actor.parameters(),self.actor_target.parameters()):
target_param.data.copy_(TAU * param.data + (1-TAU) * target_param.data)
self.num_actor_update_iteration += 1
self.num_critic_update_iteration += 1
def save(self):
torch.save(self.actor.state_dict(),directory+'actor.pth')
torch.save(self.critic.state_dict(),directory+'critic.pth')
print('model has been saved...')
def load(self):
self.actor.load_state_dict(torch.load(directory+'actor.pth'))
self.critic.load_state_dict(torch.load(directory+'critic.pth'))
print('model has been loaded...')
- 主函数,完成其功能实现。
def main():
agent = DDPG(state_dim,action_dim,max_action)
ep_r = 0
if MODE == 'test':
agent.load()
for i in range(test_iteration):
state = env.reset()
for t in count():
action = agent.select_action(state)
next_state, reawrd, done, info = env.step(np.float32(action))
ep_r += reward
env.render()
if done or t>=max_length_of_trajectory:
print('Episode:{}, Return:{:0.2f}, Step:{}'.format(i,ep_r,t))
ep_r = 0
break
state = next_state
elif MODE == 'train':
print('Collection Experience...')
for i in range(MAX_EPISODE):
state = env.reset()
for t in count():
action = agent.select_action(state)
# issue 3 add noise to action
action = (action + np.random.normal(0,exploration_noise,size=env.action_space.shape[0])).clip(env.action_space.low,env.action_space.high)
next_state, reward, done, info = env.step(action)
ep_r += reward
agent.replay_buffer.push((state,next_state,action,reward,np.float(done)))
state = next_state
if done or t>=max_length_of_trajectory:
agent.writer.add_scalar('ep_r',ep_r,global_step=i)
if i % 10 ==0:
print('Episode:{}, Return:{:0.2f}, Step:{}'.format(i,ep_r,t))
ep_r = 0
break
if (i+1) % 100 == 0:
print('Episode:{}, Memory size:{}'.format(i,len(agent.replay_buffer.storage)))
if i % log_interval == 0:
agent.save()
if len(agent.replay_buffer.storage) >= MEMORY_CAPACITY-1:
agent.update()
else:
raise NameError('model is wrong!!!')
if __name__ == '__main__':
main()
运行结果如下图所示,分别为损失函数和累积奖励值的变化情况(通过tensorboard存储及获取),以及运行的动态效果。
损失函数变化曲线
累计奖励值变化曲线
运行效果
参考资料
[1] Silver D, Lever G, Heess N, et al. Deterministic Policy Gradient Algorithms[C]// International Conference on Machine Learning. 2014:387-395.
[2] Lillicrap T P, Hunt J J, Pritzel A, et al. Continuous control with deep reinforcement learning[J]. Computer Science, 2015, 8(6):A187.
[3] Mnih V, Kavukcuoglu K, Silver D, et al. Human-level control through deep reinforcement learning.[J]. Nature, 2015, 518(7540):529.
[4] https://cloud.tencent.com/developer/article/1398544
[5] https://blog.csdn.net/kenneth_yu/article/details/78478356
[6] 冯超著 强化学习精要:核心算法与TensorFlow实现. ----北京:电子工业出版社 2018.
[7] 郭宪,方纯勇编著 深入浅出强化学习:原理入门. ----北京:电子工业出版社 2018.
南风知我意,吹梦到西洲。----《西洲曲》