Cartpole

2021-09-24  本文已影响0人  从此不迷茫
#!/usr/bin/env python3
import gym
from collections import namedtuple
import numpy as np
from tensorboardX import SummaryWriter

import torch
import torch.nn as nn
import torch.optim as optim
# tensorboard --logdir=

HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70


class Net(nn.Module):
    def __init__(self, obs_size, hidden_size, n_actions):
        # 总结:所有放在构造函数__init__里面的层的都是这个模型的“固有属性”.
        super(Net, self).__init__()  # 第一句话,super调用父类的构造函数  =super().__init__()  # 第一句话,super调用父类的构造函数
        '''
        通过Sequential来包装层
即将几个层包装在一起作为一个大的层(块),前面的一篇文章详细介绍了Sequential类的使用,包括常见的三种方式,以及每一种方式的优缺点,
参见:https://blog.csdn.net/qq_27825451/article/details/90551513
        '''
        self.net = nn.Sequential(
            nn.Linear(obs_size, hidden_size),  # PyTorch的nn.Linear()是用于设置网络中的全连接层的,需要注意在二维图像处理的任务中,全连接层的输入与输出一般都设置为二维张量,形状通常为[batch_size, size],不同于卷积层要求输入输出是四维张量。
            nn.ReLU(),
            nn.Linear(hidden_size, n_actions)
        )
        # 此处定义为函数中的新定义,不是继承的
        # view()的作用相当于numpy中的reshape,重新定义矩阵的形状
        '''
        class Fruit():
            def __init__(self, color, shape):
                self.color = color
                self.shape = shape

        class Apple(Fruit):
            def __init__(self, color, shape, taste):
                Fruit.__init__(self, color, shape) # 等价于super().__init__(color, shape)
                self.taste = taste
    
            def feature(self):
                print("Apple's color is {}, shape is {} and taste {}".format(
                    self.color, self.shape, self.taste))
原文链接:https://blog.csdn.net/w1301100424/article/details/93858890
        '''

    def forward(self, x):
        return self.net(x)
# 3)forward方法是必须要重写的,它是实现模型的功能,实现各个层之间的连接关系的核心。

'''
因为元组的局限性:不能为元组内部的数据进行命名,所以往往我们并不知道一个元组所要表达的意义,
所以在这里引入了 collections.namedtuple 这个工厂函数,来构造一个带字段名的元组。
'''

Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])
'''
# 两种方法来给 namedtuple 定义方法名
#User = collections.namedtuple('User', ['name', 'age', 'id'])
User = collections.namedtuple('User', 'name age id')
user = User('tester', '22', '464643123')
'''

