Data science机器学习常用查询

神经网络和反向传播算法

2021-05-01  本文已影响0人  星光下的胖子

目录:


一、什么是神经网络

人工神经网络简称“神经网络”,简单来说,是一种数学模型,其借鉴了生物神经网络的工作原理,并用于做信息处理。

神经元

神经网络的基本结构是神经元,如下图所示:

神经元

整个过程可理解为:输入--->处理--->输出。

神经网络

多个神经元可组成一个神经网络,神经网络的结构分为:输入层、隐藏层、输出层。

神经网络

上图是一个简单的全连接神经网络,LayerL1是输入层,LayerL4是输出层,LayerL2+LayerL3为隐藏层。除了输入层以外,每一层的输入都是上一层的输出。

神经网络的训练

在神经网络模型中,w是要训练的模型参数。x是输入数据,y是目标值(实际值),h是输出结果(预测值)。模型训练大致流程:

二、前向传播(Forward Propagation)

上面,我们已经知道了单个神经元的输出:
\qquad \qquad \qquad h_{w,b}{(x)}=f(\sum_{i=0}^{n}{x_iw_i} ) \qquad \qquad \qquad
接下来,我们来探讨下神经网络(多层多神经元)的前向过程。

神经网络实际上就是一个输入向量\vec{x}到输出向量\vec{y}的函数,即:
\qquad \qquad \qquad \vec{y} = f_{network}(\vec{x})

前向传播的流程(l_i表示第i层):首先,根据输入的特征向量\vec{x},根据公式①计算l_1层每个节点(神经元)的输出,得到输出向量\vec{a_1}。然后,从前往后,依次将上一层l_{k-1}的输出{\vec{a_{k-1}}}作为当前层的输入,计算当前层l_{k}的输出向量\vec{a_{k}},直到最后的输出层。

前向传播的向量表示(\vec{W_i}表示第i层的参数,一共n层,f是激活函数):
\qquad \qquad \qquad \left. \begin{matrix} \vec{a_1} = f(\vec{W_1} \cdot \vec{x}) \\ \vec{a_2} = f(\vec{W_2} \cdot \vec{a_1}) \\ \ldots \\ \vec{a_k} = f(\vec{W_k} \cdot \vec{a_{k-1}}) \\ \ldots \\ \vec{y} = f(\vec{W_n} \cdot \vec{a_{n-1}}) \end{matrix} \right\}\qquad \qquad

三、误差的反向传播(Back Propagation,即BP算法)

误差的反向传播,简称反向传播,是指根据前向输出与真实值的误差/损失,递归应用链式法则来计算权重的梯度的这种方法。常用于人工神经网络进行梯度下降。

原理描述起来如此简单,那么计算误差关于权重的梯度是否有规律可循?下面我们就来寻找其中的规律,即推导BP算法的几个公式。

反向传播算法的公式推导

E_d表示样本d的误差,w_{ji}表示节点i到节点j的连接权重,{net}_j表示节点j加权输入a_j表示节点j的输出。

由上图可知,权重w_{ji}仅能通过影响节点j加权输入net_j来影响网络的其他部分,net_j仅能通过影响节点j的输出a_j来影响网络的其他部分。

我们知道,梯度下降法的参数迭代公式为:
\qquad \qquad \qquad w_{ji} \leftarrow w_{ji} - \eta \frac{\partial{E_d}}{\partial w_{ji}} \qquad \qquad
因此,关键就是如何求误差E_d对每个权重w_{ji}的偏导数(梯度)\frac{\partial{E_d}}{\partial w_{ji}}

定义\delta_j = \frac{\partial {E_d}}{\partial {{net}_j}}节点j的误差项,是误差E_d对节点j加权输入net_j的偏导数。
(ps:误差项\delta_j的定义是为了简化公式,便于理解。)
于是梯度的计算可转换为:
\qquad \qquad \Delta_{w_{ji}} = \frac{\partial{E_d}}{\partial w_{ji}} = \frac{\partial{E_d}}{\partial net_j} \frac{\partial{net_j}}{\partial w_{ji}} = \delta_j x_{ji} \qquad \qquad

