深度强化学习人工智能技术圈

深度强化学习(六):连续动作空间的问题

2019-06-05  本文已影响120人  fromeast

一、问题的引入

1.1、连续动作空间

在此之前,我们讨论和研究的都是离散的动作空间,而在实际问题中,存在大量的连续空间的问题,比如价格、角度、时间等。对于离散空间的问题,可以使用探索算法尽可能地将状态行动枚举出来,而对于连续动作的问题,想要枚举所有的动作变得更困难, 而如何将所有可行的动作逐一尝试出来也变得不那么可能。在讨论连续空间的问题前,不妨先看一下之前几种算法的更新方式。
DQN 算法的更新公式:Q(s, a)=Q(s, a)+\alpha\left(r+\gamma \max _{a} Q\left(s_{t+1}, a_{t+1}\right)-Q(s, a)\right)
在更新的过程中,先完成策略评估的工作,再进行策略改进,即需要先计算出下一时刻状态下所以动作的价值,并从中选出最优的行动价值,如果动作数量是有限的,这是可行的,而如果动作空间连续,无法进行这种计算和选择。
策略梯度的更新方式:\theta^{\prime}=\theta+\alpha \nabla_{\theta} J(\theta)
其中\nabla_{\theta} J(\theta)=E_{\tau \sim \pi_{\theta}(\tau)}\left[\left(\sum_{t=0}^{T} \nabla_{\theta} \log \pi_{\theta}\left(a_{i, t} | s_{i, t}\right) \right)\left(\sum_{t=0}^{T} r\left(s_{i, t}, a_{i, t}\right)\right)\right]
策略梯度法直接对轨迹的价值期望求导,不需要进行最优行动的选择,因此连续型动作空间的问题可以使用策略梯度算法求解。

1.2、确定性策略搜索

考虑采用策略梯度的方法后就又引入了策略搜索的问题。主要有以下两种策略:

二、Deterministic Policy Gradient(DPG)

为了更好地说明确定策略的算法,首先看一下随机策略的梯度计算公式:\nabla_{\theta} J\left(\pi_{\theta}\right)=E_{s-\rho^{\pi}, a-\pi_{\theta}}\left[\nabla_{\theta} \log \pi_{\theta}(a | s) Q^{\pi}(s, a)\right]其中状态的采样空间为\rho^{\pi}, \nabla_{\theta}log \pi_{\theta}(s,a)是分值函数,可见随机性策略梯度需要在整个动作的空间\pi_{\theta}进行采样。显见,策略梯度公式是关于状态和动作的期望,在求期望时,需要对状态分布和动作分布求积分,这就要求在状态空间和动作空间采集⼤量的样本,这样求均值才能近似期望。
由于策略产生的动作是确定的,即a=\mu_{\theta}(s),和随机策略不同,相同的策略(即\theta相同时),在状态为s时,动作是唯⼀确定的,因此策略梯度的求解不需要在动作空间采样积分。相比于随机策略方法,确定性策略需要的样本数据更小,算法效率⾼。
如果采用确定性策略,当初试状态已知时,⽤确定性策略所产⽣的轨迹是固定的,智能体⽆法探索其他轨迹或访问其他状态,即智能体⽆法学习。事实上,确定性策略使用Off-Policy 方法进行学习,即动作策略和评估策略不是同⼀个策略,此处具体为动作策略是随机策略,以保证充足的探索;评估策略是确定性策略,即利用函数逼近方法估计值函数。
则Off-Policy确定性策略梯度为:\nabla_{\theta} J_{\beta}\left(\mu_{\theta}\right)=E_{s-\rho^{a}}\left[\nabla_{\theta} \mu_{\theta}(s) \nabla_{a} Q^{\mu}\left.(s, a)\right|_{a=\mu_{\theta}(s)}\right]
DPG则是确定性策略梯度与AC算法的结合,Actor采用随机策略,Critic采用确定性策略,更新过程如下:\begin{array}{c}{\delta_{t}=r_{t}+\gamma Q^{w}\left(s_{t+1}, \mu_{\theta}\left(s_{t+1}\right)\right)-Q^{w}\left(s_{t}, a_{t}\right)} \\ {w_{t+1}=w_{t}+\alpha_{w} \delta_{t} \nabla_{w} Q^{w}\left(s_{t}, a_{t}\right)} \\ {\theta_{t+1}=\theta_{t}+\alpha_{\theta} \nabla_{\theta} \mu_{\theta}\left(s_{t}\right) \nabla_{a} Q^{w}\left.\left(s_{t}, a_{t}\right)\right|_{a=\mu_{\theta}(s)}}\end{array} 进行Critic的参数更新(即利用值函数逼近的方法更新值函数参数),即上面更新公式的前两行时,动作a为输入,权重w连接的是输入状态s和动作a。在进行Actor更新的时候,需要更新的参数是 \theta,确定性策略计算中\nabla_{a} Q^{w}\left.\left(s_{t}, a_{t}\right)\right|_{a=\mu_{\theta}(s)}与参数\theta无关。

