Thank you to Yilun Kuang for providing this example!
๐น๏ธ Distributed Training with Submitit#
Composer is compatible with submitit, a lightweight SLURM cluster job management package with a Python API. To run distributed training on SLURM with submitit, the following environment variables need to be specified:
RANK, WORLD_SIZE, LOCAL_RANK, LOCAL_WORLD_SIZE, NODE_RANK, MASTER_ADDR, MASTER_PORT, PYTHONUNBUFFERED
In this tutorial, we walk through how to set up these environment variables without using the Composer launcher. The example task we are considering is standard supervised training of ResNet18 on Cifar10.
Prerequisite#
To start with, letโs first install the Composer and submitit libraries
[ ]:
%pip install mosaicml
%pip install submitit
Prepare Dataset#
We will use a standard PyTorch DataLoader for the Composer training pipeline.
[ ]:
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
[ ]:
def initialize_dataset():
cifar10_transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),]
)
train_data = torchvision.datasets.CIFAR10(
root=".", train=True, download=True, transform=cifar10_transform
)
train_loader = DataLoader(
dataset=train_data,
sampler=None,
batch_size=1024,
num_workers=10,
pin_memory=True,
drop_last=True,
)
return train_loader # standard pytorch dataloader
Prepare Model#
To use the Composer Trainer, our model must subclass the ComposerModel class.
[ ]:
from typing import Optional, Any
import torchvision
import torch.nn.functional as F
from composer.models import ComposerModel
class ResNet18(ComposerModel):
def __init__(self):
super().__init__()
self.model = torchvision.models.resnet18()
def forward(self, batch): # batch is the output of the dataloader
# specify how batches are passed through the model
inputs, _ = batch
return self.model(inputs)
def loss(self, outputs, batch):
# pass batches and `forward` outputs to the loss
_, targets = batch
return F.cross_entropy(outputs, targets)
def eval_forward(self, batch, outputs: Optional[Any] = None):
return outputs if outputs is not None else self.forward(batch)
For more details about model wrapping, see ๐ป ComposerModel.
Next, we initialize a standard Adam optimizer from PyTorch for training
[ ]:
import torch.optim as optim
def initialize_model_and_optimizer():
model = ResNet18()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
return model, optimizer
Set up Environment Variables#
Before training, we need to set up all the necessary environment variables to correctly initialize the distributed training process group. The environment variables can be set using submitit built-in attributes, torch methods, and existing environment variables generated by the SLURM cluster.
[ ]:
import os
import submitit
import subprocess
[ ]:
def set_up_dist_env():
# 1. RANK
job_env = submitit.JobEnvironment()
global_rank = job_env.global_rank
# 2. LOCAL_RANK
local_rank = job_env.local_rank
# 3. LOCAL_WORLD_SIZE
ngpus_per_node = torch.cuda.device_count()
# 4. WORLD_SIZE
world_size = int(os.getenv("SLURM_NNODES")) * ngpus_per_node
# 5. NODE_RANK
node_rank = int(os.getenv("SLURM_NODEID"))
# 6. MASTER_ADDR
cmd = "scontrol show hostnames " + os.getenv("SLURM_JOB_NODELIST")
stdout = subprocess.check_output(cmd.split())
host_name = stdout.decode().splitlines()[0]
# 7. MASTER_PORT
port = 54321
# Set All the Necessary Environment Variables!
os.environ["RANK"] = str(global_rank)
os.environ["LOCAL_RANK"] = str(local_rank)
os.environ["LOCAL_WORLD_SIZE"] = str(ngpus_per_node)
os.environ["WORLD_SIZE"] = str(world_size)
os.environ["NODE_RANK"] = str(node_rank)
os.environ["MASTER_ADDR"] = host_name
os.environ["MASTER_PORT"] = str(port)
os.environ["PYTHONUNBUFFERED"] = "1"
The above setup is a bare minimum version of the Composer launcher for the distributed training pipeline to work. Here is a code snippet from the Composer launcher source code showing how Composer sets the environment variables. The above function is doing the same thing but with the necessary variables set automatically in a slurm environment.
# composer/composer/cli/launcher.py
with _patch_env(
RANK=str(global_rank),
WORLD_SIZE=str(world_size),
LOCAL_RANK=str(local_rank),
LOCAL_WORLD_SIZE=str(nproc),
NODE_RANK=str(node_rank),
MASTER_ADDR=master_addr,
MASTER_PORT=str(master_port),
PYTHONUNBUFFERED='1',
):
Submit Job to the Cluster#
Here comes the final step. Assume we have a multi-node setup, where we have two nodes and each node has four GPUs. The same set_up_dist_env()
function should also work with a single node setup with multiple GPUs.
Letโs define the submit_job()
function:
[ ]:
from composer.trainer import Trainer
def train():
set_up_dist_env()
train_dataloader = initialize_dataset()
model, optimizer = initialize_model_and_optimizer()
print("Trainer")
trainer = Trainer(
model=model,
optimizers=optimizer,
train_dataloader=train_dataloader,
max_duration='10ep',
device='gpu' if torch.cuda.is_available() else 'cpu',
)
print("trainer.fit")
trainer.fit()
def submit_job():
slurm_ngpus = 4
slurm_nnodes = 2
slurm_timeout = 1024
workers = 10
slurm_directory = "." # "<Your Specified Directory>"
executor = submitit.AutoExecutor(folder=slurm_directory)
executor.update_parameters(
mem_gb=128*slurm_ngpus,
gpus_per_node=slurm_ngpus,
tasks_per_node=slurm_ngpus,
cpus_per_task=workers,
nodes=slurm_nnodes,
timeout_min=slurm_timeout,
slurm_partition="gpu",
# see submitit github repo for details
)
job = executor.submit(train)
Putting Things Together#
We can now put everything into a python file for job submission
[ ]:
import os
import submitit
import subprocess
import torchvision
import torch.optim as optim
import torch.nn.functional as F
import torch
from torchvision import transforms
from torch.utils.data import DataLoader
from composer.models import ComposerModel
from composer import Trainer
class ResNet18(ComposerModel):
def __init__(self):
super().__init__()
self.model = torchvision.models.resnet18()
def forward(self, batch): # batch is the output of the dataloader
# specify how batches are passed through the model
inputs, _ = batch
return self.model(inputs)
def loss(self, outputs, batch):
# pass batches and `forward` outputs to the loss
_, targets = batch
return F.cross_entropy(outputs, targets)
def initialize_model_and_optimizer():
model = ResNet18()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
return model, optimizer
def initialize_dataset():
cifar10_transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),]
)
train_data = torchvision.datasets.CIFAR10(
root=".", train=True, download=True, transform=cifar10_transform
)
train_loader = DataLoader(
dataset=train_data,
sampler=None,
batch_size=1024,
num_workers=10,
pin_memory=True,
drop_last=True,
)
return train_loader # standard pytorch dataloader
def set_up_dist_env():
# 1. RANK
job_env = submitit.JobEnvironment()
global_rank = job_env.global_rank
# 2. LOCAL_RANK
local_rank = job_env.local_rank
# 3. LOCAL_WORLD_SIZE
ngpus_per_node = torch.cuda.device_count()
# 4. WORLD_SIZE
world_size = int(os.getenv("SLURM_NNODES")) * ngpus_per_node
# 5. NODE_RANK
node_rank = int(os.getenv("SLURM_NODEID"))
# 6. MASTER_ADDR
cmd = "scontrol show hostnames " + os.getenv("SLURM_JOB_NODELIST")
stdout = subprocess.check_output(cmd.split())
host_name = stdout.decode().splitlines()[0]
# 7. MASTER_PORT
port = 54321
# Set All the Necessary Environment Variables!
os.environ["RANK"] = str(global_rank)
os.environ["LOCAL_RANK"] = str(local_rank)
os.environ["LOCAL_WORLD_SIZE"] = str(ngpus_per_node)
os.environ["WORLD_SIZE"] = str(world_size)
os.environ["NODE_RANK"] = str(node_rank)
os.environ["MASTER_ADDR"] = host_name
os.environ["MASTER_PORT"] = str(port)
os.environ["PYTHONUNBUFFERED"] = "1"
def train():
set_up_dist_env()
train_dataloader = initialize_dataset()
model, optimizer = initialize_model_and_optimizer()
print("Trainer")
trainer = Trainer(
model=model,
optimizers=optimizer,
train_dataloader=train_dataloader,
max_duration='10ep',
device='gpu' if torch.cuda.is_available() else 'cpu',
)
print("trainer.fit")
trainer.fit()
def submit_job():
slurm_ngpus = 4
slurm_nnodes = 2
slurm_timeout = 1024
workers = 10
slurm_directory = "." # "<Your Specified Directory>"
executor = submitit.AutoExecutor(folder=slurm_directory)
executor.update_parameters(
mem_gb=128*slurm_ngpus,
gpus_per_node=slurm_ngpus,
tasks_per_node=slurm_ngpus,
cpus_per_task=workers,
nodes=slurm_nnodes,
timeout_min=slurm_timeout,
slurm_partition="gpu",
# see submitit github repo for details
)
job = executor.submit(train)
if __name__ == "__main__":
submit_job()
Run the above python script in your command shell and youโre all set!