下面我们分析如何计算节点的误差项\delta_j,注意输出层和隐藏层的误差项计算有所不同。
1)计算输出层节点的误差项\delta_j
\qquad \;\;\;\, \delta_j = \frac{\partial{E_d}}{\partial net_j} = \frac{\partial{E_d}}{\partial y_j} \frac{\partial{y_j}}{\partial net_j} \qquad \qquad

2)计算隐藏层节点的误差项\delta_j
定义节点j的直接下游节点的集合Downstream(j)
误差项:
\qquad \;\;\;\, \delta_j = \frac{\partial{E_d}}{\partial net_j}
\qquad \qquad = \sum_{k \in {Downstream(j)}} (\frac{\partial{E_d}}{\partial net_k} \frac{\partial{net_k}}{\partial net_j})
\qquad \qquad = \sum_{k \in {Downstream(j)}} (\frac{\partial{E_d}}{\partial net_k} \frac{\partial{net_k}}{\partial a_j} \frac{\partial{a_j}}{\partial net_j})
\qquad \qquad = \sum_{k \in {Downstream(j)}} (\frac{\partial{E_d}}{\partial net_k} w_{kj} \frac{\partial{a_j}}{\partial net_j})
\qquad \qquad = \sum_{k \in {Downstream(j)}} (\delta_k w_{kj} \frac{\partial{a_j}}{\partial net_j})
\qquad \qquad =\frac{\partial{a_j}}{\partial net_j} \sum_{k \in {Downstream(j)}} (\delta_k w_{kj}) \qquad \qquad

至此,我们已经推导出了反向传播算法,得到公式④、⑤、⑥。选择不同的激活函数、损失函数时,只需将对应的导数公式带入④、⑤、⑥即可。

假如,我们选择sigmoid为激活函数、误差平方和为损失函数,有:
\qquad节点的输出:y_j = sigmoid(net_j),a_j = sigmoid(net_j)
\qquad样本d的误差:E_d = \frac{1}{2} \sum_{i \in outputs} (t_i - y_i) ^ 2t_i是真实值。
易得导数公式:
\qquad \frac{\partial{y_j}}{\partial net_j} = y_i (1 - y_i),\frac{\partial{a_j}}{\partial net_j} = a_i (1 - a_i),\frac{\partial{E_d}}{\partial y_j} = y_i - t_i
将上述导数公式带入④、⑤、⑥,得:
\qquad输出层节点的误差项:\delta_j = (y_i - t_i)y_i(1- y_i)
\qquad隐藏层节点的误差项:\delta_j =a_i (1 - a_i) \sum_{k \in {Downstream(j)}} (\delta_k w_{kj})
\qquad误差对权重的梯度:\Delta_{w_{ji}} = \delta_j x_{ji}

四、手动实现一个基本的全连接神经网络FCNN

如上图,可以分解出5个领域对象来实现神经网络:

代码实现如下:

# 手动实现一个基本的BP全连接神经网络
import math
from functools import reduce
import random


# sigmoid激活函数
def sigmoid(z):
    return 1 / (1 + math.exp(-z))


# 1.Node节点
# 负责记录和维护节点自身信息以及与这个节点相关的上下游连接,实现输出值和误差项的计算。
class Node(object):
    def __init__(self, layer_index, node_index):
        '''
        构造节点对象。
        layer_index:节点所属的层的编号
        node_index:节点的编号
        '''
        self.layer_index = layer_index  # 节点所属的层的编号
        self.node_index = node_index  # 节点的编号
        self.upstream = []  # 上游连接
        self.downstream = []  # 下游连接
        self.output = 0.0  # 节点的输出
        self.delta = 0.0  # 节点的误差项

    def add_upstream_connection(self, conn):
        '''
        添加一个到上游节点的连接
        '''
        self.upstream.append(conn)

    def add_downstream_connection(self, conn):
        '''
        添加一个到下游节点的连接
        '''
        self.downstream.append(conn)

    def set_output(self, output):
        '''
        设置节点的输出值。如果节点属于输入层会用到这个函数。
        '''
        self.output = output

    def calc_output(self):
        '''
        计算节点的输出值,根据公式①
        '''
        output = reduce(lambda ret, conn: ret + conn.upstream_node.output * conn.weight, self.upstream, 0.0)
        self.output = sigmoid(output)

    def calc_output_layer_delta(self, label):
        '''
        计算输出层节点的误差项,公式⑤
        '''
        self.delta = (self.output - label) * self.output * (1 - self.output)

    def calc_hidden_layer_delta(self):
        '''
        计算隐藏层节点的误差项,公式⑥
        '''
        self.delta = self.output * (1 - self.output) * reduce(
            lambda ret, conn: ret + conn.downstream_node.delta * conn.weight, self.downstream, 0.0)


