pytorch并行训练

2020-04-10  本文已影响0人  carry_xz

参考:
https://zhuanlan.zhihu.com/p/105755472

torch.nn.DataParallel 与torch.distributed 的不同

RingAllReduce 与 TreeAllReduce

分布式使用流程

并行训练又分为数据并行 (Data Parallelism)模型并行两种。
数据并行指的是,多张 GPU 使用相同的模型副本,但是使用不同的数据批进行训练。而模型并行指的是,多张GPU 分别训练模型的不同部分,使用同一批数据。

基本概念

当需要进行更加精细的通信时,可以通过 new_group 接口,使用 word 的子集,创建新组,用于集体通信等。

数据并行

import torch
net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])

nn.DataParallel使用起来更加简单(通常只要封装模型然后跑训练代码就ok了)。

模型并行

nn.DistributedDataParallel 进行Multiprocessing可以在多个gpu之间复制该模型,每个gpu由一个进程控制。这些GPU可以位于同一个节点上,也可以分布在多个节点上。每个进程都执行相同的任务,并且每个进程与所有其他进程通信。

DistributedDataParallel

torch.nn.parallel.DistributedDataParallel 与 apex.parallel.DistributedDataParallel 区别在于初始化方式不一样,基本可以等价替换。

from apex.parallel import DistributedDataParallel as DDP
model = DDP(model)

from torch.nn.parallel import DistributedDataParallel as DDP
model = DDP(model,device_ids=[local_rank],output_device=local_rank)

带fp16的混合精度、多gpu训练最小实例

import os
from datetime import datetime
import argparse
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
# from apex.parallel import DistributedDataParallel as DDP
from torch.nn.parallel import DistributedDataParallel as DDP
from apex import amp


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N',
                        help='number of data loading workers (default: 4)')
    parser.add_argument('-g', '--gpus', default=1, type=int,
                        help='number of gpus per node')
    parser.add_argument('-nr', '--nr', default=0, type=int,
                        help='ranking within the nodes')
    parser.add_argument('--epochs', default=10, type=int, metavar='N',
                        help='number of total epochs to run')
    parser.add_argument('--local_rank', default=-1, type=int,
                    help='node rank for distributed training')  # 注意此参数必须加上
    args = parser.parse_args()
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '8889' # 多个程序同一机器调试时需要改变port,一个程序时,这两行可以删除
    dist.init_process_group(backend='nccl')  # 初始化
    torch.cuda.set_device(args.local_rank)
    train(args)

class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out


def train(args):
    gpu = args.local_rank # 可以用来控制模型保存,打印等
    torch.manual_seed(0)
    model = ConvNet()
    model.cuda()
    batch_size = 100
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    # Wrap the model
    model, optimizer = amp.initialize(model, optimizer, opt_level='O1')
    print('Error args.local_rank:',args.local_rank)
    # model = DDP(model) # apex 两种方式都可以,注意对应关系
    model = DDP(model,device_ids=[args.local_rank],output_device=args.local_rank) # torch
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(
        root='./data',
        train=True,
        transform=transforms.ToTensor(),
        download=True
    )
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset) # 数据分割,加速训练
    train_loader = torch.utils.data.DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        shuffle=False, # 并行时需要关闭此项
        num_workers=0,
        pin_memory=True,
        sampler=train_sampler
    )

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(args.epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            with amp.scale_loss(loss, optimizer) as scaled_loss:
                scaled_loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1,
                    args.epochs,
                    i + 1,
                    total_step,
                    loss.item())
                )
    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))


if __name__ == '__main__':
    main()

训练命令
python -m torch.distributed.launch --nproc_per_node=2 distributed_train.py

并行训练的模型加载

并行训练保存的模型与单卡训练的模型有些不同,会导致加载模型出问题

def _Single2Parallel(self, origin_state):
    """
    将串行的权值参数转换为并行的权值参数
    :param origin_state : 原始串行权值参数
    :return             : 并行的权值参数
    """
    converted = OrderedDict()

    for k, v in origin_state.items():
      name = "module." + k
      converted[name] = v

    return converted


def _Parallel2Single(self, origin_state):
    """
    将并行的权值参数转换为串行的权值参数
    :param origin_state : 原始串行权值参数
    :return             : 并行的权值参数
    """

    converted = OrderedDict()

    for k, v in origin_state.items():
      name = k[7:]
      converted[name] = v

    return converted
上一篇下一篇

猜你喜欢

热点阅读