# composer.utils.dist#

Helper methods for torch.distributed.

To use torch.distributed, launch your training script with the composer launcher for distributed training. For example, the following command launches an eight-process training run.

composer -n 8 path/to/train.py


The composer launcher will automatically configure the following environment variables, which are required for distributed training:

• RANK: The global rank of the process, which should be on [0; WORLD_SIZE - 1].

• LOCAL_RANK: The local rank for the process, which should be on [0; LOCAL_WORLD_SIZE - 1].

• NODE_RANK: The rank of the node.

• WORLD_SIZE: The total number of processes.

• LOCAL_WORLD_SIZE: The number of processes on the current node.

• MASTER_ADDR: The hostname for the rank-zero process.

• MASTER_PORT: The port for the rank-zero process.

If none of these environment variables are set, this module will safely assume a single-rank configuration, where:

RANK=0
LOCAL_RANK=0
NODE_RANK=0
WORLD_SIZE=1
LOCAL_WORLD_SIZE=1


Functions

 all_gather Collects a Tensor from each rank. all_gather_object Collect a pickleable object from each rank and return a list of these objects indexed by rank. all_reduce Reduce a tensor by applying the reduce_operation. barrier Synchronizes all processes. broadcast Broadcasts the tensor to the whole group. broadcast_object_list Broadcasts picklable objects in object_list to the whole group. get_global_rank Returns the global rank of the current process, which is on [0; WORLD_SIZE - 1]. get_local_rank Returns the local rank for the current process, which is on [0; LOCAL_WORLD_SIZE - 1]. get_local_world_size Returns the local world size, which is the number of processes for the current node. get_node_rank Returns the node rank. get_sampler Constructs a DistributedSampler for a dataset. get_world_size Returns the world size, which is the number of processes participating in this training run. initialize_dist Initialize the default PyTorch distributed process group. is_available Returns whether PyTorch was built with distributed support. is_initialized Returns whether PyTorch distributed is initialized.
composer.utils.dist.all_gather(tensor)[source]#

Collects a Tensor from each rank.

Parameters

tensor (Tensor) – Tensor from each rank to be gathered.

Returns

Sequence[Tensor] – A sequence of tensors indexed by rank.

composer.utils.dist.all_gather_object(obj)[source]#

Collect a pickleable object from each rank and return a list of these objects indexed by rank.

Parameters

obj (TObj) – Object to be gathered.

Returns

List[TObj] – A list of objects indexed by rank.

composer.utils.dist.all_reduce(tensor, reduce_operation='SUM')[source]#

Reduce a tensor by applying the reduce_operation.

All ranks get the same, bitwise-identical result.

Parameters
• tensor (Tensor) – Input and output of the collective. The function operates in-place.

• op (optional) – One of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.

• tensor – Tensor to reduce. The function operates in-place.

• reduce_operation (str, optional) –

The reduction operation (default: SUM).

Valid options are:
• SUM

• PRODUCT

• MIN

• MAX

• BAND

• BOR

• BXOR

Returns

Nonetensor is modified in-place.

composer.utils.dist.barrier()[source]#

Synchronizes all processes.

This function blocks until all processes reach this function.

Broadcasts the tensor to the whole group.

tensor must have the same number of elements in all processes participating in the collective. See torch.distributed.broadcast().

Parameters
• tensor (Tensor) – Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise.

• src (int) – Source rank

Broadcasts picklable objects in object_list to the whole group.

Similar to broadcast(), but Python objects can be passed in. Note that all objects in object_list must be picklable in order to be broadcasted.

Parameters
• object_list (Tensor) – List of input objects to broadcast. Each object must be picklable. Only objects on the src rank will be broadcast, but each rank must provide lists of equal sizes.

• src (int, optional) – Source rank (default: 0)

Returns

Noneobject_list will be modified in-place and set to values of object_list from the src rank.

composer.utils.dist.get_global_rank()[source]#

Returns the global rank of the current process, which is on [0; WORLD_SIZE - 1].

Returns

int – The global rank.

composer.utils.dist.get_local_rank()[source]#

Returns the local rank for the current process, which is on [0; LOCAL_WORLD_SIZE - 1].

Returns

int – The local rank.

composer.utils.dist.get_local_world_size()[source]#

Returns the local world size, which is the number of processes for the current node.

Returns

int – The local world size.

composer.utils.dist.get_node_rank()[source]#

Returns the node rank.

For example, if there are 2 nodes, and 2 ranks per node, then global ranks 0-1 will have a node rank of 0, and global ranks 2-3 will have a node rank of 1.

Returns

int – The node rank, starting at 0.

composer.utils.dist.get_sampler(dataset, *, drop_last, shuffle)[source]#

Constructs a DistributedSampler for a dataset.

The DistributedSampler assumes that each rank has a complete copy of the dataset. It ensures that each rank sees a unique shard for each epoch containing len(dataset) / get_world_size() samples.

Note

If the dataset is already sharded by rank, use a SequentialSampler or RandomSampler.

Parameters
• dataset (Dataset) – The dataset.

• drop_last (bool) – Whether to trop the last batch.

• shuffle (bool) – Whether to shuffle the dataset.

Returns

torch.utils.data.distributed.DistributedSampler – The sampler.

composer.utils.dist.get_world_size()[source]#

Returns the world size, which is the number of processes participating in this training run.

Returns

int – The world size.

composer.utils.dist.initialize_dist(backend, timeout)[source]#

Initialize the default PyTorch distributed process group.

This function assumes that the following environment variables are set:

• RANK: The global rank of the process, which should be on [0; WORLD_SIZE - 1].

• LOCAL_RANK: The local rank for the process, which should be on [0; LOCAL_WORLD_SIZE - 1].

• NODE_RANK: The rank of the node.

• WORLD_SIZE: The total number of processes.

• LOCAL_WORLD_SIZE: The number of processes on the current node.

• MASTER_ADDR: The hostname for the rank-zero process.

• MASTER_PORT: The port for the rank-zero process.

If none of the environment variables are set, this function will assume a single-rank configuration and initialize the default process group using a torch.distributed.HashStore store.

Parameters
• backend (str) – The distributed backend to use. Should be gloo for CPU training, or nccl for GPU training.

• timeout (timedelta) – The timeout for operations executed against the process group.

composer.utils.dist.is_available()[source]#

Returns whether PyTorch was built with distributed support.

Returns

bool – Whether PyTorch distributed support is available.

composer.utils.dist.is_initialized()[source]#

Returns whether PyTorch distributed is initialized.

Returns

bool – Whether PyTorch distributed is initialized.

composer.utils.dist.run_local_rank_zero_first()[source]#

Context manager to hold all non-zero ranks until rank zero completes.