# 2.ConstNode节点
# 实现一个输出恒为1的节点(计算偏置项bias时需要)
class ConstNode(object):
    def __init__(self, layer_index, node_index):
        '''
        构造常量节点对象,输出恒为1。
        layer_index:节点所属的层的编号
        node_index:节点的编号
        '''
        self.layer_index = layer_index  # 节点所属层的编号
        self.node_index = node_index  # 节点的编号
        self.downstream = []  # 下游连接
        self.output = 1  # 输出恒为1
        self.delta = 0.0  # 误差项恒为0

    def add_downstream_connection(self, conn):
        '''
        添加一个到下游节点的连接
        '''
        self.downstream.append(conn)

    def calc_hidden_layer_delta(self):
        '''
        计算隐藏层节点的误差项,公式⑥
        '''
        self.delta = 0.0  # 函数f(x)=y=1的导数为0


# 3.Layer层
# 由多个Node节点组成,提供对Node集合的操作。
class Layer(object):
    def __init__(self, layer_index, node_count):
        '''
        初始化一层(包括输入层、隐藏层、输出层)。
        layer_index:层编号
        node_count:层的节点个数
        '''
        self.layer_index = layer_index
        self.nodes = []
        for i in range(node_count):
            self.nodes.append(Node(layer_index, i))
        # 所有层添加一个ConstNode节点。但输出层没有ConstNode节点,后面的逻辑要考虑在内。
        self.nodes.append(ConstNode(layer_index, node_count))

    def set_output(self, data):
        '''
        设置层的输出。如果层是输入层会用到这个函数。
        '''
        for i in range(len(data)):
            self.nodes[i].set_output(data[i])

    def calc_output(self):
        '''
        计算层的各节点的输出
        '''
        for node in self.nodes[:-1]:  # 最后一个常量节点不用计算
            node.calc_output()


# 4.Connection连接
# 记录该连接的权重、与这个连接所关联的上下游节点,提供计算梯度和更新权重的功能。
class Connection(object):
    def __init__(self, upstream_node, downstream_node):
        '''
        初始化连接,权重初始化为一个很小的随机数。
        upstream_node: 连接的上游节点
        downstream_node: 连接的下游节点
        '''
        self.upstream_node = upstream_node  # 连接的上游节点
        self.downstream_node = downstream_node  # 连接的下游节点
        self.weight = random.uniform(-0.1, 0.1)  # 权重值,初始化为[-0.1, 0.1]之间的随机数
        self.gradient = 0.0  # 权重的梯度

    def calc_gradient(self):
        '''
        计算梯度,公式④
        '''
        self.gradient = self.downstream_node.delta * self.upstream_node.output

    def get_gradient(self):
        '''
        获取当前的梯度,"梯度检查"时使用
        '''
        return self.gradient

    def update_weight(self, rate):
        '''
        根据梯度下降法更新权重。为了方便,更新权重时先自动计算梯度。
        '''
        self.calc_gradient()
        self.weight -= rate * self.gradient


# 5.Connections类,提供Connection集合操作
class Connections(object):
    def __init__(self):
        self.connections = []

    def add_connection(self, conn):
        self.connections.append(conn)


