Pytorch单机多卡GPU的实现(原理概述、基本框架、常见报错)

Pytorch单机多卡GPU的实现(原理概述、基本框架、常见报错),第1张

Pytorch单机多卡GPU的实现(原理概述、基本框架、常见报错) Pytorch单机多卡GPU的实现(原理概述、基本框架、常见报错)

  通常在预训练等需要大规模训练任务时,单卡很难满足需求。因此需要使用集群等资源以满足加速的需求。通常在完成一些个人任务时,可以申请单机4~8张GPU卡(目前较为好的配置是V100,32G),而在工业级别可能会涉及到多机多卡。本文主要对基于pytorch的单机多卡并行的使用进行描述。

Pytorch提供了性能更好的DistributedDataParallel(DDP)模块,相比DataParallel更加稳定高效。因此本文主要基于DistributedDataParallel完成并行。


一、单机多卡原理概述


  并行有两种模式,分别是模型并行和数据并行,如上图所示。

  基于动态图的pytorch的GPU并行主要为“数据并行”,即同一时刻各个设备上保存的模型(参数)完全相同,而各个设备上喂入模型的数据不同。在所有设备上完成一次前向传播计算后,分别得到相应的损失loss,DistributedDataParallel(DDP)将会自动地将所有设备上的梯度统一到某一台设备上(例如0号设备)实现参数更新(通常会将所有设备的loss自动取均值进行更新),然后同步到所有设备的模型上。

  因此理论上,每一个batch之后,各个设备上的模型相同,各个设备上由于数据的不同,得到的梯度和损失也不同,但都由DDP自动完成。

二、基本框架

  我们以最经典的MNIST手写数字识别作为例子。首先定义卷积函数类,该类代码将作为后续模型的使用:

#定义卷积神经网络
class ConvNet(nn.Module):
    def __init__(self):
        super().__init__()
        # batch*1*28*28(每次会送入batch个样本,输入通道数1(黑白图像),图像分辨率是28x28)
        # 下面的卷积层Conv2d的第一个参数指输入通道数,第二个参数指输出通道数,第三个参数指卷积核的大小
        self.conv1 = nn.Conv2d(1, 10, 5) # 输入通道数1,输出通道数10,核的大小5
        self.conv2 = nn.Conv2d(10, 20, 3) # 输入通道数10,输出通道数20,核的大小3
        # 下面的全连接层Linear的第一个参数指输入通道数,第二个参数指输出通道数
        self.fc1 = nn.Linear(20*10*10, 500) # 输入通道数是2000,输出通道数是500
        self.fc2 = nn.Linear(500, 10) # 输入通道数是500,输出通道数是10,即10分类
    def forward(self,x):
        in_size = x.size(0) # 在本例中in_size=512,也就是BATCH_SIZE的值。输入的x可以看成是512*1*28*28的张量。
        out = self.conv1(x) # batch*1*28*28 -> batch*10*24*24(28x28的图像经过一次核为5x5的卷积,输出变为24x24)
        out = F.relu(out) # batch*10*24*24(激活函数ReLU不改变形状))
        out = F.max_pool2d(out, 2, 2) # batch*10*24*24 -> batch*10*12*12(2*2的池化层会减半)
        out = self.conv2(out) # batch*10*12*12 -> batch*20*10*10(再卷积一次,核的大小是3)
        out = F.relu(out) # batch*20*10*10
        out = out.view(in_size, -1) # batch*20*10*10 -> batch*2000(out的第二维是-1,说明是自动推算,本例中第二维是20*10*10)
        out = self.fc1(out) # batch*2000 -> batch*500
        out = F.relu(out) # batch*500
        out = self.fc2(out) # batch*500 -> batch*10
        out = F.log_softmax(out, dim=1) # 计算log(softmax(x))
        return out

  首先先看单机单卡训练任务代码:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
torch.__version__
import ConvNet # 自定义的卷积网络模型

BATCH_SIZE=512 #大概需要2G的显存
EPOCHS=20 # 总共训练批次
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 让torch判断是否使用GPU,建议使用GPU环境,因为会快很多

