每天AI你三千遍

深度残差网络理解与Tensorflow/keras/pytorc

2019-05-23  本文已影响30人  AI_Engine

欢迎关注本人的微信公众号AI_Engine

各位码神大家好,这是深度学习进阶的第一篇,不要吝啬你们的赞和转发哟~

深度残差网络 ResNet 的作者何凯明在ILSVRC和COCO 2015上的战绩中取得5项第一,这是CNN在图像领域中的一件里程碑事件。

从过往的建模经验来看,随着神经网络的深度增加,模型的复杂度越高,算法在一定程度上的效果也就越好。上述这一点已经在我之前的博文中证明过了,但是所有的事件都要遵循一个道理:过犹不及,物极必反。因为网络深度增加时,网络准确度一定会出现饱和状态,如果继续进行层次的加深,模型的准确效果会出现下降的现象。下图是使用20层网络和56层网络在cifar10数据中的表现,可以看出无论是在训练数据还是在测试数据中56层的神经网络效果比20层的效果要差一些。

上述现象其实是深度网络退化问题,不是过拟合问题。随着网络的加深,神经网络会存在梯度弥散的可能,这样参数训练的过程中不会找到最佳的方向,损失函数也永远不会找到全局最小值,这种诟病在循环神经网络RNN中也存在。所以说简单堆叠网络,算法效果不会更好。于是何博士提出了残差学习来解决退化问题,对于一个堆积层结构(几层堆积而成)当输入为x其学习到的特征为H(x),如果我们认为H(x)=x+y,并令y=f(x),那么可以得到H(x)=x+f(x),由于x本身作为一种输入,所以不用去学习,所以我们只要拟合出残差f(x)就可以达到学习到H(x)的目的,即f(x)=H(x)-x。但是我们要学习到的新特征还是H(x),它是有输入+残差组成的:f(x)+x。那这样做有什么好处呢?1.当残差为0时,此时堆积层仅仅做了恒等映射,至少网络性能不会下降。2.实际上残差不会为0,这也会使得堆积层在输入特征基础上学习到新的特征,从而拥有更好的性能。残差学习的单元结构===>残差模块(注意这里说的是学习单元,并不是网络结构)如图所示:

有了残差模块(residual block)这个概念,我们再来设计网络架构,架构很简单,基于VGG19的架构,我们首先把网络增加到34层,增加过后的网络我们叫做plain network,再此基础上,增加残差模块,得到我们的Residual Network。我们可以可以看到ResNet有很多旁路的支线将上一个残差模块的输出参与到本次残差模块的输出,这种连接方式被称为shortcut或skip connections.

下面我们再分析一下残差单元,ResNet使用两种残差单元,如左图对应的是浅层网络,而右图对应的是深层网络。当输入和输出维度一致时,可以直接将输入加到输出上。

但是:

1.当维度不一致时(对应的是维度增加一倍),这就不能直接相加。所以有两种策略:a.采用zero-padding增加维度,此时一般要先做一个downsamp,可以采用strde=2的池化(pooling),这样不会增加参数。b.采用新的映射(projection shortcut),一般采用1x1的卷积,这样会增加参数,也会增加计算量。

2.当channel不一致的时候,也不可以直接相加。此时对输入x进行卷积操作,使其channel数量与f(x)保持一致,然后二者相加得到H(x)

现在我们已经对残差单元和残差网络有了一定的了解,下面就要撸代码了,本篇主要通过tensorflow,keras,pytorch三个主流框架对深度残差网络进行实现,首先就是tensorflow实现ResNet V2。ResNet V2与上述介绍的ResNet V1主要区别在于:首先各个残差块通过skip connections连接时使用的Relu激活函数被替换为Identity Mapping(y=x)。其次,V2的残差模块在每层中都使用了BN归一化处理。这样变更后残差模块的训练会更容易,而且模型的泛化能力得到了增强。

# coding=utf-8

import tensorflowas tf

import collections

from tensorflow.contribimport slim

from tensorflow.contrib.layers.python.layersimport utils

class Block(collections.namedtuple('block',['name','residual_unit','args'])):

'A named tuple descrbing a RseNet Block'

def conv2d_same(inputs,num_outputs,kernel_size,stride,scope=None):

if stride==1:

return slim.conv2d(inputs,num_outputs,kernel_size,stride=1,padding='SAME',scope=scope)

else:

pad_begin=(kernel_size-1)//2

        pad_end=kernel_size-1-pad_begin

inputs=tf.pad(inputs,[[0,0],[pad_begin,pad_end],[pad_begin,pad_end],[0,0]])

return slim.conv2d(inputs,num_outputs,kernel_size,stride=stride,padding='VALID',

                          normalizer_fn=None,activation_fn=tf.nn.relu,scope=scope)

@slim.add_arg_scope

def residual_unit(inputs,depth,depth_residual,stride,outputs_collections=None,scope=None):

with tf.variable_scope(scope,'residual_v2',[inputs])as sc:

#获取输入的通道数,及inputs的最后一个元素

        depth_input=utils.last_dimension(inputs.get_shape(),min_rank=4)