# 6.Network神经网络
# 由若干Layer层对象+若干Connection连接对象组成,提供模型训练的功能,
# 包括前向传播(计算节点的输出)、反向传播(计算节点的误差项、计算权重的梯度)和权值更新等功能。
class Network(object):
    def __init__(self, layers):
        '''
        初始化一个全联接神经网络。
        layers: 一维数组,描述神经网络每层节点数。形如[3, 32, 10]表示输入层
                的特征维度为3,只有一个隐藏层且节点数为32,输出层节点数为10。
                (这里不包括ConstNode节点,常量节点的定义是为了计算偏置项bias)
        '''
        self.connections = Connections()  # 所有连接集合
        self.layers = []  # 所有层

        layer_count = len(layers)
        for layer_index in range(layer_count):  # 初始化层和节点
            self.layers.append(Layer(layer_index, layers[layer_index]))
        for i in range(layer_count - 1):  # 初始化连接
            connections = [Connection(upstream_node, downstream_node)
                           for upstream_node in self.layers[i].nodes
                           for downstream_node in self.layers[i + 1].nodes[:-1]]
            for conn in connections:  # 将连接添加到对应的节点中
                self.connections.add_connection(conn)  # "梯度检查"时使用
                conn.upstream_node.add_downstream_connection(conn)
                conn.downstream_node.add_upstream_connection(conn)

    def train(self, features, labels, rate, iteration):
        '''
        训练神经网络。
        features: 数组,训练样本特征。每个元素是一个样本的特征。
        labels: 数组,训练样本标签。每个元素是一个样本的标签。
        rate: 学习率
        iteration: 训练迭代次数
        '''
        for i in range(iteration):
            for j in range(len(features)):
                self.train_one_sample(features[j], labels[j], rate)

    def train_one_sample(self, feature, label, rate):
        '''
        内部函数,用一个样本训练网络。
        label: 单个样本的标签
        feature: 单个样本的特征
        rate: 学习率
        '''
        self.predict(feature)  # 前向过程
        self.calc_delta(label)  # 计算误差项
        self.update_weight(rate)  # 更新权重

    def calc_delta(self, label):
        '''
        内部函数,计算每个节点的误差项delta。
        label: 单个样本的标签
        '''
        # 先计算输出层节点的误差项
        for node, tag in zip(self.layers[-1].nodes[:-1], label):
            node.calc_output_layer_delta(tag)
        # 再计算隐藏层的误差项,从深层到浅层反向计算。
        for layer in self.layers[-2:0:-1]:  # 输入层的误差项不需要计算
            for node in layer.nodes:
                node.calc_hidden_layer_delta()

    def update_weight(self, rate):
        '''
        内部函数,更新每个连接的权重。
        rate:学习率
        '''
        for layer in self.layers[:-1]:
            for node in layer.nodes:
                for conn in node.downstream:
                    conn.update_weight(rate)

    def predict(self, feature):
        '''
        前向过程,即根据输入的样本特征计算每个节点的输出。
        feature: 单个样本的特征
        '''
        self.layers[0].set_output(feature)  # 直接将sample设置给输入层
        for i in range(1, len(self.layers)):  # 计算隐藏层和输出层的输出
            self.layers[i].calc_output()

        # 返回输出层的output。之前定义隐藏层时多加了一个ConstNode节点,要舍去。
        return list(map(lambda node: node.output, self.layers[-1].nodes[:-1]))

    def get_gradient(self, feature, label):
        '''
        计算网络在当前样本下的每个连接的梯度。用于"梯度检查"操作。
        label: 当前样本的标签
        sample: 当前样本的特征
        '''
        self.predict(feature)  # 前向过程
        self.calc_delta(label)  # 计算误差项
        self.calc_gradient()  # 计算梯度

    def calc_gradient(self):
        '''
        内部函数,计算每个连接中的梯度。
        '''
        for layer in self.layers[:-1]:
            for node in layer.nodes:
                for conn in node.downstream:
                    conn.calc_gradient()
梯度检查

根据导数的定义:
\qquad \qquad f'(\theta) = \lim_{\epsilon \to 0} \frac{f(\theta + \epsilon) - f(\theta - \epsilon)}{2 \epsilon}
E_d看作w_{ji}的导数,有:
\qquad \qquad \frac{\partial E_d(w_{ji})}{\partial w_{ji}} = \lim_{\epsilon \to 0} \frac{E_d(w_{ji} + \epsilon) - E_d(w_{ji} - \epsilon)}{2 \epsilon}
因此,如果把\epsilon设置为一个很小的数(如10^{-5}),可知:
\qquad \qquad \frac{\partial E_d(w_{ji})}{\partial w_{ji}} \approx \frac{E_d(w_{ji} + \epsilon) - E_d(w_{ji} - \epsilon)}{2 \epsilon} \qquad