#下载训练集
train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('data', train=True, download=True, 
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=BATCH_SIZE, shuffle=True)

#下载测试集
test_loader = torch.utils.data.DataLoader(
        datasets.MNIST('data', train=False, transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=BATCH_SIZE, shuffle=True)
#训练
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if(batch_idx+1)%30 == 0: 
            print('Train Epoch: {} [{}/{} ({:.0f}%)]tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

#测试
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item() # 将一批的损失相加
            pred = output.max(1, keepdim=True)[1] # 找到概率最大的下标
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    print('nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

if __name__ == '__main__':
    model = ConvNet().to(DEVICE)
    optimizer = optim.Adam(model.parameters())
    for epoch in range(1, EPOCHS + 1):
        train(model, DEVICE, train_loader, optimizer, epoch)
        test(model, DEVICE, test_loader)
    #保存训练完成后的模型
    torch.save(model, './MNIST.pth')

  在单机单卡代码中,通常只需要定义好Datasets以及DataLoader即可。而在多卡并行任务中,我们刚才说到GPU并行本质是数据并行,因此主要改动的则是Datasets以及DataLoader这两个部分。涉及到的内容如下:

获得所有GPU的数量:torch.cuda.device_count(),获得当前可用GPU的数量(进程数):dist.get_world_size()local_rank:需要显式定义ArgumentParser,并添加参数:parser.add_argument('--local_rank', default=-1, type=int)

local_rank参数需要显式定义,但用户在执行脚本时无需指定其取值。该参数则由系统根据可用的所有GPU卡(设备)自动进行编号,因此local_rank作为设备编号来表示。通常默认local_rank=0为基本卡,比如当模型在处理一些数据并保存磁盘时,或者模型在进行推理时,只需要在local_rank=0的卡上完成即可。
当在训练时,单卡一般直接进行tensor.cuda()实现将tensor转移到GPU上。而多卡训练时,则为tensor.cuda(args.local_rank)。

batch_size的指定:单机的batch_size则表示一次性喂入模型的样本数量,而DDP中batch_size为每张卡上喂入数据的数量。

通常开源项目中对batch_size可命名为“batch_size_per_gpu”。当然也可以使用BATCH_SIZE = bz // dist.get_world_size()的方法计算每个ka的batch。但需要注意每张卡的GPU显存大小。例如在完成BERT-large预训练时,如果只有16G的卡,每张卡的batch_size只能设置8。

DistributedSampler:用于分布式训练时的数据采样。其通常保证每张卡上均同时但不会重复的采样数据,默认情况下为随机采样;DataLoader:不同于单机单卡,其需要指定sampler=DistributedSampler作为采样器,同时需要显式设置shuffle=False。dist.barrier():用于同步,即确保所有设备上的程序均执行到该位置时,再继续执行。

  改为多GPU版本代码如下:

import argparse
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch import distributed as dist
torch.__version__

print(torch.cuda.device_count())  # 打印gpu数量
dist.init_process_group(backend="nccl")  # 并行训练初始化,建议'nccl'模式
print('world_size', dist.get_world_size())  # 打印当前进程数

# 下面这个参数需要加上,torch内部调用多进程时,会使用该参数,对每个gpu进程而言,其local_rank都是不同的;
parser = argparse.ArgumentParser(description="Command")
parser.add_argument('--local_rank', default=-1, type=int)
args = parser.parse_args()
torch.cuda.set_device(args.local_rank)  # 设置gpu编号为local_rank;此句也可能看出local_rank的值是什么

BATCH_SIZE = 512 // dist.get_world_size()   # 大概需要2G的显存
EPOCHS = 20  # 总共训练批次
# DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 让torch判断是否使用GPU,建议使用GPU环境,因为会快很多


# 下载训练集
train_dataset = datasets.MNIST('data', train=True, download=True,
                               transform=transforms.Compose([
                                   transforms.ToTensor(),
                                   transforms.Normalize((0.1307,), (0.3081,))
                               ]))
# 下载测试集
test_dataset = datasets.MNIST('data', train=False, transform=transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
]))

train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE, shuffle=False,
    sampler=train_sampler)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE, shuffle=False)

