LunarLander-v2 in reinforcement

2021-10-06  本文已影响0人  Rain师兄

这篇文章讲的是ppo算法,训练lunarlander。

(

关于这个环境:火箭轨迹优化在最优控制中是一个经典的主题

根据Pontryagin's的最大值原则,最好将发动机全油门点火或将其关闭,这就是为什么这个环境有离散行为是可行的。

着陆垫的坐标总是(0,0),坐标是状态向量中的前两个数字,从屏幕顶部移动到着陆垫并且零速度的奖励约为 100..140 分。

如果着陆器原理这个landing pad就会损失奖励,如果着陆器坠毁或休息,获得额外的 -100 或 +100 分,

每条腿与地面接触+10 分。每帧启动主引擎是 -0.3 点。每帧触发副引擎是 -0.03 点。解决是200分。

可以在着陆垫外着陆。燃料是无限的,所以代理可以学习飞行然后着陆

在它的第一次尝试中。详情请查看源代码。

)

在这个教程最后,为了学习控制任何离散的游戏环节,你将知道如何将一个基于策略的学习方法应用到演员评论家框架里。

这篇文章使用了openai gym的环境,但是你可以使用任何其他的游戏环境,只需要确定这个环境支持openai的API就行,如果您想为其他环境调整代码,只需确保您的输入和输出是正确的。

Running the LunarLander-v2 Environment

在这个环境里面:

Action space (Discrete): 0-Do nothing, 1-Fire left engine, 2-Fire down engine, 3-Fire right engine

Proximal Policy Optimization (PPO)

PPO算法在2017年被openai团队引入,并且快速的成为最受欢迎的强化学习方法之一,使得其他所有的强化学习方法在那一刻被放在一边

PPO涉及到收集一小批与环境交互的经验并且使用这些batch来更新做决定的策略,一旦策略用batch更新后,这些经验就被抛弃,然后新的batch通过新的策略被收集。

这就是为什么它是一个on-policy的方法,收集的经验样本仅用于更新当前政策。

主要思想是,在一次更新后,新的策略与旧策略的差距不会太大。

这会以一些偏差为代价减少训练中的差异,但确保更顺畅的训练,并确保代理不会走上不可恢复的路径,采取无意义的行动。因此,让我们继续将我们的代理分解为更多细节,看看它如何定义和更新其策略。

The Actor-Critic model's structure

PPO对agent用的是Actor-Critic approach。

这意味着什么?意味着,使用了两个模型,一个叫做actor,一个叫做,critic。

The Actor model

Actor模型是用来学习在一个环境的特定观测状态下要采取什么动作。在lunarlander-v2里,它接受这个游戏的拥有八个值的列表作为输入。

这代表了当前的rocket的状态,输出是点火点哪个引擎。

Custom PPO loss

这是PPO算法,所以让我们理解一下损失函数。

行为的概率和old_概率表明策略由我们的actor神经网络模型定义。通过训练这个模型,我们想要改善这些概率,以便于随着时间的流逝给予我们愈来越好的action.

现在在一些强化学习方法中主要的问题是,一旦我们的模型采取了一个坏的策略,它在游戏中只会采取bad action,所以从那以后我们无法产生任何好的action ,只会让我们走无法挽回的下坡路。PPO试图解决依靠对模型做小的更新步,因此使得训练过程稳定。

The Critic model

我们将actor预测得出的action发送给我们的环境,并且观测在游戏中发生了什么,如果我们的action导致了一些积极的事情发生了,比如spaceship landing了。

然后环境就发送一个正的反馈仪奖励的形式,但是如果我们的spaceship fall 了,我们将接受一个负的奖励,这些奖励在训练critic模型的时候接受。

import gym

import random

env = gym.make("LunarLander-v2")

def Random_games():

# Each of this episode is its own game.

for episode in range(10):

    env.reset()

    # this is each frame, up to 500...but we wont make it that far with random.

    while True:

        # This will display the environment

        # Only display if you really want to see it.

        # Takes much longer to display it.

        env.render()

        

        # This will just create a sample action in any environment.

        # In this environment, the action can be any of one how in list on 4, for example [0 1 0 0]

        action = env.action_space.sample()



        # this executes the environment with an action, 

        # and returns the observation of the environment, 

        # the reward, if the env is over, and other info.

        next_state, reward, done, info = env.step(action)

        

        # lets print everything in one line:

        print(next_state, reward, done, info, action)

        if done:

            break

Random_games()

section 1

class Actor_Model:

def __init__(self, input_shape, action_space, lr, optimizer):

    X_input = Input(input_shape)

    self.action_space = action_space



    X = Dense(512, activation="relu", kernel_initializer=tf.random_normal_initializer(stddev=0.01))(X_input)

    X = Dense(256, activation="relu", kernel_initializer=tf.random_normal_initializer(stddev=0.01))(X)

    X = Dense(64, activation="relu", kernel_initializer=tf.random_normal_initializer(stddev=0.01))(X)

    output = Dense(self.action_space, activation="softmax")(X)



    self.Actor = Model(inputs = X_input, outputs = output)

    self.Actor.compile(loss=self.ppo_loss, optimizer=optimizer(lr=lr))



