使用大型數據集訓練大型深度神經網絡 (DNN) 的問題是深度學習領域的主要挑戰。 隨著 DNN 和數據集規模的增加,訓練這些模型的計算和內存需求也會增加。 這使得在計算資源有限的單臺機器上訓練這些模型變得困難甚至不可能。 使用大型數據集訓練大型 DNN 的一些主要挑戰包括:
- 訓練時間長:訓練過程可能需要數周甚至數月才能完成,具體取決于模型的復雜性和數據集的大小。
- 內存限制:大型 DNN 可能需要大量內存來存儲訓練期間的所有模型參數、梯度和中間激活。 這可能會導致內存不足錯誤并限制可在單臺機器上訓練的模型的大小。
為了應對這些挑戰,已經開發了各種技術來擴大具有大型數據集的大型 DNN 的訓練,包括模型并行性、數據并行性和混合并行性,以及硬件、軟件和算法的優化。
在本文中我們將演示使用 PyTorch 的數據并行性和模型并行性。
我們所說的并行性一般是指在多個gpu,或多臺機器上訓練深度神經網絡(dnn),以實現更少的訓練時間。數據并行背后的基本思想是將訓練數據分成更小的塊,讓每個GPU或機器處理一個單獨的數據塊。然后將每個節點的結果組合起來,用于更新模型參數。在數據并行中,模型體系結構在每個節點上是相同的,但模型參數在節點之間進行了分區。每個節點使用分配的數據塊訓練自己的本地模型,在每次訓練迭代結束時,模型參數在所有節點之間同步。這個過程不斷重復,直到模型收斂到一個令人滿意的結果。
下面我們用用Re.NET50和CIFAR10數據集來進行完整的代碼示例:
在數據并行中,模型架構在每個節點上保持相同,但模型參數在節點之間進行了分區,每個節點使用分配的數據塊訓練自己的本地模型。
PyTorch的DistributedDataParallel 庫可以進行跨節點的梯度和模型參數的高效通信和同步,實現分布式訓練。本文提供了如何使用ResNet50和CIFAR10數據集使用PyTorch實現數據并行的示例,其中代碼在多個gpu或機器上運行,每臺機器處理訓練數據的一個子集。訓練過程使用PyTorch的DistributedDataParallel 庫進行并行化。
導入必須要的庫
import os
from datetime import datetime
from time import time
import argparse
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
接下來,我們將檢查GPU。
import subprocess
result = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
print(result.stdout.decode())
因為我們需要在多個服務器上運行,所以手動一個一個執行并不現實,所以需要有一個調度程序。這里我們使用SLURM文件來運行代碼(slurm面向linux和Unix類似內核的免費和開源工作調度程序),
def main():
# get distributed configuration from Slurm environment
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--batch-size', default=128, type =int,
help='batch size. it will be divided in mini-batch for each worker')
parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('-c','--checkpoint', default=None, type=str,
help='path to checkpoint to load')
args = parser.parse_args()
rank = int(os.environ['SLURM_PROCID'])
local_rank = int(os.environ['SLURM_LOCALID'])
size = int(os.environ['SLURM_NTASKS'])
master_addr = os.environ["SLURM_SRUN_COMM_HOST"]
port = "29500"
node_id = os.environ['SLURM_NODEID']
ddp_arg = [rank, local_rank, size, master_addr, port, node_id]
train(args, ddp_arg)
然后,我們使用DistributedDataParallel 庫來執行分布式訓練。
def train(args, ddp_arg):
rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg
# display info
if rank == 0:
#print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
#print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
# configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
#dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
dist.init_process_group(
backend='nccl',
init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
world_size=size,
rank=rank
)
# distribute model
torch.cuda.set_device(local_rank)
gpu = torch.device("cuda")
#model = ResNet18(classes=10).to(gpu)
model = torchvision.models.resnet50(pretrained=False).to(gpu)
ddp_model = DistributedDataParallel(model, device_ids=[local_rank])
if args.checkpoint is not None:
map_location = {'cuda:%d' % 0: 'cuda:%d' % local_rank}
ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
# distribute batch size (mini-batch)
batch_size = args.batch_size
batch_size_per_gpu = batch_size // size
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# load data with distributed sampler
#train_dataset = torchvision.datasets.CIFAR10(root='./data',
# train=True,
# transform=transform_train,
# download=False)
# load data with distributed sampler
train_dataset = torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
num_replicas=size,
rank=rank)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
# training (timers and display handled by process 0)
if rank == 0: start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
if rank == 0: start_dataload = time()
for i, (images, labels) in enumerate(train_loader):
# distribution of images and labels to all GPUs
images = images.to(gpu, non_blocking=True)
labels = labels.to(gpu, non_blocking=True)
if rank == 0: stop_dataload = time()
if rank == 0: start_training = time()
# forward pass
outputs = ddp_model(images)
loss = criterion(outputs, labels)
# backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if rank == 0: stop_training = time()
if (i + 1) % 10 == 0 and rank == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
(stop_training - start_training)*1000))
if rank == 0: start_dataload = time()
#Save checkpoint at every end of epoch
if rank == 0:
torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
if rank == 0:
print(">>> Training complete in: " + str(datetime.now() - start))
if __name__ == '__main__':
main()
代碼將數據和模型分割到多個gpu上,并以分布式的方式更新模型。下面是代碼的一些解釋:
train(args, ddp_arg)有兩個參數,args和ddp_arg,其中args是傳遞給腳本的命令行參數,ddp_arg包含分布式訓練相關參數。
rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式訓練相關參數。
如果rank為0,則打印當前使用的gpu數量和主節點IP地址信息。
dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式進程組。
torch.cuda.set_device(local_rank):為這個進程選擇指定的GPU。
model = torchvision.models. ResNet50 (pretrained=False).to(gpu):從torchvision模型中加載ResNet50模型,并將其移動到指定的gpu。
ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):將模型包裝在DistributedDataParallel模塊中,也就是說這樣我們就可以進行分布式訓練了
加載CIFAR-10數據集并應用數據增強轉換。
train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):創建一個DistributedSampler對象,將數據集分割到多個gpu上。
train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):創建一個DataLoader對象,數據將批量加載到模型中,這與我們平常訓練的步驟是一致的只不過是增加了一個分布式的數據采樣DistributedSampler。
為指定的epoch數訓練模型,以分布式的方式使用optimizer.step()更新權重。
rank0在每個輪次結束時保存一個檢查點。
rank0每10個批次顯示損失和訓練時間。
結束訓練時打印訓練模型所花費的總時間也是在rank0上。
代碼測試
在使用1個節點1/2/3/4個gpu, 2個節點6/8個gpu,每個節點3/4個gpu上進行了訓練Cifar10上的Resnet50的測試如下圖所示,每次測試的批處理大小保持不變。完成每項測試所花費的時間以秒為單位記錄。隨著使用的gpu數量的增加,完成測試所需的時間會減少。當使用8個gpu時,需要320秒才能完成,這是記錄中最快的時間。這是肯定的,但是我們可以看到訓練的速度并沒有像GPU數量增長呈現線性的增長,這可能是因為Resnet50算是一個比較小的模型了,并不需要進行并行化訓練。
在多個gpu上使用數據并行可以顯著減少在給定數據集上訓練深度神經網絡(DNN)所需的時間。隨著gpu數量的增加,完成訓練過程所需的時間減少,這表明DNN可以更有效地并行訓練。
這種方法在處理大型數據集或復雜的DNN架構時特別有用。通過利用多個gpu,可以加快訓練過程,實現更快的模型迭代和實驗。但是需要注意的是,通過Data Parallelism實現的性能提升可能會受到通信開銷和GPU內存限制等因素的限制,需要仔細調優才能獲得最佳結果。
責任編輯:華軒來源: DeepHub IMBA