tensorflow实战

深层神经网络对semeion手写数据集进行识别

2018-11-27  本文已影响0人  YinliX

运行环境

python3.6.7、tensorflow1.4.0

数据来源

https://archive.ics.uci.edu/ml/datasets/semeion+handwritten+digit

思路

数据分析

从数据及其说明可以知道,其原始数据共有1593组,每组数据都代表一张手写数字的图片,经过处理后化为256个像素,每个像素取值低于127设为0,高于127设为1,每组数据包含256个0-1特征和10个0-1型one-hot编码,代表了此数字的实际取值。在这里取前1195组数据作为训练集,用后面398组数据进行测试,后398组数据中0~9都有,且分布均匀,可以认为能很好地测试神经网络的分类效果。

模型构建

这里采用含有一个隐含层的神经网络,此隐含层有500个结点,采用ReLU激活函数,输入节点为256个,代表256个像素,输出节点为10个,分别代表0~9的10个数字。损失函数采用交叉熵,为了避免过拟合,引入L2正则化函数,总损失为交叉熵和正则化损失的和,另外为了迭代学习的平缓,采用了指数衰减的学习率进行学习,为了使模型更健壮,采用了滑动平均模型。模型的训练采用了基于batch的随机梯度下降算法,每个batch的数量为100。

实现细节

代码实现总共有四个文件,inference.py、load_data.py、train.py、eval.py。

inference.py是一个工具实现,用来封装神经网络的前向传播过程,可以选择是否提供正则化函数,因为在训练的时候需要提供正则化函数来避免过拟合,而测试验证数据的时候则不需要提供正则化函数,输入要预测的数据的特征张量即可输出预测结果。
load_data.py也是一个工具,是用来从文件中读取数据并将其处理并返回的,返回tr变量包括训练特征、训练标签、测试特征和测试标签四个矩阵。
train.py是用来训练神经网络的,在训练过程中,每10轮训练输出一次当前的损失值,并将当前模型保存至文件,共训练500次。
eval.py是用来测试神经网络的,和训练程序同步运行,每0.2秒读取存在文件中的最新模型,并将此模型用在测试集上,打印输出准确率,持续监视当前的新模型。

源代码

inference.py

# -*- coding: UTF-8 -*-
#Author:Yinli

import tensorflow as tf

'''
这是一个工具,用来封装神经网络的前向传播过程,可以选择是否采用正则化函数
只需传入输入的张量,即可输出最终预测值
因为这里只有一个程序调这个工具,所以命名空间不用声明reuse=true
'''

#定义相关参数,输入节点共784,因为一张图共784个像素点
#输出节点10个,因为0~9共10个数字,中间隐藏层设为500个结点
INPUT_NODE = 256
OUTPUT_NODE = 10
LAYER1_NODE = 500

#在每一层里面都有个weights通过命名空间的不同来达到解耦和的效果,
def get_weight_variable(shape, regularizer):
    #取名字为weights的变量记为weights
    weights = tf.get_variable("weights", shape, initializer=tf.truncated_normal_initializer(stddev=0.1))
    #如果给出了正则化函数,那么将当前的变量正则化之后的损失加入名为losses的集合
    if regularizer != None:
        tf.add_to_collection("losses", regularizer(weights))
    return weights

#定义神经网络的前向传播过程
def inference(input_tensor, regularizer):
    #首先在输出层到隐藏层的传播中定义命名空间为layer1
    with tf.variable_scope('layer1'):
        #给定权重矩阵的形状还有正则化函数,把weights取出来
        weights = get_weight_variable([INPUT_NODE, LAYER1_NODE], regularizer)
        #初始化偏置项,命名为biases
        biases = tf.get_variable("biases", [LAYER1_NODE], initializer=tf.constant_initializer(0.0))
        #计算隐藏层的值,用到了激活函数
        layer1 = tf.nn.relu(tf.matmul(input_tensor, weights) + biases)

    #然后在从隐藏层到输出层的传播中定义命名空间为layer2
    with tf.variable_scope('layer2'):
        #给定权重矩阵的形状还有正则化函数,把weights取出来
        weights = get_weight_variable([LAYER1_NODE, OUTPUT_NODE], regularizer)
        #创建初始化偏置项,命名为biases
        biases = tf.get_variable("biases", [OUTPUT_NODE], initializer=tf.constant_initializer(0.0))
        #计算输出层的值,这里不用激活函数
        layer2 = tf.matmul(layer1, weights) + biases

    #返回输出层的值
    return layer2

load_data.py

# -*- coding: UTF-8 -*-
#Author:Yinli

import numpy as np

import inference

'''
一个工具,用来从文件中读取数据并将其处理并返回结果
返回变量包括训练特征、训练标签、测试特征和测试标签四个矩阵
'''