def ppo_loss(self, y_true, y_pred):

    # Defined in https://arxiv.org/abs/1707.06347

    advantages, prediction_picks, actions = y_true[:, :1], y_true[:, 1:1+self.action_space], y_true[:, 1+self.action_space:]

    LOSS_CLIPPING = 0.2

    ENTROPY_LOSS = 0.001

    

    prob = actions * y_pred

    old_prob = actions * prediction_picks



    prob = K.clip(prob, 1e-10, 1.0)

    old_prob = K.clip(old_prob, 1e-10, 1.0)



    ratio = K.exp(K.log(prob) - K.log(old_prob))

    

    p1 = ratio * advantages

    p2 = K.clip(ratio, min_value=1 - LOSS_CLIPPING, max_value=1 + LOSS_CLIPPING) * advantages



    actor_loss = -K.mean(K.minimum(p1, p2))



    entropy = -(y_pred * K.log(y_pred + 1e-10))

    entropy = ENTROPY_LOSS * K.mean(entropy)

    

    total_loss = actor_loss - entropy



    return total_loss



def predict(self, state):

    return self.Actor.predict(state)

section 2

class Critic_Model:

def __init__(self, input_shape, action_space, lr, optimizer):

    X_input = Input(input_shape)

    old_values = Input(shape=(1,))



    V = Dense(512, activation="relu", kernel_initializer='he_uniform')(X_input)

    V = Dense(256, activation="relu", kernel_initializer='he_uniform')(V)

    V = Dense(64, activation="relu", kernel_initializer='he_uniform')(V)

    value = Dense(1, activation=None)(V)



    self.Critic = Model(inputs=[X_input, old_values], outputs = value)

    self.Critic.compile(loss=[self.critic_PPO2_loss(old_values)], optimizer=optimizer(lr=lr))



def critic_PPO2_loss(self, values):

    def loss(y_true, y_pred):

        LOSS_CLIPPING = 0.2

        clipped_value_loss = values + K.clip(y_pred - values, -LOSS_CLIPPING, LOSS_CLIPPING)

        v_loss1 = (y_true - clipped_value_loss) ** 2

        v_loss2 = (y_true - y_pred) ** 2

        

        value_loss = 0.5 * K.mean(K.maximum(v_loss1, v_loss2))

        #value_loss = K.mean((y_true - y_pred) ** 2) # standard PPO loss

        return value_loss

    return loss



def predict(self, state):

    return self.Critic.predict([state, np.zeros((state.shape[0], 1))])

section 3

def get_gaes(self, rewards, dones, values, next_values, gamma = 0.99, lamda = 0.9, normalize=True):

deltas = [r + gamma * (1 - d) * nv - v for r, d, nv, v in zip(rewards, dones, next_values, values)]

deltas = np.stack(deltas)

gaes = copy.deepcopy(deltas)

for t in reversed(range(len(deltas) - 1)):

    gaes[t] = gaes[t] + (1 - dones[t]) * gamma * lamda * gaes[t + 1]



target = gaes + values

if normalize:

    gaes = (gaes - gaes.mean()) / (gaes.std() + 1e-8)

return np.vstack(gaes), np.vstack(target)

section 4

def replay(self, states, actions, rewards, predictions, dones, next_states):

# reshape memory to appropriate shape for training

states = np.vstack(states)

next_states = np.vstack(next_states)

actions = np.vstack(actions)

predictions = np.vstack(predictions)



# Get Critic network predictions 

values = self.Critic.predict(states)

next_values = self.Critic.predict(next_states)



# Compute discounted rewards and advantages

advantages, target = self.get_gaes(rewards, dones, np.squeeze(values), np.squeeze(next_values))



# stack everything to numpy array

# pack all advantages, predictions and actions to y_true and when they are received

# in custom PPO loss function we unpack it

y_true = np.hstack([advantages, predictions, actions])



# training Actor and Critic networks

a_loss = self.Actor.Actor.fit(states, y_true, epochs=self.epochs, verbose=0, shuffle=self.shuffle)

c_loss = self.Critic.Critic.fit([states, values], target, epochs=self.epochs, verbose=0, shuffle=self.shuffle)

def run_batch(self): # train every self.Training_batch episodes

state = self.env.reset()

state = np.reshape(state, [1, self.state_size[0]])

done, score, SAVING = False, 0, ''

while True:

    # Instantiate or reset games memory

    states, next_states, actions, rewards, predictions, dones = [], [], [], [], [], []

    for t in range(self.Training_batch):

        self.env.render()

        # Actor picks an action

        action, action_onehot, prediction = self.act(state)

        # Retrieve new state, reward, and whether the state is terminal

        next_state, reward, done, _ = self.env.step(action)

        # Memorize (state, action, reward) for training

        states.append(state)

        next_states.append(np.reshape(next_state, [1, self.state_size[0]]))

        actions.append(action_onehot)

        rewards.append(reward)

        dones.append(done)

        predictions.append(prediction)

        # Update current state

        state = np.reshape(next_state, [1, self.state_size[0]])

        score += reward

        if done:

            self.episode += 1

            average, SAVING = self.PlotModel(score, self.episode)

            print("episode: {}/{}, score: {}, average: {:.2f} {}".format(self.episode, self.EPISODES, score, average, SAVING))



            state, done, score, SAVING = self.env.reset(), False, 0, ''

            state = np.reshape(state, [1, self.state_size[0]])

            

    self.replay(states, actions, rewards, predictions, dones, next_states)

section 5

上一篇下一篇

猜你喜欢

热点阅读