为确保代码实现没有BUG,我们根据公式⑥来进行梯度检查。将手动计算的近似梯度与网络计算的梯度相比较,如果两者相差非常小,那么就说明代码实现是正确的。
检查权重w_{ji}的梯度的几个步骤:
1、初始化一个神经网络,使用一个样本进行训练,计算每个w_{ji}的梯度。
2、将w_{ji}加上一个很小的数\epsilon,重新计算神经网络在这个样本d下的误差E_{d+}
3、将w_{ji}减去一个很小的数\epsilon,重新计算神经网络在这个样本d下的误差E_{d-}
4、根据公式⑦计算近似的梯度值,并与第一步中网络计算的梯度值做比较。

梯度检查的代码实现如下:

def gradient_check(network, sample_feature, sample_label):
    '''
    梯度检查
    network: 神经网络对象
    sample_feature: 样本的特征
    sample_label: 样本的标签
    '''
    # 定义计算网络误差的函数
    network_error = lambda vec1, vec2: \
        0.5 * reduce(lambda a, b: a + b,
                     map(lambda v: (v[0] - v[1]) ** 2, zip(vec1, vec2)))

    # 计算网络在当前样本下每个连接的梯度
    network.get_gradient(sample_feature, sample_label)
    # 对每个权重做梯度检查
    for conn in network.connections.connections:
        # 获取指定连接的梯度
        actual_gradient = conn.get_gradient()

        # 加上一个很小的值,计算网络的误差
        epsilon = 0.00001
        conn.weight += epsilon
        error1 = network_error(network.predict(sample_feature), sample_label)

        # 减去一个很小的值,计算网络的误差
        conn.weight -= 2 * epsilon  # 刚才加过了一次,因此这里需要减去2倍
        error2 = network_error(network.predict(sample_feature), sample_label)

        # 根据公式⑦手动计算近似的梯度值
        expected_gradient = (error1 - error2) / (2 * epsilon)

        # 打印
        print('expected gradient: \t%f\nactual gradient: \t%f' % (expected_gradient, actual_gradient))


if __name__ == '__main__':
    # 输入层特征数为3,隐藏层节点数为4,输出层节点数为2
    network = Network(layers=[3, 4, 2])
    sample_feature = [0.8, 0.6, 0.2]
    sample_label = [0.9, 0.1]
    gradient_check(network, sample_feature, sample_label)

至此,我们已经完成了BP算法的推导、代码实现和BUG检查。接下来还需要不断的实践,我们用刚刚写过的神经网络去识别手写数字。

五、神经网络实战——手写数字识别

MNIST手写数字识别数据集,数字为0-9的一个10分类问题。60000个训练集+10000个测试集,每个训练数据是28*28的灰度图,一共784个像素。

超参数的确定

如何确定网络的层数和每层的节点数?这个没有什么理论化的方法,大家都是根据经验来设计。如果没有经验,我们就多设计几种模型,进行训练对比,然后选择效果最好的那一个。这是一门手艺+技术活,也是为什么算法工程师有时候被戏称为“调参侠”或“炼丹师”。

不过,有些基本道理我们还是明白的,我们知道网络层数越多越好,也知道层数越多训练难度越大。对于全连接网络,隐藏层最好不要超过三层。那么,我们可以先试试仅有一个隐藏层的神经网络效果怎么样。毕竟模型小的话,训练起来也快些。

输入层的节点数是确定的,一个像素对应一个特征,设置为784。

输出层的节点数也是确定的,用每个节点对应一个分类,设置为10。输出层10个节点中,输出最大值的那个节点对应的分类,就是模型的预测结果。

隐藏层的节点数是不好确定的,需要多次炼丹才能确定更好的模型。在这里,我们根据几个经验公式,先暂定隐藏层节点数为100。
\qquad \qquad m = \sqrt{n + l} + \alpha,m = \sqrt{nl},m = \log_2 n
n:输入层节点数,l:输出层节点数,m:隐藏层节点数。\alpha:1到10之间的常数。

