pytorch学习笔记

PyTorch模型量化- layer-wise Quantize

2022-08-07  本文已影响0人  侠之大者_7d3f

Motivation

深度学习模型为什么要量化
模型量化是深度学习Inference加速的关键技术之一, 一般训练之后得到的模型采用float32 (fp32)格式存储, 由于FP32 bit位数宽, 而且浮点计算对硬件资源消耗高,造成内存带宽,模型吞吐率瓶颈。 量化(Quantization) 是解决FP32 的模型在内存带宽消耗,推理速度的主要技术之一, 其采用定点(fixed point)或者整形数据(INT8)代替FP32类型, Hardware friendly。

如何学习和掌握量化技术
模型量化涉及很多概念和算法,比如 对称量化/非对称量化, 线性量化/非线性量化 等等, 了解这些基础概念之后我们最好结合实践这样才能更好的理解和加深记忆。

本文以PyTorch提供的Quantization例子为例,分析简单的模型量化过程。


Requirements


量化的基本原理

量化本质就是一个映射, 将一组浮点数映射为一组整形(例如INT8)的过程


# 量化
xq = round(x_fp32 / scale + zero_point)

# 反量化
xqd = (xq - zero_point) * scale

量化参数scale, zero_point影响量化的精度

直接根据上面的公式验证一下:
上例子: 对一个FP32的数据进行量化,观察输出结果

import torch
import numpy as np
import random
import matplotlib.pyplot as plt
import os

seed = 1234
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

# 量化的基本原理
'''
# 量化
xq = round(x_fp32 / scale + zero_point)

# 反量化
xqd = (xq - zero_point) * scale

量化参数scale, zero_point影响量化的精度

'''

x = torch.rand(3, 4, dtype=torch.float32)

scale = 0.3
zero_point = 8
xq = torch.round((x / scale + zero_point))
xqd = (xq - zero_point) * scale

print('量化前的FP32 Tensor', x)
print('量化之后的INT8 Tensor', xq)
print('反量化之后的FP32 Tensor', xqd)

plt.stem(x.flatten().numpy())
# plt.subplot(1, 2, 2)
plt.stem(xqd.flatten().float().numpy())
plt.show()

由于量化参数 scale, zero_point 是随意选取的,因此反量化之后和原始的FP32数据存在一定的误差:


image.png

采用PyTorch 提供的量化工具

PyTorch 提供了对Tensor 进行量化的API函数:

两种量化的区别: Tensor-wise 和Channel-wise Quantization的主要区别是量化的粒度

Tensor-wise Quantization


# Tensor-wise 量化

def quantize_per_tensor():
    x = torch.rand(3, 4, dtype=torch.float32)
    # quantize, 量化参数: scale + zp 保存在了QuantizeTensor
    xq = torch.quantize_per_tensor(x, scale=0.3, zero_point=8, dtype=torch.qint8)
    # 反量化
    xqd = torch.dequantize(xq)

    print('量化前的FP32 Tensor', x)
    print('量化之后的INT8 Tensor', xq)
    print('反量化之后的FP32 Tensor', xqd)

    # compare the x, xq
    # plt.subplot(1, 2, 1)
    plt.stem(x.flatten().numpy())
    # plt.subplot(1, 2, 2)
    plt.stem(xqd.flatten().float().numpy())
    plt.show()

quantize_per_tensor()
image.png

Channel-wise Quantization


# Channel-wise量化

def quantize_per_channel():
    x = torch.rand(3, 4, dtype=torch.float32)
    # quantize, 量化参数: scale + zp 保存在了QuantizeTensor
    xq = torch.quantize_per_channel(x, scales=torch.tensor([0.3, 0.5, 0.4]), 
                                    zero_points=torch.tensor([8, 9, 10]),
                                    axis=0, dtype=torch.qint8)
    # 反量化
    xqd = torch.dequantize(xq)

    print('量化前的FP32 Tensor', x)
    print('量化之后的INT8 Tensor', xq)
    print('反量化之后的FP32 Tensor', xqd)

    # compare the x, xq
    # plt.subplot(1, 2, 1)
    plt.stem(x.flatten().numpy())
    # plt.subplot(1, 2, 2)
    plt.stem(xqd.flatten().float().numpy())
    plt.show()

