当前位置:网站首页>Pytorch uses multi GPU parallel training and its principle and precautions

Pytorch uses multi GPU parallel training and its principle and precautions

2022-06-26 06:16:00 Le0v1n

1. More common GPU Usage method

 Insert picture description here

  • Model parallel (model parallel) -> Big networks ( No improvement in speed )
    When the model needs a large graphics card , a sheet GPU When your video memory doesn't fit , In this way, a large network can be trained
  • Data parallelism (data parallel)-> Speed up your training
    You can put the whole model on one sheet GPU when , We can put every model in every GPU On , Let them train at the same time ( Positive communication + Back propagation )

2. Training speed and GPU The relationship between numbers

Performance measurement : Data sources Thunderbolt Barra WZ

  • PyTorch 1.7
  • CUDA:10.1
  • Model:ResNet-34
  • Dataset:flower_photos( A very small data set )
  • BatchSize:16
  • Optimizer:SGD
  • GPU:Tesla V100( The previous generation of Kahuang )

 Insert picture description here

As can be seen from the picture above , Training speed and GPU Quantity is not a simple multiplication relation . With GPU An increase in quantity , The acceleration effect is getting worse , At this time, because of the difference GPU We need to communicate with each other , There will be performance loss .

3. a key

  1. How datasets are allocated between different devices
  2. How error gradients communicate between different devices
  3. Batch Normalization How to synchronize between different devices

3.1 Data set allocation

3.2 Error communication

Use more GPU During training , Each sheet GPU After a step Then there will be a gradient , We need to put all of GPU The gradient of is averaged ( Not every GPU Each learns his own , That would be meaningless ), Only in this way can each GPU Combined with your training results .

3.3 BN Sync

 Insert picture description here

hypothesis BS=2,feature1 and feature2 It is the characteristic graph of a layer in the network after convolution operation , because BS=2, So there are two feature. In the process of positive propagation ,BN Be able to find each of the characteristic matrices channel The mean and variance of , Then the data on each channel is “ Minus the mean divided by the standard deviation ” The operation of , So we can get through BN The characteristic diagram of .


 Insert picture description here

When we use more GPU During training , Every GPU Will calculate their respective mean and variance . Here we also assume that each GPU On the data BS=2, Then each BN The average value of the layer μ i \mu_i μi And variance σ i 2 \sigma^2_i σi2 Both are for two characteristic graphs .

We said before ,BS The bigger it is ,BN The closer the mean and variance are to all samples , The higher the accuracy .

So if we use more GPU Training , We should consider “ Should we ask everyone BN The mean and variance of the layer on all devices ”. In this way, the mean and variance obtained by us are more meaningful .

If we don't consider more GPU Between BN Parameter relationship of , So what we have got BN The mean and variance of the layer are for the two input samples (BS=2) To solve the .

What if we consider other equipment ?

GPU1 Of BN Layer to get two characteristic graphs 1 and 2,GPU2 Of BN Layer to get two characteristic graphs 3 and 4. If we ask BN Parameters are characteristic graphs 12+34, So our BN In disguise, it is equivalent to BS=4 The mean and variance obtained in the case of . This will be helpful to our final training results .

Thunderbolt Barra WZ say , If you do not use synchronized BN( The ordinary nn.BatchNorm), Then the results obtained and the use sheet GPU The results are basically consistent .
Of course , Use different BN More GPU It is also very helpful for the training speed of the model
If you use synchronized BN after , The final result is generally improved by nearly one point
All synchronized BN It does have a certain effect

If your GPU The video memory is very large , It was on a GPU It can be very big BS, Then use synchronized BN It won't make much difference .

Sync BN Mainly used in : The network is relatively large , a sheet GPU its BS Can't be set in very large cases , synchronous BN It is of great help to accuracy .

Be careful : Used with synchronization BN Methods , many GPU The parallel speed of will decrease . It may reduce 30% The speed of .

  • Want faster -> Do not use synchronized BN
  • Higher precision -> Use synchronous BN

4. PyTorch Implementation of multi card parallel computing

Divided into two :

  1. DataParallel
    PyTorch A plan given by the government a long time ago
  2. DistributedDataParallel
    A new generation of multi card training methods

DDP It is not limited to single machine with multiple cards , It is also applicable to multi-level and multi card scenarios .

  1. PyTorch About DP Documents
  2. [PyTorch About DDP Documents ](DistributedDataParallel — PyTorch 1.11.0 documentation)

 Insert picture description here

  • DP It's a single process 、 Multithreaded and can only work on a single device ( A single node , Not applicable to the case of multiple machines )
  • DDP It's a multi process , It can work in single machine or multi machine scenarios
  • DP Usually slower than DDP( Even on a single device )

The single machine and multi machine mentioned here are not single machines GPU And many GPU, It means single server and multiple servers


DP and DDP Can be used in the case of a single machine , single DDP It can be used in the case of multiple computers , And even in a single machine ( A machine has more than one GPU),DDP Is faster than DP Fast .

[PyTorch Training tutorial on single machine multi card and multi-level multi card ](PyTorch Distributed Overview — PyTorch Tutorials 1.11.0+cu102 documentation)