#定义训练数据和测试数据的数量
NUM_OF_TRAINING = 1195
NUM_OF_TESTING = 1593-1195

def load_data():
    #从文件中逐行读取数据
    filename = "D:/tensorflow/code/第10次作业/data/semeion.data"
    fr = open(filename, 'r', encoding='utf-8')
    arrayOfLines = fr.readlines()
    #初始化训练特征矩阵,训练标签矩阵,测试特征矩阵和测试标签矩阵
    trainingImages = np.zeros([NUM_OF_TRAINING, inference.INPUT_NODE])
    trainingLabels = np.zeros([NUM_OF_TRAINING, inference.OUTPUT_NODE])
    testingImages = np.zeros([NUM_OF_TESTING, inference.INPUT_NODE])
    testingLabels = np.zeros([NUM_OF_TESTING, inference.OUTPUT_NODE])
    #处理用于训练的数据
    for i in range(NUM_OF_TRAINING):
        #去掉每一行末尾的空格和回车
        arrayOfLines[i] = arrayOfLines[i].rstrip(' \n')
        #将一行数据以空格分开存入列表
        currentLine = arrayOfLines[i].split(' ')
        #将特征值存入特征矩阵
        trainingImages[i][:] = currentLine[:256]
        #将标签值存入标签矩阵
        trainingLabels[i][:] = currentLine[256:]
    #处理用于测试的数据
    for i in range(NUM_OF_TESTING):
        #去掉每一行末尾的空格和回车
        arrayOfLines[i+NUM_OF_TRAINING] = arrayOfLines[i+NUM_OF_TRAINING].rstrip(' \n')
        #将一行数据以空格分开存入列表
        currentLine = arrayOfLines[i+NUM_OF_TRAINING].split(' ')
        #将特征值存入特征矩阵
        testingImages[i][:] = currentLine[:256]
        #将标签值存入标签矩阵
        testingLabels[i][:] = currentLine[256:]
    #返回处理后的矩阵
    return trainingImages,trainingLabels,testingImages,testingLabels

train.py

# -*- coding: UTF-8 -*-
#Author:Yinli

import os
import tensorflow as tf

import inference
import load_data

'''
训练神经网络的程序,每10轮训练输出一次当前的损失值
并将当前模型保存至文件,共训练500次
'''


#每批训练的数据数
BATCH_SIZE = 100
#定义初始学习率为0.8
LEARNING_RATE_BASE = 0.5
#定义学习率的衰减率为0.99
LEARNING_RATE_DECAY = 0.99
#定义正则化参数为0.0001
REGULARATION_RATE = 0.0001
#定义训练次数为500
TRAINING_STEPS = 500
#定义滑动平均率为0.99
MOVING_AVERAGE_DECAY = 0.99

#定义模型存储路径和文件名
MODEL_SAVE_PATH = "model/"
MODEL_NAME = "model.ckpt"

#定义训练函数
def train():
    #先定义输入输出的placeholder
    x = tf.placeholder(tf.float32, [None, inference.INPUT_NODE], name='x-input')
    y_ = tf.placeholder(tf.float32, [None, inference.OUTPUT_NODE], name='y-input')

    #定义正则化函数为l2正则
    regularizer = tf.contrib.layers.l2_regularizer(REGULARATION_RATE)
    #用工具计算预测结果值
    y = inference.inference(x,regularizer)
    #初始化全局步数,设定其为不可训练的参数
    global_step = tf.Variable(0,trainable=False)

    #初始化滑动平均类,给定滑动平均速率和训练轮数的变量
    variable_average = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
    #定义滑动平均的操作,在所有代表神经网络参数的变量上进行滑动平均,除了那些定义trainable=False的变量
    #tf.trainable_variables()返回的就是整张计算图中所有没有定义trainable=False的变量
    variable_average_op = variable_average.apply(tf.trainable_variables())
    #定义交叉熵,sparse_cross_entropy_with_logits是一个tensorflow提供的函数
    #logits参数指的是神经网络的前向传播结果,不包含softmax层
    #labels参数指的是正确结果,因为传入参数是个长度为10的一维数组,所有要指定argmax函数返回按行比较的最大值索引标号
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_,1))
    #用reduce_mean计算当前batch中所有样例的交叉熵平均值
    cross_entropy_mean = tf.reduce_mean(cross_entropy)
    #总损失等于交叉熵损失加上正则化损失的和,add_n实现的是一个集合元素的相加,这个集合在工具包里实现过
    loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
    #设置指数衰减的学习率,先指定基础学习率,再指定当前迭代的轮数
    #再指定过完所有训练数据需要迭代的次数为每次迭代的间隔
    #最后指定学习率衰减速度
    learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step,
                                               load_data.NUM_OF_TRAINING/BATCH_SIZE, LEARNING_RATE_DECAY)
    #使用梯度下降算法优化损失函数loss,这里损失函数包括了交叉熵和正则化损失,学习率是指数衰减的
    train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss,global_step=global_step)
    #因为每过一遍数据都要更新神经网络的参数,还要更新参数的滑动平均值
    #所以把两个操作合起来,下面两行程序相当于
    #train_op = tf.group(train_step, variables_average_op)
    with tf.control_dependencies([train_step,variable_average_op]):
        train_op = tf.no_op(name='train')

    #初始化tensorflow持久化类
    saver = tf.train.Saver()
    #创建一个默认会话
    with tf.Session() as sess:
        #初始化所有参数
        tf.global_variables_initializer().run()
        trainingImages, trainingLabels, testingImages, testingLabels = load_data.load_data()

        #循环训练
        for i in range(TRAINING_STEPS):
            #定义这一轮训练使用的一个batch的数据的起点和终点
            start = (i*BATCH_SIZE) % load_data.NUM_OF_TRAINING
            end = min(start+BATCH_SIZE, load_data.NUM_OF_TRAINING)
            #run返回三个结果,一个是训练步骤的返回结果,后续没有用,用_记录
            #另外返回训练后的损失值和当前迭代步数
            _, loss_value, step = sess.run([train_op, loss, global_step],
                                           feed_dict={x:trainingImages[start:end], y_:trainingLabels[start:end]})

            #每10轮训练输出一次当前的损失值,并将当前模型保存
            #这里指定了当前的轮数,这样每个保存文件后缀会加上当前轮数
            if i%10 == 0:
                print("经过%d次训练之后,损失为%g" % (step, loss_value))
                saver.save(sess,os.path.join(MODEL_SAVE_PATH, MODEL_NAME),global_step=global_step)