quantize_per_channel()
image.png

对实际的CNN模型进行量化

上面的例子是对单个Tensor进行量化,方便掌握量化的思想; 下面采用实际的MobileNet-V2模型进行量化.

量化前的准备

ImageNet-2012 数据集准备, 包含ImageNet-train, ImageNet-Val数据集, 在实际的量化中需要进行标定(calibration) 以及精度测试, 因此需要这两个数据集

MobileNet 量化实现

总体方法介绍:

对于一个CNN/RNN/Transformer等模型, 一般需要量化的对象包含2个部分:

根据Activation是否需要离线量化,量化方法可分为2种:

  1. 动态量化 Dynamic Quantization: Activation的量化参数是在模型运行时动态确定的,这种方式得到的量化参数比较准确,缺点也很明显: 运行效率低,加速效果不好
  2. 静态量化 Static Quantization: 即采用离线标定的方法实现计算Activation的量化参数

下面的示例代码采用Static Quantization:


  1. 工程结构:


    image.png

Utils: 一些PyTorch样板代码, 直接复用即可

image.png

utils.py


import os
import torch
import torchvision
import torchvision.transforms as transforms


# Helper functions
# 用于ImageNet 验证
class AverageMeter(object):
    def __init__(self, name, fmt=':f') -> None:
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0
    
    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count
    

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)


def accuracy(output, target, topk=(1,)):
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()

        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res


def evaluate(model, criterion, data_loader, neval_batches, n_print=5):
    model.eval()
    top1 = AverageMeter('Acc@1', ':6.2f')
    top5 = AverageMeter('Acc@5', ':6.2f')
    cnt = 0
    with torch.no_grad():
        for image, target in data_loader:
            output = model(image)
            # loss = criterion(output, target)
            cnt += 1
            acc1, acc5 = accuracy(output, target, topk=(1, 5))
            print('.', end = '')
            top1.update(acc1[0], n=image.size(0))
            top5.update(acc5[0], n=image.size(0))
            if cnt % n_print == 0:
                print(f'{top1}, {top5}')
            if cnt >= neval_batches:
                 return top1, top5

    return top1, top5


def load_model(model, model_weight_file):
    state_dict = torch.load(model_weight_file)
    model.load_state_dict(state_dict)
    model.to('cpu')
    return model


def print_size_of_model(model):
    torch.save(model.state_dict(), "temp.p")
    print('Size (MB):', os.path.getsize("temp.p")/1e6)
    os.remove('temp.p')


def prepare_data_loaders(data_path, train_batch_size, eval_batch_size):
    '''
    
    准备ImageNet数据集, 包含train_set, val_set
    '''
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    
    dataset_train = torchvision.datasets.ImageNet(
        data_path, split='train',
        transform=transforms.Compose(
            [
                transforms.RandomResizedCrop(224),
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                normalize
            ]))
    
    dataset_test = torchvision.datasets.ImageNet(
        data_path, split='val',
        transform=transforms.Compose(
            [
                transforms.Resize(256),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
                normalize
            ]))

    train_sampler = torch.utils.data.RandomSampler(dataset_train)
    test_sampler = torch.utils.data.SequentialSampler(dataset_test)
    
    data_loader_train = torch.utils.data.DataLoader(
        dataset_train, batch_size=train_batch_size,
        sampler=train_sampler
    )
    
    data_loader_test = torch.utils.data.DataLoader(
        dataset_test, batch_size=eval_batch_size,
        num_workers=8, sampler=test_sampler
        )

    return data_loader_train, data_loader_test

  1. MobileNet-V2模型

MV2模型基本上和torchvision中提供的模型写法相同,只是个别地方的写法需要修改,为量化进行适配

model.py

from statistics import mode
import numpy as np
import os
import matplotlib.pyplot as plt
import random
import torch
from torch import nn
import torchvision
from torch.quantization import QuantStub, DeQuantStub

# model = torchvision.models.mobilenet_v2(pretrained=True, progress=True)

# # Setup warnings
import warnings
warnings.filterwarnings(
    action='ignore',
    category=DeprecationWarning,
    module=r'.*'
)
warnings.filterwarnings(
    action='default',
    module=r'torch.quantization'
)

