Tip

This tutorial is available as a Jupyter notebook.

Open in Colab

Thank you to Yilun Kuang for providing this example!

๐Ÿ•น๏ธ Distributed Training with Submitit#

Composer is compatible with submitit, a lightweight SLURM cluster job management package with a Python API. To run distributed training on SLURM with submitit, the following environment variables need to be specified:

RANK, WORLD_SIZE, LOCAL_RANK, LOCAL_WORLD_SIZE, NODE_RANK, MASTER_ADDR, MASTER_PORT, PYTHONUNBUFFERED

In this tutorial, we walk through how to set up these environment variables without using the Composer launcher. The example task we are considering is standard supervised training of ResNet18 on Cifar10.

Prerequisite#

To start with, letโ€™s first install the Composer and submitit libraries

[ ]:
%pip install mosaicml
%pip install submitit

Prepare Dataset#

We will use a standard PyTorch DataLoader for the Composer training pipeline.

[ ]:
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
[ ]:
def initialize_dataset():
    cifar10_transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),]
    )
    train_data = torchvision.datasets.CIFAR10(
        root="./data", train=True, download=True, transform=cifar10_transform
    )
    train_loader = DataLoader(
        dataset=train_data,
        sampler=None,
        batch_size=1024,
        num_workers=10,
        pin_memory=True,
        drop_last=True,
    )

    return train_loader  # standard pytorch dataloader

Prepare Model#

To use the Composer Trainer, our model must subclass the ComposerModel class.

[ ]:
from typing import Optional, Any

import torchvision
import torch.nn.functional as F
from composer.models import ComposerModel

class ResNet18(ComposerModel):
    def __init__(self):
        super().__init__()
        self.model = torchvision.models.resnet18()

    def forward(self, batch): # batch is the output of the dataloader
        # specify how batches are passed through the model
        inputs, _ = batch
        return self.model(inputs)

    def loss(self, outputs, batch):
        # pass batches and `forward` outputs to the loss
        _, targets = batch
        return F.cross_entropy(outputs, targets)

    def eval_forward(self, batch, outputs: Optional[Any] = None):
        return outputs if outputs is not None else self.forward(batch)

For more details about model wrapping, see ๐Ÿ›ป ComposerModel.

Next, we initialize a standard Adam optimizer from PyTorch for training

[ ]:
import torch.optim as optim

def initialize_model_and_optimizer():
    model = ResNet18()
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    return model, optimizer

Set up Environment Variables#

Before training, we need to set up all the necessary environment variables to correctly initialize the distributed training process group. The environment variables can be set using submitit built-in attributes, torch methods, and existing environment variables generated by the SLURM cluster.

[ ]:
import os
import submitit
import subprocess
[ ]:
def set_up_dist_env():
    # 1. RANK
    job_env = submitit.JobEnvironment()
    global_rank = job_env.global_rank

    # 2. LOCAL_RANK
    local_rank = job_env.local_rank

    # 3. LOCAL_WORLD_SIZE
    ngpus_per_node = torch.cuda.device_count()

    # 4. WORLD_SIZE
    world_size = int(os.getenv("SLURM_NNODES")) * ngpus_per_node

    # 5. NODE_RANK
    node_rank = int(os.getenv("SLURM_NODEID"))

    # 6. MASTER_ADDR
    cmd = "scontrol show hostnames " + os.getenv("SLURM_JOB_NODELIST")
    stdout = subprocess.check_output(cmd.split())
    host_name = stdout.decode().splitlines()[0]

    # 7. MASTER_PORT
    port = 54321

    # Set All the Necessary Environment Variables!
    os.environ["RANK"] = str(global_rank)
    os.environ["LOCAL_RANK"] = str(local_rank)
    os.environ["LOCAL_WORLD_SIZE"] = str(ngpus_per_node)
    os.environ["WORLD_SIZE"] = str(world_size)
    os.environ["NODE_RANK"] = str(node_rank)
    os.environ["MASTER_ADDR"] = host_name
    os.environ["MASTER_PORT"] = str(port)
    os.environ["PYTHONUNBUFFERED"] = "1"

The above setup is a bare minimum version of the Composer launcher for the distributed training pipeline to work. Here is a code snippet from the Composer launcher source code showing how Composer sets the environment variables. The above function is doing the same thing but with the necessary variables set automatically in a slurm environment.

# composer/composer/cli/launcher.py

with _patch_env(
        RANK=str(global_rank),
        WORLD_SIZE=str(world_size),
        LOCAL_RANK=str(local_rank),
        LOCAL_WORLD_SIZE=str(nproc),
        NODE_RANK=str(node_rank),
        MASTER_ADDR=master_addr,
        MASTER_PORT=str(master_port),
        PYTHONUNBUFFERED='1',
):

Submit Job to the Cluster#

Here comes the final step. Assume we have a multi-node setup, where we have two nodes and each node has four GPUs. The same set_up_dist_env() function should also work with a single node setup with multiple GPUs.