5. PyTorch Use more in GPU Common starting methods for training

  1. torch.distributed.launch: Less code , It's fast to start

    python -m torch.distributed.launch
    
    # -m: run library module as a script
    # --help:  Can pass `torch.distributed.launch --help` This is a real way to see how to use 
    

    stay PyTorch Officially realized Faster R-CNN Source code , many GPU Training is using distributed.launch To start , Therefore, the following will be mainly based on distributed.launch To start up

  2. torch.multiprocessing: More code , But with better control and flexibility


matters needing attention :

  • In the use of torch.distributed.launch The way to train . Once the training starts , Manually force the termination of the training program (ctrl + c), There is a small probability that the process is not killed .
    At this time, the program will also occupy GPU Video memory and resources . So we need to put these processes kill -9 fall

5.1 Single card training script

5.1.1 main

"""  Single card training script  ——  Training ResNet34/101 """


import os
import math
import argparse

import torch
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
import torch.optim.lr_scheduler as lr_scheduler

from model import resnet34, resnet101
from my_dataset import MyDataSet
from utils import read_split_data
from multi_train_utils.train_eval_utils import train_one_epoch, evaluate


def main(args):
    #  Check the configuration of the machine ( Is there a GPU, No, GPU Then for CPU)
    device = torch.device(args.device if torch.cuda.is_available() else "cpu")

    print(args)  #  Print the incoming parameters 
    print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
    tb_writer = SummaryWriter()  #  establish Tensorborad object 
    if os.path.exists("./weights") is False:  #  Check whether the folder where the weight file is saved exists , If it does not exist, create the folder 
        os.makedirs("./weights")

    train_info, val_info, num_classes = read_split_data(args.data_path)
    train_images_path, train_images_label = train_info
    val_images_path, val_images_label = val_info

    # check num_classes
    assert args.num_classes == num_classes, "dataset num_classes: {}, input {}".format(args.num_classes,
                                                                                       num_classes)

    data_transform = {
    
        "train": transforms.Compose([transforms.RandomResizedCrop(224),
                                     transforms.RandomHorizontalFlip(),
                                     transforms.ToTensor(),
                                     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
        "val": transforms.Compose([transforms.Resize(256),
                                   transforms.CenterCrop(224),
                                   transforms.ToTensor(),
                                   transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}

    #  Instantiate training data sets 
    train_data_set = MyDataSet(images_path=train_images_path,
                               images_class=train_images_label,
                               transform=data_transform["train"])

    #  Instantiate validation data sets 
    val_data_set = MyDataSet(images_path=val_images_path,
                             images_class=val_images_label,
                             transform=data_transform["val"])

    batch_size = args.batch_size
    
    #  according to BS Quantity and training equipment CPU Core number num_worker Size 
    nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])  # number of workers
    print('Using {} dataloader workers every process'.format(nw))

    #  Read training data 
    train_loader = torch.utils.data.DataLoader(train_data_set,
                                               batch_size=batch_size,
                                               shuffle=True,
                                               pin_memory=True,
                                               num_workers=nw,
                                               collate_fn=train_data_set.collate_fn)

    #  Read validation data 
    val_loader = torch.utils.data.DataLoader(val_data_set,
                                             batch_size=batch_size,
                                             shuffle=False,
                                             pin_memory=True,
                                             num_workers=nw,
                                             collate_fn=val_data_set.collate_fn)

    #  Define the model object and add it to the equipment 
    model = resnet34(num_classes=args.num_classes).to(device)
    #  If there are pre training weights, load 
    if args.weights != "":
        if os.path.exists(args.weights):
            #  First use torch.load Load the weights in the specified file 
            weights_dict = torch.load(args.weights, map_location=device)

            #  Load only key and value Element equal key value pairs 
            load_weights_dict = {
    k: v for k, v in weights_dict.items()
                                 if model.state_dict()[k].numel() == v.numel()}
            # 1.  Model loading Dictionary ( Not strictly loaded );2.  Print 
            print(model.load_state_dict(load_weights_dict, strict=False))
        else:
            raise FileNotFoundError("not found weights file: {}".format(args.weights))

    #  Whether to freeze the weight 
    if args.freeze_layers:
        for name, para in model.named_parameters():  # name:  The name of the layer ; para:  The corresponding parameter 
            #  Except for the last full connection layer , All other weights are frozen 
            if "fc" not in name:  #  except fc Out of layer , The parameters of all layers have no gradient ( No back propagation , That is, no parameter update )
                para.requires_grad_(False)

    #  Pass parameters with gradients into pg This list in 
    pg = [p for p in model.parameters() if p.requires_grad]

    #  Define parameter optimizer , The first parameter is the parameter to be updated , That is, the previous line pg Results in the list 
    optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005)

    #  Define the learning rate change function , Reference resources :Scheduler https://arxiv.org/pdf/1812.01187.pdf ->  It's actually a cosine function [0, pi]
    lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf  # cosine

    # scheduler It is the object of adjusting the change of learning rate , The regular change of learning rate can be realized by passing the optimizer and learning rate change curve to it 
    scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)

    #  Start iterative training 
    for epoch in range(args.epochs):
        # train
        """  You can see , there train The stage just returns the average loss, There is no predictive probability  """
        mean_loss = train_one_epoch(model=model,
                                    optimizer=optimizer,
                                    data_loader=train_loader,
                                    device=device,
                                    epoch=epoch)

        # epoch A , The learning rate changer needs to be updated ( bring optimizer The learning rate in )
        scheduler.step()

        # validate
        sum_num = evaluate(model=model,
                           data_loader=val_loader,
                           device=device)
        acc = sum_num / len(val_data_set)  # top-1 Accuracy rate  =  Predict the right number  /  Number of validation samples 
        
        #  Print this epoch Accuracy under 
        print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3)))

        #  take train and validation The resulting data is added to tensorboard in 
        # optimizer.param_groups[0]["lr"] That is, corresponding to epoch Learning rate of 
        tags = ["loss", "accuracy", "learning_rate"]
        tb_writer.add_scalar(tags[0], mean_loss, epoch)
        tb_writer.add_scalar(tags[1], acc, epoch)
        tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)

        #  Save the corresponding epoch Model of ( The best model is not selected here , Can pass acc Only the weight files with high accuracy are retained )
        torch.save(model.state_dict(), "./weights/model-{}.pth".format(epoch))


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--num_classes', type=int, default=5)
    parser.add_argument('--epochs', type=int, default=30)
    parser.add_argument('--batch-size', type=int, default=16)
    parser.add_argument('--lr', type=float, default=0.001)
    # lrf Is the magnification factor , That is, the learning rate eventually decreases to the initial learning rate lr How many times .
    #  The final learning rate is lr * lrf
    parser.add_argument('--lrf', type=float, default=0.1)

    #  The root directory where the dataset is located 
    # http://download.tensorflow.org/example_images/flower_photos.tgz
    parser.add_argument('--data-path', type=str,
                        default="/home/w180662/my_project/my_github/data_set/flower_data/flower_photos")

    # resnet34  Official weight download address 
    # https://download.pytorch.org/models/resnet34-333f7ec4.pth
    parser.add_argument('--weights', type=str, default='resNet34.pth',
                        help='initial weights path')  #  by "" Indicates that the pre training model is not used 
    parser.add_argument('--freeze-layers', type=bool, default=False)
    parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')

    opt = parser.parse_args()

    main(opt)