# https://pytorch.org/tutorials/advanced/static_quantization_tutorial.html

seed = 1234
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)


# Define MobileNet

# 需要对MobileNetV2模型进行小修改
# Replacing addition with nn.quantized.FloatFunctional
# Insert QuantStub and DeQuantStub at the beginning and end of the network.
# Replace ReLU6 with ReLU

def _make_divisible(v, divisor, min_value=None):
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    :param v:
    :param divisor:
    :param min_value:
    :return:
    """
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v


class ConvBNReLU(nn.Sequential):
    def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):
        padding = (kernel_size - 1) // 2
        super(ConvBNReLU, self).__init__(
            nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, groups=groups, bias=False),
            nn.BatchNorm2d(out_planes, momentum=0.1),
            # Replace with ReLU
            nn.ReLU(inplace=False)
        )


class InvertedResidual(nn.Module):
    def __init__(self, inp, oup, stride, expand_ratio):
        super(InvertedResidual, self).__init__()
        self.stride = stride
        assert stride in [1, 2]

        hidden_dim = int(round(inp * expand_ratio))
        self.use_res_connect = self.stride == 1 and inp == oup

        layers = []
        if expand_ratio != 1:
            # pw
            layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1))
        layers.extend([
            # dw
            ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim),
            # pw-linear
            nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
            nn.BatchNorm2d(oup, momentum=0.1),
        ])

        self.conv = nn.Sequential(*layers)
        # Replace torch.add with floatfunctional, 为量化修改
        self.skip_add = nn.quantized.FloatFunctional()
    

    def forward(self, x):
        if self.use_res_connect:
            return self.skip_add.add(x, self.conv(x))   # 为量化修改
        else:
            return self.conv(x)


class MobileNetV2(nn.Module):
    def __init__(self, num_classes=1000, width_mult=1.0, inverted_residual_setting=None, round_nearest=8):
        """
        MobileNet V2 main class
        Args:
            num_classes (int): Number of classes
            width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount
            inverted_residual_setting: Network structure
            round_nearest (int): Round the number of channels in each layer to be a multiple of this number
            Set to 1 to turn off rounding
        """
        super(MobileNetV2, self).__init__()
        block = InvertedResidual
        input_channel = 32
        last_channel = 1280

        if inverted_residual_setting is None:
            inverted_residual_setting = [
                # t, c, n, s
                [1, 16, 1, 1],
                [6, 24, 2, 2],
                [6, 32, 3, 2],
                [6, 64, 4, 2],
                [6, 96, 3, 1],
                [6, 160, 3, 2],
                [6, 320, 1, 1],
            ]

        # only check the first element, assuming user knows t,c,n,s are required
        if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
            raise ValueError("inverted_residual_setting should be non-empty "
                             "or a 4-element list, got {}".format(inverted_residual_setting))

        # building first layer
        input_channel = _make_divisible(input_channel * width_mult, round_nearest)
        self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
        features = [ConvBNReLU(3, input_channel, stride=2)]
        # building inverted residual blocks
        for t, c, n, s in inverted_residual_setting:
            output_channel = _make_divisible(c * width_mult, round_nearest)
            for i in range(n):
                stride = s if i == 0 else 1
                features.append(block(input_channel, output_channel, stride, expand_ratio=t))
                input_channel = output_channel
        # building last several layers
        features.append(ConvBNReLU(input_channel, self.last_channel, kernel_size=1))
        # make it nn.Sequential
        self.features = nn.Sequential(*features)
        self.quant = QuantStub()
        self.dequant = DeQuantStub()
        # building classifier
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(self.last_channel, num_classes),
        )

        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

    def forward(self, x):

        x = self.quant(x)

        x = self.features(x)
        x = x.mean([2, 3])
        x = self.classifier(x)
        x = self.dequant(x)
        return x
    
    
    # 模型Fuse优化
    def fuse_model(self):
        for m in self.modules():
            if type(m) == ConvBNReLU:
                torch.quantization.fuse_modules(m, ['0', '1', '2'], inplace=True)
            if type(m) == InvertedResidual:
                for idx in range(len(m.conv)):
                    if type(m.conv[idx]) == nn.Conv2d:
                        torch.quantization.fuse_modules(m.conv, [str(idx), str(idx + 1)], inplace=True)


if __name__ == '__main__':
    net = MobileNetV2()
    net.eval()
    print(net)

    inputs = torch.rand(1, 3, 224, 224)
    outputs = net(inputs)
    print(outputs.size())

  1. 量化测试

分别测试3种情况:

baseline FP32模型精度测试
由于只是演示,而且笔者在CPU上跑模型,因此只测试部分的样本。
预训练模型从torchvision下载, 之后重命名为 mobilenet_pretrained_float.pth

代码片段:


data_path = '/media/wei/Development/Datasets/imagenet'
saved_model_dir = 'data/'
float_model_file = 'mobilenet_pretrained_float.pth'
scripted_float_model_file = 'mobilenet_quantization_scripted.pth'
scripted_quantized_model_file = 'mobilenet_quantization_scripted_quantized.pth'

train_batch_size = 30
eval_batch_size = 512
num_eval_batches = 10

def evaluate_baseline():
    net = MobileNetV2()
    net.eval()
    float_model = load_model(net, saved_model_dir + float_model_file).to('cpu')

    print('\n Inverted Residual Block: Before fusion \n\n', float_model.features[1].conv)
    float_model.eval()

    # Fuses modules, Conv, BN, RelU算子融合
    float_model.fuse_model()

    # Note fusion of Conv+BN+Relu and Conv+Relu
    print('\n Inverted Residual Block: After fusion\n\n',float_model.features[1].conv)

    print("Size of baseline model")
    print_size_of_model(float_model)

    # ImageNet
    _, data_loader_test = prepare_data_loaders(data_path=data_path, train_batch_size=train_batch_size, eval_batch_size=eval_batch_size)

    # Start to get baseline Accuracy
    top1, top5 = evaluate(float_model, None, data_loader_test, neval_batches=num_eval_batches, n_print=1)
    print('Evaluation accuracy on %d images, %2.2f'%(num_eval_batches * eval_batch_size, top1.avg))
    torch.jit.save(torch.jit.script(float_model), saved_model_dir + scripted_float_model_file)

Baseline模型的Top-1 Acc结果: Acc@Top1 = 78.57%


image.png

Layer-wise量化方法测试
需要设置量化方法: model.qconfig = torch.quantization.default_qconfig

查看一下default_qconfig的具体方法:

default_qconfig = QConfig(activation=default_observer,
weight=default_weight_observer)

关于 MinMaxObserver: 统计一个Tensor中的最大和最小值, 因此量化方法为Tensor-wise或者称为Layer-wise


class MinMaxObserver(_ObserverBase):
    r"""Observer module for computing the quantization parameters based on the
    running min and max values.

    This observer uses the tensor min/max statistics to compute the quantization
    parameters. The module records the running minimum and maximum of incoming
    tensors, and uses this statistic to compute the quantization parameters.