# 训练
def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.cuda(args.local_rank), target.cuda(args.local_rank)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if (batch_idx + 1) % 30 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.item()))

# 测试
def test(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.cuda(args.local_rank), target.cuda(args.local_rank)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # 将一批的损失相加
            pred = output.max(1, keepdim=True)[1]  # 找到概率最大的下标
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    print('nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

if __name__ == '__main__':
    model = ConvNet().cuda(args.local_rank)
    model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)  # 设置多个gpu的BN同步
    model = torch.nn.parallel.DistributedDataParallel(model,
                                                      device_ids=[args.local_rank],
                                                      output_device=args.local_rank,
                                                      find_unused_parameters=False,
                                                      broadcast_buffers=False)

    optimizer = optim.Adam(model.parameters())

    for epoch in range(1, EPOCHS + 1):
        train_sampler.set_epoch(epoch)  # 这句莫忘,否则相当于没有shuffle数据
        train(model, train_loader, optimizer, epoch)
        if args.local_rank == 0:
            test(model, test_loader)
        dist.barrier()

    # 保存训练完成后的模型
    torch.save(model, './MNIST.pth')

  因此,单卡多卡框架代码可以总结为:

import torch
import argparse
from torch import distributed as dist

print(torch.cuda.device_count())  # 打印gpu数量
dist.init_process_group(backend="nccl")  # 并行训练初始化,建议'nccl'模式
print('world_size', dist.get_world_size()) # 打印当前进程数

# 下面这个参数需要加上,torch内部调用多进程时,会使用该参数,对每个gpu进程而言,其local_rank都是不同的;
parser.add_argument('--local_rank', default=-1, type=int)  
args = parser.parse_args()
torch.cuda.set_device(args.local_rank)  # 设置gpu编号为local_rank;此句也可能看出local_rank的值是什么

def reduce_mean(tensor, nprocs):  # 用于平均所有gpu上的运行结果,比如loss
    rt = tensor.clone()
    dist.all_reduce(rt, op=dist.ReduceOp.SUM)
    rt /= nprocs
    return rt


'''
多卡训练加载数据:
# Dataset的设计上与单gpu一致,但是DataLoader上不一样。首先解释下原因:多gpu训练是,我们希望
# 同一时刻在每个gpu上的数据是不一样的,这样相当于batch size扩大了N倍,因此起到了加速训练的作用。
# 在DataLoader时,如何做到每个gpu上的数据是不一样的,且gpu1上训练过的数据如何确保接下来不被别
# 的gou再次训练。这时候就得需要DistributedSampler。
# dataloader设置方式如下,注意shuffle与sampler是冲突的,并行训练需要设置sampler,此时务必
# 要把shuffle设为False。但是这里shuffle=False并不意味着数据就不会乱序了,而是乱序的方式交给
# sampler来控制,实质上数据仍是乱序的。
'''
train_sampler = torch.utils.data.distributed.DistributedSampler(My_Dataset)
dataloader = torch.utils.data.DataLoader(ds,
                                         batch_size=batch_size,
                                         shuffle=False,
                                         num_workers=16,
                                         pin_memory=True,
                                         drop_last=True,
                                         sampler=self.train_sampler)



'''
多卡训练的模型设置:
# 最主要的是find_unused_parameters和broadcast_buffers参数;
# find_unused_parameters:如果模型的输出有不需要进行反传的,设置此参数为True;如果你的代码运行
# 后卡住某个地方不动,基本上就是该参数的问题。
# broadcast_buffers:设置为True时,在模型执行forward之前,gpu0会把buffer中的参数值全部覆盖
# 到别的gpu上。注意这和同步BN并不一样,同步BN应该使用上面那句代码。
'''
My_model = My_model.cuda(args.local_rank)  # 将模型拷贝到每个gpu上
My_model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(My_model) # 设置多个gpu的BN同步
My_model = torch.nn.parallel.DistributedDataParallel(My_model, 
                                                     device_ids=[args.local_rank], 
                                                     output_device=args.local_rank, 
                                                     find_unused_parameters=False, 
                                                     broadcast_buffers=False)

'''开始多卡训练:'''
for epoch in range(200):
    train_sampler.set_epoch(epoch)  # 这句莫忘,否则相当于没有shuffle数据
    My_model.train()
    for idx, sample in enumerate(dataloader):
        inputs, targets = sample[0].cuda(local_rank, non_blocking=True), sample[1].cuda(local_rank, non_blocking=True)
        opt.zero_grad()
        output = My_model(inputs)
        loss = My_loss(output, targets)  # 
        loss.backward()
        opt.step()
        loss = reduce_mean(loss, dist.get_world_size())  # 多gpu的loss进行平均。


'''多卡测试(evaluation):'''
if local_rank == 0:
    My_model.eval()
    with torch.no_grad():
        acc = My_eval(My_model)
    torch.save(My_model.module.state_dict(), model_save_path)
dist.barrier() # 这一句作用是:所有进程(gpu)上的代码都执行到这,才会执行该句下面的代码

程序的执行为:

代码运行:

python3 -m torch.distributed.launch --nproc_per_node=2 main.py

设置可见的设备:

CUDA_VISIBLE_DEVICES=0,1 python3 -m torch.distributed.launch --nproc_per_node=2 main.py

结合nohup:

CUDA_VISIBLE_DEVICES=0,1 nohup python3 -m torch.distributed.launch --nproc_per_node=2 main.py >> log.log &
三、常见错误

  简单列举了几个基于DDP的常见错误,该部分将持续更新。

1、Expected to have finished reduction in the prior iteration before starting a new one.

RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by (1) passing the key …

参考:https://blog.csdn.net/qxqxqzzz/article/details/116076355

出错原因:

通常是使用多GPU时,程序或模型存在BUG,需要仔细debug;使用多任务训练时,必要的模块loss没有作梯度更新;此时可以在torch.nn.parallel.DistributedDataParallel()添加参数find_unused_parameters=True。 2、RuntimeError: Address already in use

参考:https://blog.csdn.net/j___t/article/details/107774289

出错原因:

某一个端口已经存在DDP程序了。

解决方案:

可以开启新的端口,添加参数–master_port。例如:

python3 -m torch.distributed.launch --nproc_per_node=2 --master_port 8848 main.py
3、多GPU显存和内存泄漏问题

解决方案:

tensor.cuda(non_blocking=True) 将张量存入cuda并不取出;喂入torch.utils.data.dataset和dataloader时,不要先转换为tensor:

例如:

feature_dict = {
    'input_ids': [f.input_ids for f in features],
    'attention_masks': [f.attention_mask for f in features],
    'token_type_ids': [f.token_type_ids for f in features],
}
Dataset(feature_dict)

class DictDataset(Dataset):
    """A dataset of tensors that uses a dictionary for key-value mappings"""
    def __init__(self, **tensors):
        # tensors.values()
        # assert all(next(iter(tensors.values())).size(0) == tensor.size(0) for tensor in tensors.values())
        self.tensors = tensors
    def __getitem__(self, index):
        bool_list = ['is_mlms']
        return {key: torch.tensor(tensor[index], dtype=torch.bool) if key in bool_list else torch.tensor(tensor[index], dtype=torch.long) for key, tensor in self.tensors.items()}
    def __len__(self):
        return len(next(iter(self.tensors.values())))

也可以在Dataloader中指定自定义的batch处理函数collate_fn内实现tensor转换。

显式使用GC机制:GC是python的垃圾回收机制模块,通常python自动实现对无用的变量自动回收,但难免可能在多GPU场景下失效,因此可以显式调用:

import gc
gc.collect()

清理GPU缓存:可使用torch.cuda.empty_cache()清理存在于GPU上的无用变量。


参考文献:

【1】pytorch 并行训练之DistributedDataParallel(代码样例和解释)
【2】https://blog.csdn.net/qxqxqzzz/article/details/116076355
【3】https://blog.csdn.net/j___t/article/details/107774289
【4】pytorch多gpu并行训练

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5712116.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存