LunarLander-v2 in reinforcement
这篇文章讲的是ppo算法,训练lunarlander。
(
关于这个环境:火箭轨迹优化在最优控制中是一个经典的主题
根据Pontryagin's的最大值原则,最好将发动机全油门点火或将其关闭,这就是为什么这个环境有离散行为是可行的。
着陆垫的坐标总是(0,0),坐标是状态向量中的前两个数字,从屏幕顶部移动到着陆垫并且零速度的奖励约为 100..140 分。
如果着陆器原理这个landing pad就会损失奖励,如果着陆器坠毁或休息,获得额外的 -100 或 +100 分,
每条腿与地面接触+10 分。每帧启动主引擎是 -0.3 点。每帧触发副引擎是 -0.03 点。解决是200分。
可以在着陆垫外着陆。燃料是无限的,所以代理可以学习飞行然后着陆
在它的第一次尝试中。详情请查看源代码。
)
在这个教程最后,为了学习控制任何离散的游戏环节,你将知道如何将一个基于策略的学习方法应用到演员评论家框架里。
这篇文章使用了openai gym的环境,但是你可以使用任何其他的游戏环境,只需要确定这个环境支持openai的API就行,如果您想为其他环境调整代码,只需确保您的输入和输出是正确的。
Running the LunarLander-v2 Environment
在这个环境里面:
Action space (Discrete): 0-Do nothing, 1-Fire left engine, 2-Fire down engine, 3-Fire right engine
Proximal Policy Optimization (PPO)
PPO算法在2017年被openai团队引入,并且快速的成为最受欢迎的强化学习方法之一,使得其他所有的强化学习方法在那一刻被放在一边
PPO涉及到收集一小批与环境交互的经验并且使用这些batch来更新做决定的策略,一旦策略用batch更新后,这些经验就被抛弃,然后新的batch通过新的策略被收集。
这就是为什么它是一个on-policy的方法,收集的经验样本仅用于更新当前政策。
主要思想是,在一次更新后,新的策略与旧策略的差距不会太大。
这会以一些偏差为代价减少训练中的差异,但确保更顺畅的训练,并确保代理不会走上不可恢复的路径,采取无意义的行动。因此,让我们继续将我们的代理分解为更多细节,看看它如何定义和更新其策略。
The Actor-Critic model's structure
PPO对agent用的是Actor-Critic approach。
这意味着什么?意味着,使用了两个模型,一个叫做actor,一个叫做,critic。
The Actor model
Actor模型是用来学习在一个环境的特定观测状态下要采取什么动作。在lunarlander-v2里,它接受这个游戏的拥有八个值的列表作为输入。
这代表了当前的rocket的状态,输出是点火点哪个引擎。
Custom PPO loss
这是PPO算法,所以让我们理解一下损失函数。
行为的概率和old_概率表明策略由我们的actor神经网络模型定义。通过训练这个模型,我们想要改善这些概率,以便于随着时间的流逝给予我们愈来越好的action.
现在在一些强化学习方法中主要的问题是,一旦我们的模型采取了一个坏的策略,它在游戏中只会采取bad action,所以从那以后我们无法产生任何好的action ,只会让我们走无法挽回的下坡路。PPO试图解决依靠对模型做小的更新步,因此使得训练过程稳定。
The Critic model
我们将actor预测得出的action发送给我们的环境,并且观测在游戏中发生了什么,如果我们的action导致了一些积极的事情发生了,比如spaceship landing了。
然后环境就发送一个正的反馈仪奖励的形式,但是如果我们的spaceship fall 了,我们将接受一个负的奖励,这些奖励在训练critic模型的时候接受。
import gym
import random
env = gym.make("LunarLander-v2")
def Random_games():
# Each of this episode is its own game.
for episode in range(10):
env.reset()
# this is each frame, up to 500...but we wont make it that far with random.
while True:
# This will display the environment
# Only display if you really want to see it.
# Takes much longer to display it.
env.render()
# This will just create a sample action in any environment.
# In this environment, the action can be any of one how in list on 4, for example [0 1 0 0]
action = env.action_space.sample()
# this executes the environment with an action,
# and returns the observation of the environment,
# the reward, if the env is over, and other info.
next_state, reward, done, info = env.step(action)
# lets print everything in one line:
print(next_state, reward, done, info, action)
if done:
break
Random_games()
section 1
class Actor_Model:
def __init__(self, input_shape, action_space, lr, optimizer):
X_input = Input(input_shape)
self.action_space = action_space
X = Dense(512, activation="relu", kernel_initializer=tf.random_normal_initializer(stddev=0.01))(X_input)
X = Dense(256, activation="relu", kernel_initializer=tf.random_normal_initializer(stddev=0.01))(X)
X = Dense(64, activation="relu", kernel_initializer=tf.random_normal_initializer(stddev=0.01))(X)
output = Dense(self.action_space, activation="softmax")(X)
self.Actor = Model(inputs = X_input, outputs = output)
self.Actor.compile(loss=self.ppo_loss, optimizer=optimizer(lr=lr))
def ppo_loss(self, y_true, y_pred):
# Defined in https://arxiv.org/abs/1707.06347
advantages, prediction_picks, actions = y_true[:, :1], y_true[:, 1:1+self.action_space], y_true[:, 1+self.action_space:]
LOSS_CLIPPING = 0.2
ENTROPY_LOSS = 0.001
prob = actions * y_pred
old_prob = actions * prediction_picks
prob = K.clip(prob, 1e-10, 1.0)
old_prob = K.clip(old_prob, 1e-10, 1.0)
ratio = K.exp(K.log(prob) - K.log(old_prob))
p1 = ratio * advantages
p2 = K.clip(ratio, min_value=1 - LOSS_CLIPPING, max_value=1 + LOSS_CLIPPING) * advantages
actor_loss = -K.mean(K.minimum(p1, p2))
entropy = -(y_pred * K.log(y_pred + 1e-10))
entropy = ENTROPY_LOSS * K.mean(entropy)
total_loss = actor_loss - entropy
return total_loss
def predict(self, state):
return self.Actor.predict(state)
section 2
class Critic_Model:
def __init__(self, input_shape, action_space, lr, optimizer):
X_input = Input(input_shape)
old_values = Input(shape=(1,))
V = Dense(512, activation="relu", kernel_initializer='he_uniform')(X_input)
V = Dense(256, activation="relu", kernel_initializer='he_uniform')(V)
V = Dense(64, activation="relu", kernel_initializer='he_uniform')(V)
value = Dense(1, activation=None)(V)
self.Critic = Model(inputs=[X_input, old_values], outputs = value)
self.Critic.compile(loss=[self.critic_PPO2_loss(old_values)], optimizer=optimizer(lr=lr))
def critic_PPO2_loss(self, values):
def loss(y_true, y_pred):
LOSS_CLIPPING = 0.2
clipped_value_loss = values + K.clip(y_pred - values, -LOSS_CLIPPING, LOSS_CLIPPING)
v_loss1 = (y_true - clipped_value_loss) ** 2
v_loss2 = (y_true - y_pred) ** 2
value_loss = 0.5 * K.mean(K.maximum(v_loss1, v_loss2))
#value_loss = K.mean((y_true - y_pred) ** 2) # standard PPO loss
return value_loss
return loss
def predict(self, state):
return self.Critic.predict([state, np.zeros((state.shape[0], 1))])
section 3
def get_gaes(self, rewards, dones, values, next_values, gamma = 0.99, lamda = 0.9, normalize=True):
deltas = [r + gamma * (1 - d) * nv - v for r, d, nv, v in zip(rewards, dones, next_values, values)]
deltas = np.stack(deltas)
gaes = copy.deepcopy(deltas)
for t in reversed(range(len(deltas) - 1)):
gaes[t] = gaes[t] + (1 - dones[t]) * gamma * lamda * gaes[t + 1]
target = gaes + values
if normalize:
gaes = (gaes - gaes.mean()) / (gaes.std() + 1e-8)
return np.vstack(gaes), np.vstack(target)
section 4
def replay(self, states, actions, rewards, predictions, dones, next_states):
# reshape memory to appropriate shape for training
states = np.vstack(states)
next_states = np.vstack(next_states)
actions = np.vstack(actions)
predictions = np.vstack(predictions)
# Get Critic network predictions
values = self.Critic.predict(states)
next_values = self.Critic.predict(next_states)
# Compute discounted rewards and advantages
advantages, target = self.get_gaes(rewards, dones, np.squeeze(values), np.squeeze(next_values))
# stack everything to numpy array
# pack all advantages, predictions and actions to y_true and when they are received
# in custom PPO loss function we unpack it
y_true = np.hstack([advantages, predictions, actions])
# training Actor and Critic networks
a_loss = self.Actor.Actor.fit(states, y_true, epochs=self.epochs, verbose=0, shuffle=self.shuffle)
c_loss = self.Critic.Critic.fit([states, values], target, epochs=self.epochs, verbose=0, shuffle=self.shuffle)
def run_batch(self): # train every self.Training_batch episodes
state = self.env.reset()
state = np.reshape(state, [1, self.state_size[0]])
done, score, SAVING = False, 0, ''
while True:
# Instantiate or reset games memory
states, next_states, actions, rewards, predictions, dones = [], [], [], [], [], []
for t in range(self.Training_batch):
self.env.render()
# Actor picks an action
action, action_onehot, prediction = self.act(state)
# Retrieve new state, reward, and whether the state is terminal
next_state, reward, done, _ = self.env.step(action)
# Memorize (state, action, reward) for training
states.append(state)
next_states.append(np.reshape(next_state, [1, self.state_size[0]]))
actions.append(action_onehot)
rewards.append(reward)
dones.append(done)
predictions.append(prediction)
# Update current state
state = np.reshape(next_state, [1, self.state_size[0]])
score += reward
if done:
self.episode += 1
average, SAVING = self.PlotModel(score, self.episode)
print("episode: {}/{}, score: {}, average: {:.2f} {}".format(self.episode, self.EPISODES, score, average, SAVING))
state, done, score, SAVING = self.env.reset(), False, 0, ''
state = np.reshape(state, [1, self.state_size[0]])
self.replay(states, actions, rewards, predictions, dones, next_states)
section 5