2022

Visual Transformer (ViT) 代码实现 Py

2021-07-08  本文已影响0人  HaloZhang

简介

本文的目的是通过实际代码编写来实现ViT模型,进一步加对ViT模型的理解,如果还不知道ViT模型的话,可以先看下博客了解一下ViT的整体结构。
本文整体是对Implementing Vision Transformer (ViT) in PyTorch 的翻译,但是也加上了一些自己的注解。如果读者更习惯看英文版,建议直接去看原文。

ViT模型整体结构

按照惯例,先放上模型的架构图,如下:

ViT模型 输入图片被划分为一个个16x16的小块,也叫做patch。接着这些patch被送入一个全连接层得到embeddings,然后在embeddings前前加上一个特殊的cls token。然后给所有的embedding加上位置信息编码positional encoding。接着这个整体被送入Transformer Encoder,然后取cls token的输出特征送入MLP Head去做分类,总体流程就是这样。
代码的整体结构跟ViT模型的结构类似,大体可以分为以下几个部分:
我们将以自底向上的方式来逐步实现ViT模型。

Data

首先需要导入相关的依赖库,如下:

import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt

from torch import nn
from torch import Tensor
from PIL import Image
from torchvision.transforms import Compose, Resize, ToTensor
from einops import rearrange, reduce, repeat
from einops.layers.torch import Rearrange, Reduce
from torchsummary import summary

首先我们需要打开一张图片,如下:

img = Image.open('./cat.jpg')
fig = plt.figure()
plt.imshow(img)

结果:

Cat
接着我们需要对图片进行预处理,主要是包含resize向量化等操作,代码如下:
# resize to imagenet size 
transform = Compose([Resize((224, 224)), ToTensor()])
x = transform(img)
x = x.unsqueeze(0) # 主要是为了添加batch这个维度
x.shape
输出

Pathches Embeddings

根据ViT模型结构,第一步是需要将图片划分为多个Patches,并且将其铺平。如下图:

原文的描述如下: 看起来很复杂,但是我们可以使用einops库来简化代码编写,如下:
patch_size = 16 # 16 pixels
pathes = rearrange(x, 'b c (h s1) (w s2) -> b (h w) (s1 s2 c)', s1=patch_size, s2=patch_size)
pathes.shape
结果
关于einops库的使用可以参考doc。这里解释一下这个结果[1,196,768]是怎么来的。我们知道原始图片向量x的大小为[1,3,224,224],当我们使用16x16大小的patch对其进行分割的时候,一共可以划分为224x224/16/16 = 196个patches,其次每个patch大小为16x16x3=768,故大小为[1,196,768]。
接着我们需要将这些patches通过一个线性映射层。 这里可以定义一个名为PatchEmbedding的类来使代码更加整洁:
class PatchEmbedding(nn.Module):
    def __init__(self, in_channels: int = 3, patch_size: int = 16, emb_size: int = 768):
        self.patch_size = patch_size
        super().__init__()
        self.projection = nn.Sequential(
            # break-down the image in s1 x s2 patches and flat them
            Rearrange('b c (h s1) (w s2) -> b (h w) (s1 s2 c)', s1=patch_size, s2=patch_size),
            # 注意这里的隐层大小设置的也是768,可以配置
            nn.Linear(patch_size * patch_size * in_channels, emb_size)
        )
                
    def forward(self, x: Tensor) -> Tensor:
        x = self.projection(x)
        return x
    
PatchEmbedding()(x).shape
实际查看原作者的代码,他并没有使用线性映射层来做这件事,出于效率考虑,作者使用了Conv2d层来实现相同的功能。这是通过设置卷积核大小和步长均为patch_size来实现的。直观上来看,卷积操作是分别应用在每个patch上的。所以,我们可以先应用一个卷积层,然后再对结果进行铺平,改进如下:
class PatchEmbedding(nn.Module):
    def __init__(self, in_channels: int = 3, patch_size: int = 16, emb_size: int = 768):
        self.patch_size = patch_size
        super().__init__()
        self.projection = nn.Sequential(
            # using a conv layer instead of a linear one -> performance gains
            nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size),
            # 将卷积操作后的patch铺平
            Rearrange('b e h w -> b (h w) e'),
        )
                
    def forward(self, x: Tensor) -> Tensor:
        x = self.projection(x)
        return x
    