三、Deep Deterministic Policy Gradient(DDPG)

DDPG 即深度确定性策略,所谓深度是指利⽤深度神经⽹络逼近值函数Q^{w}(s, a)和确定性策略\mu_{\theta}(s)。为了打破数据之间的相关性,DDPG同样⽤了两个技巧:经验回放和独⽴的目标网络,其实现框架和流程如下图所示。

DDPG实现框架

而对于 Actor当前网络,其损失函数就和之前讲的PG,A3C不同了,这里由于是确定性策略,原论文定义的损失梯度是:\nabla_{\theta}J(\theta) = \frac{1}{N}\sum\limits_{i=1}^N [\nabla_{a}Q_(s_i,a_i,w)|_{s=s_i,a=\pi_{\theta}(s)}\nabla_{\theta} \pi_{\theta(s)}|_{s=s_i}]

其实理解起来很简单,假如对同一个状态,我们输出了两个不同的动作a_1a_2,从Critic当前网络得到了两个反馈的Q值,分别是Q_1,Q_2,假设Q_1>Q_2,即采取动作1可以得到更多的奖励,那么策略梯度的思想就是增加a_1的概率,降低a_2的概率,也就是说,Actor想要尽可能的得到更大的Q值。所以我们的Actor的损失可以简单的理解为得到的反馈Q值越大损失越小,得到的反馈Q值越小损失越大,因此只要对状态估计网络返回的Q值取个负号即可,即:Loss = -\frac{1}{m}\sum\limits_{i=1}^m Q_(s_i,a_i,w)
以下为DDPG算法伪代码形式:

DDPG算法

四、案例分析

倒立摆问题 Pendulum-v0~~~ 倒立摆问题是控制文献中的经典问题,如下图所示。 在这个版本的问题中,钟摆以随机位置开始,目标是将其向上摆动,使其保持直立,是一个连续控制问题。其状态及动作空间如下表所示,初始状态从-\pi\pi的随机角度,以及-1和1之间的随机速度,奖励的表示为-\left(\theta^{2}+0.1 * \theta_{d t}^{2}+0.001 * \text {action}^{2}\right),目标是保持零角度(垂直),旋转速度最小,力度最小。

Pendulum-v0及其坐标表示
状态及动作空间

以下为DDPG实现Pendulum-v0的过程。

  1. 包的调用及初始化参数设置,采用pytorch框架。
import os, sys, random
from itertools import count 
import numpy as np
import gym

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal
from tensorboardX import SummaryWriter

TAU = 0.005
LR = 1e-3
GAMMA = 0.99
MEMORY_CAPACITY = 5000
BATCH_SIZE = 64
MAX_EPISODE = 100000
MODE = 'train' # or 'test'

sample_frequency = 256
log_interval = 50
render_interval = 100
exploration_noise = 0.1
max_length_of_trajectory = 2000
target_update_interval = 1
test_iteration = 10
update_iteration = 10

device = 'cuda' if torch.cuda.is_available() else 'cpu'

env = gym.make('Pendulum-v0').unwrapped
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
min_Val = torch.tensor(1e-7).float().to(device)

