深度学习分布式训练(上)-Pytorch实现篇
前言
随着数据量越来越多以及模型层数的越来越复杂,深度学习展现了更强劲的效果,但随之而来也带来了负面影响:训练的时间也跟着变的更长,有的模型一训练就是好几天,怎么加快训练的时间?
俗话说人多力量大,很自然想到我们可以把多个GPU组合在一起形成一个集群训练模型来加快训练速度
本着先使用后深入理论,本篇主要讲Pytorch分布式训练的使用,下一篇将详细介绍分布式训练的原理
分布式训练
分布式训练根据并行策略的不同,可以分为模型并行和数据并行
数据并行中根据梯度同步的策略不同,又可以分为参数服务器同步和All-Reduce方式同步(当然这些都放在下一篇讲解)
本篇讲的Pytorch分布式训练采用数据并行方式,梯度信息同步采用All-Reduce
Pytorch分布式训练
废话不多说,我们在实战中学习,先跑个例子然后再慢慢解释每一段的意思,下面一段代码拷下来可以保存成mnist.py文件:
# -*- encoding: utf8 -*-
import torch
from torch import nn
import torch.nn.functional as F
import torch.distributed as dist
from torchvision import datasets, transforms
import argparse
import torch.optim as optim
from torch.utils.data.distributed import DistributedSampler
DATA_DIR = '~/data/mnist'
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 20, 5, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(4*4*50, 500)
self.fc2 = nn.Linear(500, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, 4*4*50)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def train(model, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(DEVICE), target.to(DEVICE)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
def test(model, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(DEVICE), target.to(DEVICE)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print('\naccuracy={:.4f}\n'.format(float(correct) / len(test_loader.dataset)))
def load_data(dist, batch_size=64, test_batch_size=64):
train_kwargs = {'num_workers': 1, 'pin_memory': True}
test_kwargs = {'num_workers': 1, 'pin_memory': True}
train_data_set = datasets.MNIST(DATA_DIR, train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
if dist.is_initialized():
# 如果采用分布式训练, 使用DistributedSampler让每个worker拿到训练数据集不同的子集
datasampler = DistributedSampler(train_data_set)
# sampler shuffle must be `False`
train_kwargs.update({'sampler': datasampler,
'shuffle': False
})
train_loader = torch.utils.data.DataLoader(train_data_set, batch_size=batch_size, **train_kwargs)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(DATA_DIR, train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=test_batch_size, shuffle=True, **test_kwargs)
return train_loader, test_loader
def main():
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--backend', type=str, help='Distributed backend',
choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
default=dist.Backend.GLOO)
parser.add_argument('--init-method', default=None, type=str,
help='Distributed init_method')
parser.add_argument('--rank', default=-1, type=int,
help='Distributed rank')
parser.add_argument('--world-size', default=-1, type=int,
help='Distributed world_size')
args = parser.parse_args()
dist.init_process_group(backend=args.backend,
init_method=args.init_method,
rank=args.rank,
world_size=args.world_size
)
train_loader, test_loader = load_data(dist)
model = Net().to(DEVICE)
model = nn.parallel.DistributedDataParallel(model)
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(1, 2):
train(model, train_loader, optimizer, epoch)
test(model, test_loader)
if __name__ == '__main__':
main()
如果没有多台GPU机器,可以用本地指定端口号的方式来测试一下
在第一个终端运行:
$ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 0 --world-size 3
在第二个第三个终端再运行:
$ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 1 --world-size 3
$ python mnist.py --init-method tcp://127.0.0.1:22225 --rank 2 --world-size 3
wow 激动人心! 在三个进程里面我们等于模拟了三台机器在做分布式训练了,训练输出结果:
我们来先看一下第一段:
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--backend', type=str, help='Distributed backend',
choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
default=dist.Backend.GLOO)
parser.add_argument('--init-method', default=None, type=str,
help='Distributed init_method')
parser.add_argument('--rank', default=-1, type=int,
help='Distributed rank')
parser.add_argument('--world-size', default=-1, type=int,
help='Distributed world_size')
args = parser.parse_args()
dist.init_process_group(backend=args.backend,
init_method=args.init_method,
rank=args.rank,
world_size=args.world_size
)
这一段初始化Pytorch分布式训练的参数:
- rank:等级, 0为master, >0为worker
- world_size:进程总数量,Pytorch会等到所有world_size个进程就绪之后才会开心训练
- backend:指定当前进程要使用的通信后端,支持的通信后端有 gloo,mpi,nccl方式,支持如下所示:
- init_method:分布式训练的初始化方式,默认使用环境变量方式env
1.env:读取环境变量方式,会自动读取系统中的这些环境变量:
MASTER_ADDR: 要求(0级除外), 等级0节点的地址
MASTER_PORT: 机器上的自由端口
RANK: 等级, 0为master, >0为worker,也可以在调用init函数时设置
WORLD_SIZE: 进程数量,也可以在调用init函数时设置
env方式可以很方便的跟Kubeflow结合进行分布式训练,如果本地测试可以使用Pytorch提供的测试工具torch.distributed.launch来提交环境变量
$ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=0 --master_addr="127.0.0.1" --master_port=22225 mnist.py
$ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=1 --master_addr="127.0.0.1" --master_port=22225 mnist.py
$ python -m torch.distributed.launch --nproc_per_node=1 --nnodes=2 --node_rank=2 --master_addr="127.0.0.1" --master_port=22225 mnist.py
2.host+port的方式:指定通信的ip和端口号,可以在运行的时候输入
3.共享文件的方式:有个多个机器都能访问到的文件夹,那么可以在这里创建个文件来实现初始化
接下来加载数据load_data():
def load_data(dist, batch_size=64, test_batch_size=64):
train_kwargs = {'num_workers': 1, 'pin_memory': True}
test_kwargs = {'num_workers': 1, 'pin_memory': True}
train_data_set = datasets.MNIST(DATA_DIR, train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
if dist.is_initialized():
# 如果采用分布式训练, 使用DistributedSampler让每个worker拿到训练数据集不同的子集
datasampler = DistributedSampler(train_data_set)
# sampler shuffle must be `False`
train_kwargs.update({'sampler': datasampler,
'shuffle': False
})
train_loader = torch.utils.data.DataLoader(train_data_set, batch_size=batch_size, **train_kwargs)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(DATA_DIR, train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=test_batch_size, shuffle=True, **test_kwargs)
return train_loader, test_loader
-
Pytorch使用的是数据分布式训练,每个进程实际上是独立加载数据的,所以需要加载相同数据集后用一定的规则根据rank来顺序切割获取不同的数据子集,DistributedSampler就是用来确保dataloader只会load到整个数据集的一个特定子集的做法(实际上不用Pytorch提供的DistributedSampler工具,自己做加载数据后切分word_size个子集按rank顺序拿到子集效果也是一样)
-
同时为了能够按顺序划分数据子集,拿到不同部分数据,所以数据集不能够进行随机打散,所以用了参数 'shuffle': False
再接下来看模型的分布式:
model = nn.parallel.DistributedDataParallel(model)
- DistributedDataParallel 是实现多机多卡分布训练最核心东西,封装了All-Reduce方法,可以帮助我们在不同机器的多个模型拷贝之间平均梯度
总结
通过上面例子,我们看到Pytorch做分布式训练实现起来还是比较简单的:
- Pytorch模型使用DistributedDataParallel方法包装来实现梯度参数的All-Reduce传递
- 数据集需要在不同机器上按Rank进行切分,以保证每个GPU进程训练的数据集是不一样的
- 使用Kubeflow创建Docker Pod的方式配合Pytorch env环境变量的训练非常方便
参考
- [1] https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html
- [2] https://zhuanlan.zhihu.com/p/76638962
长按二维码,关注本公众号
微信公众号