5.1.2 train_eval_utils

import sys

from tqdm import tqdm
import torch

from multi_train_utils.distributed_utils import reduce_value, is_main_process


def train_one_epoch(model, optimizer, data_loader, device, epoch):
    model.train()  #  Declare the state of the model 
    loss_function = torch.nn.CrossEntropyLoss()  #  Define the loss function 
    mean_loss = torch.zeros(1).to(device)  #  Generate shape by [1, ] All zero matrix of , And add it to the corresponding device  ->  Used to store the results of subsequent calculations mean_loss

    #  Clear the gradient residue in the optimizer 
    optimizer.zero_grad()

    #  In the process 0 Print training progress in 
    if is_main_process():  #  Determine whether the master process 
        #  Use tqdm Pack it in the library data_loader This variable , After a while, I was traversing data_loader Will print a progress bar 
        data_loader = tqdm(data_loader, file=sys.stdout)
    
    #  iteration data_loader, obtain step And the corresponding data 
    """  stay data_loader It's defined in BS Size ,  So here step Namely $step =  Number of all pictures  / BS$ step That is to say BS The number of ,  Every time I walk step That is to say, the BS A picture   The first 1 individual step Corresponding to the first 1 individual bs  The first 2 individual step Corresponding to the first 2 individual bs  The first 3 individual step Corresponding to the first 3 individual bs ... """
    for step, data in enumerate(data_loader):
        images, labels = data  # data It contains two parts of data :1.  Preprocessed picture ;2.  The corresponding real label 

        #  Get the prediction result of the picture through forward propagation 
        pred = model(images.to(device))  

        #  Calculate according to the predicted results and the real label loss
        loss = loss_function(pred, labels.to(device))

        #  Yes loss Back propagation 
        loss.backward()

        #  Yes loss Perform averaging 
        """ reduce_value: def reduce_value(value, average=True): world_size = get_world_size() if world_size < 2: #  single GPU The situation of  return value #  The original value returns  with torch.no_grad(): dist.all_reduce(value) if average: value /= world_size #  It needs to be divided by GPU Quantity before returning  return value """
        loss = reduce_value(loss, average=True)

        #  Average historical losses 
        mean_loss = (mean_loss * step + loss.detach()) / (step + 1)  # update mean losses

        #  In the process 0 Print average in loss
        if is_main_process():  #  Determine whether the master process 
            data_loader.desc = "[epoch {}] mean loss {}".format(epoch, round(mean_loss.item(), 3))

        #  Judge loss Limited data ( It can't be infty)
        if not torch.isfinite(loss):
            print('WARNING: non-finite loss, ending training ', loss)
            sys.exit(1)  #  If loss Is infinite , Then quit training 

        optimizer.step()  #  The parameter optimizer updates the parameters 
        optimizer.zero_grad()  #  After the parameter optimizer updates the parameters , The gradient needs to be emptied 

    #  Wait for all processes to complete the calculation 
    if device != torch.device("cpu"):
        torch.cuda.synchronize(device)  #  wait for CUDA All cores in all streams on the device are complete .
    
    #  Returns the calculated average loss
    return mean_loss.item()


