知识图谱自然语言处理

从零开始强化学习(七)——DDPG

2022-07-10  本文已影响0人  晓柒NLP与药物设计

DDPG是google DeepMind团队提出的一种用于输出确定性动作的算法,它解决了Actor-Critic神经网络每次参数更新前后都存在相关性,导致神经网络只能片面的看待问题这一缺点。同时也解决了DQN不能用于连续性动作的缺点

1. DDPG简介

Deep Deterministic Policy Gradient(DDPG)即深度确定性策略梯度算法,是一种可以解决连续性控制问题的方法,属于model-free,off-policy,policy-based的方法

DDPG可以拆开来看,Deep是说明需要神经网络;Deterministic的意思就是最终确定地只输出一个动作。Policy Gradient是策略梯度算法。DDPG可以看成是DQN的扩展版,不同的是,以往的DQN在最终输出的是一个动作向量,对于DDPG是最终确定地只输出一个动作。而且,DDPG让DQN可以扩展到连续的动作空间

提出DDPG是为了让DQN可以扩展到连续的动作空间,比如车速、角度和电压这种的连续值。

策略网络的loss function是一个复合函数,把a = \mu_\theta(s)代进去,最终策略网络要优化的是策略网络的参数\theta。Q网络要优化的是Q_w(s,a)和Q_target之间的均方误差

但是Q网络的优化存在一个和DQN一模一样的问题就是它后面的Q_target是不稳定的。此外后面的Q_{\bar{w}}\left(s^{\prime}, a^{\prime}\right)也是不稳定的,因为Q_{\bar{w}}\left(s^{\prime}, a^{\prime}\right)也是一个预估值

为了稳定这个Q_target,DDPG分别给Q网络和策略网络都搭建了target network:

这里面训练需要用到的数据就是s,a,r,s′,只需要用到这四个数据。我们就用Replay Memory把这些数据存起来,然后再sample进来训练。这个经验回放的技巧跟DQN是一模一样的。因为DDPG使用了经验回放这个技巧,所以DDPG是一个off-policy的算法

2. Exploration vs. Exploitation

DDPG通过off-policy的方式来训练一个确定性策略。因为策略是确定的,如果agent使用同策略来探索,在一开始的时候,很可能不会尝试足够多的action来找到有用的学习信号。为了让DDPG的策略更好地探索,在训练的时候action加了噪音。DDPG的原作者推荐使用时间相关的OUnoise,但最近的结果表明不相关的、均值为0的Gaussian noise的效果非常好,由于后者更简单,因此更喜欢使用它。为了便于获得更高质量的训练数据,可以在训练过程中把噪声变小

在测试的时候,为了查看策略利用它学到的东西的表现,不会在action中加噪音

虽然DDPG表现很好,但它在超参数和其他类型的调整方面经常很敏感。DDPG常见的问题是已经学习好的Q函数开始显著地高估Q值,然后导致策略被破坏了,因为它利用了Q函数中的误差。可以拿实际的Q值跟这个Q-network输出的Q值进行对比。实际的Q值可以用MC来算。根据当前的policy采样1000条轨迹,得到G后取平均,得到实际的Q值

双延迟深度确定性策略梯度(Twin Delayed DDPG,简称 TD3)通过引入三个关键技巧来解决这个问题:

这三个技巧加在一起,使得性能相比基线DDPG有了大幅的提升

目标策略平滑化的工作原理如下:
a_{T D 3}\left(s^{\prime}\right)=\operatorname{clip}\left(\mu_{\theta, t a r g}\left(s^{\prime}\right)+\operatorname{clip}(\epsilon,-c, c), a_{\text {low }}, a_{\text {high }}\right)\tag{2}
其中\epsilon本质上是一个噪声,是从正态分布中取样得到的,即\epsilon \sim N(0,\sigma)目标策略平滑化是一种正则化方法

3. 算法流程

伪代码如下:

  1. 初始化Actor和Critic以及其各自的目标网络共4个网络以及经验池replay buffer R
  2. 在Actor网络输出动作时,DDPG通过添加随机噪声的方式实现exploration,可以让智能体更好的探索潜在的最优策略,之后是采取经验回放的技巧。把智能体与环境交互的数据(s_t,a_t,r_t,s_{t+1})存储到R。随后每次训练从中随机采样一个minibatch
  3. 在参数更新上,先利用Critic的目标网络Q'来计算目标值y_i,利用y_i与当前Q值的均方误差构造损失函数,进行梯度更新。对于Actor的策略网络,其实就是把Actor的确定性动作函数代进Q-function的a,然后求梯度,最后是更新目标网络

4. 总结

简单来说DQN+Actor-Critic=>Deep Deterministic Policy Gradient(DDPG)。实际上DDPG其实更接近DQN,只是采用了类似Actor-Critic的结构。DDPG吸收了Actor-Critic中策略梯度单步更新的优点,同时还吸收了DQN对Q值估计的技巧。DDPG 最大的优势就是能够在连续动作上更有效地学习

5. 代码

代码主要看DDPG算法主要几个模块:

5.1 Actor

Actor作用是接收状态描述,输出一个action,由于DDPG中的动作空间要求是连续的,所以使用了一个tanh