PatchEmbedding()(x).shape

CLS Token

下一步是对映射后的patches添加上cls token以及位置编码信息。cls token是一个随机初始化的torch Parameter对象,在forward方法中它需要被拷贝b次(b是batch的数量),然后使用torch.cat函数添加到patch前面。

class PatchEmbedding(nn.Module):
    def __init__(self, in_channels: int = 3, patch_size: int = 16, emb_size: int = 768):
        self.patch_size = patch_size
        super().__init__()
        self.proj = nn.Sequential(
            # using a conv layer instead of a linear one -> performance gains
            nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size),
            Rearrange('b e (h) (w) -> b (h w) e'),
        )
        # 生成一个维度为emb_size的向量当做cls_token
        self.cls_token = nn.Parameter(torch.randn(1, 1, emb_size))
        
    def forward(self, x: Tensor) -> Tensor:
        b, _, _, _ = x.shape # 单独先将batch缓存起来
        x = self.proj(x) # 进行卷积操作
        # 将cls_token 扩展b次
        cls_tokens = repeat(self.cls_token, '() n e -> b n e', b=b)
        print(cls_tokens.shape)
        print(x.shape)
        # prepend the cls token to the input on the dim 1
        x = torch.cat([cls_tokens, x], dim=1)
        return x
    
PatchEmbedding()(x).shape

Position Embedding

目前为止,模型还对patches的在图像中的原始位置一无所知。我们需要传递给模型这些空间上的信息。可以有很多种方法来实现这个功能,在ViT中,我们让模型自己去学习这个。位置编码信息只是一个形状为[N_PATCHES+1(token)m EMBED_SIZE]的张量,它直接加到映射后的patches上。