#使用slim.batch_norm()函数进行BatchNormalization操作

        preactivate=slim.batch_norm(inputs,activation_fn=tf.nn.relu,scope='preactivate')

if depth==depth_input:

if stride==1:

identity=inputs

else:

identity=slim.max_pool2d(inputs,[1,1],stride=stride,scope='shortcut')

else:

identity=slim.conv2d(preactivate,depth,[1,1],stride=stride,normalizer_fn=None,activation_fn=None,scope='shortcut')

residual=slim.conv2d(preactivate,depth_residual,[1,1],stride=1,normalizer_fn=None,activation_fn=tf.nn.relu,scope='conv1')

residual=conv2d_same(residual,depth_residual,3,stride=stride,scope='conv2')

residual=slim.conv2d(residual,depth,[1,1],stride=1,normalizer_fn=None,activation_fn=None,scope='conv3')

output=identity+residual

output=utils.collect_named_outputs(outputs_collections,sc.name,output)

return output

def resnet_v2(inputs,blocks,num_classes,reuse=None,scope=None):

with tf.variable_scope(scope,'resnet_v2',[inputs],reuse=reuse)as sc:

end_points_collection=sc.original_name_scope+'_end_points'

        with slim.arg_scope([residual_unit],outputs_collections=end_points_collection):

with slim.arg_scope([slim.conv2d],activation_fn=None,normalizer_fn=None):

net=conv2d_same(inputs,num_outputs=64,kernel_size=7,stride=2,scope='conv1')

net=slim.max_pool2d(net,[3,3],stride=2,scope="pool1")

for blockin blocks:

with tf.variable_scope(block.name,'block',[net])as sc:

for i,tuple_valuein enumerate(block.args):

with tf.variable_scope('unit_%d' % (i+1),values=[net]):

depth,depth_bottleneck,stride=tuple_value

net=block.residual_unit(net,depth=depth,depth_residual=depth_bottleneck,stride=stride)

net=utils.collect_named_outputs(end_points_collection,sc.name,net)

net=slim.batch_norm(net,activation_fn=tf.nn.relu,scope='postnorm')

net=tf.reduce_mean(net,[1,2],name='pool5',keep_dims=True)

if num_classesis not None:

net=slim.conv2d(net,num_classes,[1,1],activation_fn=None,normalizer_fn=None,scope='logits')

return net

def resnet_v2_50(inputs,num_classes=None,global_pool=True,reuse=None,scope='resnet_v2_6'):

blocks = [Block('block1', residual_unit, [(64, 64, 2)])]

'''

blocks=[Block('block1',residual_unit,[(256,64,1),(256,64,1),(256,64,2)]),

Block('block2',residual_unit,[(512,128,1)]*3+[(512,128,2)]),

Block('block3', residual_unit, [(1024, 256, 1)] * 5 + [(1024, 256, 2)]),

Block('block4', residual_unit, [(2048, 512, 1)] * 3 )]

'''

    return resnet_v2(inputs,blocks,num_classes,reuse=reuse,scope=scope)

inputs=tf.random_uniform([1,32,32,3])

net=resnet_v2_50(inputs,num_classes=10)

saver=tf.train.Saver()

with tf.Session()as sess:

sess.run(tf.global_variables_initializer())

net=sess.run(net)

saver.save(sess,'./resnet.ckpt')

训练后模型的部分结构如下图:

好吧,有些凌乱哈。说实话tensorflow在实现这种稍微复杂的神经网络的时候显得还是有一些麻烦,容易出错。但是没有办法,谁让人家是主流呢。下面我们看看tensorflow的高级框架keras是如何实现resnet_v2_50的。

# coding=utf-8

import collections

from keras.modelsimport Model

from keras.layersimport Input,Dense,Dropout,BatchNormalization,Conv2D,MaxPool2D,AveragePooling2D,ReLU,ZeroPadding2D,add,Flatten

from tensorflow.contrib.layers.python.layersimport utils

class Block(collections.namedtuple('block',['name','residual_unit','args'])):

'A named tuple descrbing a RseNet Block'

def residual_unit(inputs,depth,residual_depth,stride):

depth_input=utils.last_dimension(inputs.get_shape(),min_rank=4)

inputs=BatchNormalization(axis=3)(inputs)

if depth==depth_input:

if stride==(1,1):

identity=inputs

else:

identity=MaxPool2D(pool_size=(1,1),strides=stride)(inputs)

else:

identity=Conv2D(filters=depth,kernel_size=1,strides=stride,activation='relu')(inputs)

x=Conv2D(residual_depth,kernel_size=1,strides=(1,1),padding='same',activation='relu')(inputs)

x=Conv2D(residual_depth,kernel_size=3,strides=stride,padding='same',activation='relu')(x)

x=Conv2D(depth,kernel_size=1,strides=(1,1),padding='same',activation='relu')(x)

out_puts=add([x,identity])

return out_puts

def resnet_v2_50():

#blocks = [Block('block1', residual_unit, [(64, 64, 2)])]

    blocks=[Block('block1',residual_unit,[(256,64,1),(256,64,1),(256,64,2)]),

            Block('block2',residual_unit,[(512,128,1)]*3+[(512,128,2)]),

            Block('block3', residual_unit, [(1024, 256, 1)] *5 + [(1024, 256, 2)]),

            Block('block4', residual_unit, [(2048, 512, 1)] *3 )]

