# 👨‍👩‍👧‍👦 Distributed Training#

Composer supports distributed training on multiple devices, whether it be multiple GPUs on a single node or multiple GPUs across multiple nodes.

## Data Parallelism#

Composer distributes work across devices via data-parallelism-only. We choose this in order to provide the most flexibility to algorithms, which can modify the training loop in complex ways. Data parallelism greatly simplifies model building and memory management. Every GPU is performing the same work, so inspecting the rank zero is sufficient to reason about memory, performance, and other properties.

Within Composer, we have two options for data-parallelism-only execution: Pytorch FSDP, Pytorch DDP, and DeepSpeed Zero. We currently default to Pytorch DDP, though DeepSpeed Zero can provide better performance and lower memory utilization when configured correctly. We are also support Pytorch FSDP which produces the same results as Pytorch DDP, while also increasing memory and computational efficiency.

## Usage#

To launch a multi-GPU training job, we provide the composer launcher:

# run training on 8 GPUs
>>> composer -n 8 my_training_script.py


Under the hood, this script (source code here) sets the required torch.distributed environment variables, launches the processes, and runs the script on each process.

Note

The batch_size passed to your dataloader should be the per-device minibatch size. We further split this into smaller microbatches with gradient accumulation.

For additional configurations of our launcher script, run composer --help.

usage: composer [-h] [--version] [-n NPROC] [--stdout STDOUT]
[--stderr STDERR] [-v] [-m] [-c] [--world_size WORLD_SIZE]
[--base_rank BASE_RANK] [--node_rank NODE_RANK]
training_script ...


### Named Arguments#

--version

show program’s version number and exit

-n, --nproc

The number of processes to launch on this node. Overrides env var LOCAL_WORLD_SIZE if specified; otherwise, defaults to max(1, torch.cuda.device_count()).

--stdout

Format string for a filename to dump the STDOUT from the non-local-rank-zero processes. The local rank zero process will be piped through to STDOUT. The available format variables are: ‘{rank}’, ‘{local_rank}’, ‘{world_size}’, ‘{node_rank}’, and ‘{local_world_size}’. If specified, it is recommended to include ‘{rank}’ or ‘{local_rank}’ in the filename so each rank will write to its own file. By default, the STDOUT of the non-local-rank-zero processes is discarded; instead, use the FileLogger within Composer. This logger captures and saves the STDOUT of each process.

--stderr

Format string for a filename to dump the STDERR from the non-local-rank-zero processes. The local rank zero process will be piped through to STDERR. The available format variables are: ‘{rank}’, ‘{local_rank}’, ‘{world_size}’, ‘{node_rank}’, and ‘{local_world_size}’. If specified, it is recommended to include ‘{rank}’ or ‘{local_rank}’ in the filename so each rank will write to its own file. By default, the STDERR of the non-local-rank-zero processes is discarded; instead, use the FileLogger within Composer. This logger captures and saves the STDERR of each process.

-v, --verbose

If set, print verbose messages

Default: False

-m, --module_mode

If set, run the training script as a module instead of as a script. Cannot be used in conjunction with command_mode

Default: False

-c, --command_mode

If set, run the training script as a command (i.e. without python). Cannot be used in conjunction with module_mode.

Default: False

### required arguments#

training_script

The path to the training script used to initialize a single training process. Should be followed by any command-line arguments the script should be launched with.

training_script_args

Any arguments for the training script, given in the expected order.

### multi-node arguments#

These arguments generally only need to be set when training in a multi-node environment, i.e. when the world_size is bigger than nproc.

--world_size

The total number of processes to launch across all nodes. Setting this to a value greater than nproc indicates a multi-node environment. Overrides env var WORLD_SIZE. Defaults to nproc.

--base_rank

The rank of the lowest ranked process to launch on this node. Specifying a base_rank B and an nproc N will spawn processes with global ranks [B, B+1, … B+N-1]. In a multi-node environment, at least one of base_rank and node_rank must be specified. If only one of base_rank and node_rank are provided, it is assumed that all nodes have the same amount of processes, and that the two values are related as node_rank * nproc = base_rank. If this is not the case, both base_rank and node_rank must be provided. Overrides env var BASE_RANK. Defaults to 0 in a single-node environment.

--node_rank

The rank of this node. See base_rank for information on when this must be provided. Overrides env var NODE_RANK. Defaults to 0 in a single-node environment.