#主函数
def main(argv=None):
    train()

#入口
if __name__ == '__main__':
    tf.app.run()

eval.py

# -*- coding: UTF-8 -*-
#Author:Yinli

import time
import tensorflow as tf

import inference
import train
import load_data

'''
测试函数,每0.2秒读取存在文件中的最新模型
并将此模型用在测试集上,打印输出准确率
'''

#定义每1秒钟加载一次最新的模型,并在测试数据集上测试最新模型的正确率
EVAL_INTERVAL_SECS = 0.2

def evaluate():
    with tf.Graph().as_default() as g:
        #先定义好输入输出的格式
        x = tf.placeholder(tf.float32, [None, inference.INPUT_NODE], name='x-input')
        y_ = tf.placeholder(tf.float32, [None, inference.OUTPUT_NODE], name='y-input')

        trainingImages, trainingLabels, testingImages, testingLabels = load_data.load_data()
        #定义好输入输出的数据集
        validate_feed = {x:testingImages, y_:testingLabels}

        #计算前向传播的结果,因为测试的时候用不着正则化,所有将正则化函数设为None
        y = inference.inference(x,None)

        #计算正确率
        correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction,tf.float32))

        #通过变量重命名的方式来加载模型
        variable_averages = tf.train.ExponentialMovingAverage(train.MOVING_AVERAGE_DECAY)
        variables_to_restore = variable_averages.variables_to_restore()
        saver = tf.train.Saver(variables_to_restore)

        while True:
            with tf.Session() as sess:
                #get_checkpoint_state会通过checkpoint自动查找指定目录里的最新模型的文件名
                ckpt = tf.train.get_checkpoint_state(train.MODEL_SAVE_PATH)
                #如果有模型存在
                if ckpt and ckpt.model_checkpoint_path:
                    #加载模型
                    saver.restore(sess, ckpt.model_checkpoint_path)
                    #把此模型保存的迭代的轮数分离出来
                    global_step = ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1]
                    #计算此模型在测试集上的准确率,并打印输出
                    accuracy_score = sess.run(accuracy, feed_dict=validate_feed)
                    print("经过%s次训练后,测试集正确率为%g" % (global_step, accuracy_score))
                #没有保存的模型则返回
                else:
                    print("No checkpoint file found")
                    return
                #线程挂起0.2秒钟
                time.sleep(EVAL_INTERVAL_SECS)

#主函数
def main(argv=None):
    evaluate()

#入口
if __name__ == '__main__':
    tf.app.run()

运行结果

train.py运行结果(部分)
eval.py运行结果
测试集最终正确率

结果分析

从结果中可以看得到,经过训练,总损失逐渐变小,到100次训练之后,总损失维持在0.07的水平,模型趋近收敛。而在100次训练后的模型在测试集上的表现也甚佳,可以稳定在90%的正确率,在最终经过了约500次训练之后,模型在测试集上的最终正确率为91.2%,可以认为此模型对这个semeion手写数字数据集的识别效果很好。

上一篇下一篇

猜你喜欢

热点阅读