模型的训练和评估

用60000个训练样本进行模型训练,用10000个测试样本进行测试评估。
我们用准确率作为模型的评估指标:
\qquad \qquad 正确率/准确率 = \frac{正确预测的样本数}{总样本数}
每训练10轮,评估一次准确率。当准确率开始下降时(出现了过拟合)终止训练。

代码实现

首先,我们需要把MNIST数据集处理为神经网络能够接受的形式。MNIST训练集的文件格式可以参考官方网站,这里不在赘述,如下图所示:

每个训练样本是一个28*28的图像,我们按照行优先,把它转化为一个784维的向量。每个标签是0-9的值,我们将其转换为一个10维的one-hot向量:如果标签值为n,我们就把向量的第n维(从0开始编号)设置为1,而其它维设置为0。例如,向量[0,0,1,0,0,0,0,0,0,0]表示标签值为2。

代码实现如下:

from bp import *
from datetime import datetime


# 数据加载器基类
class Loader(object):
    def __init__(self, path, count):
        '''
        初始化加载器
        path: 数据文件路径
        count: 文件中的样本个数
        '''
        self.path = path
        self.count = count

    def get_file_content(self):
        '''
        读取文件内容
        '''
        f = open(self.path, 'rb')
        content = f.read()
        f.close()
        return content


# 图像数据加载器
class ImageLoader(Loader):
    def norm(self, feature):
        '''
        内部函数,将一个像素值归一化到-0.5~0.5
        '''
        return feature / 255 - 0.5

    def get_picture(self, content, index):
        '''
        内部函数,从文件中获取图像
        '''
        start = index * 28 * 28 + 16
        picture = []
        for i in range(28):
            picture.append([])
            for j in range(28):
                picture[i].append(
                    self.norm(content[start + i * 28 + j]))
        return picture

    def get_one_sample(self, picture):
        '''
        内部函数,将图像转化为样本的输入向量
        '''
        sample = []
        for i in range(28):
            for j in range(28):
                sample.append(picture[i][j])
        return sample

    def load(self):
        '''
        加载数据文件,获得全部样本的输入向量
        '''
        content = self.get_file_content()
        data_set = []
        for index in range(self.count):
            data_set.append(
                self.get_one_sample(
                    self.get_picture(content, index)))
        return data_set


# 标签数据加载器
class LabelLoader(Loader):
    def norm(self, label):
        '''
        内部函数,将一个值转换为10维标签向量(one-hot编码)
        '''
        label_vec = []
        for i in range(10):
            if i == label:
                label_vec.append(1)
            else:
                label_vec.append(0)
        return label_vec

    def load(self):
        '''
        加载数据文件,获得全部样本的标签向量
        '''
        content = self.get_file_content()
        labels = []
        for index in range(self.count):
            labels.append(self.norm(content[index + 8]))
        return labels


def get_train_data_set():
    '''
    获得训练数据集
    '''
    image_loader = ImageLoader('train-images-idx3-ubyte', 60000)
    label_loader = LabelLoader('train-labels-idx1-ubyte', 60000)
    return image_loader.load(), label_loader.load()


def get_test_data_set():
    '''
    获得测试数据集
    '''
    image_loader = ImageLoader('t10k-images-idx3-ubyte', 10000)
    label_loader = LabelLoader('t10k-labels-idx1-ubyte', 10000)
    return image_loader.load(), label_loader.load()


def get_result(vec):
    '''
    获取向量最大值的索引。输出的一个10维向量中,最大值的索引为网络识别的类别。
    '''
    max_value_index = 0
    max_value = 0
    for i in range(len(vec)):
        if vec[i] > max_value:
            max_value = vec[i]
            max_value_index = i
    return max_value_index


def evaluate(network, test_data_set, test_labels):
    '''
    模型评估:计算测试集的正确率
    '''
    correct = 0
    total = len(test_data_set)
    for i in range(total):
        label = get_result(test_labels[i])
        predict = get_result(network.forward(test_data_set[i]))
        if label == predict:
            correct += 1
    return float(correct) / float(total)