def iterate_batches(env, net, batch_size):  # 接受环境(来自Gym库的Env实例)、神经网络、以及每次迭代时应该生成的episode数量
    batch = []  # batch变量用于累积batch(一个Episode实例列表)
    episode_reward = 0.0  # 奖励计数器
    episode_steps = []  #
    obs = env.reset()  # 重新设定环境,获得第一个观察并创建softmax层,用于将网络输出装换成动作的概率分布
    sm = nn.Softmax(dim=1)
    '''
    def softmax(x):
        exp_x = np.exp(x)
        sum_exp_x = np.sum(exp_x) 
        y = exp_x/sum_exp_x
        return y
    改进:解决溢出问题
    def softmax(a):
        c = np.max(a)
        exp_a = np.exp(a - c) # 溢出对策
        sum_exp_a = np.sum(exp_a)
        y = exp_a / sum_exp_a
        return y
    softmax函数的输出是0.0到1.0之间的实数。并且,softmax函数的输出值的总和是1。
    输出总和为1是softmax函数的一个重要性质。正因为有了这个性质,我们才可以把softmax函数的输出解释为“概率”。
    
    所以,当nn.Softmax的输入是一个二维张量时,其参数dim = 0,是让列之和为1;dim = 1,是让行之和为1。
    '''
    while True:  # 进行环境循环
        obs_v = torch.FloatTensor([obs])  # 将观察值(在CartPole中是一个四个数字的向量,即cart_pos,cart_v,pole_angle,pole_v))转换成1*4的张量,这里用单一元素列表传递观察实现
        act_probs_v = sm(net(obs_v))  # 这里没有在网络中使用非线性特性,他将输出原始动作分值,此分值需要softmax函数提供 ,net = Net(obs_size, HIDDEN_SIZE, n_actions),此处obs_v相当于网络输入的x
        act_probs = act_probs_v.data.numpy()[0]   # 这里的网络和softmax层都返回能够跟踪梯度变化的张量,因此需要通过访问tensor.data字段,然后将张量转换为Numpy数组将其解包。
        # 这个数组具有和输入相同的二维结构,batch维度在0轴上,因此需要得到第一个batch元素,获得动作概率的一个一维向量

        action = np.random.choice(len(act_probs), p=act_probs)  # 根据已有的动作的概率分布,获得当前步骤采取的动作,通过使用Numpy.choice()函数对该分布进行采样实现,得到0~len(act_probs)-1整数列表
        next_obs, reward, is_done, _ = env.step(action)   # 之后,把这个动作传递到环境中,获得下一个观察、奖励以及episode是否结束的提示,step()是执行动作的方法
        episode_reward += reward  # 更新
        episode_steps.append(EpisodeStep(observation=obs, action=action))  # episode列表扩展了一个(用于选择动作的观察,动作)对
        if is_done:
            batch.append(Episode(reward=episode_reward, steps=episode_steps))  # 将最终的episode附加到batch中,保存总奖励和采取的步骤,Episode是具名元组
            episode_reward = 0.0  # 重置总奖励累加器并清理步骤列表
            episode_steps = []
            next_obs = env.reset()  # 充值环境重新开始
            if len(batch) == batch_size:
                yield batch  # 如果batch已经达到所需的episode数量,使用yield函数将其返回给调用者进行处理,返回具有不同的稍好一些(所期望)的行为
                batch = []  # 清理batch
        obs = next_obs   # 非常重要的一步是将从环境中获得的观察分配给当前的观察变量
# 这个函数逻辑中需要理解的一个非常重要的事实是,这里的网络训练和episode的生成是同时进行的。
'''
到这里你可能就明白yield和return的关系和区别了,带yield的函数是一个生成器,而不是一个函数了,这个生成器有一个函数就是next函数,next就相当于“下一步”生成哪个数,
这一次的next开始的地方是接着上一次的next停止的地方执行的,所以调用next的时候,生成器并不会从foo函数的开始执行,只是接着上一步停止的地方开始,然后遇到yield后,return出要生成的数,此步就结束。
原文链接:https://blog.csdn.net/mieleizhi0522/article/details/82142856
'''
def filter_batch(batch, percentile):
    rewards = list(map(lambda s: s.reward, batch))
    '''map() 会根据提供的函数对指定序列做映射。map(function, iterable, ...) ,第一个参数 function 以参数序列中的每一个元素调用 function 函数,返回包含每次 function 函数返回值的新列表。
    lambda (匿名函数):示例:add = lambda x,y:x+y  print(add(3,4))-》7
    '''
    reward_bound = np.percentile(rewards, percentile)
    # np.percentile(a, q, axis=None, out=None, overwrite_input=False, interpolation='linear', keepdims=False)
    # 作用:找到一组数的分位数值,如四分位数等(具体什么位置根据自己定义),注意实际百分位数计算方式
    reward_mean = float(np.mean(rewards))

    # 这个函数是交叉熵方法的核心:他从给定batch中的episode和百分位数中计算出一个边界奖励,用于筛选“精华”episode进行训练。为获得边界奖励,
    # 使用Numpy的百分位数函数,他从一组值列表和期望的百分位数中计算该百分位数对应的值。然后计算平均奖励,用于监控。

    train_obs = []
    train_act = []
    for example in batch:
        if example.reward < reward_bound:
            continue
        train_obs.extend(map(lambda step: step.observation, example.steps))  # 将example中的steps观察值列表扩展到train_obs中
        # extend() 函数用于在列表末尾一次性追加另一个序列中的多个值(用新列表扩展原来的列表)。
        train_act.extend(map(lambda step: step.action, example.steps))
    # 然后筛选episode。对于batch中每个episode,这里将检查该episode的总奖励是否高于边界,若是则填写要观察和行动的列表用于训练。

    train_obs_v = torch.FloatTensor(train_obs)
    train_act_v = torch.LongTensor(train_act)
    return train_obs_v, train_act_v, reward_bound, reward_mean
    # 该函数最后一步,需要把“精华”episode中的观察和动作转换为张量,并返回一个四元组:观察、动作、奖励边界和平均奖励。
    # 最后两个值仅用于将他们写入TensorBoard以检查智能体性能。

