memo: PyTorch | Model Parallel

Table of contents

Pytorch-Transformer

Model Parallelism using Transformers and PyTorch

  1. Loading the data

  2. Instantiate a model

  3. Create torch Dataset and DataLoader

    1
    
    class myDataset(torch.utils.data.Dataset):
    

    Split the data into train and val sets:

    1
    2
    
    from sklearn.model_selection import train_test_split
    df_train, df_val = train_test_split(imdb_df, test_size=0.3, random_state=2021
    

    create DataLoader for train set and val set:

  4. Make a wrapper for the model:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    
    class MultiGPUClassifier(torch.nn.Module):
        def __init__(self, roberta_model):
            super(MultiGPUClassifier, self).__init__()
    
            # Embedding layer --> cuda:0
            self.embedding = roberta_model.roberta.embeddings.to('cuda:0')
    
            # Encoder Layer --> cuda:1
            self.encoder = roberta_model.roberta.encoder.to('cuda:1')
    
            # Classifier --> cuda:1
            self.classifier = roberta_model.classifier.to('cuda:1')
    
        def forward(self, input_ids, token_type_ids=None, attention_mask=None, labels=None):
    
            # Pass the input_ids to cuda:0 since embedding layer in cuda:0
            emb_out = self.embedding(input_ids.to('cuda:0'))
    
            # Move the outputs of embedding layer to cuda:1 as input to encoder layer
            enc_out = self.encoder(emb_out.to('cuda:1'))
    
            classifier_out = self.classifier(enc_out[0])
    
            return classifier_out
    
    # Initialize the model
    multi_gpu_roberta = MultiGPUClassifier(roberta_model)
    

    Upon constructing the model, the memory usage can be seen using nvidia-smi.

  5. Create optimizer and loss function for the model:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    from transformers import get_linear_schedule_with_warmup, AdamW
    
    EPOCHS = 2
    LR = 1e-5
    
    optimizer = AdamW(multi_gpu_roberta.parameters(), lr=LR)
    total_steps = len(train_data_loader)*EPOCHS
    
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0,
                         num_training_steps=total_steps)
    
    loss_fn = torch.nn.CrossEntropyLoss().to('cuda:1')   # match with the roberta.classifier layer
    
  6. Create a helper function for training the model and returning accuracy and losses:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
    def train_model (model, data_loader, loss_fn, optimizer, scheduler, n_examples):
        model = model.train()    # 
        losses = []
        correct_predictions = 0
    
        for d in data_loader:    # take a batch
            input_ids = d['input_ids']
            attention_mask = d['attention_mask']
            # Reshaping attention mask for adapting the forward pass
            reshaped_attention_mask = attention_mask.reshape(d['attention_mask'].shape[0],1,1,d['attention_mask'].shape[1])
            targets = d['labels']
    
            outputs = model(input_ids = input_ids, attention_mask = reshaped_attention_mask)
            _, preds = torch.max(outputs, dim=1)
    
            loss = loss_fn(outputs, targets.to('cuda:1')) # move targets to cuda:1 to calculate loss
    
            correct_prediction += torch.sum(preds == targets.to('cuda:1'))
            losses.append(loss.item())
    
            loss.backward()
            # Clip the gradients of the model to prevent exploding gradients using clip_grad_norm
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step() # gradient descent
            scheduler.step() # lr decay
            optimizer.zero_grad()
    
        return correct_predictions.double() / n_examples, np.mean(losses)
    
  7. Create a helper function for evaluating the model:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
    def eval_model(model, data_loader, loss_fn, n_examples):
        model = model.eval()
        losses = []
        correct_predictions = 0
    
        with torch.no_grad():
            for d in data_loader:
                input_ids = d['input_ids']
                attention_mask = d['attention_mask']
                reshaped_attention_mask = attention_mask.reshaped(d['attention_mask'].shape[0],1,1,d['attention_mask'].shape[1])
                targets = d['labels']
    
                outputs = model(input_ids = input_ids, attention_mask=reshaped_attention_mask)
                _, preds = torch.max(outputs, dim=1)
    
                loss = loss_fn(outputs, targets.to('cuda:1'))
    
                correct_predictions += torch.sum(preds == targets.to('cuda:1'))
                losses.append(loss.item())
    
            return correct_predictions.double() / n_examples, np.mean(losses)
    
  8. Create the training loop and only store the best one:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    from collections import defaultdict
    
    history = defaultdict(list)
    best_accuracy = 0
    
    %%time
    
    for epoch in range(EPOCHS):
        print(f'Epoch {epoch+1}/{EPOCHS})
        print('-' * 10)
    
        train_acc, train_loss = train_model(multi_gpu_roberta, train_data_loader, loss_fn, optimizer, scheduler, len(df_train))
        print(f'Train Loss:{train_loss}; Train Accuracy: {train_acc}')
    
        val_acc, val_loss = eval_model(multi_gpu_roberta, val_data_loader, loss_fn, len(df_val))
        print(f'Val Loss: {val_loss}; Val Accuracy: {val_acc}')
    
        history['train_acc'].append(train_acc)
        history['train_loss'].append(train_loss)
        history['val_acc'].append(val_acc)
        history['val_loss'].append(val_loss)
    
        if val_acc > best_accuracy:
            torch.save(multi_gpu_roberta.state_dict(), 'multi_gpu_roberta_best_model_state.bin')
            best_accuracy = val_acc
    
  9. Visualizing model performance

Combining DDP with Model Parallelism

DDP tutoria

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class ToyMpModel(nn.Module):    # model parallel, multi-gpu model
    def __init__(self, dev0, dev1): # use 2 gpu
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10,10).to(dev0) # move 0th layer to dev0
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10,5).to(dev1)  # move 1st layer to dev1

    def forward(self, x):
        x = x.to(self.dev0)     # move input to the dev0 same as  0th layer
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)     # move output of 0th layer to dev1
        return self.net2(x)