"""  It's used here @torch.no_grad() This decorator modifies the method , You can also use  with torch.no_grad:  This context manager  """
@torch.no_grad()
def evaluate(model, data_loader, device):
    model.eval()  #  Declare model state  -> 1.  close BN; 2. Dropout

    #  Used to store the number of correctly predicted samples 
    sum_num = torch.zeros(1).to(device)

    #  In the process 0 Print verification progress in 
    if is_main_process():
        data_loader = tqdm(data_loader, file=sys.stdout)  #  Wrap in the main process dataloader

    for step, data in enumerate(data_loader):
        images, labels = data
        pred = model(images.to(device))  #  Get the prediction probability 
        pred = torch.max(pred, dim=1)[1]  #  Get the best prediction probability max
        """ torch.eq(tensor, tensor/value)  For two tensors Tensor Make an element by element comparison , If two elements in the same position are the same , Then return to True; If different , return False. """
        sum_num += torch.eq(pred, labels.to(device)).sum()  #  Calculate all predicted correct quantities 

    #  Wait for all processes to complete the calculation 
    if device != torch.device("cpu"):
        torch.cuda.synchronize(device)

    sum_num = reduce_value(sum_num, average=False)  #  Count all predicted correct quantities 

    return sum_num.item()

5.2 Distributed training

5.2.1 main

from cgi import test
import os
import math
import tempfile
import argparse

import torch
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms

from model import resnet34
from my_dataset import MyDataSet
from utils import read_split_data, plot_data_loader_image
from multi_train_utils.distributed_utils import init_distributed_mode, dist, cleanup
from multi_train_utils.train_eval_utils import train_one_epoch, evaluate


