RemoteUploaderDownloader#
- class composer.loggers.RemoteUploaderDownloader(bucket_uri, backend_kwargs=None, file_path_format_string='{remote_file_name}', num_concurrent_uploads=1, upload_staging_folder=None, use_procs=True, num_attempts=3)[source]#
Logger destination that uploads (downloads) files to (from) a remote backend.
This logger destination handles calls to
Logger.upload_file()
and uploads files toObjectStore
, such as AWS S3 or Google Cloud Storage. To minimize the training loop performance hit, it supports background uploads.from composer.loggers import RemoteUploaderDownloader from composer.utils import LibcloudObjectStore remote_uploader_downloader = RemoteUploaderDownloader( bucket_uri="s3://my-bucket", ) # Construct the trainer using this logger trainer = Trainer( ..., loggers=[remote_uploader_downloader], )
or
from composer.loggers import RemoteUploaderDownloader from composer.utils import LibcloudObjectStore remote_uploader_downloader = RemoteUploaderDownloader( bucket_uri="libcloud://my-bucket", backend_kwargs={ 'provider': 's3', 'container': 'my-bucket', 'provider_kwargs=': { 'key': 'AKIA...', 'secret': '*********', 'region': 'ap-northeast-1', }, }, ) # Construct the trainer using this logger trainer = Trainer( ..., loggers=[remote_uploader_downloader], )
or
remote_uploader_downloader = RemoteUploaderDownloader( bucket_uri="libcloud://my-gcs-bucket", backend_kwargs={ "provider": "google_storage", "container": "my-gcs-bucket", "key_environ": "MY_HMAC_ACCESS_ID", # Name of env variable for HMAC access id. "secret_environ": "MY_HMAC_SECRET", # Name of env variable for HMAC secret. }, ) # Construct the trainer using this logger trainer = Trainer( ..., loggers=[remote_uploader_downloader], )
Note
This callback blocks the training loop to upload each file, as the uploading happens in the background. Here are some additional tips for minimizing the performance impact:
Set
use_procs=True
(the default) to use background processes, instead of threads, to perform the file uploads. Processes are recommended to ensure that the GIL is not blocking the training loop when performing CPU operations on uploaded files (e.g. computing and comparing checksums). Network I/O happens always occurs in the background.Provide a RAM disk path for the
upload_staging_folder
parameter. Copying files to stage on RAM will be faster than writing to disk. However, there must have sufficient excess RAM, orMemoryError
s may be raised.
- Parameters
bucket_uri (str) โ
The remote uri for the bucket to use (e.g. s3://my-bucket).
As individual
ObjectStore
instances are not necessarily thread safe, each worker will construct its ownObjectStore
instance fromremote_backend
andbackend_kwargs
.backend_kwargs (Dict[str, Any]) โ
The keyword arguments to construct the remote backend indicated by
bucket_uri
.As individual
ObjectStore
instances are not necessarily thread safe, each worker will construct its ownObjectStore
instance fromremote_backend
andbackend_kwargs
.file_path_format_string (str, optional) โ
A format string used to determine the remote file path (within the specified bucket).
The following format variables are available:
Variable
Description
{remote_file_name}
The name of the file being logged.
{run_name}
The name of the training run. See
State.run_name
.{rank}
The global rank, as returned by
get_global_rank()
.{local_rank}
The local rank of the process, as returned by
get_local_rank()
.{world_size}
The world size, as returned by
get_world_size()
.{local_world_size}
The local world size, as returned by
get_local_world_size()
.{node_rank}
The node rank, as returned by
get_node_rank()
.Leading slashes (
'/'
) will be stripped.Consider the following example, which subfolders the remote files by their rank:
>>> remote_uploader_downloader = RemoteUploaderDownloader(..., file_path_format_string='rank_{rank}/{remote_file_name}') >>> trainer = Trainer(..., save_latest_filename=None, run_name='foo', loggers=[remote_uploader_downloader]) >>> trainer.logger.upload_file( ... remote_file_name='bar.txt', ... file_path='path/to/file.txt', ... )
Assuming that the processโs rank is
0
, the remote backend would store the contents of'path/to/file.txt'
in at'rank0/bar.txt'
.Default:
'{remote_file_name}'
num_concurrent_uploads (int, optional) โ Maximum number of concurrent uploads. Defaults to 1.
upload_staging_folder (str, optional) โ A folder to use for staging uploads. If not specified, defaults to using a
TemporaryDirectory()
.use_procs (bool, optional) โ Whether to perform file uploads in background processes (as opposed to threads). Defaults to True.
num_attempts (int, optional) โ For operations that fail with a transient error, the number of attempts to make. Defaults to 3.
- get_uri_for_file(remote_file_name)[source]#
Get the object store provider uri for a remote file.
- Parameters
remote_file_name (str) โ The name of a remote file.
- Returns
str โ The uri corresponding to the uploaded location of the remote file.
- property remote_backend#
The
ObjectStore
instance for the main thread.
- wait_for_workers()[source]#
Wait for all tasks to be completed.
This is called after fit/eval/predict. If we donโt wait, then a worker might not schedule an upload before the interpreter is shutdown and garbage collection begins. While post_close logic ensures existing uploads are completed, trying to schedule new uploads during this time will error.