class PatchEmbedding(nn.Module):
    def __init__(self, in_channels: int = 3, patch_size: int = 16, emb_size: int = 768, img_size: int = 224):
        self.patch_size = patch_size
        super().__init__()
        self.projection = nn.Sequential(
            # using a conv layer instead of a linear one -> performance gains
            nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size),
            Rearrange('b e (h) (w) -> b (h w) e'),
        )
        self.cls_token = nn.Parameter(torch.randn(1,1, emb_size))
        # 位置编码信息,一共有(img_size // patch_size)**2 + 1(cls token)个位置向量
        self.positions = nn.Parameter(torch.randn((img_size // patch_size)**2 + 1, emb_size))
        
    def forward(self, x: Tensor) -> Tensor:
        b, _, _, _ = x.shape
        x = self.projection(x)
        cls_tokens = repeat(self.cls_token, '() n e -> b n e', b=b)
        # prepend the cls token to the input
        x = torch.cat([cls_tokens, x], dim=1)
        # add position embedding
        print(x.shape, self.positions.shape)
        x += self.positions
        return x
    
PatchEmbedding()(x).shape

我们首先定义位置embedding向量,然后在forward函数中将其加到线性映射后的patches向量上去。

Transformer

现在我们来实现Transformer模块。ViT模型中只使用了Transformer的Encoder部分,其整体架构如下: Encoder架构

接下来依次实现。

Attention

attention部分有三个输入,分别是queries,keys,values矩阵,首先使用queries,keys矩阵去计算注意力矩阵,然后与values矩阵相乘,得到对应的输出。在下图中,multi-head注意力机制表示将输入划分成n份,然后将计算分到n个head上去。

Attentionn 我们可以使用pytorch的nn.MultiAttention模块或者自己实现一个,为了完整起见,我将完整的MultiAttention代码贴出来:
class MultiHeadAttention(nn.Module):
    def __init__(self, emb_size: int = 512, num_heads: int = 8, dropout: float = 0):
        super().__init__()
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.keys = nn.Linear(emb_size, emb_size)
        self.queries = nn.Linear(emb_size, emb_size)
        self.values = nn.Linear(emb_size, emb_size)
        self.att_drop = nn.Dropout(dropout)
        self.projection = nn.Linear(emb_size, emb_size)
        
    def forward(self, x : Tensor, mask: Tensor = None) -> Tensor:
        # split keys, queries and values in num_heads
        queries = rearrange(self.queries(x), "b n (h d) -> b h n d", h=self.num_heads)
        keys = rearrange(self.keys(x), "b n (h d) -> b h n d", h=self.num_heads)
        values  = rearrange(self.values(x), "b n (h d) -> b h n d", h=self.num_heads)
        # sum up over the last axis
        energy = torch.einsum('bhqd, bhkd -> bhqk', queries, keys) # batch, num_heads, query_len, key_len
        if mask is not None:
            fill_value = torch.finfo(torch.float32).min
            energy.mask_fill(~mask, fill_value)
            
        scaling = self.emb_size ** (1/2)
        att = F.softmax(energy, dim=-1) / scaling
        att = self.att_drop(att)
        # sum up over the third axis
        out = torch.einsum('bhal, bhlv -> bhav ', att, values)
        out = rearrange(out, "b h n d -> b n (h d)")
        out = self.projection(out)
        return out

接下来,一步一步分析下上述代码。我们定义了4个全连接层,分别用于queries,keys,values,以及最后的线性映射层。关于这块更加详细的内容可以阅读The Illustrated Transformer 。主要的思想是使用querieskeys之间的乘积来计算输入序列中的每一个patch与剩余patch之间的匹配程度。然后使用这个匹配程度(数值)去对对应的values做缩放,再累加起来作为Encoder的输出。
forward方法将上一层的输出作为输入,使用三个线性映射层分别得到queries,keys,values。因为我们要实现multi-head注意力机制,我们需要将输出重排成多个head的形式。这一步是使用einops库的rearrange函数来完成的。
Queries,keys,values的形状是一样的,为了简便起见,它们都是基于同一个输入x

queries = rearrange(self.queries(x), "b n (h d) -> b h n d", h=self.n_heads)
keys = rearrange(self.keys(x), "b n (h d) -> b h n d", h=self.n_heads)
values  = rearrange(self.values(x), "b n (h d) -> b h n d", h=self.n_heads)

经过rearrange操作之后,Queries,keys,values的形状大小为[BATCH, HEADS, SEQUENCE_LEN, EMBEDDING_SIZE]. 然后我们将多个head拼接在一起就得到了最终的输出。

注意: 为了加快计算,我们可以使用单个矩阵一次性计算出Queries,keys,values

改进后的代码如下:

class MultiHeadAttention(nn.Module):
    def __init__(self, emb_size: int = 768, num_heads: int = 8, dropout: float = 0):
        super().__init__()
        self.emb_size = emb_size
        self.num_heads = num_heads
        # fuse the queries, keys and values in one matrix
        self.qkv = nn.Linear(emb_size, emb_size * 3)
        self.att_drop = nn.Dropout(dropout)
        self.projection = nn.Linear(emb_size, emb_size)
        
    def forward(self, x : Tensor, mask: Tensor = None) -> Tensor:
        # split keys, queries and values in num_heads
        print("1qkv's shape: ", self.qkv(x).shape)
        qkv = rearrange(self.qkv(x), "b n (h d qkv) -> (qkv) b h n d", h=self.num_heads, qkv=3)
        print("2qkv's shape: ", qkv.shape)
        
        queries, keys, values = qkv[0], qkv[1], qkv[2]
        print("queries's shape: ", queries.shape)
        print("keys's shape: ", keys.shape)
        print("values's shape: ", values.shape)
        
        # sum up over the last axis
        energy = torch.einsum('bhqd, bhkd -> bhqk', queries, keys) # batch, num_heads, query_len, key_len
        print("energy's shape: ", energy.shape)
        if mask is not None:
            fill_value = torch.finfo(torch.float32).min
            energy.mask_fill(~mask, fill_value)
        
        scaling = self.emb_size ** (1/2)
        print("scaling: ", scaling)
        att = F.softmax(energy, dim=-1) / scaling
        print("att1' shape: ", att.shape)
        att = self.att_drop(att)
        print("att2' shape: ", att.shape)
        
        # sum up over the third axis
        out = torch.einsum('bhal, bhlv -> bhav ', att, values)
        print("out1's shape: ", out.shape)
        out = rearrange(out, "b h n d -> b n (h d)")
        print("out2's shape: ", out.shape)
        out = self.projection(out)
        print("out3's shape: ", out.shape)
        return out
    
patches_embedded = PatchEmbedding()(x)
print("patches_embedding's shape: ", patches_embedded.shape)
MultiHeadAttention()(patches_embedded).shape
output

Residuals

Transformer模块也包含了残差连接,如下图: Skip Connection

我们可以单独封装一个处理残差连接的类如下:

class ResidualAdd(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn
        
    def forward(self, x, **kwargs):
        res = x
        x = self.fn(x, **kwargs)
        x += res
        return x

接着attention层的输出首先通过BN层,紧跟其后一个全连接层,全连接层中采用了一个expansion因子来对输入进行上采样。同样这里也采用了类似resnet的残差连接的方式。如下图:

FFN 代码如下:
class FeedForwardBlock(nn.Sequential):
    def __init__(self, emb_size: int, expansion: int = 4, drop_p: float = 0.):
        super().__init__(
            nn.Linear(emb_size, expansion * emb_size),
            nn.GELU(),
            nn.Dropout(drop_p),
            nn.Linear(expansion * emb_size, emb_size),
        )

作者说,不知道为什么,很少看见人们直接继承nn.Sequential类,这样就可以避免重写forward方法了。
译者著,确实~又学到了一招。

最终,我们可以创建一个完整的Transformer Encoder 块了。

Encoder 利用我们之前定义好的ResidualAdd类,我们可以很优雅地定义出Transformer Encoder Block,如下:
class TransformerEncoderBlock(nn.Sequential):
    def __init__(self,
                 emb_size: int = 768,
                 drop_p: float = 0.,
                 forward_expansion: int = 4,
                 forward_drop_p: float = 0.,
                 ** kwargs):
        super().__init__(
            ResidualAdd(nn.Sequential(
                nn.LayerNorm(emb_size),
                MultiHeadAttention(emb_size, **kwargs),
                nn.Dropout(drop_p)
            )),
            ResidualAdd(nn.Sequential(
                nn.LayerNorm(emb_size),
                FeedForwardBlock(
                    emb_size, expansion=forward_expansion, drop_p=forward_drop_p),
                nn.Dropout(drop_p)
            )
            ))

我们来测试一下:

patches_embedded = PatchEmbedding()(x)
TransformerEncoderBlock()(patches_embedded).shape
output

Transformer

在ViT中只使用了原始Transformer中的Encoder部分。Encoder一共包含L个block,我们使用参数depth来指定,代码如下:

class TransformerEncoder(nn.Sequential):
    def __init__(self, depth: int = 12, **kwargs):
        super().__init__(*[TransformerEncoderBlock(**kwargs) for _ in range(depth)])

很简单不是嘛?

ViT的最后一层就是一个简单的全连接层,输出分类的概率值。它对整个序列执行一个mean操作。 MLP Head
class ClassificationHead(nn.Sequential):
    def __init__(self, emb_size: int = 768, n_classes: int = 1000):
        super().__init__(
            Reduce('b n e -> b e', reduction='mean'),
            nn.LayerNorm(emb_size), 
            nn.Linear(emb_size, n_classes))

我们将之前定义好的PatchEMbedding, TransformerEncoder,ClassificationHead整合起来,搭建出最终的ViT代码模型如下:

class ViT(nn.Sequential):
    def __init__(self,     
                in_channels: int = 3,
                patch_size: int = 16,
                emb_size: int = 768,
                img_size: int = 224,
                depth: int = 12,
                n_classes: int = 1000,
                **kwargs):
        super().__init__(
            PatchEmbedding(in_channels, patch_size, emb_size, img_size),
            TransformerEncoder(depth, emb_size=emb_size, **kwargs),
            ClassificationHead(emb_size, n_classes)
        )

我们可以使用torchsummary函数来计算参数量,输出如下:

参数量
与其他ViT实现代码相比,这个参数量是差不多的。
原文的代码仓库在https://github.com/FrancescoSaverioZuppichini/ViT

参考

上一篇下一篇

猜你喜欢

热点阅读