def main(args):
    if torch.cuda.is_available() is False:  #  No, GPU The device will directly report an error 
        raise EnvironmentError("not find GPU device for training.")

    #  Initialize each process environment  -> args There are several more parameters in the container :1. args.rank; 2. args.world_size; 3. args.gpu
    init_distributed_mode(args=args)

    #  take args In the new DDP Parameters are assigned to global variables 
    rank = args.rank
    device = torch.device(args.device)
    batch_size = args.batch_size
    weights_path = args.weights
    """  When we are using more GPU In parallel training , A gradient is usually a combination of multiple blocks GPU The gradient of is averaged .  On the original single card , Every one of us learns step, Gradient forward 1m( This is for ease of understanding ), If you study two , Then the gradient advances 2m  Suppose our GPU The number of 2. So we seem to have learned a step , But because of 2 block GPU, So it's two pieces that work together , So it's a one-off   Count two step, But when we update, we average the gradient , So this 2 individual step The value of is only updated sequentially , It means that the gradient is only going forward 1m  The learning rate has been reduced in disguise , So we need to expand the learning rate  """
    args.lr *= args.world_size  #  The learning rate should be based on parallel learning GPU Multiply the number of  ->  Here is a simple and crude way to increase the learning rate 

    """  Use DDP when , General write operations 、 The printing operation is put in the first process ( The main process ) Medium operated ( It is not necessary to perform the same operation in every process ) """
    if rank == 0:  #  Print information in the first process , And instantiate tensorboard( Print parameters only in the first process )
        print(args)
        print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
        tb_writer = SummaryWriter()
        if os.path.exists("./weights") is False:
            os.makedirs("./weights")

    train_info, val_info, num_classes = read_split_data(args.data_path)
    train_images_path, train_images_label = train_info
    val_images_path, val_images_label = val_info

    # check num_classes
    assert args.num_classes == num_classes, "dataset num_classes: {}, input {}".format(args.num_classes,
                                                                                       num_classes)

    data_transform = {
    
        "train": transforms.Compose([transforms.RandomResizedCrop(224),
                                     transforms.RandomHorizontalFlip(),
                                     transforms.ToTensor(),
                                     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
        "val": transforms.Compose([transforms.Resize(256),
                                   transforms.CenterCrop(224),
                                   transforms.ToTensor(),
                                   transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}

    #  Instantiate training data sets 
    train_data_set = MyDataSet(images_path=train_images_path,
                               images_class=train_images_label,
                               transform=data_transform["train"])

    #  Instantiate validation data sets 
    val_data_set = MyDataSet(images_path=val_images_path,
                             images_class=val_images_label,
                             transform=data_transform["val"])

    #  For each rank The corresponding process allocates the training sample index 
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
    val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)

    #  Index the sample every batch_size The elements make up a list
    # BatchSampler It's right train_sampler Do further processing 
    train_batch_sampler = torch.utils.data.BatchSampler(train_sampler, 
                                                        batch_size, 
                                                        drop_last=True)

    nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])  # number of workers
    if rank == 0:  #  Printing is also in the first process 
        print('Using {} dataloader workers every process'.format(nw))

    train_loader = torch.utils.data.DataLoader(train_data_set,  #  The incoming training set is still instantiated ( No DistributedSampler)
                                               batch_sampler=train_batch_sampler,  #  What's coming in here is BatchSampler, Not simply batch_size
                                               pin_memory=True,  #  Load data directly into GPU in , So as to achieve the effect of acceleration 
                                               num_workers=nw,  # num workers
                                               collate_fn=train_data_set.collate_fn)

    val_loader = torch.utils.data.DataLoader(val_data_set,  #  The incoming validation set is still instantiated ( No DistributedSampler)
                                             batch_size=batch_size,  #  What comes in here is still batch_size, instead of BatchSampler
                                             sampler=val_sampler,  #  Because we didn't use BatchSampler Yes DistributedSampler To deal with , So here 
                                                                   #  Direct in DistributedSampler, The verification set is randomly scrambled and evenly distributed to different devices 
                                                                   #  The point here is not random disruption , It is distributed evenly to different devices .
                                             pin_memory=True,
                                             num_workers=nw,
                                             collate_fn=val_data_set.collate_fn)
    
    #  Instantiate the model and assign it to the specified device .
    # Note: I was just defining args when ,device The default is cuda, And in initialization DDP The environment sometimes has  torch.cuda.set_device(args.gpu)
    #  The corresponding GPU Identify args.gpu, There is something device = torch.device(args.device)
    #  So it's directly used here device That's all right.  ->  Will help us automatically assign to the corresponding GPU On 
    model = resnet34(num_classes=num_classes).to(device)

    #  If there are pre training weights, load 
    if os.path.exists(weights_path):
        weights_dict = torch.load(weights_path, map_location=device)
        load_weights_dict = {
    k: v for k, v in weights_dict.items()
                             if model.state_dict()[k].numel() == v.numel()}
        model.load_state_dict(load_weights_dict, strict=False)
    
        """  If we use more GPU Training , We must ensure that the initial weights on each device are identical ! So we use more GPU Training   That's right ; If we initialize the weights differently , So the gradient obtained in the training process is not aimed at the same group   Parameters .  So what should we do ? The following statement is used here : checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt")  among ,tempfile.gettempdir() Used to return the path of the folder where the temporary files are saved . So we can get a path to save the temporary file : xxxxtempfolder/initial_weights.pt  Then we save the weight file of the main process model to this temporary file  if rank == 0: #  Save the initialization weight of the module in the main process  torch.save(model.state_dict(), checkpoint_path)  Finally, let all models read the weight ( Whether it is the main process or other processes  <=>  Whether it's generating pt Of documents GPU Or something else GPU): model.load_state_dict(torch.load(checkpoint_path, map_location=device)) """
    else:  #  If there is no weight 
        checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt")
        #  If there is no pre training weight , You need to save the weights in the first process , Then other processes load , Keep the initialization weight consistent 
        if rank == 0:  #  Save the initialization weight of the module in the main process 
            torch.save(model.state_dict(), checkpoint_path)

        dist.barrier()  #  Wait for all GPU To this point 

        #  Note here , Be sure to designate map_location Parameters , Otherwise, it will lead to the first piece GPU Take up more resources 
        # map_location For redirection , Reference resources :https://blog.csdn.net/qq_43219379/article/details/123675375
        model.load_state_dict(torch.load(checkpoint_path, map_location=device))

    #  Whether to freeze the weight 
    if args.freeze_layers:
        for name, para in model.named_parameters():
            #  Except for the last full connection layer , All other weights are frozen (FC There is no... In the layer BN Of , So using the synchronization function BN It doesn't make sense )
            if "fc" not in name:
                para.requires_grad_(False)
    else:  #  Do not freeze weights 
        #  Only training with BN When using a structured network SyncBatchNorm It makes sense ( No BN You don't have to )
        if args.syncBN:  #  Use... With synchronization BN
            #  Use SyncBatchNorm Post training will be more time-consuming 
            #  This will BN Replace with sync_BN It doesn't matter BN yes 2d still 3d Of 
            model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)

    #  To DDP Model : Use torch.nn.parallel.DistributedDataParallel Package our model , Then identify the corresponding equipment ID
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])

    # optimizer
    pg = [p for p in model.parameters() if p.requires_grad]
    optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005)
    # Scheduler https://arxiv.org/pdf/1812.01187.pdf
    lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf  # cosine
    scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)

    for epoch in range(args.epochs):
        """  We mentioned that earlier , Use DistributedSampler It will disrupt the data and distribute the data evenly to each device .  But the data owned by the device is unchanged . After setting this method , Every time DistributedSampler The order of disruption is different , This leads to   Each device has different data  ->  Make every device have access to all samples  """
        train_sampler.set_epoch(epoch)

        mean_loss = train_one_epoch(model=model,
                                    optimizer=optimizer,
                                    data_loader=train_loader,
                                    device=device,
                                    epoch=epoch)

        scheduler.step()

        sum_num = evaluate(model=model,
                           data_loader=val_loader,
                           device=device)

        #  there val_sampler.total_size Is the total number of samples for the entire validation set ( Including supplementary data )
        acc = sum_num / val_sampler.total_size

        if rank == 0:  #  Give... In the main process Tensorboard Add data 
            print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3)))
            tags = ["loss", "accuracy", "learning_rate"]
            tb_writer.add_scalar(tags[0], mean_loss, epoch)
            tb_writer.add_scalar(tags[1], acc, epoch)
            tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)

            #  Save the weight parameters in the main process 
            torch.save(model.module.state_dict(), "./weights/model-{}.pth".format(epoch))

    #  Delete temporary cache file 
    """  If you don't use pre training weights , So it's going to generate one temp file It is used to ensure that the initialization of all device models is consistent   So we're going to delete it  """
    if rank == 0:
        if os.path.exists(checkpoint_path) is True:
            os.remove(checkpoint_path)

    #  After training , We need to call  cleanup  This method destroys the process group  ->  Release resources 
    cleanup()


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--num_classes', type=int, default=5)
    parser.add_argument('--epochs', type=int, default=30)
    parser.add_argument('--batch-size', type=int, default=16)
    parser.add_argument('--lr', type=float, default=0.001)
    parser.add_argument('--lrf', type=float, default=0.1)
    #  Is it enabled? SyncBatchNorm
    parser.add_argument('--syncBN', type=bool, default=True)

    #  The root directory where the dataset is located 
    # http://download.tensorflow.org/example_images/flower_photos.tgz
    parser.add_argument('--data-path', type=str, default="/home/wz/data_set/flower_data/flower_photos")

    # resnet34  Official weight download address 
    # https://download.pytorch.org/models/resnet34-333f7ec4.pth
    parser.add_argument('--weights', type=str, default='resNet34.pth',
                        help='initial weights path')
    parser.add_argument('--freeze-layers', type=bool, default=False)

    #  Do not modify the following three parameters , The system will automatically assign 
    parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')
    #  Number of processes opened ( Note that not threads ), Do not set this parameter , Will be based on nproc_per_node Automatic setting 
    parser.add_argument('--world-size', default=4, type=int,
                        help='number of distributed processes')
    parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')
    opt = parser.parse_args()

    main(opt)