class Actor(nn.Module):
    def __init__(self, n_obs, n_actions, hidden_size, init_w=3e-3):
        super(Actor, self).__init__()  
        self.linear1 = nn.Linear(n_obs, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        self.linear3 = nn.Linear(hidden_size, n_actions)
        self.linear3.weight.data.uniform_(-init_w, init_w)
    self.linear3.bias.data.uniform_(-init_w, init_w)
def forward(self, x):
    x = F.relu(self.linear1(x))
    x = F.relu(self.linear2(x))
    x = F.tanh(self.linear3(x))
    return x

实现方面,就是用了几个全连接层来设计的网络,输出的结果是一个连续的值

5.2 Critic

Critic批评者,在DDPG中接受来自Actor的一个Action值和当前的状态,输出的是当前状态下,采用Action动作以后得到的关于Q的期望

class Critic(nn.Module):
    def __init__(self, n_obs, n_actions, hidden_size, init_w=3e-3):
        super(Critic, self).__init__()
        self.linear1 = nn.Linear(n_obs + n_actions, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        self.linear3 = nn.Linear(hidden_size, 1)
        # 随机初始化为较小的值
        self.linear3.weight.data.uniform_(-init_w, init_w)
        self.linear3.bias.data.uniform_(-init_w, init_w)
    def forward(self, state, action):
        # 按维数1拼接
        x = torch.cat([state, action], 1)
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = self.linear3(x)
        return x

5.3 Replay Buffer

Replay Buffer就是用来存储一系列等待学习的SARS片段。

class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0
    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state_batch, action_batch, reward_batch, next_state_batch, done_batch = map(np.stack, zip(*batch))
        return state_batch, action_batch, reward_batch, next_state_batch, done_batch
    def __len__(self):
        return len(self.buffer)

可以设置Replay Buffer的容量,push函数是向buffer中添加一个SARS片段;sample代表从buffer中采样batch size个片段

5.4 DDPG

DDPG用到了以上的所有对象,包括Critic、Target Critic、Actor、Target Actor、memory

init函数如下:

def __init__(self, n_states, n_actions, hidden_dim=30, device="cpu", critic_lr=1e-3,
                actor_lr=1e-4, gamma=0.99, soft_tau=1e-2, memory_capacity=100000, batch_size=128):
    self.device = device
    self.critic = Critic(n_states, n_actions, hidden_dim).to(device)
    self.actor = Actor(n_states, n_actions, hidden_dim).to(device)
    self.target_critic = Critic(n_states, n_actions, hidden_dim).to(device)
    self.target_actor = Actor(n_states, n_actions, hidden_dim).to(device)
    for target_param, param in zip(self.target_critic.parameters(), self.critic.parameters()):
        target_param.data.copy_(param.data)
    for target_param, param in zip(self.target_actor.parameters(), self.actor.parameters()):
        target_param.data.copy_(param.data)
    self.critic_optimizer = optim.Adam(
        self.critic.parameters(),  lr=critic_lr)
    self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
    self.memory = ReplayBuffer(memory_capacity)
    self.batch_size = batch_size
    self.soft_tau = soft_tau
    self.gamma = gamma

其中核心的函数就是update函数:

def update(self):
    if len(self.memory) < self.batch_size:
        return
    state, action, reward, next_state, done = self.memory.sample(
        self.batch_size)
    # 将所有变量转为张量
    state = torch.FloatTensor(state).to(self.device)
    next_state = torch.FloatTensor(next_state).to(self.device)
    action = torch.FloatTensor(action).to(self.device)
    reward = torch.FloatTensor(reward).unsqueeze(1).to(self.device)
    done = torch.FloatTensor(np.float32(done)).unsqueeze(1).to(self.device)
    # 注意critic将(s_t,a)作为输入
    policy_loss = self.critic(state, self.actor(state))
    
    policy_loss = -policy_loss.mean()

    next_action = self.target_actor(next_state)
    target_value = self.target_critic(next_state, next_action.detach())
    expected_value = reward + (1.0 - done) * self.gamma * target_value
    expected_value = torch.clamp(expected_value, -np.inf, np.inf)

    value = self.critic(state, action)
    value_loss = nn.MSELoss()(value, expected_value.detach())
    
    self.actor_optimizer.zero_grad()
    policy_loss.backward()
    self.actor_optimizer.step()

    self.critic_optimizer.zero_grad()
    value_loss.backward()
    self.critic_optimizer.step()
    for target_param, param in zip(self.target_critic.parameters(), self.critic.parameters()):
        target_param.data.copy_(
            target_param.data * (1.0 - self.soft_tau) +
            param.data * self.soft_tau
        )
    for target_param, param in zip(self.target_actor.parameters(), self.actor.parameters()):
        target_param.data.copy_(
            target_param.data * (1.0 - self.soft_tau) +
            param.data * self.soft_tau
        )

整体流程如下:

next_action = self.target_actor(next_state)
target_value = self.target_critic(next_state, next_action.detach())

实现如下:

expected_value = reward + (1.0 - done) * self.gamma * target_value
expected_value = torch.clamp(expected_value, -np.inf, np.inf)

如果done为1,代表已经结束了,也就不需要这个系数了。第二行对expected value进行了数值上的限制

上一篇下一篇

猜你喜欢

热点阅读