# 最后一部分代码将所有函数结合一起,训练循环组成如下:
if __name__ == "__main__":
    env = gym.make("CartPole-v0")
    # env = gym.wrappers.Monitor(env, directory="mon", force=True)
    obs_size = env.observation_space.shape[0]
    # env.observation_space是Box属性,box(可能是无界的)在n维空间中。一个box代表n维封闭区间的笛卡尔积。
    # 假设集合A={a,b},集合B={0,1,2},则两个集合的笛卡尔积为{(a,0),(a,1),(a,2),(b,0),(b,1),(b,2)}。
    n_actions = env.action_space.n

    net = Net(obs_size, HIDDEN_SIZE, n_actions)  # HIDDEN_SIZE = 128,返回一个net,可输入参数为x
    objective = nn.CrossEntropyLoss()
    optimizer = optim.Adam(params=net.parameters(), lr=0.01)
    # 深度学习的优化算法Adam
    # torch.optim is a package implementing various optimization algorithms. Most commonly used methods are already supported,
    # and the interface is general enough, so that more sophisticated ones can be also easily integrated in the future.
    writer = SummaryWriter(comment="-cartpole")
    # 首先,创建所有必须的对象:环境、神经网络、目标函数、优化器、TensorBoard的摘要编写器。注释行创建一个监视器以写入智能体程序性能的视频。

    for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)):  # BATCH_SIZE = 16,enumerate返回索引和值
        '''
        for循环遍历的原理就是迭代,in后面必须是可迭代对象. iterate_batches()函数里面有yield()函数,自动变成可迭代对象
        '''
        obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)  # PERCENTILE = 70
        optimizer.zero_grad()
        action_scores_v = net(obs_v)
        loss_v = objective(action_scores_v, acts_v)
        loss_v.backward()
        optimizer.step()
    # 在训练循环中,迭代batch(一个episode对象的列表),然后使用filter_batch函数筛选“精华”episode。其结果就是观察和采取行动的变量,用于筛选的奖励边界和平均奖励。
    # 之后,将网络的梯度归零,并将观察传递给网络,获得其动作分值。这些分值被传递给目标函数,目标函数计算网络输出和智能体所采取的动作之间的交叉熵。这样做可以增强网络,
    # 以执行哪些可以带来良好奖励的“精华:动作。然后,计算损失梯度,并要求优化器调整网络。


        print("%d: loss=%.3f, reward_mean=%.1f, reward_bound=%.1f" % (
            iter_no, loss_v.item(), reward_m, reward_b))
        writer.add_scalar("loss", loss_v.item(), iter_no)
        writer.add_scalar("reward_bound", reward_b, iter_no)
        writer.add_scalar("reward_mean", reward_m, iter_no)
        # 循环其余部分是监控进度,在控制台上,显示迭代次数、损失、batch的平均奖励和奖励边界。这里还将相同的值写入TensorBoard,以获得一个漂亮的智能体学习性能图。
        if reward_m > 199:
            print("Solved!")
            break
    writer.close()
    # 训练最后一次检查是比较该batch中episode的平均奖励。若该平均奖励数值超过199时,就停止训练。为什么是199?在Gym中,当最后100个episode的平均奖励大于195时,
    # 此Cart Pole环境可以考虑为被解决完了,交叉熵方法收敛的非常快,以至于通常只需要100个episode。经过适当训练的智能体可以无限长时间的保持棍子平衡(获得任何数量的
    # 分数),但Cart Pole中的episode长度限制为200步(Cart Pole环境中,Time limit包装器,它会停止200步后的episode)考虑到此问题,这将在batch中的平均奖励大于199
    # 之后停止训练,者可以很好的表明智能体已经知道如何像一个专业者一样平衡棍子。

上一篇下一篇

猜你喜欢

热点阅读