pytorch并行训练
参考:
https://zhuanlan.zhihu.com/p/105755472
torch.nn.DataParallel 与torch.distributed 的不同
- distributed较新,每个进程对应一个独立的训练过程,只有梯度等少量数据在进程中交互传递。
在各进程梯度计算完成之后,各进程需要将梯度进行汇总平均,然后再由 rank=0 的进程,将其 broadcast 到所有进程。之后,各进程用该梯度来更新参数。
由于各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。 - DataParallel 中,全程维护一个 optimizer,对各 GPU 上梯度进行求和,而在主 GPU 进行参数更新,之后再将模型参数 broadcast 到其他 GPU。传输参数+梯度 数据量大,等待时间长。
- distributed 进程包含独立的进程锁,因此可以减少解释器和 GIL 使用冲突,这对于严重依赖 Python runtime 的 models 而言,比如说包含 RNN 层或大量小组件的 models 而言,这尤为重要。
RingAllReduce 与 TreeAllReduce
- TreeAllReduce 采用 PS(参数服务器) 计算模型的分布式,gpu多时通常会遇到网络的问题。由于仅使用某一个 GPU 做服务器,该GPU需要接收其他所有 GPU 的梯度,并求平均以及 broadcast 回去,若 GPU 数量越大时,通信成本也就越高。
- RingAllreduce GPU 集群被组织成一个逻辑环,每个 GPU 只从左邻居接受数据、并发送数据给右邻居,即每次同步每个 gpu 只获得部分梯度更新,等一个完整的 Ring 完成,每个 GPU 都获得了完整的参数。算法的每次通信成本是恒定的,与系统中 gpu 的数量无关,完全由系统中 gpu 之间最慢的连接决定。
分布式使用流程
并行训练又分为数据并行 (Data Parallelism) 和 模型并行两种。
数据并行指的是,多张 GPU 使用相同的模型副本,但是使用不同的数据批进行训练。而模型并行指的是,多张GPU 分别训练模型的不同部分,使用同一批数据。
基本概念
-
group:
即进程组。默认情况下,只有一个组,一个 job 即为一个组,也即一个 world。
当需要进行更加精细的通信时,可以通过 new_group 接口,使用 word 的子集,创建新组,用于集体通信等。
-
world size:
表示全局进程个数,进程总数。 -
rank:
表示进程序号,用于进程间通讯,表征进程优先级。rank = 0 的主机为 master 节点。 -
local_rank:
进程内,GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU。
数据并行
import torch
net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
nn.DataParallel使用起来更加简单(通常只要封装模型然后跑训练代码就ok了)。
- 模型的权重都是在 一个进程上先算出来 然后再把他们分发到每个GPU上,所以网络通信就成为了一个瓶颈,而GPU使用率也通常很低。
- 需要所有的GPU都在一个节点(一台机器)上,且并不支持 [Apex] 的 [混合精度训练]
- 容易出现GPU负载不均衡,一个GPU占满,其他占用很少。
模型并行
nn.DistributedDataParallel 进行Multiprocessing可以在多个gpu之间复制该模型,每个gpu由一个进程控制。这些GPU可以位于同一个节点上,也可以分布在多个节点上。每个进程都执行相同的任务,并且每个进程与所有其他进程通信。
DistributedDataParallel
torch.nn.parallel.DistributedDataParallel 与 apex.parallel.DistributedDataParallel 区别在于初始化方式不一样,基本可以等价替换。
from apex.parallel import DistributedDataParallel as DDP
model = DDP(model)
from torch.nn.parallel import DistributedDataParallel as DDP
model = DDP(model,device_ids=[local_rank],output_device=local_rank)
带fp16的混合精度、多gpu训练最小实例
import os
from datetime import datetime
import argparse
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
# from apex.parallel import DistributedDataParallel as DDP
from torch.nn.parallel import DistributedDataParallel as DDP
from apex import amp
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N',
help='number of data loading workers (default: 4)')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=10, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('--local_rank', default=-1, type=int,
help='node rank for distributed training') # 注意此参数必须加上
args = parser.parse_args()
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '8889' # 多个程序同一机器调试时需要改变port,一个程序时,这两行可以删除
dist.init_process_group(backend='nccl') # 初始化
torch.cuda.set_device(args.local_rank)
train(args)
class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7*7*32, num_classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
def train(args):
gpu = args.local_rank # 可以用来控制模型保存,打印等
torch.manual_seed(0)
model = ConvNet()
model.cuda()
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Wrap the model
model, optimizer = amp.initialize(model, optimizer, opt_level='O1')
print('Error args.local_rank:',args.local_rank)
# model = DDP(model) # apex 两种方式都可以,注意对应关系
model = DDP(model,device_ids=[args.local_rank],output_device=args.local_rank) # torch
# Data loading code
train_dataset = torchvision.datasets.MNIST(
root='./data',
train=True,
transform=transforms.ToTensor(),
download=True
)
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset) # 数据分割,加速训练
train_loader = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=batch_size,
shuffle=False, # 并行时需要关闭此项
num_workers=0,
pin_memory=True,
sampler=train_sampler
)
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
if (i + 1) % 100 == 0 and gpu == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
epoch + 1,
args.epochs,
i + 1,
total_step,
loss.item())
)
if gpu == 0:
print("Training complete in: " + str(datetime.now() - start))
if __name__ == '__main__':
main()
训练命令
python -m torch.distributed.launch --nproc_per_node=2 distributed_train.py
并行训练的模型加载
并行训练保存的模型与单卡训练的模型有些不同,会导致加载模型出问题
def _Single2Parallel(self, origin_state):
"""
将串行的权值参数转换为并行的权值参数
:param origin_state : 原始串行权值参数
:return : 并行的权值参数
"""
converted = OrderedDict()
for k, v in origin_state.items():
name = "module." + k
converted[name] = v
return converted
def _Parallel2Single(self, origin_state):
"""
将并行的权值参数转换为串行的权值参数
:param origin_state : 原始串行权值参数
:return : 并行的权值参数
"""
converted = OrderedDict()
for k, v in origin_state.items():
name = k[7:]
converted[name] = v
return converted