def train_and_evaluate():
    '''
    网络模型的训练和评估
    '''
    network = Network([784, 100, 10])
    train_data_set, train_labels = get_train_data_set()
    test_data_set, test_labels = get_test_data_set()

    last_correct_ratio = 0.0
    epoch = 0
    while True:
        epoch += 1
        network.train(train_data_set, train_labels, rate=0.3, iteration=1)
        print('%s epoch %d finished' % (datetime.now(), epoch))
        if epoch % 2 == 0:
            correct_ratio = evaluate(network, test_data_set, test_labels)
            print('%s after epoch %d, correct ratio is %f' % (datetime.now(), epoch, correct_ratio))
            if correct_ratio < last_correct_ratio:
                break
            else:
                last_correct_ratio = correct_ratio


if __name__ == "__main__":
    train_and_evaluate()

可以正常训练,但训练速度很慢,需要对代码做很多的性能优化工作。下面介绍向量化编程。

六、向量化编程

让我们告别面向对象编程,去用一种更适合深度学习算法的编程方式:向量化编程。主要有两个原因:一个是我们事实上并不需要真的去定义Node、Layer这样的对象,直接把数学计算实现了就可以了;另一个原因,是底层算法库会针对向量运算做优化(甚至有专用的硬件,比如GPU),程序效率会提升很多。所以,在深度学习的世界里,我们总会想法设法的把计算表达为向量的形式。

重新实现前面的全连接神经网络:

# 全连接神经网络的优化版本——向量化编程
import numpy as np


# 全连接层实现类
class FullConnectedLayer(object):
    def __init__(self, input_size, output_size, activator):
        '''
        构造函数
        input_size: 本层输入向量的维度
        output_size: 本层输出向量的维度
        activator: 激活函数
        '''
        self.input_size = input_size
        self.output_size = output_size
        self.activator = activator
        self.W = np.random.uniform(-0.1, 0.1, (output_size, input_size))  # 权重数组W
        self.b = np.zeros((output_size, 1))  # 偏置项b
        self.output = np.zeros((output_size, 1))

    def forward(self, input_array):
        '''
        前向计算
        input_array: 输入向量,维度必须等于input_size
        '''
        self.input = input_array
        self.output = self.activator.forward(np.dot(self.W, input_array) + self.b)

    def backward(self, delta):
        '''
        反向计算W和b的梯度,同时计算并返回上层节点的误差项
        delta: 反向传递过来的误差项,即为当前层节点的误差项
        '''
        self.delta = delta
        self.W_grad = np.dot(delta, self.input.T)
        self.b_grad = delta
        return self.activator.backward(self.input) * np.dot(self.W.T, delta)

    def update(self, learning_rate):
        '''
        使用梯度下降算法更新权重
        '''
        self.W -= learning_rate * self.W_grad
        self.b -= learning_rate * self.b_grad


# Sigmoid激活函数类
class SigmoidActivator(object):
    def forward(self, weighted_input):
        return 1.0 / (1.0 + np.exp(-weighted_input))

    def backward(self, output):
        return output * (1 - output)


# 神经网络类
class Network(object):
    def __init__(self, layers):
        '''
        构造函数
        '''
        self.layers = []
        for i in range(len(layers) - 1):
            self.layers.append(FullConnectedLayer(layers[i], layers[i + 1], SigmoidActivator()))

    def predict(self, feature):
        '''
        前向过程,即根据输入的样本特征计算每个节点的输出。
        feature: 输入样本特征
        '''
        output = feature
        for layer in self.layers:
            layer.forward(output)
            output = layer.output
        return output

    def train(self, features, labels, rate, iteration):
        '''
        训练函数
        labels: 样本标签
        features: 样本特征
        rate: 学习速率
        iteration: 训练轮数
        '''
        for i in range(iteration):
            for d in range(len(features)):
                self.train_one_sample(features[d], labels[d], rate)

    def train_one_sample(self, feature, label, rate):
        self.predict(feature[:, None])
        self.calc_gradient(label[:, None])
        self.update_weight(rate)

    def calc_gradient(self, label):
        delta = (self.layers[-1].output - label) * self.layers[-1].activator.backward(self.layers[-1].output)
        for layer in self.layers[::-1]:
            delta = layer.backward(delta)
        return delta

    def update_weight(self, rate):
        for layer in self.layers:
            layer.update(rate)