Letโ€™s define the submit_job() function:

[ ]:
from composer.trainer import Trainer

def train():
    set_up_dist_env()
    train_dataloader = initialize_dataset()
    model, optimizer = initialize_model_and_optimizer()

    print("Trainer")
    trainer = Trainer(
        model=model,
        optimizers=optimizer,
        train_dataloader=train_dataloader,
        max_duration='10ep',
        device='gpu' if torch.cuda.is_available() else 'cpu',
    )
    print("trainer.fit")
    trainer.fit()

def submit_job():
    slurm_ngpus = 4
    slurm_nnodes = 2
    slurm_timeout = 1024
    workers = 10

    slurm_directory = "." # "<Your Specified Directory>"
    executor = submitit.AutoExecutor(folder=slurm_directory)

    executor.update_parameters(
            mem_gb=128*slurm_ngpus,
            gpus_per_node=slurm_ngpus,
            tasks_per_node=slurm_ngpus,
            cpus_per_task=workers,
            nodes=slurm_nnodes,
            timeout_min=slurm_timeout,
            slurm_partition="gpu",
            # see submitit github repo for details
    )

    job = executor.submit(train)

Putting Things Together#

We can now put everything into a python file for job submission

[ ]:
import os
import submitit
import subprocess

import torchvision
import torch.optim as optim
import torch.nn.functional as F

import torch
from torchvision import transforms
from torch.utils.data import DataLoader

from composer.models import ComposerModel
from composer import Trainer

class ResNet18(ComposerModel):
    def __init__(self):
        super().__init__()
        self.model = torchvision.models.resnet18()

    def forward(self, batch): # batch is the output of the dataloader
        # specify how batches are passed through the model
        inputs, _ = batch
        return self.model(inputs)

    def loss(self, outputs, batch):
        # pass batches and `forward` outputs to the loss
        _, targets = batch
        return F.cross_entropy(outputs, targets)

def initialize_model_and_optimizer():
    model = ResNet18()
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    return model, optimizer

def initialize_dataset():
    cifar10_transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),]
    )
    train_data = torchvision.datasets.CIFAR10(
        root=".", train=True, download=True, transform=cifar10_transform
    )
    train_loader = DataLoader(
        dataset=train_data,
        sampler=None,
        batch_size=1024,
        num_workers=10,
        pin_memory=True,
        drop_last=True,
    )

    return train_loader  # standard pytorch dataloader

def set_up_dist_env():
    # 1. RANK
    job_env = submitit.JobEnvironment()
    global_rank = job_env.global_rank

    # 2. LOCAL_RANK
    local_rank = job_env.local_rank

    # 3. LOCAL_WORLD_SIZE
    ngpus_per_node = torch.cuda.device_count()

    # 4. WORLD_SIZE
    world_size = int(os.getenv("SLURM_NNODES")) * ngpus_per_node

    # 5. NODE_RANK
    node_rank = int(os.getenv("SLURM_NODEID"))

    # 6. MASTER_ADDR
    cmd = "scontrol show hostnames " + os.getenv("SLURM_JOB_NODELIST")
    stdout = subprocess.check_output(cmd.split())
    host_name = stdout.decode().splitlines()[0]

    # 7. MASTER_PORT
    port = 54321

    # Set All the Necessary Environment Variables!
    os.environ["RANK"] = str(global_rank)
    os.environ["LOCAL_RANK"] = str(local_rank)
    os.environ["LOCAL_WORLD_SIZE"] = str(ngpus_per_node)
    os.environ["WORLD_SIZE"] = str(world_size)
    os.environ["NODE_RANK"] = str(node_rank)
    os.environ["MASTER_ADDR"] = host_name
    os.environ["MASTER_PORT"] = str(port)
    os.environ["PYTHONUNBUFFERED"] = "1"

def train():
    set_up_dist_env()
    train_dataloader = initialize_dataset()
    model, optimizer = initialize_model_and_optimizer()

    print("Trainer")
    trainer = Trainer(
        model=model,
        optimizers=optimizer,
        train_dataloader=train_dataloader,
        max_duration='10ep',
        device='gpu' if torch.cuda.is_available() else 'cpu',
    )
    print("trainer.fit")
    trainer.fit()

def submit_job():
    slurm_ngpus = 4
    slurm_nnodes = 2
    slurm_timeout = 1024
    workers = 10

    slurm_directory = "." # "<Your Specified Directory>"
    executor = submitit.AutoExecutor(folder=slurm_directory)

    executor.update_parameters(
            mem_gb=128*slurm_ngpus,
            gpus_per_node=slurm_ngpus,
            tasks_per_node=slurm_ngpus,
            cpus_per_task=workers,
            nodes=slurm_nnodes,
            timeout_min=slurm_timeout,
            slurm_partition="gpu",
            # see submitit github repo for details
    )

    job = executor.submit(train)

if __name__ == "__main__":
    submit_job()

Run the above python script in your command shell and youโ€™re all set!