BatchNorm实现
2018-09-08 本文已影响55人
yanghedada
# -*- coding: utf-8 -*-
"""
Created on Wed Apr 25 16:47:30 2018
@author: yanghe
"""
import tensorflow as tf
import math
from tensorflow.examples.tutorials.mnist import input_data
print("Tensorflow version " + tf.__version__)
tf.set_random_seed(0.0)
# Download images and labels into mnist.test (10K images+labels) and mnist.train (60K images+labels)
mnist = input_data.read_data_sets(r'E:\python\mnist_data', one_hot=True)
# neural network structure for this sample:
#
# · · · · · · · · · · (input data, 1-deep) X [batch, 28, 28, 1]
# @ @ @ @ @ @ @ @ @ @ -- conv. layer +BN 6x6x1=>24 stride 1 W1 [5, 5, 1, 24] B1 [24]
# ∶∶∶∶∶∶∶∶∶∶∶∶∶∶∶∶∶∶∶ Y1 [batch, 28, 28, 6]
# @ @ @ @ @ @ @ @ -- conv. layer +BN 5x5x6=>48 stride 2 W2 [5, 5, 6, 48] B2 [48]
# ∶∶∶∶∶∶∶∶∶∶∶∶∶∶∶ Y2 [batch, 14, 14, 12]
# @ @ @ @ @ @ -- conv. layer +BN 4x4x12=>64 stride 2 W3 [4, 4, 12, 64] B3 [64]
# ∶∶∶∶∶∶∶∶∶∶∶ Y3 [batch, 7, 7, 24] => reshaped to YY [batch, 7*7*24]
# \x/x\x\x/ ✞ -- fully connected layer (relu+dropout+BN) W4 [7*7*24, 200] B4 [200]
# · · · · Y4 [batch, 200]
# \x/x\x/ -- fully connected layer (softmax) W5 [200, 10] B5 [10]
# · · · Y [batch, 10]
# input X: 28x28 grayscale images, the first dimension (None) will index the images in the mini-batch
X = tf.placeholder(tf.float32, [None, 28, 28, 1])
# correct answers will go here
Y_ = tf.placeholder(tf.float32, [None, 10])
# test flag for batch norm
tst = tf.placeholder(tf.bool)
iter = tf.placeholder(tf.int32)
# dropout probability
pkeep = tf.placeholder(tf.float32)
pkeep_conv = tf.placeholder(tf.float32)
def batchnorm(Ylogits, is_test, iteration, offset, convolutional=False):
exp_moving_avg = tf.train.ExponentialMovingAverage(0.999, iteration) # adding the iteration prevents from averaging across non-existing iterations
bnepsilon = 1e-5
if convolutional:
mean, variance = tf.nn.moments(Ylogits, [0, 1, 2])
else:
mean, variance = tf.nn.moments(Ylogits, [0])
update_moving_averages = exp_moving_avg.apply([mean, variance])
m = tf.cond(is_test, lambda: exp_moving_avg.average(mean), lambda: mean)
v = tf.cond(is_test, lambda: exp_moving_avg.average(variance), lambda: variance)
Ybn = tf.nn.batch_normalization(Ylogits, m, v, offset, None, bnepsilon)
return Ybn, update_moving_averages
def no_batchnorm(Ylogits, is_test, iteration, offset, convolutional=False):
return Ylogits, tf.no_op()
def compatible_convolutional_noise_shape(Y):
noiseshape = tf.shape(Y)
noiseshape = noiseshape * tf.constant([1,0,0,1]) + tf.constant([0,1,1,0])
return noiseshape
# three convolutional layers with their channel counts, and a
# fully connected layer (tha last layer has 10 softmax neurons)
K = 24 # first convolutional layer output depth
L = 48 # second convolutional layer output depth
M = 64 # third convolutional layer
N = 200 # fully connected layer
W1 = tf.Variable(tf.truncated_normal([6, 6, 1, K], stddev=0.1)) # 6x6 patch, 1 input channel, K output channels
B1 = tf.Variable(tf.constant(0.1, tf.float32, [K]))
W2 = tf.Variable(tf.truncated_normal([5, 5, K, L], stddev=0.1))
B2 = tf.Variable(tf.constant(0.1, tf.float32, [L]))
W3 = tf.Variable(tf.truncated_normal([4, 4, L, M], stddev=0.1))
B3 = tf.Variable(tf.constant(0.1, tf.float32, [M]))
W4 = tf.Variable(tf.truncated_normal([7 * 7 * M, N], stddev=0.1))
B4 = tf.Variable(tf.constant(0.1, tf.float32, [N]))
W5 = tf.Variable(tf.truncated_normal([N, 10], stddev=0.1))
B5 = tf.Variable(tf.constant(0.1, tf.float32, [10]))
# The model
# batch norm scaling is not useful with relus
# batch norm offsets are used instead of biases
stride = 1 # output is 28x28
Y1l = tf.nn.conv2d(X, W1, strides=[1, stride, stride, 1], padding='SAME')
Y1bn, update_ema1 = batchnorm(Y1l, tst, iter, B1, convolutional=True)
Y1r = tf.nn.relu(Y1bn)
Y1 = tf.nn.dropout(Y1r, pkeep_conv, compatible_convolutional_noise_shape(Y1r))
stride = 2 # output is 14x14
Y2l = tf.nn.conv2d(Y1, W2, strides=[1, stride, stride, 1], padding='SAME')
Y2bn, update_ema2 = batchnorm(Y2l, tst, iter, B2, convolutional=True)
Y2r = tf.nn.relu(Y2bn)
Y2 = tf.nn.dropout(Y2r, pkeep_conv, compatible_convolutional_noise_shape(Y2r))
stride = 2 # output is 7x7
Y3l = tf.nn.conv2d(Y2, W3, strides=[1, stride, stride, 1], padding='SAME')
Y3bn, update_ema3 = batchnorm(Y3l, tst, iter, B3, convolutional=True)
Y3r = tf.nn.relu(Y3bn)
Y3 = tf.nn.dropout(Y3r, pkeep_conv, compatible_convolutional_noise_shape(Y3r))
# reshape the output from the third convolution for the fully connected layer
YY = tf.reshape(Y3, shape=[-1, 7 * 7 * M])
Y4l = tf.matmul(YY, W4)
Y4bn, update_ema4 = batchnorm(Y4l, tst, iter, B4)
Y4r = tf.nn.relu(Y4bn)
Y4 = tf.nn.dropout(Y4r, pkeep)
Ylogits = tf.matmul(Y4, W5) + B5
Y = tf.nn.softmax(Ylogits)
update_ema = tf.group(update_ema1, update_ema2, update_ema3, update_ema4)
# cross-entropy loss function (= -sum(Y_i * log(Yi)) ), normalised for batches of 100 images
# TensorFlow provides the softmax_cross_entropy_with_logits function to avoid numerical stability
# problems with log(0) which is NaN
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=Ylogits, labels=Y_)
cross_entropy = tf.reduce_mean(cross_entropy)*100
# accuracy of the trained model, between 0 (worst) and 1 (best)
correct_prediction = tf.equal(tf.argmax(Y, 1), tf.argmax(Y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# matplotlib visualisation
allweights = tf.concat([tf.reshape(W1, [-1]), tf.reshape(W2, [-1]), tf.reshape(W3, [-1]), tf.reshape(W4, [-1]), tf.reshape(W5, [-1])], 0)
allbiases = tf.concat([tf.reshape(B1, [-1]), tf.reshape(B2, [-1]), tf.reshape(B3, [-1]), tf.reshape(B4, [-1]), tf.reshape(B5, [-1])], 0)
conv_activations = tf.concat([tf.reshape(tf.reduce_max(Y1r, [0]), [-1]), tf.reshape(tf.reduce_max(Y2r, [0]), [-1]), tf.reshape(tf.reduce_max(Y3r, [0]), [-1])], 0)
dense_activations = tf.reduce_max(Y4r, [0])
# training step
# the learning rate is: # 0.0001 + 0.03 * (1/e)^(step/1000)), i.e. exponential decay from 0.03->0.0001
lr = 0.0001 + tf.train.exponential_decay(0.02, iter, 1600, 1/math.e)
train_step = tf.train.AdamOptimizer(lr).minimize(cross_entropy)
# init
init = tf.global_variables_initializer()
sess = tf.Session()
sess.run(init)
import numpy as np
# You can call this function in a loop to train the model, 100 images at a time
def training_step(i, update_test_data, update_train_data):
# training on batches of 100 images with 100 labels
batch_X, batch_Y = mnist.train.next_batch(100)
batch_X = np.reshape(batch_X ,[-1,28,28,1])
# compute training values for visualisation
if update_train_data:
a, c, ca, da, l = sess.run([accuracy, cross_entropy, conv_activations, dense_activations, lr],
feed_dict={X: batch_X, Y_: batch_Y, iter: i, tst: False, pkeep: 1.0, pkeep_conv: 1.0})
print(str(i) + ": accuracy:" + str(a) + " loss: " + str(c) + " (lr:" + str(l) + ")")
# compute test values for visualisation
if update_test_data:
a, c = sess.run([accuracy, cross_entropy, ],
feed_dict={X: mnist.test.images, Y_: mnist.test.labels, tst: True, pkeep: 1.0, pkeep_conv: 1.0})
print(str(i) + ": ********* epoch " + str(i*100//mnist.train.images.shape[0]+1) + " ********* test accuracy:" + str(a) + " test loss: " + str(c))
# the backpropagation training step
sess.run(train_step, {X: batch_X, Y_: batch_Y, tst: False, iter: i, pkeep: 0.75, pkeep_conv: 1.0})
sess.run(update_ema, {X: batch_X, Y_: batch_Y, tst: False, iter: i, pkeep: 1.0, pkeep_conv: 1.0})
for i in range(100):
training_step(i, False,True)