The FQDN of the node hosting the C10d TCP store. For single-node operation, this can generally be left as 127.0.0.1. Overrides env var MASTER_ADDR. Defaults to 127.0.0.1 in a single-node environment.

--master_port

The port on the master hosting the C10d TCP store. If you are running multiple trainers on a single node, this generally needs to be unique for each one. Overrides env var MASTER_PORT. Defaults to a random free port in a single-node environment.

## Distributed Properties#

Developers may need to access the current rank or world size in a distributed setting. For example, a callback may only want to log something for rank zero. Use our composer.utils.dist module to retrieve this information. The methods are similiar to torch.distributed, but also return defaults in a non-distributed setting.

from composer.utils import dist

dist.get_world_size()  # torch.distributed.get_world_size()
dist.get_local_rank()
dist.get_global_rank()  # torch.distributed.get_rank()


For all retrievable properties, see composer.utils.dist.

## Space-time Equivalence#

We consider an equivalency principle between distributed training and gradient accumulation. That is, batches can either be parallelized across space (e.g. devices) or across time (e.g. gradient accumulation). Furthermore, the two dimensions are interchangable – more devices, less gradient accumulation, and vice versa. Our trainer strives to respect this equivalency and ensure identical behavior regardless of the combinations of space and time parallelization used.

## Distributed Sampling#

When providing torch.utils.data.Dataset which is not torch.utils.data.IterableDataset with torch.utils.data.DataLoader to Composer, a torch.utils.data.distributed.DistributedSampler is necessary to ensure different devices receive different batches. Composer will raise an error if a DistributedSampler is not provided. composer.utils.dist provides a helper function to create a DistributedSampler with the correct parameters in composer.utils.dist.get_sampler().

from composer.utils import dist

sampler = dist.get_sampler(dataset, shuffle=True)