5.2.2 distributed_utils

import os

import torch
import torch.distributed as dist


def init_distributed_mode(args):
    """ DDP It can be used in : 1.  Multiple machines, multiple cards  2.  Single machine multi card  +  In the case of multiple machines and multiple cards ,WORLD_SIZE Corresponds to the number of processes used in all machines ( One process corresponds to one block GPU). + RANK Represents the number of processes in all processes  + LOCAL_RANK It corresponds to the processes in the current machine   Because we are talking about the single machine multi card scenario , So here : + WORLD_SIZE Just a few pieces GPU + RANK Which piece does it represent GPU + LOCAL_RANK and RANK It's the same   Because our script uses  torch.distributed.launch  This method , So we need to use  --use_env  This method (args.use_env==True) """
    if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:  #  We use this 
        args.rank = int(os.environ["RANK"])  #  The environment RANK Forced to int And then it's coming in args.rank Variable 
        args.world_size = int(os.environ['WORLD_SIZE'])
        args.gpu = int(os.environ['LOCAL_RANK'])
    elif 'SLURM_PROCID' in os.environ:
        args.rank = int(os.environ['SLURM_PROCID'])
        args.gpu = args.rank % torch.cuda.device_count()
    else:
        print('Not using distributed mode')
        args.distributed = False
        return

    args.distributed = True  #  Will distributed training be used flag Set to True

    #  Specifies the... Used by the current process GPU. When using a single machine with multiple cards , In fact, it is aimed at every GPU Started a process 
    torch.cuda.set_device(args.gpu)

    args.dist_backend = 'nccl'  #  Communication back end ,nvidia GPU Recommended NCCL

    #  Print the currently used GPU Of RANK And its distributed url Information 
    print('| distributed init (rank {}): {}'.format(args.rank, args.dist_url), flush=True)

    #  adopt  init_process_group Method to create a process group 
    dist.init_process_group(backend=args.dist_backend,   #  Communication back end (N Card is recommended nccl)
                            init_method=args.dist_url,  #  Initialization method ( Use the default method directly  -> "env://")
                            world_size=args.world_size,  #  For different processes ( A piece of GPU Assign a process ),WORLD_SIZE It's the same 
                            rank=args.rank)  #  But in a different process ,RANK It's different ( If there are two GPU parallel , The first one GPU Of RANK=0, The second GPU Of RANK=1)
    
    #  call barrier Method waits for all GPU They all run to this place and then go down  
    dist.barrier()


def cleanup():
    dist.destroy_process_group()


def is_dist_avail_and_initialized():
    """ Check whether the distributed environment is supported """
    if not dist.is_available():
        return False
    if not dist.is_initialized():
        return False
    return True