DDP wraps a multi-GPU model:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def demo_model_parallel(rank, world_size): # rank indicates the index of this process, world_size is the total numer of gpu will be used.
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size) # set env vars, and initialize process group

    # set up multi-GPU model and devices for this process
    dev0 = (rank * 2) % world_size
    dev1 = (rank * 2 + 1) % world_size
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
    optimizer.zero_grad()

    # outputs will be on dev1
    outputs = ddp_mp_model(torch.randn(20,10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()

if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >=2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_model_parallel, world_size)

Apply Model Parallel to Existing Modules

SINGLE-MACHINE MODEL PARALLEL BEST PRACTICES ResNet50

nn.Sequential

GNT model.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class GNTModel(object):
 __init__(self,args)
 ├─ self.net_coarse = GNT().to(device)
   └─ __init__()
      ├─ self.rgbfeat_fc = nn.Sequential(Liner, ReLU, Linear)
      ├─ for i in range(args.trans_depth):
        ├─ self.view_trans.append( Transformer2D() ) # ModuleList()
          ├─ self.ff = FeedForward(dim, ff_hid_dim, ff_dp_rate)
            ├─ self.fc1 = nn.Linear(dim, ff_hid_dim)
            ├─ self.fc2 = nn.Linear(ff_hid_dim, dim)
            ├─ self.dp = nn.Dropout(ff_dp_rate)
            └─ self.activ = nn.ReLU()
          
          └─ self.attn = Attention2D(dim,attn_dp_rate)
             ├─ self.q_fc = nn.Linear(dim, dim, bias=False)
             ├─ self.k_fc = nn.Linear(dim, dim, bias=False)
             ├─ self.v_fc = nn.Linear(dim, dim, bias=False)
             ├─ self.pos_fc = nn.Sequential(Linear, ReLU, Linear)
             ├─ self.attn_fc = nn.Sequential(Linear, ReLU, Linear)
             ├─ self.out_fc = nn.Linear(dim, dim)
             └─ self.dp = nn.Dropout(dp_rate)
        
        ├─ self.ray_trans.append( Transformer() )    # nn.ModuleList()
          ├─ self.ff = FeedForward(dim, ff_hid_dim, ff_dp_rate)
          └─ self.attn = Attention(dim, n_heads, attn_dp_rate, attn_mode, pos_dim)
             ├─ if attn_mode in ["qk","gate"]:
             ├─ if attn_mode in ["pos", "gate"]:
             ├─ if attn_mode == "gate":
             ├─ self.v_fc = nn.Linear(dim, dim, bias=False)
             ├─ self.out_fc = nn.Linear(dim, dim)
             ├─ self.dp = nn.Dropout(dp_rate)
             ├─ self.n_heads = n_heads
             └─ self.attn_mode = attn_mode
        
        └─ if i % 2 == 0: self.q_fc.append( nn.Sequential())
           └─ else: self.q_fc.append(nn.Identity())  # nn.ModuleList()
      
      ├─ self.pos_enc = Embedder()    # 21x3=63 funcs
      └─ self.view_enc = Embedder()   # 21x3=63 funcs
 
 ├─ self.net_fine = GNT().to(device)
 ├─ self.feature_net = ResUNet(coarse_out_ch, fine_out_ch, signle_net).to(device)
   ├─ self.conv1 = nn.Conv2d()
   ├─ self.bn1
   ├─ self.relu
   ├─ self.layer1 = self._make_layer(block,planes,blocks,stride,dilate)
     ├─ norm_layer
     ├─ previous_dilation
     ├─ if dilate: self.dilation *= stride
     ├─ if stride !=1 or inplanes differs from outchanl * expansion:
       └─ downsample = nn.Sequential(conv1x1(), norm_layer())
     
     ├─ layers.append(BasicBlock(self.inplanes, planes, stride,downsample,))   # list 
       ├─ norm_layer
       ├─ self.conv1
       ├─ self.bn1
       ├─ self.relu
       ├─ self.conv2
       ├─ self.bn2
       ├─ self.downsample
       └─ self.stride
     
     ├─ self.inplanes = planes * block.expansion
     └─ for _ in range(1,blocks):
        └─ layers.append(BasicBlock(self.inplanes,planes,))
           ├─ norm_layer
           ├─ self.conv1
           ├─ self.bn1
           ├─ self.relu
           ├─ self.conv2
           ├─ self.bn2
           ├─ self.downsample
           └─ self.stride
   
   ├─ self.layer2
   ├─ self.layer3
   ├─ self.upconv3
   ├─ self.iconv3
   ├─ self.upconv2
   ├─ self.iconv2
   └─ self.out_conv
 
 ├─ learnable_params
 ├─ self.optimizer = torch.optim.Adam()
 ├─ self.scheduler
 ├─ self.start_step
 └─ if args.distributed:
    ├─ self.net_coarse = torch.nn.parallel.DDP
    ├─ self.feature_net = torch.nn.parallel.DDP
    └─ self.net_fine = torch.nn.parallel.DDP

 switch_to_eval(self):
 └─ render_kwargs_train{network_query_fn, model, ...}
 switch_to_train(self):
 save_model(self, filename):
 load_model(self, filename):
 load_from_ckpt(self, out_folder):
   ()
  ├─ h

In a new file: model_parallel.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class GNTModel(object):
 __init__(self, args, load_opt=True, load_schedular=True):
 ├─ device = ['cuda:0','cuda:1','cuda:2','cuda:3']
 ├─ self.net_coarse = GNT(..., device)  # transformer_network_parallel.py
    ├─ __init__(self, args, in_feat_ch=...)
       ├─ super(GNT, self).__init__()
       ├─ self.rgbfeat_fc = nn.Sequential().to(device[0])
       ├─ 












In train.py:

1
from gnt.model_parallel import GNTModel

Pytorch-DDP-RPC

Invited Talk: PyTorch Distributed (DDP, RPC) - By Facebook Research Scientist Shen Li ytb

(DDG search: tensorflow model split distributed parallel)

Built with Hugo
Theme Stack designed by Jimmy