重新加载数据集,并进行模型训练:

from bp_optimize import *
from datetime import datetime
import numpy as np


def load_images(filepath):
    with open(filepath, 'rb') as f:
        data = f.read()

    # 解析前16字节为4个数
    magic_number, num_items = int.from_bytes(data[:4], byteorder="big"), int.from_bytes(data[4:8], byteorder="big")
    if magic_number != 2051:
        raise TypeError("this is not a image file.")

    # 使用numpy解析二进制
    return np.frombuffer(data[16:], dtype=np.uint8).reshape(num_items, -1)


def load_labels(filepath):
    with open(filepath, 'rb') as f:
        data = f.read()

    # 解析前8字节为2个数
    magic_number, num_items = int.from_bytes(data[:4], byteorder="big"), int.from_bytes(data[4:8], byteorder="big")
    # 验证是否是label文件,否则手动抛出异常
    if magic_number != 2049:
        raise TypeError("this is not a label file.")

    # 使用numpy解析二进制
    labels = np.frombuffer(data[8:], dtype=np.uint8)
    return labels


def to_onehot(labels, num_classes=10):  # num_classes表示类别数
    labels_onehot = np.zeros((labels.shape[0], num_classes))
    for i, label in enumerate(labels):
        labels_onehot[i][label] = 1
    return labels_onehot


def get_train_data_set():
    '''
    获得训练数据集
    '''
    train_images = load_images('train-images-idx3-ubyte')
    train_labels = load_labels('train-labels-idx1-ubyte')
    train_images = train_images / 255 - 0.5  # 归一化到-0.5~0.5
    train_labels = to_onehot(train_labels)  # 转换为one-hot编码
    return train_images, train_labels


def get_test_data_set():
    '''
    获得测试数据集
    '''
    test_images = load_images('t10k-images-idx3-ubyte')
    test_labels = load_labels('t10k-labels-idx1-ubyte')
    test_images = test_images / 255 - 0.5  # 归一化到-0.5~0.5
    test_labels = to_onehot(test_labels)  # 转换为one-hot编码
    return test_images, test_labels


def get_result(vec):
    '''
    获取向量最大值的索引。输出的一个10维向量中,最大值的索引为网络识别的类别。
    '''
    max_value_index = 0
    max_value = 0
    for i in range(len(vec)):
        if vec[i] > max_value:
            max_value = vec[i]
            max_value_index = i
    return max_value_index


def evaluate(network, test_data_set, test_labels):
    '''
    模型评估:计算测试集的正确率
    '''
    correct = 0
    total = len(test_data_set)
    for i in range(total):
        predict_res = network.predict(test_data_set[i][:, None])
        if np.argmax(test_labels[i]) == np.argmax(predict_res):
            correct += 1
    return float(correct) / float(total)


def train_and_evaluate():
    '''
    网络模型的训练和评估
    '''
    network = Network([784, 100, 10])
    train_data_set, train_labels = get_train_data_set()
    test_data_set, test_labels = get_test_data_set()

    last_correct_ratio = 0.0
    epoch = 0
    while True:
        epoch += 1
        network.train(train_data_set, train_labels, rate=0.3, iteration=1)
        print('%s epoch %d finished' % (datetime.now(), epoch))
        if epoch % 10 == 0:
            correct_ratio = evaluate(network, test_data_set, test_labels)
            print('%s after epoch %d, correct ratio is %f' % (datetime.now(), epoch, correct_ratio))
            if correct_ratio < last_correct_ratio:
                break
            else:
                last_correct_ratio = correct_ratio


if __name__ == "__main__":
    train_and_evaluate()

完美!不仅代码变得简洁,训练速度也快了几百倍。下面是训练10轮的效果:


参考资料:https://www.zybuluo.com/hanbingtao/note/476663

上一篇下一篇

猜你喜欢

热点阅读