return resnet_v2(blocks,224,224,3,10)

def resnet_v2(blocks,weights,hight,channel,classes):

inputs=Input(batch_shape=(None,weights,hight,channel))

x=ZeroPadding2D((3,3))(inputs)

x=Conv2D(filters=64,kernel_size=(7,7),strides=(2,2),padding='valid')(x)

x=MaxPool2D(pool_size=(3,3),strides=(2,2))(x)

for blockin blocks:

for i, tuple_valuein enumerate(block.args):

depth, depth_bottleneck, stride = tuple_value

x = block.residual_unit(x, depth=depth, residual_depth=depth_bottleneck, stride=stride)

x=BatchNormalization(axis=3)(x)

x=ReLU()(x)

x=AveragePooling2D(pool_size=(2,2))(x)

x=Flatten()(x)

net=Dense(units=classes,activation='softmax')(x)

model=Model(inputs,net)

return model

model_file =r'./resnet_v2.h5'

model = resnet_v2_50()

model.save(model_file)

训练后模型的部分结构如下图:

现在好看多了是吧。最后我们看看pytorch是如何实现ResNet的:

# coding=utf-8

import torchas t

from torchimport nn

from torch.nnimport functionalas F

import math

class Bottleneck(nn.Module):

expansion=4

    def __init__(self,in_channel,out_channel,stride=1,downsample=None):

super(Bottleneck,self).__init__()

self.conv1=nn.Conv2d(in_channel,out_channel,kernel_size=1,bias=False)

self.bn1=nn.BatchNorm2d(out_channel)

self.conv2=nn.Conv2d(out_channel,out_channel,kernel_size=3,stride=stride,padding=1,bias=False)

self.bn2=nn.BatchNorm2d(out_channel)

self.conv3=nn.Conv2d(out_channel,out_channel*self.expansion,kernel_size=1,bias=False)

self.bn3=nn.BatchNorm2d(out_channel*self.expansion)

self.relu=nn.ReLU(inplace=True)

self.downsample=downsample

self.stride=stride

def forward(self, x):

residual=x

out=self.conv1(x)

out=self.bn1(out)

out=self.relu(out)

out=self.conv2(out)

out=self.bn2(out)

out=self.relu(out)

out=self.conv3(out)

out=self.bn3(out)

if self.downsampleis not None:

residual=self.downsample(x)

out+=residual

out=self.relu(out)

return out

class ResNet_v2(nn.Module):

def __init__(self,block,layer,num_classes=10):

self.in_channel=64

        super(ResNet_v2,self).__init__()

self.conv1=nn.Conv2d(3,self.in_channel,kernel_size=7,stride=2,padding=3,bias=False)

self.bn1=nn.BatchNorm2d(self.in_channel)

self.relu=nn.ReLU(inplace=True)

self.maxpool=nn.MaxPool2d(kernel_size=3,stride=2,padding=1)

self.layer1=self._make_layer(block,self.in_channel,layer[0])

self.layer2=self._make_layer(block,128,layer[1],stride=2)

self.layer3=self._make_layer(block,256,layer[2],stride=2)

self.layer4=self._make_layer(block,512,layer[3],stride=2)

self.avgpool=nn.AvgPool2d(7,stride=1)

self.fc=nn.Linear(512*block.expansion,num_classes)

for min self.modules():

if isinstance(m, nn.Conv2d):

n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels

m.weight.data.normal_(0, math.sqrt(2. / n))

elif isinstance(m, nn.BatchNorm2d):

m.weight.data.fill_(1)

m.bias.data.zero_()

def _make_layer(self, block, planes, blocks, stride=1):

downsample =None

        if stride !=1 or self.inplanes != planes * block.expansion:

downsample = nn.Sequential(nn.Conv2d(self.inplanes, planes * block.expansion,kernel_size=1, stride=stride, bias=False),

                nn.BatchNorm2d(planes * block.expansion))

layers = []

layers.append(block(self.inplanes, planes, stride, downsample))

self.inplanes = planes * block.expansion

for iin range(1, blocks):

layers.append(block(self.inplanes, planes))

return nn.Sequential(*layers)

def forward(self, x):

x =self.conv1(x)

x =self.bn1(x)

x =self.relu(x)

x =self.maxpool(x)

x =self.layer1(x)

x =self.layer2(x)

x =self.layer3(x)

x =self.layer4(x)

x =self.avgpool(x)

x = x.view(x.size(0), -1)

return x

model=ResNet_v2(Bottleneck,[3,4,6,3])

t.save(model,'./resnet.pkl')

训练后模型的部分结构如下图:

这就是上述三种框架实现深度残差网络的基本过程,虽然现在框架逐渐完善而且易操作,但是坚持徒手撸一遍实现代码会对自己更好。即便不是工作需要,对自己的成长也是有好处的。同时呢常回忆,多理解,好啦,不早啦,各位码神早点休息吧~

上一篇下一篇

猜你喜欢

热点阅读