具体的测试代码:

def per_layer_quantize():
    num_calibration_batches = 32
    
    net = MobileNetV2()
    net.eval()
    model = load_model(net, saved_model_dir + float_model_file).to('cpu')

    model.eval()
    # Fuses modules, Conv, BN, RelU算子融合
    model.fuse_model()

    # 设置量化配置,方法
    # MinMax observer, per-tensor quantize of weight
    model.qconfig = torch.quantization.default_qconfig
    print(model.qconfig)

    # model will be attached with qconfig
    torch.quantization.prepare(model, inplace=True)

    # Calibrate
    # 标定应该采用训练集的一部分
    data_loader_train, data_loader_test = prepare_data_loaders(data_path=data_path, eval_batch_size=eval_batch_size, train_batch_size=train_batch_size)

    # 会进行标定数据统计
    evaluate(model, criterion=None, data_loader=data_loader_train, neval_batches=num_calibration_batches)
    print('Calibration done...')

    # Convert to quantized model, Int8
    torch.quantization.convert(model, inplace=True)
    
    print('Quantize done')
    print('Size of quantized model')
    print_size_of_model(model)

    # test accuracy
    top1, top5 = evaluate(model, None, data_loader_test, neval_batches=num_eval_batches, n_print=1)
    print('Evaluation accuracy on %d images, %2.2f'%(num_eval_batches * eval_batch_size, top1.avg))

