memo: PyTorch | Data Parallel

类似分布式系统中的概念 1

  • group: 当前process group (world),一个job是一个组。一个组里面有多个主机(node)
  • world_size: 参与job(整个网络中)的进程个数 (or gpu个数 or 数据集被切成world_size份)
  • rank: 当前主机的编号,group内各进程的标志符是从 0 到 world_size 的连续整数
  • local_rank: 为当前主机内的一个进程分配GPU(每台主机可以开启多个进程(执行同一份代码))

DP vs DDP

Comparison between Dataparallel and DistributedDataParallel 8

  • DataParallel is single-process, multi-thread, and only works on single machine (with multiple card). While DistributedDataParallel is multi-process and works for both single- and multi- machine training.
  • DDP works with model parallel

torch.distributed 使用流程

Refer to 2

  1. 创建进程组:

    1
    
    torch.distributed.init_process_group(backend='nccl', init_method='env://')
    
  2. 如果需要 group 内集体通信,用new_group 创建子分组

  3. 创建DDP对象:

    1
    2
    
    ddp_model = torch.nn.parallel.DistributedDataParallel(
         net, device_ids=[args.local_rank], output_device=args.local_rank)
    

    net位于local_rank指定的gpu上

  4. 为数据集创建sampler:

    1
    
    train_sampler = DistributedSampler(train_set, num_replicas=world_size, rank=rank)
    

    确保每个进程的 dataloader 只会 load 到整个数据集的一个特定子集,而不重复 3

  5. 在每个主机上用命令 torch.distributed.launch 启动进程(如果已开启的进程未达到world_size,则所有进程会一直等待),开始训练

  6. 恢复训练时,给第一主机的命令加上--resume

  7. 销毁进程组:destory_process_group()

init_method 指定了各进程(主机)向 rank=0 的进程发送信息的(url)地址。

  • TCP 方式需要指定 rank 0 进程的ip地址 (init_method='tcp://10.1.1.20:23456'),并且需要手动指定各进程的rank。

  • 第2种是使用一个在同组内各进程共享的文件交换信息, url应以file://开头+文件地址,例如 init_method=file:///mnt/nfs/sharedfile,不会自动删除。

  • 第3种(默认)从环境变量中读取配置:MASTER_PORT, MASTER_ADDR, WORLD_SIZE, RANKinit_method=env:// 7

  • 使用tcp初始化,使用3台主机,执行3次命令:


Multi-node multi-gpu

All you need to do is 4

  1. Create a process group by RANK and WORLD_SIZE (auto set from the command arguments nproc_per_node,nnodes,and node_rank of torchrun)
  2. Wrap the model by torch.nn.parallel.DistributedDataParallel()
  3. move the model to gpu through LOCAL_RANK
  4. Wrap the dataset by DistributedSampler()

resnet_ddp.py 5:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def train():
    # read hyper params from command
    ... 
    parser.add_argument("--local_rank", type=int, help="Local rank. Necessary for using the torch.distributed.launch utility.") # specify gpu index

    # initialize process group
    torch.distributed.init_process_group(backend="nccl")

    # construct model
    model = torchvision.models.resnet18(pretrained=False)

    # Wrap the model on the GPU assigned to the current process
    device = torch.device(f"cuda:{local_rank}")
    model = model.to(device)
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

    # only save (and restore) the model on the gpu whose local_rank=0
    if resume == True:
        map_location = {"cuda:0": "cuda:{}".format(local_rank)}
        ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))

    # prepare dataset 
    train_set = torchvision.datasets.CIFAR10(root="./data", train=True, download=True,transform=transform)
    # a process will only use its own subset
    train_sampler = DistributedSampler(dataset=train_set)

    tarin_loader = DataLoader(dataset=train_set, batch_size=128, sampler=train_sampler, num_workers=8)

    # loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-5)

    # training cycle
    for epoch in range(1000):
        # check accuracy of the model on local_rank=0

        # train mode
        ddp_model.train()

        # iter all train_set
        for data in train_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.zero_grad()
            optimizer.step()

if __name__ == "__main__":
    train()

The training script resnet_ddp.py will run on two nodes, and each of nodes has 8 gpus and each gpu would launch one process.

In the terminal of the first node, excute the following command.

1
2
3
4
python -m torch.distributed.launch \
--nproc_per_node=8 --nnodes=2 --node_rank=0 \
--master_addr="192.168.0.1" \
--master_port=1234 resnet_ddp.py

Excute the same command but with different node_rank on the second node:

1
2
3
4
python -m torch.distributed.launch \
--nproc_per_node=8 --nnodes=2 --node_rank=1 \
--master_addr="192.168.0.1" \
--master_port=1234 resnet_ddp.py

Single-node multi-worker

Refer to 6

1
2
3
4
torchrun --standalone --nnodes=1 \
--nproc_per_node=$NUM_TRAINERS \
YOUR_TRAINING_SCRIPT.py \
(--arg1 ... train script args ...)

(2023-08-27)

Example of launch.json

For running AIM

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "module": "torch.distributed.launch",
            "console": "internalConsole",
            "justMyCode": true,
            "env": {"CUDA_VISIBLE_DEVICES": "4"},
            "args": [
                "--nproc_per_node", "1", // GPUs
                "--master_port", "29500",

                "tools/train.py",
                "configs/recognition/vit/vitclip_base_diving48.py",
                "--launcher", "pytorch",
                "--test-last",
                "--validate",
                "--cfg-options", "model.backbone.pretrained=openaiclip", 
                               "work_dir=work_dirs_vit/diving48/debug"
            ]
        }
    ]
}

Pass env to args

Settings below in “launch.json” don’t work:

1
2
3
4
5
6
7
8
"env": {
    "CUDA_VISIBLE_DEVICES": "4",
    "GPUS": "1",
    "PORT": "29500"
},
"args": [
    "--nproc_per_node", "${env:GPUS}",
    "--master_port", "${env:PORT}",

May refer to


Ref


【pytorch distributed】nccl 集合通信(collective communication)


(2024-06-02)

【pytorch distributed】accelerate 基本用法(config,launch)数据并行 - 五道口纳什