directory = './runs'
  1. 经验回放缓存空间的类,包括存和取的操作。
class Replay_buffer():
    def __init__(self,max_size=MEMORY_CAPACITY):
        self.storage = []
        self.max_size = max_size
        self.ptr = 0

    def push(self,data):
        if len(self.storage) == self.max_size:
            self.storage[int(self.ptr)] = data
            self.ptr = (self.ptr+1) % self.max_size
        else:
            self.storage.append(data)

    def sample(self,batch_size):
        ind = np.random.randint(0,len(self.storage),size=batch_size)
        x,y,u,r,d = [],[],[],[],[]

        for i in ind:
            X,Y,U,R,D = self.storage[i]
            x.append(np.array(X,copy=False))
            y.append(np.array(Y,copy=False))
            u.append(np.array(U,copy=False))
            r.append(np.array(R,copy=False))
            d.append(np.array(D,copy=False))
        return np.array(x),np.array(y),np.array(u),np.array(r),np.array(d)
  1. Actor与Critic网络的搭建,均采用三层线性全连接神经网络。
class Actor(nn.Module):
    """docstring for Actor"""
    def __init__(self, state_dim,action_dim,max_action):
        super(Actor, self).__init__()
        
        self.l1 = nn.Linear(state_dim,400)
        self.l2 = nn.Linear(400,300)
        self.l3 = nn.Linear(300,action_dim)
        self.max_action = max_action

    def forward(self,x):
        x = F.relu(self.l1(x))
        x = F.relu(self.l2(x))
        x = self.max_action * torch.tanh(self.l3(x))
        return x

class Critic(nn.Module):
    """docstring for Critic"""
    def __init__(self, state_dim,action_dim):
        super(Critic, self).__init__()
        self.l1 = nn.Linear(state_dim+action_dim,400)
        self.l2 = nn.Linear(400,300)
        self.l3 = nn.Linear(300,1)
        
    def forward(self,x,u):
        x = F.relu(self.l1(torch.cat([x,u],1)))
        x = F.relu(self.l2(x))
        x = self.l3(x)
        return x
  1. 算法主体类(agent),包括动作选择,参数更新,模型保存和读取操作。
