RemoteUploaderDownloader#

class composer.loggers.RemoteUploaderDownloader(bucket_uri, backend_kwargs=None, file_path_format_string='{remote_file_name}', num_concurrent_uploads=1, upload_staging_folder=None, use_procs=True, num_attempts=3)[source]#

Logger destination that uploads (downloads) files to (from) a remote backend.

This logger destination handles calls to Logger.upload_file() and uploads files to ObjectStore, such as AWS S3 or Google Cloud Storage. To minimize the training loop performance hit, it supports background uploads.

from composer.loggers import RemoteUploaderDownloader
from composer.utils import LibcloudObjectStore

remote_uploader_downloader = RemoteUploaderDownloader(
    bucket_uri="s3://my-bucket",
)

# Construct the trainer using this logger
trainer = Trainer(
    ...,
    loggers=[remote_uploader_downloader],
)

or

from composer.loggers import RemoteUploaderDownloader
from composer.utils import LibcloudObjectStore

remote_uploader_downloader = RemoteUploaderDownloader(
    bucket_uri="libcloud://my-bucket",
    backend_kwargs={
        'provider': 's3',
        'container': 'my-bucket',
        'provider_kwargs=': {
            'key': 'AKIA...',
            'secret': '*********',
            'region': 'ap-northeast-1',
        },
    },
)

# Construct the trainer using this logger
trainer = Trainer(
    ...,
    loggers=[remote_uploader_downloader],
)

or

remote_uploader_downloader = RemoteUploaderDownloader(
    bucket_uri="libcloud://my-gcs-bucket",
    backend_kwargs={
        "provider": "google_storage",
        "container": "my-gcs-bucket",
        "key_environ": "MY_HMAC_ACCESS_ID", # Name of env variable for HMAC access id.
        "secret_environ": "MY_HMAC_SECRET", # Name of env variable for HMAC secret.
    },
)

# Construct the trainer using this logger
trainer = Trainer(
    ...,
    loggers=[remote_uploader_downloader],
)

Note

This callback blocks the training loop to upload each file, as the uploading happens in the background. Here are some additional tips for minimizing the performance impact:

  • Set use_procs=True (the default) to use background processes, instead of threads, to perform the file uploads. Processes are recommended to ensure that the GIL is not blocking the training loop when performing CPU operations on uploaded files (e.g. computing and comparing checksums). Network I/O happens always occurs in the background.

  • Provide a RAM disk path for the upload_staging_folder parameter. Copying files to stage on RAM will be faster than writing to disk. However, there must have sufficient excess RAM, or MemoryErrors may be raised.

Parameters
  • bucket_uri (str) โ€“

    The remote uri for the bucket to use (e.g. s3://my-bucket).

    As individual ObjectStore instances are not necessarily thread safe, each worker will construct its own ObjectStore instance from remote_backend and backend_kwargs.

  • backend_kwargs (Dict[str, Any]) โ€“

    The keyword arguments to construct the remote backend indicated by bucket_uri.

    As individual ObjectStore instances are not necessarily thread safe, each worker will construct its own ObjectStore instance from remote_backend and backend_kwargs.

  • file_path_format_string (str, optional) โ€“

    A format string used to determine the remote file path (within the specified bucket).

    The following format variables are available:

    Variable

    Description

    {remote_file_name}

    The name of the file being logged.

    {run_name}

    The name of the training run. See State.run_name.

    {rank}

    The global rank, as returned by get_global_rank().

    {local_rank}

    The local rank of the process, as returned by get_local_rank().

    {world_size}

    The world size, as returned by get_world_size().

    {local_world_size}

    The local world size, as returned by get_local_world_size().

    {node_rank}

    The node rank, as returned by get_node_rank().

    Leading slashes ('/') will be stripped.

    Consider the following example, which subfolders the remote files by their rank:

    >>> remote_uploader_downloader = RemoteUploaderDownloader(..., file_path_format_string='rank_{rank}/{remote_file_name}')
    >>> trainer = Trainer(..., save_latest_filename=None, run_name='foo', loggers=[remote_uploader_downloader])
    >>> trainer.logger.upload_file(
    ...     remote_file_name='bar.txt',
    ...     file_path='path/to/file.txt',
    ... )
    

    Assuming that the processโ€™s rank is 0, the remote backend would store the contents of 'path/to/file.txt' in at 'rank0/bar.txt'.

    Default: '{remote_file_name}'

  • num_concurrent_uploads (int, optional) โ€“ Maximum number of concurrent uploads. Defaults to 1.

  • upload_staging_folder (str, optional) โ€“ A folder to use for staging uploads. If not specified, defaults to using a TemporaryDirectory().

  • use_procs (bool, optional) โ€“ Whether to perform file uploads in background processes (as opposed to threads). Defaults to True.

  • num_attempts (int, optional) โ€“ For operations that fail with a transient error, the number of attempts to make. Defaults to 3.

can_upload_files()[source]#

Whether the logger supports uploading files.

get_uri_for_file(remote_file_name)[source]#

Get the object store provider uri for a remote file.

Parameters

remote_file_name (str) โ€“ The name of a remote file.

Returns

str โ€“ The uri corresponding to the uploaded location of the remote file.

property remote_backend#

The ObjectStore instance for the main thread.

wait_for_workers()[source]#

Wait for all tasks to be completed.

This is called after fit/eval/predict. If we donโ€™t wait, then a worker might not schedule an upload before the interpreter is shutdown and garbage collection begins. While post_close logic ensures existing uploads are completed, trying to schedule new uploads during this time will error.