composer.datasets.StreamingDataset is an IterableDataset so a DistributedSampler is not supported as IterableDatasets need to handle multi-worker training internally. See IterableDataset [docs](https://pytorch.org/docs/stable/data.html#torch.utils.data.IterableDataset) for more information

## Deepspeed#

Composer comes with DeepSpeed support, allowing you to leverage their full set of features that makes it easier to train large models across (1) any type of GPU and (2) multiple nodes. For more details on DeepSpeed, see their website.

We support optimizer and gradient sharing via Deepspeed Zero stages 1 and 2 respectively. In the future, we’ll support model sharding via Zero-3. These methods reduce model state memory by a factor of (1 / the number of data-parallel devices).

To enable DeepSpeed, simply pass in a config as specified in the DeepSpeed docs here.

# run_trainer.py

from composer import Trainer

trainer = Trainer(
model=model,
max_duration='160ep',
device='gpu',
deepspeed_config={
"train_batch_size": 2048,
"fp16": {"enabled": True},
})


Providing an empty dictionary to deepspeed is also valid. The deepspeed defaults will be used and other fields (such as precision) will be inferred from the trainer.

Warning

The deepspeed_config must not conflict with any other parameters passed to the trainer.

Warning

Not all algorithms have been tested with Deepspeed, please proceed with caution.

## FullyShardedDataParallel (FSDP)#

Composer integrates Pytorch’s FullyShardedDataParallel engine with some syntactic sugar to make it easy to write custom models that work with Composer + FSDP.

At a high level, when you use the Composer Trainer, you must pass it a ComposerModel like ComposerGPT that defines certain functions like forward, eval_forward, loss, etc. that are called during the training loop.

Inside that ComposerModel you may have one or many submodules, such as a .model or .language_model or .classifier that is the actual torch.nn.Module that you will be deploying at inference time. In our case, this is the GPT module that we build and attach ComposerGPT.model.

When you provide an fsdp_config={...} dictionary to the Composer Trainer, then on __init__, the Trainer will attempt to wrap each of the submodules of your ComposerModel with an FSDP auto wrap policy. This wrapping is recursive, so not only is GPT wrapped, but all submodules of GPT may/may not be wrapped too. See the FSDP documentation for more details on how auto wrap policies work.

The full spec and defaults for Composer’s fsdp_config is here:

fsdp_config = {
'sharding_strategy': str = 'FULL_SHARD' | 'SHARD_GRAD_OP' | 'NO_SHARD', # Default: 'FULL_SHARD'
'min_params': float # Default: 1e8
'cpu_offload': bool = True | False, # Default: False, cpu_offload not supported yet
'mixed_precision': str = 'FULL' | 'DEFAULT' | 'PURE', # Default: 'DEFAULT'
# Note: you can explictly provide a dictionary too
# 'mixed_precision': dict = {
#   'param_dtype': 'fp32' | 'fp16' | 'bf16',
#   'reduce_dtype': 'fp32' | 'fp16' | 'bf16',
#   'buffer_dtype': 'fp32' | 'fp16' | 'bf16',
# },
'backward_prefetch': str = 'BACKWARD_PRE' | 'BACKWARD_POST' | 'NONE', # Default: 'BACKWARD_POST'
'activation_checkpointing': bool = True | False, # Default: False
'activation_cpu_offload': bool = True | False, # Default: False
'verbose': bool = True | False,
}


All values come with defaults and can be optionally defined in the fsdp_config. Most parameters map directly to parameters in the FSDP documentation.

One Composer-specific pattern is that if mixed_precision is provided as a str, then we automatically infer the settings to use from the Trainer’s precision, which is already being used for autocast, and we construct an associated MixedPrecision object for FSDP:

# If mixed_precision = 'full'
mixed_precision = MixedPrecision(
param_dtype=torch.float32,
reduce_dtype=torch.float32,
buffer_dtype=torch.float32,
)
# If mixed_precision = 'default'
mixed_precision = MixedPrecision(
param_dtype=torch.float32,
reduce_dtype=autocast_precision, # Low precision gradient communication
buffer_dtype=torch.float32,
)

# If mixed_precision = 'pure'
mixed_precision = MixedPrecision(
param_dtype=autocast_precision, # Low precision master weights
reduce_dtype=autocast_precision, # Low precision gradient communication
buffer_dtype=autocast_precision, # Low precision buffers
)


An example code snippet for using FSDP with composer is provided below:

import torch.nn as nn
from composer import Trainer

class Block (nn.Module):
...

class Model(nn.Module):
def __init__(self, n_layers):
super().__init__()
self.blocks = nn.ModuleList([
Block(...) for _ in range(n_layers)
]),

def forward(self, inputs):
...

# FSDP Wrap Function
def fsdp_wrap_fn(self, module):
return isinstance(module, Block)

# Activation Checkpointing Function
def activation_checkpointing_fn(self, module):
return isinstance(module, Block)

class MyComposerModel(ComposerModel):

def __init__(self, n_layers):
super().__init__()
self.model = Model(n_layers)
...

def forward(self, batch):
...

def eval_forward(self, batch, outputs=None):
...

def loss(self, outputs, batch):
...

...

composer_model = MyComposerModel(n_layers=3)

fsdp_config = {
'sharding_strategy': 'FULL_SHARD',
'min_params': 1e8,
'cpu_offload': False, # Not supported yet
'mixed_precision': 'DEFAULT',
'backward_prefetch': 'BACKWARD_POST',
'activation_checkpointing': False,
'verbose': True
}

trainer = Trainer(
model=composer_model,
fsdp_config=fsdp_config,
...
)

trainer.fit()


Warning

As of now now we don’t support CPU Offloading for FSDP.

Warning

As of now, default parameters might not provide optimal convergence. Please proceed with caution.

## Composer’s FSDP Auto Wrap Policy#

To make auto-wrapping easier on users, Composer uses a custom auto wrap policy that wraps modules according to the following rules:

1. If any module is attributed with module._fsdp_wrap = True | False, that choice will be respected.

2. If the root module (e.g. GPT) defines a function def fsdp_wrap_fn(module: torch.nn.Module) -> bool, then that function will be used to evaluate the root module’s children.

3. If any module has more parameters than fsdp_config['min_params'], it will be wrapped.

These rules are meant to make it easy for users to modify existing models for usage with FSDP. You can either add attributes to modules you want to wrap (#1), define a filter (#2), or make no changes at all and just use the size-based policy via fsdp_config['min_params'] = ... (#3).

In gpt.py, you can see that we used rule #2 to specify that all GPTBlock modules within GPT should be wrapped. Alternatively, we could have easily attributed each of the blocks with block._fsdp_wrap = True and it would have accomplished the same thing. Whatever style you prefer, it’s up to you!

A very similar auto wrap policy is provided for activation checkpointing, with analogous rule #1 that looks for module._activation_checkpointing = True | False and rule #2 that looks for def activation_checkpointing_fn(module: torch.nn.Module) -> bool.