TensorFlow应用实战-19-编写A3C实现赛车游戏
2018-06-19 本文已影响163人
天涯明月笙
编写A3C实现赛车游戏
# -*- coding: UTF-8 -*-
"""
A3C 算法。做决策的部分,相当于机器人的大脑
"""
import numpy as np
import threading
import scipy.signal
import six.moves.queue as queue
from collections import namedtuple
import tensorflow as tf
# 用折扣因子 gamma 来衰减 x
def discount(x, gamma):
return scipy.signal.lfilter([1], [1, -gamma], x[::-1], axis=0)[::-1]
# 处理样本
def process_rollout(rollout, gamma, lambda_=1.0):
"""
给一个样本,计算它的返回和优势
"""
batch_si = np.asarray(rollout.states)
batch_a = np.asarray(rollout.actions)
rewards = np.asarray(rollout.rewards)
vpred_t = np.asarray(rollout.values + [rollout.r])
rewards_plus_v = np.asarray(rollout.rewards + [rollout.r])
batch_r = discount(rewards_plus_v, gamma)[:-1]
delta_t = rewards + gamma * vpred_t[1:] - vpred_t[:-1]
# 计算 Advantage(优势)的公式来自这篇论文:
# https://arxiv.org/abs/1506.02438
batch_adv = discount(delta_t, gamma * lambda_)
features = rollout.features[0]
return Batch(batch_si, batch_a, batch_adv, batch_r, rollout.terminal, features)
# 一个样本的命名 tuple(元组)
Batch = namedtuple("Batch", ["si", "a", "adv", "r", "terminal", "features"])
class PartialRollout(object):
"""
一个完整样本的一部分。
如果一个 agent(智能体)已经运行了足够多的步数,
我们就处理它的经验
"""
def __init__(self):
self.states = []
self.actions = []
self.rewards = []
self.values = []
self.r = 0.0
self.terminal = False
self.features = []
def add(self, state, action, reward, value, terminal, features):
self.states += [state]
self.actions += [action]
self.rewards += [reward]
self.values += [value]
self.terminal = terminal
self.features += [features]
def extend(self, other):
assert not self.terminal
self.states.extend(other.states)
self.actions.extend(other.actions)
self.rewards.extend(other.rewards)
self.values.extend(other.values)
self.r = other.r
self.terminal = other.terminal
self.features.extend(other.features)
class RunnerThread(threading.Thread):
"""
Universe 的环境和普通环境最大不同之一是:
Universe 的环境是 Real Time(实时)的,也就需要有一个
线程来与其交互,告诉它该干什么。RunnerThread 线程就是来做这事的
"""
def __init__(self, env, policy, num_local_steps, visualise):
threading.Thread.__init__(self)
self.queue = queue.Queue(5)
self.num_local_steps = num_local_steps
self.env = env
self.last_features = None
self.policy = policy
self.daemon = True
self.sess = None
self.summary_writer = None
self.visualise = visualise
def start_runner(self, sess, summary_writer):
self.sess = sess
self.summary_writer = summary_writer
self.start()
def run(self):
with self.sess.as_default():
self._run()
def _run(self):
rollout_provider = env_runner(self.env, self.policy, self.num_local_steps, self.summary_writer, self.visualise)
while True:
# timeout 是凭经验设定的。
# 如果一个 worker 挂了,其他 worker 不会和它一起挂
self.queue.put(next(rollout_provider), timeout=600.0)
def env_runner(env, policy, num_local_steps, summary_writer, render):
"""
RunnerThread 的主逻辑。
简单来说,它不停执行 Policy(策略),
只要样本长度超过了一定值,就会把 Policy 插入到队列中
"""
last_state = env.reset()
last_features = policy.get_initial_features()
length = 0
rewards = 0
while True:
terminal_end = False
rollout = PartialRollout()
for _ in range(num_local_steps):
fetched = policy.act(last_state, *last_features)
action, value_, features = fetched[0], fetched[1], fetched[2:]
# 用 argmax 方法从 One-Hot 编码转换出所需结果
state, reward, terminal, info = env.step(action.argmax())
if render:
env.render()
# 收集经验
rollout.add(last_state, action, reward, value_, terminal, last_features)
length += 1
rewards += reward
last_state = state
last_features = features
if info:
summary = tf.Summary()
for k, v in info.items():
summary.value.add(tag=k, simple_value=float(v))
summary_writer.add_summary(summary, policy.global_step.eval())
summary_writer.flush()
timestep_limit = env.spec.tags.get('wrapper_config.TimeLimit.max_episode_steps')
if terminal or length >= timestep_limit:
terminal_end = True
if length >= timestep_limit or not env.metadata.get('semantics.autoreset'):
last_state = env.reset()
last_features = policy.get_initial_features()
print("回合结束. 回合奖励: {}. 回合长度: {}".format(rewards, length))
length = 0
rewards = 0
break
if not terminal_end:
rollout.r = policy.value(last_state, *last_features)
# 一旦收集了足够的经验,就把它发送出去,并且让 RunnerThread 把它置于队列中
yield rollout
# A3C 算法的一个实现,针对 VNC 的环境做了优化和适配
class A3C(object):
def __init__(self, env, task, visualise):
self.env = env
self.task = task
worker_device = "/job:worker/task:{}/cpu:0".format(task)
with tf.device(tf.train.replica_device_setter(1, worker_device=worker_device)):
with tf.variable_scope("global"):
self.network = LSTMPolicy(env.observation_space.shape, env.action_space.n)
self.global_step = tf.get_variable("global_step", [], tf.int32, initializer=tf.constant_initializer(0, dtype=tf.int32),
trainable=False)
with tf.device(worker_device):
with tf.variable_scope("local"):
self.local_network = pi = LSTMPolicy(env.observation_space.shape, env.action_space.n)
pi.global_step = self.global_step
self.ac = tf.placeholder(tf.float32, [None, env.action_space.n], name="ac")
self.adv = tf.placeholder(tf.float32, [None], name="adv")
self.r = tf.placeholder(tf.float32, [None], name="r")
log_prob_tf = tf.nn.log_softmax(pi.logits)
prob_tf = tf.nn.softmax(pi.logits)
# Policy Gradient 的所谓 loss(损失):
# 它的导数恰好就是 Policy Gradient
# self.adv 用于存储 Advantage(优势),
# Advantage 是在 process_rollout 方法中算出来的
pi_loss = - tf.reduce_sum(tf.reduce_sum(log_prob_tf * self.ac, [1]) * self.adv)
# 值函数的 loss(损失)
vf_loss = 0.5 * tf.reduce_sum(tf.square(pi.vf - self.r))
entropy = - tf.reduce_sum(prob_tf * log_prob_tf)
bs = tf.to_float(tf.shape(pi.x)[0])
self.loss = pi_loss + 0.5 * vf_loss - entropy * 0.01
# 20 代表 "本地步数",也就是在更新参数前我们运行策略的时间步数
# "本地步数" 越大的话,一方面,Policy Gradient 估计的方差就越小
# 另一方面,我们更新参数的频率就降低了,会减慢学习速度
# 20 是一个比较好的超参数。
self.runner = RunnerThread(env, pi, 20, visualise)
grads = tf.gradients(self.loss, pi.var_list)
# 为了用 TensorBoard 看到这些信息
tf.summary.scalar("model/policy_loss", pi_loss / bs)
tf.summary.scalar("model/value_loss", vf_loss / bs)
tf.summary.scalar("model/entropy", entropy / bs)
tf.summary.image("model/state", pi.x)
tf.summary.scalar("model/grad_global_norm", tf.global_norm(grads))
tf.summary.scalar("model/var_global_norm", tf.global_norm(pi.var_list))
self.summary_op = tf.summary.merge_all()
grads, _ = tf.clip_by_global_norm(grads, 40.0)
# 从 ps(参数服务器)将参数拷贝到 worker 的神经网络模型中
self.sync = tf.group(*[v1.assign(v2) for v1, v2 in zip(pi.var_list, self.network.var_list)])
grads_and_vars = list(zip(grads, self.network.var_list))
inc_step = self.global_step.assign_add(tf.shape(pi.x)[0])
# 每一个 worker 有一个不同的 Adam Optimizer(Adam 优化器)集
opt = tf.train.AdamOptimizer(1e-4)
self.train_op = tf.group(opt.apply_gradients(grads_and_vars), inc_step)
self.summary_writer = None
self.local_steps = 0
def start(self, sess, summary_writer):
self.runner.start_runner(sess, summary_writer)
self.summary_writer = summary_writer
# 从 RunnerThread 的队列里取出一个 batch(样本)
def pull_batch_from_queue(self):
rollout = self.runner.queue.get(timeout=600.0)
while not rollout.terminal:
try:
rollout.extend(self.runner.queue.get_nowait())
except queue.Empty:
break
return rollout
def process(self, sess):
"""
抓取 RunnerThread 产生的一个样本,更新它们的参数
更新的参数会被发送到 ps(参数服务器)
"""
sess.run(self.sync) # 将参数从 ps(参数服务器)拷贝到本地 worker
rollout = self.pull_batch_from_queue()
batch = process_rollout(rollout, gamma=0.99, lambda_=1.0)
should_compute_summary = self.task == 0 and self.local_steps % 11 == 0
if should_compute_summary:
fetches = [self.summary_op, self.train_op, self.global_step]
else:
fetches = [self.train_op, self.global_step]
feed_dict = {
self.local_network.x: batch.si,
self.ac: batch.a,
self.adv: batch.adv,
self.r: batch.r,
self.local_network.state_in[0]: batch.features[0],
self.local_network.state_in[1]: batch.features[1],
}
fetched = sess.run(fetches, feed_dict=feed_dict)
if should_compute_summary:
self.summary_writer.add_summary(tf.Summary.FromString(fetched[0]), fetched[-1])
self.summary_writer.flush()
self.local_steps += 1
# 用于变量的初始化
def normalized_columns_initializer(std=1.0):
def _initializer(shape, dtype=None, partition_info=None):
out = np.random.randn(*shape).astype(np.float32)
out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True))
return tf.constant(out)
return _initializer
# 扁平层
def flatten(x):
return tf.reshape(x, [-1, np.prod(x.get_shape().as_list()[1:])])
# 二维卷积层
def conv2d(x, num_filters, name, filter_size=(3, 3), stride=(1, 1), pad="SAME", dtype=tf.float32, collections=None):
with tf.variable_scope(name):
stride_shape = [1, stride[0], stride[1], 1]
filter_shape = [filter_size[0], filter_size[1], int(x.get_shape()[3]), num_filters]
fan_in = np.prod(filter_shape[:3])
fan_out = np.prod(filter_shape[:2]) * num_filters
# 初始化 weight(权重)
w_bound = np.sqrt(6. / (fan_in + fan_out))
w = tf.get_variable("W", filter_shape, dtype, tf.random_uniform_initializer(-w_bound, w_bound),
collections=collections)
b = tf.get_variable("b", [1, 1, 1, num_filters], initializer=tf.constant_initializer(0.0),
collections=collections)
return tf.nn.conv2d(x, w, stride_shape, pad) + b
# 线性计算:y = W * x + b
def linear(x, size, name, initializer=None, bias_init=0):
w = tf.get_variable(name + "/w", [x.get_shape()[1], size], initializer=initializer)
b = tf.get_variable(name + "/b", [size], initializer=tf.constant_initializer(bias_init))
return tf.matmul(x, w) + b
def categorical_sample(logits, d):
value = tf.squeeze(tf.multinomial(logits - tf.reduce_max(logits, [1], keep_dims=True), 1), [1])
return tf.one_hot(value, d)
# 用 LSTM (RNN) 循环神经网络来做策略梯度
class LSTMPolicy(object):
def __init__(self, ob_space, ac_space):
self.x = x = tf.placeholder(tf.float32, [None] + list(ob_space))
for i in range(4):
x = tf.nn.elu(conv2d(x, 32, "l{}".format(i + 1), [3, 3], [2, 2]))
# 在扁平化之后引入一个假的样本维度(1),这样我们就可以在时间维度上做 LSTM 操作
x = tf.expand_dims(flatten(x), [0])
size = 256
lstm = tf.contrib.rnn.BasicLSTMCell(size, state_is_tuple=True)
self.state_size = lstm.state_size
step_size = tf.shape(self.x)[:1]
c_init = np.zeros((1, lstm.state_size.c), np.float32)
h_init = np.zeros((1, lstm.state_size.h), np.float32)
self.state_init = [c_init, h_init]
c_in = tf.placeholder(tf.float32, [1, lstm.state_size.c])
h_in = tf.placeholder(tf.float32, [1, lstm.state_size.h])
self.state_in = [c_in, h_in]
state_in = tf.contrib.rnn.LSTMStateTuple(c_in, h_in)
lstm_outputs, lstm_state = tf.nn.dynamic_rnn(
lstm, x, initial_state=state_in, sequence_length=step_size,
time_major=False)
lstm_c, lstm_h = lstm_state
x = tf.reshape(lstm_outputs, [-1, size])
self.logits = linear(x, ac_space, "action", normalized_columns_initializer(0.01))
self.vf = tf.reshape(linear(x, 1, "value", normalized_columns_initializer(1.0)), [-1])
self.state_out = [lstm_c[:1, :], lstm_h[:1, :]]
self.sample = categorical_sample(self.logits, ac_space)[0, :]
self.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, tf.get_variable_scope().name)
def get_initial_features(self):
return self.state_init
# 实施 action
def act(self, ob, c, h):
sess = tf.get_default_session()
return sess.run([self.sample, self.vf] + self.state_out,
{self.x: [ob], self.state_in[0]: c, self.state_in[1]: h})
# 计算 Q 值
def value(self, ob, c, h):
sess = tf.get_default_session()
return sess.run(self.vf, {self.x: [ob], self.state_in[0]: c, self.state_in[1]: h})[0]
https://www.cnblogs.com/darkknightzh/p/6591923.html
cuda设置gpu。默认全部使用。