机器学习与数据挖掘

深度学习分布式训练(上)-Pytorch实现篇

2020-05-23  本文已影响0人  老居搞机

前言

随着数据量越来越多以及模型层数的越来越复杂,深度学习展现了更强劲的效果,但随之而来也带来了负面影响:训练的时间也跟着变的更长,有的模型一训练就是好几天,怎么加快训练的时间?

俗话说人多力量大,很自然想到我们可以把多个GPU组合在一起形成一个集群训练模型来加快训练速度

本着先使用后深入理论,本篇主要讲Pytorch分布式训练的使用,下一篇将详细介绍分布式训练的原理

分布式训练

分布式训练根据并行策略的不同,可以分为模型并行和数据并行

数据并行中根据梯度同步的策略不同,又可以分为参数服务器同步和All-Reduce方式同步(当然这些都放在下一篇讲解)

本篇讲的Pytorch分布式训练采用数据并行方式,梯度信息同步采用All-Reduce

Pytorch分布式训练

废话不多说,我们在实战中学习,先跑个例子然后再慢慢解释每一段的意思,下面一段代码拷下来可以保存成mnist.py文件:

# -*- encoding: utf8 -*-
import torch
from torch import nn
import torch.nn.functional as F
import torch.distributed as dist
from torchvision import datasets, transforms
import argparse
import torch.optim as optim
from torch.utils.data.distributed import DistributedSampler
​
DATA_DIR = '~/data/mnist'
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
​
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4*4*50, 500)
        self.fc2 = nn.Linear(500, 10)
​
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4*4*50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)
​
def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
                
def test(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
            pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
​
    test_loss /= len(test_loader.dataset)
    print('\naccuracy={:.4f}\n'.format(float(correct) / len(test_loader.dataset)))
​
def load_data(dist, batch_size=64, test_batch_size=64):
    train_kwargs = {'num_workers': 1, 'pin_memory': True}
    test_kwargs = {'num_workers': 1, 'pin_memory': True}
​
    train_data_set = datasets.MNIST(DATA_DIR, train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ]))
​
    if dist.is_initialized():
        # 如果采用分布式训练, 使用DistributedSampler让每个worker拿到训练数据集不同的子集
        datasampler = DistributedSampler(train_data_set)
        # sampler shuffle must be `False`
        train_kwargs.update({'sampler': datasampler,
                             'shuffle': False
                             })
​
    train_loader = torch.utils.data.DataLoader(train_data_set, batch_size=batch_size, **train_kwargs)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST(DATA_DIR, train=False, transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])),
        batch_size=test_batch_size, shuffle=True, **test_kwargs)
​
    return train_loader, test_loader
​
def main():
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--backend', type=str, help='Distributed backend',
                        choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
                        default=dist.Backend.GLOO)
    parser.add_argument('--init-method', default=None, type=str,
                        help='Distributed init_method')
    parser.add_argument('--rank', default=-1, type=int,
                        help='Distributed rank')
    parser.add_argument('--world-size', default=-1, type=int,
                        help='Distributed world_size')
    args = parser.parse_args()
​
    dist.init_process_group(backend=args.backend,
                            init_method=args.init_method,
                            rank=args.rank,
                            world_size=args.world_size
                            )
​
    train_loader, test_loader = load_data(dist)
    model = Net().to(DEVICE)
    model = nn.parallel.DistributedDataParallel(model)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
​
    for epoch in range(1, 2):
        train(model, train_loader, optimizer, epoch)
        test(model, test_loader)
​
​
if __name__ == '__main__':
    main()

如果没有多台GPU机器,可以用本地指定端口号的方式来测试一下

在第一个终端运行:

$ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 0 --world-size 3

在第二个第三个终端再运行:

$ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 1 --world-size 3
$ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 2 --world-size 3

wow 激动人心! 在三个进程里面我们等于模拟了三台机器在做分布式训练了,训练输出结果:

我们来先看一下第一段:

parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--backend', type=str, help='Distributed backend',
                        choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
                        default=dist.Backend.GLOO)
    parser.add_argument('--init-method', default=None, type=str,
                        help='Distributed init_method')
    parser.add_argument('--rank', default=-1, type=int,
                        help='Distributed rank')
    parser.add_argument('--world-size', default=-1, type=int,
                        help='Distributed world_size')
    args = parser.parse_args()
​
    dist.init_process_group(backend=args.backend,
                            init_method=args.init_method,
                            rank=args.rank,
                            world_size=args.world_size
                            )

这一段初始化Pytorch分布式训练的参数:

1.env:读取环境变量方式,会自动读取系统中的这些环境变量:

MASTER_ADDR: 要求(0级除外), 等级0节点的地址
MASTER_PORT: 机器上的自由端口
RANK: 等级, 0为master, >0为worker,也可以在调用init函数时设置
WORLD_SIZE:  进程数量,也可以在调用init函数时设置

env方式可以很方便的跟Kubeflow结合进行分布式训练,如果本地测试可以使用Pytorch提供的测试工具torch.distributed.launch来提交环境变量

$ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=0 --master_addr="127.0.0.1" --master_port=22225 mnist.py
$ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=1 --master_addr="127.0.0.1" --master_port=22225 mnist.py
$ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=2 --master_addr="127.0.0.1" --master_port=22225 mnist.py

2.host+port的方式:指定通信的ip和端口号,可以在运行的时候输入
3.共享文件的方式:有个多个机器都能访问到的文件夹,那么可以在这里创建个文件来实现初始化

接下来加载数据load_data():

def load_data(dist, batch_size=64, test_batch_size=64):
    train_kwargs = {'num_workers': 1, 'pin_memory': True}
    test_kwargs = {'num_workers': 1, 'pin_memory': True}
​
    train_data_set = datasets.MNIST(DATA_DIR, train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ]))
​
    if dist.is_initialized():
        # 如果采用分布式训练, 使用DistributedSampler让每个worker拿到训练数据集不同的子集
        datasampler = DistributedSampler(train_data_set)
        # sampler shuffle must be `False`
        train_kwargs.update({'sampler': datasampler,
                             'shuffle': False
                             })
​
    train_loader = torch.utils.data.DataLoader(train_data_set, batch_size=batch_size, **train_kwargs)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST(DATA_DIR, train=False, transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])),
        batch_size=test_batch_size, shuffle=True, **test_kwargs)
​
    return train_loader, test_loader

再接下来看模型的分布式:

model = nn.parallel.DistributedDataParallel(model)

总结

通过上面例子,我们看到Pytorch做分布式训练实现起来还是比较简单的:

参考


长按二维码,关注本公众号

微信公众号
上一篇下一篇

猜你喜欢

热点阅读