运行结果:
Acc@top1 = 65.37%, 比baseline低很多,精度丢失太严重!!!


image.png

Channel-wise 量化方法测试
由于Layer-wise量化之后模型的Top1 Acc下降太严重,因此需要更换量化方式。 Channel-wise是一种比layer-wise Quantization粒度更细的算法, 为Tensor的每个通道分别计算各自的量化参数,因此这个方法的精度预期比Layer-wise的高。

Channel-wise量化的实现:
在PyTorch中为了支持不同的量化算法,Pytorch设置的不同的后端backend:

fbgemm backend:

def get_default_qconfig(backend='fbgemm'):
    """
    Returns the default PTQ qconfig for the specified backend.

    Args:
      * `backend`: a string representing the target backend. Currently supports `fbgemm`
        and `qnnpack`.

    Return:
        qconfig
    """

    if backend == 'fbgemm':
        qconfig = QConfig(activation=HistogramObserver.with_args(reduce_range=True),
                          weight=default_per_channel_weight_observer)
    elif backend == 'qnnpack':
        qconfig = QConfig(activation=HistogramObserver.with_args(reduce_range=False),
                          weight=default_weight_observer)
    else:
        qconfig = default_qconfig
    return qconfig

channel-wise的测试代码:


def per_channel_quantize():
    num_calibration_batches = 32
    net = MobileNetV2()
    net.eval()
    model = load_model(net, saved_model_dir + float_model_file).to('cpu')
    model.eval()
    # Fuses modules, Conv, BN, RelU算子融合
    model.fuse_model()

    # 设置channel-wise 量化
    # qconfig = QConfig(activation=HistogramObserver.with_args(reduce_range=True),
    #               weight=default_per_channel_weight_observer)
    model.qconfig = torch.quantization.get_default_qconfig(backend='fbgemm')
    print('量化设置:', model.qconfig)

    torch.quantization.prepare(model, inplace=True)

     # Calibrate
    # 标定应该采用训练集的一部分
    data_loader_train, data_loader_test = prepare_data_loaders(data_path=data_path, eval_batch_size=eval_batch_size, train_batch_size=train_batch_size)

    # 会进行标定数据统计
    evaluate(model, criterion=None, data_loader=data_loader_train, neval_batches=num_calibration_batches)
    print('Calibration done...')

    # Convert to quantized model, Int8
    torch.quantization.convert(model, inplace=True)
    
    print('Quantize done')
    print('Size of quantized model')
    print_size_of_model(model)

    # test accuracy
    top1, top5 = evaluate(model, None, data_loader_test, neval_batches=num_eval_batches, n_print=1)
    print('Evaluation accuracy on %d images, %2.2f'%(num_eval_batches * eval_batch_size, top1.avg))

量化结果:

image.png

量化前后模型推理速度比较

CPU端测试, batch_size=4

def speed_test():
    # Original FP32 model
    net = MobileNetV2()
    net.eval()
    model_fp32 = load_model(net, saved_model_dir + float_model_file).to('cpu')
    model_fp32.eval()
    # Fuses modules, Conv, BN, RelU算子融合
    model_fp32.fuse_model()

    # Int8 Quantized model
    model_quantized = per_channel_quantize(evaluate_acc=False)

    def inference_with_time_ms(BS=4, model=None):
        n = 10
        total_time = 0.0
        inputs = torch.rand(BS, 3, 224, 224)
        for i in range(n+2):
            t1 = time.time()
            model(inputs)
            torch.cuda.synchronize()
            t2 = time.time()
            if i > 1:
                total_time += (t2 - t1)
        return total_time / n * 1000

    print('Start speed test...')
    fp32_avg_time = inference_with_time_ms(BS=4, model=model_fp32)
    quantized_avg_time = inference_with_time_ms(BS=4, model=model_quantized)
    print(f'FP32 Avg time: {fp32_avg_time}ms')
    print(f'Quantized Avg time: {quantized_avg_time}ms')


Reference

上一篇 下一篇

猜你喜欢

热点阅读