def get_world_size():
    if not is_dist_avail_and_initialized():
        return 1
    return dist.get_world_size()


def get_rank():
    if not is_dist_avail_and_initialized():
        return 0
    return dist.get_rank()


def is_main_process():
    return get_rank() == 0


def reduce_value(value, average=True):
    world_size = get_world_size()
    if world_size < 2:  #  single GPU The situation of 
        return value

    with torch.no_grad():  #  many GPU Under the circumstances 
        dist.all_reduce(value)  #  adopt  all_reduce  Methods for different equipment value Do the sum operation 
        #  adopt  all_reduce  After the operation ,value It becomes all the equipment value Sum of 
        if average:  #  If you want to average 
            value /= world_size  # world_size by GPU Number 

        return value

5.2.3 train_eval_utils

import sys

from tqdm import tqdm
import torch

from multi_train_utils.distributed_utils import reduce_value, is_main_process


def train_one_epoch(model, optimizer, data_loader, device, epoch):
    model.train()  #  Declare the state of the model 
    loss_function = torch.nn.CrossEntropyLoss()  #  Define the loss function 
    mean_loss = torch.zeros(1).to(device)  #  Generate shape by [1, ] All zero matrix of , And add it to the corresponding device  ->  Used to store the results of subsequent calculations mean_loss

    #  Clear the gradient residue in the optimizer 
    optimizer.zero_grad()

    #  In the process 0 Print training progress in 
    if is_main_process():  #  Determine whether the master process 
        #  Use tqdm Pack it in the library data_loader This variable , After a while, I was traversing data_loader Will print a progress bar 
        data_loader = tqdm(data_loader, file=sys.stdout)
    
    #  iteration data_loader, obtain step And the corresponding data 
    """  stay data_loader It's defined in BS Size ,  So here step Namely $step =  Number of all pictures  / BS$ step That is to say BS The number of ,  Every time I walk step That is to say, the BS A picture   The first 1 individual step Corresponding to the first 1 individual bs  The first 2 individual step Corresponding to the first 2 individual bs  The first 3 individual step Corresponding to the first 3 individual bs ... """
    for step, data in enumerate(data_loader):
        images, labels = data  # data It contains two parts of data :1.  Preprocessed picture ;2.  The corresponding real label 

        #  Get the prediction result of the picture through forward propagation 
        pred = model(images.to(device))  

        #  Calculate according to the predicted results and the real label loss
        loss = loss_function(pred, labels.to(device))

        #  Yes loss Back propagation 
        """  It's calculated here loss It's about the present GPU At present batch The loss obtained . But in the single machine multi card environment , We want to get all GPU The average loss of .  So we should find a way to find the difference between different devices loss The average of . This is through  reduce_value This method realizes  """
        loss.backward()

        #  Yes loss Perform averaging 
        """ reduce_value: def reduce_value(value, average=True): world_size = get_world_size() if world_size < 2: #  single GPU The situation of  return value #  The original value returns  with torch.no_grad(): dist.all_reduce(value) if average: value /= world_size #  It needs to be divided by GPU Quantity before returning  return value """
        loss = reduce_value(loss, average=True)  #  This line of code does not work in the single card environment 

        #  Average historical losses 
        mean_loss = (mean_loss * step + loss.detach()) / (step + 1)  # update mean losses

        #  In the process 0 Print average in loss
        if is_main_process():  #  Determine whether the master process 
            data_loader.desc = "[epoch {}] mean loss {}".format(epoch, round(mean_loss.item(), 3))

        #  Judge loss Limited data ( It can't be infty)
        if not torch.isfinite(loss):
            print('WARNING: non-finite loss, ending training ', loss)
            sys.exit(1)  #  If loss Is infinite , Then quit training 

        optimizer.step()  #  The parameter optimizer updates the parameters 
        optimizer.zero_grad()  #  After the parameter optimizer updates the parameters , The gradient needs to be emptied 

    #  Wait for all processes to complete the calculation 
    if device != torch.device("cpu"):
        torch.cuda.synchronize(device)  #  wait for CUDA All cores in all streams on the device are complete .
    
    #  Returns the calculated average loss
    return mean_loss.item()


"""  It's used here @torch.no_grad() This decorator modifies the method , You can also use  with torch.no_grad:  This context manager  """
@torch.no_grad()
def evaluate(model, data_loader, device):
    model.eval()  #  Declare model state  -> 1.  close BN; 2. Dropout

    #  Used to store the number of correctly predicted samples 
    sum_num = torch.zeros(1).to(device)

    #  In the process 0 Print verification progress in 
    if is_main_process():
        data_loader = tqdm(data_loader, file=sys.stdout)  #  Wrap in the main process dataloader

    for step, data in enumerate(data_loader):
        images, labels = data
        pred = model(images.to(device))  #  Get the prediction probability 
        pred = torch.max(pred, dim=1)[1]  #  Get the best prediction probability max
        """ torch.eq(tensor, tensor/value)  For two tensors Tensor Make an element by element comparison , If two elements in the same position are the same , Then return to True; If different , return False. """
        sum_num += torch.eq(pred, labels.to(device)).sum()  #  Calculate all predicted correct quantities 

    #  Wait for all processes to complete the calculation 
    if device != torch.device("cpu"):
        torch.cuda.synchronize(device)

    #  Count all predicted correct quantities 
    #  Use here  reduce_value  Method implements all GPU Of sum_num Sum of variables 
    sum_num = reduce_value(sum_num, average=False)

    return sum_num.item()
    