class DDPG(object):
    """docstring for DDPG"""
    def __init__(self, state_dim,action_dim,max_action):
        super(DDPG, self).__init__()
        
        self.actor = Actor(state_dim,action_dim,max_action).to(device)
        self.actor_target = Actor(state_dim,action_dim,max_action).to(device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(),LR)

        self.critic = Critic(state_dim,action_dim).to(device)
        self.critic_target = Critic(state_dim,action_dim).to(device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = optim.Adam(self.critic.parameters(),LR)

        self.replay_buffer = Replay_buffer()
        self.writer = SummaryWriter(directory)
        self.num_critic_update_iteration = 0
        self.num_actor_update_iteration = 0
        self.num_training = 0

    def select_action(self,state):
        state = torch.FloatTensor(state.reshape(1,-1)).to(device)
        return self.actor(state).cpu().data.numpy().flatten()

    def update(self):
        for it in range(update_iteration):
            # sample replay buffer
            x,y,u,r,d = self.replay_buffer.sample(BATCH_SIZE)
            state = torch.FloatTensor(x).to(device)
            action = torch.FloatTensor(u).to(device)
            next_state = torch.FloatTensor(y).to(device)
            done = torch.FloatTensor(d).to(device)
            reward = torch.FloatTensor(r).to(device)

            # compute the target Q value
            target_Q = self.critic_target(next_state,self.actor_target(next_state))
            target_Q = reward + ((1-done)*GAMMA*target_Q).detach()

            # get current Q estimate
            current_Q = self.critic(state,action)

            # compute critic loss
            critic_loss = F.mse_loss(current_Q,target_Q)
            self.writer.add_scalar('Loss/critic_loss',critic_loss,global_step=self.num_critic_update_iteration)

            # optimize the critic
            self.critic_optimizer.zero_grad()
            critic_loss.backward()
            self.critic_optimizer.step()

            # compute actor loss
            actor_loss = - self.critic(state,self.actor(state)).mean()
            self.writer.add_scalar('Loss/actor_loss',actor_loss,global_step=self.num_actor_update_iteration)

            # optimize the actor
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()

            # update the frozen target models
            for param,target_param in zip(self.critic.parameters(),self.critic_target.parameters()):
                target_param.data.copy_(TAU * param.data + (1-TAU) * target_param.data)

            for param,target_param in zip(self.actor.parameters(),self.actor_target.parameters()):
                target_param.data.copy_(TAU * param.data + (1-TAU) * target_param.data) 

            self.num_actor_update_iteration += 1
            self.num_critic_update_iteration += 1


    def save(self):
        torch.save(self.actor.state_dict(),directory+'actor.pth')
        torch.save(self.critic.state_dict(),directory+'critic.pth')
        print('model has been saved...')

    def load(self):
        self.actor.load_state_dict(torch.load(directory+'actor.pth'))
        self.critic.load_state_dict(torch.load(directory+'critic.pth'))
        print('model has been loaded...')
  1. 主函数,完成其功能实现。
def main():
    agent = DDPG(state_dim,action_dim,max_action)
    ep_r = 0

    if MODE == 'test':
        agent.load()
        for i in range(test_iteration):
            state = env.reset()
            for t in count():
                action = agent.select_action(state)
                next_state, reawrd, done, info = env.step(np.float32(action))
                ep_r += reward
                env.render()
                if done or t>=max_length_of_trajectory:
                    print('Episode:{}, Return:{:0.2f}, Step:{}'.format(i,ep_r,t))
                    ep_r = 0
                    break
                state = next_state

    elif MODE == 'train':
        print('Collection Experience...')

        for i in range(MAX_EPISODE):
            state = env.reset()
            for t in count():
                action = agent.select_action(state)

                # issue 3 add noise to action
                action = (action + np.random.normal(0,exploration_noise,size=env.action_space.shape[0])).clip(env.action_space.low,env.action_space.high)

                next_state, reward, done, info = env.step(action)
                ep_r += reward
                agent.replay_buffer.push((state,next_state,action,reward,np.float(done)))
                
                state = next_state
                if done or t>=max_length_of_trajectory:
                    agent.writer.add_scalar('ep_r',ep_r,global_step=i)
                    if i % 10 ==0:
                        print('Episode:{}, Return:{:0.2f}, Step:{}'.format(i,ep_r,t))
                    ep_r = 0
                    break

            if (i+1) % 100 == 0:
                print('Episode:{}, Memory size:{}'.format(i,len(agent.replay_buffer.storage)))

            if i % log_interval == 0:
                agent.save()

            if len(agent.replay_buffer.storage) >= MEMORY_CAPACITY-1:
                agent.update()

    else:
        raise NameError('model is wrong!!!')

if __name__ == '__main__':
    main()

运行结果如下图所示,分别为损失函数和累积奖励值的变化情况(通过tensorboard存储及获取),以及运行的动态效果。


损失函数变化曲线
累计奖励值变化曲线
运行效果

参考资料

[1] Silver D, Lever G, Heess N, et al. Deterministic Policy Gradient Algorithms[C]// International Conference on Machine Learning. 2014:387-395.
[2] Lillicrap T P, Hunt J J, Pritzel A, et al. Continuous control with deep reinforcement learning[J]. Computer Science, 2015, 8(6):A187.
[3] Mnih V, Kavukcuoglu K, Silver D, et al. Human-level control through deep reinforcement learning.[J]. Nature, 2015, 518(7540):529.
[4] https://cloud.tencent.com/developer/article/1398544
[5] https://blog.csdn.net/kenneth_yu/article/details/78478356
[6] 冯超著 强化学习精要:核心算法与TensorFlow实现. ----北京:电子工业出版社 2018.
[7] 郭宪,方纯勇编著 深入浅出强化学习:原理入门. ----北京:电子工业出版社 2018.

南风知我意,吹梦到西洲。~~~----《西洲曲》

上一篇 下一篇

猜你喜欢

热点阅读