6. Supplementary knowledge

6.1 Cosine Learning rate curve

lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - arg.lrf) + args.lrf

l r ( e p o c h ) = 1 2 [ 1 + cos ⁡ π ⋅ e p o c h e p o c h s ] × ( 1 − α ) + α \mathrm{lr(epoch)} = \frac{1}{2}[1 + \cos\frac{\pi \cdot \mathrm{epoch}}{\mathrm{epochs}}] \times (1 - \alpha) + \alpha lr(epoch)=21[1+cosepochsπepoch]×(1α)+α

among :

  • l f \mathrm{lf} lf Is the current learning rate
  • e p o c h \mathrm{epoch} epoch For the current iteration epoch Count
  • e p o c h s \mathrm{epochs} epochs For the total epoch Count
  • α \alpha α Is the magnification factor

Its function curve is :

 Insert picture description here

6.2 DistributedSampler Explanation

#  Instantiate training data sets 
train_data_set = MyDataSet(images_path=train_images_path,
                           images_class=train_images_label,
                           transform=data_transform["train"])

#  Instantiate validation data sets 
val_data_set = MyDataSet(images_path=val_images_path,
                         images_class=val_images_label,
                         transform=data_transform["val"])

#  For each rank The corresponding process allocates the training sample index 
train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)

Right now torch.utils.data.distributed.DistributedSampler Explain .

 Insert picture description here

Assume that the current data set has 11 Samples -> [0, 1, 2, 3, 4, …, 10].

First DistributedSampler It's going to work on the data Shuffle Handle , obtain [6, 1, 9, 3, …, 7] Such a random data sequence . And then according to GPU The quantity of . Suppose we use 2 block GPU In parallel . that DistributedSampler:

  1. First, the total number of samples /2( Rounding up )=6.
  2. Then multiply the number obtained by GPU The number of , namely 6*2=12
  3. This 12 Namely 2 block GPU The total number of samples of data required . But our total number of samples is 11, Then it will be supplemented . How to add ?—— Copy the first data , And finally . Here we are short of 1 Data , All will be 6 Add to the end .( If we are bad 3 Data , will [6, 1, 9] replicate , Add later )

So we have 12 It's data , Can be evenly distributed to each GPU It's in the device .

Finally, the data is allocated . The distribution method is also very simple , It is to distribute data to different devices at intervals .


adopt DistributedSampler, We will divide all the data equally among the devices . The device can only use the assigned data , You cannot use data owned by other devices .

6.3 BatchSampler

#  For each rank The corresponding process allocates the training sample index 
train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)

# -----------------------------------------------------------

#  Index the sample every batch_size The elements make up a list
# BatchSampler It's right train_sampler Do further processing 
train_batch_sampler = torch.utils.data.BatchSampler(train_sampler, 
                                                    batch_size, 
                                                    drop_last=True)

 Insert picture description here

Suppose we have 2 block GPU, adopt DistributedSampler after , The data is evenly distributed between the two devices . Let's take the first piece GPU For example , The data index to which it is assigned is [6, 9, 10, 1, 8, 7]. Suppose our Batch Size=2, that BatchSampler The data of different devices will be 2 One in a group .


stay BatchSampler There's another parameter in drop_last. This parameter means :

If our BS=4, So what we find is that , Only the front 4 Data can be packaged into a group , after 2 Not enough data :

  • If Droplast==True, So the rest 2 Data is discarded .
  • If Droplast==False, So the rest 2 Data is packaged into one batch.

Note

  • commonly BatchSampler Only for training sets DistributedSampler, The validation set does not need to be processed .

7. summary

  1. Print 、 Save the model 、 Computing time and other operations should be executed in the main thread (rank==0)
  2. Don't forget to use... For the model DDP For packaging
  3. Don't forget to use... After loading the dataset DistributedSampler To deal with ; also BatchSampler Deal with the former
  4. Remember right loss formation reduce_value The calculation of
  5. Don't forget to use whatever you need to add up reduce_value Calculate
  6. loss.backward() Just do gradient back propagation directly , There is no need to do reduce_value Re dissemination . Because of each card loss It's different , It is the value obtained from your own data ,mean loss Just for display , It's not actually back-propagation loss
  7. to(device) The method is very easy to use , You don't have to .cuda()
  8. tensorboard The order of things in it depends on the English order , Not a statement order
  9. If you pursue speed , Turn off synchronized BN(SyncBN)

Sources of knowledge

  1. https://www.bilibili.com/video/BV1yt4y1e7sZ?share_source=copy_pc
原网站

版权声明
本文为[Le0v1n]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/177/202206260601000998.html