# Copyright 2022 MosaicML Composer authors
# SPDX-License-Identifier: Apache-2.0
"""Profiler to collect :mod:`torch` performance metrics during training."""
from __future__ import annotations
import json
import logging
import os
import textwrap
from typing import TYPE_CHECKING, Optional, OrderedDict
import torch.cuda
import torch.profiler
from packaging import version
from torch.profiler.profiler import ProfilerAction as TorchProfilerAction
from composer.core.callback import Callback
from composer.loggers import Logger
from composer.profiler.profiler_action import ProfilerAction
from composer.utils import (
FORMAT_NAME_WITH_DIST_AND_TIME_TABLE,
FORMAT_NAME_WITH_DIST_TABLE,
dist,
ensure_folder_is_empty,
format_name_with_dist,
format_name_with_dist_and_time,
)
if TYPE_CHECKING:
from composer.core import State
__all__ = ['TorchProfiler']
log = logging.getLogger(__name__)
[docs]class TorchProfiler(Callback): # noqa: D101
__doc__ = f"""Profile the execution using the :class:`PyTorch Profiler <torch.profiler.profile>`.
Profiling results are stored in TensorBoard format in the directory specified by ``folder``.
.. note::
The Composer :class:`.Trainer` automatically creates an instance of this
:class:`.TorchProfiler` callback whenever any of the PyTorch Profiler arguments
(``torch_prof_record_shapes``, ``torch_prof_profile_memory``, ``torch_prof_with_stack``, or
``torch_prof_with_flops``) are enabled.
When using the Composer :class:`.Trainer`, one does not need to directly create an
instance of this :class:`.TorchProfiler` callback.
To view profiling results, run::
pip install tensorboard torch_tb_profiler
tensorboard --logdir path/to/torch/trace_folder
.. note::
See :doc:`profiler` for additional usage details on the :class:`torch.profiler.profile`.
.. note::
Enabling shape and stack tracing results in additional overhead.
When ``record_shapes=True`` is specified, the profiler will temporarily hold references to tensors which
may prevent certain optimizations that depend on the reference count and can introduce extra tensor copies.
Args:
folder (str, optional): Format string for the folder containing the Torch Profiler trace files.
Defaults to ``'{{run_name}}/torch_traces'``.
The following format variables are available:
{textwrap.indent(FORMAT_NAME_WITH_DIST_TABLE, prefix=' ')}
For example, if the ``run_name`` is ``'awesome_training_run'``, and the default ``folder`` of
``'{{run_name}}/torch_traces'`` is used, Torch Profiler traces will be stored in
``'awesome_training_run/torch_traces'``.
filename (str, optional): A format string describing how to name Torch Profiler trace files.
Defaults to ``'rank{{rank}}.{{batch}}.pt.trace.json'``.
At the end of each batch where :meth:`~composer.profiler.Profiler.get_action` returns
:attr:`~composer.profiler._profiler_action.ProfilerAction.ACTIVE_AND_SAVE`, trace files are saved
approximately to ``{{folder.format(...)}}/{{filename.format(...)}}``.
The following format variables are available:
{textwrap.indent(FORMAT_NAME_WITH_DIST_AND_TIME_TABLE, prefix=' ')}
Consider the following scenario, where:
* The :attr:`~.State.run_name` is ``'awesome-training-run'``.
* The default ``trace_folder='{{run_name}}/torch_traces'`` is used.
* The default ``name='rank{{rank}}.{{batch}}.pt.trace.json'`` is used.
* The current epoch count is ``1``.
* The current batch count is ``42``.
Each rank (process) will save traces to::
awesome-training-run/torch_traces/ep1-ba42-rank0.pt.trace.json
awesome-training-run/torch_traces/ep1-ba42-rank1.pt.trace.json
awesome-training-run/torch_traces/ep1-ba42-rank2.pt.trace.json
...
remote_file_name (str, optional): Format string for a Torch Profiler trace file's remote file name.
Defaults to ``'{{run_name}}/torch_traces/rank{{rank}}.{{batch}}.pt.trace.json'``.
Whenever a trace file is saved, it is also uploaded as a file according to this format string.
The same format variables as for ``filename`` are available.
.. seealso:: :doc:`Uploading Files</trainer/file_uploading>` for notes for file uploading.
Leading slashes (``'/'``) will be stripped.
To disable uploading trace files, set this parameter to ``None``.
memory_filename (str, optional): A format string describing how to name Torch Profiler memory trace files.
Defaults to None. An example memory_filename is ``'rank{{rank}}.{{batch}}.pt.trace.memory.html'``.
At the end of each batch where :meth:`~composer.profiler.Profiler.get_action` returns
:attr:`~composer.profiler._profiler_action.ProfilerAction.ACTIVE_AND_SAVE`, trace files are saved
approximately to ``{{folder.format(...)}}/{{memory_filename.format(...)}}``.
The following format variables are available:
{textwrap.indent(FORMAT_NAME_WITH_DIST_AND_TIME_TABLE, prefix=' ')}
Consider the following scenario, where:
* The :attr:`~.State.run_name` is ``'awesome-training-run'``.
* The default ``trace_folder='{{run_name}}/torch_traces'`` is used.
* The default ``name='rank{{rank}}.{{batch}}.pt.trace.memory.html'`` is used.
* The current epoch count is ``1``.
* The current batch count is ``42``.
Each rank (process) will save traces to::
awesome-training-run/torch_traces/ep1-ba42-rank0.pt.trace.memory.html
awesome-training-run/torch_traces/ep1-ba42-rank1.pt.trace.memory.html
awesome-training-run/torch_traces/ep1-ba42-rank2.pt.trace.memory.html
...
memory_remote_file_name (str, optional): Format string for a Torch Profiler memory trace file's remote file name.
Defaults to ``'{{run_name}}/torch_traces/rank{{rank}}.{{batch}}.pt.trace.memory.json'``.
Whenever a trace file is saved, it is also uploaded as a file according to this format string.
The same format variables as for ``filename`` are available.
.. seealso:: :doc:`Uploading Files</trainer/file_uploading>` for notes for file uploading.
Leading slashes (``'/'``) will be stripped.
To disable uploading trace files, set this parameter to ``None``.
overwrite (bool, optional): Whether to override existing Torch Profiler traces. Defaults to False.
If False, then the trace folder as determined by ``folder`` must be empty.
use_gzip (bool, optional): Whether to use gzip for the trace. Defaults to False.
If True, ``'.gz'`` will be appended ``filename`` and ``remote_file_name``
(if they do not already end in ``'.gz'``).
record_shapes (bool, optional): Whether to record tensor shapes. Defaults to False.
profile_memory (bool, optional): Whether to profile memory. Defaults to True.
with_stack (bool, optional): Whether to record stack info. Defaults to False.
with_flops (bool, optional): Whether to estimate flops for operators. Defaults to True.
num_traces_to_keep (int, optional): The number of trace files to keep locally. Defaults to -1.
If set to -1, then all traces files are kept locally.
After a trace has been saved and uploaded, the oldest traces are removed until
``num_traces_to_keep`` traces remain. This parameter only controls how many traces are kept locally;
traces are not deleted from remote file systems.
It can be useful to set this parameter to ``0`` when using a remote file uploader such as the
:class:`.RemoteUploaderDownloader`. This combination will minimize local
disk usage by deleting trace files immediately after they have been uploaded to the object store.
Attributes:
saved_traces (list[tuple[Timestamp, list[pathlib.Path]]]): The trace timestamps and filepaths.
This list contains tuples of the save timestamp and the trace filepaths.
This list will have at most ``num_traces_to_keep`` entries. The latest trace
will be at the end.
The index of a filepath in each list corresponds to the global rank of the process that wrote that file.
Each filepath is valid only on the process's (rank's) node.
"""
def __init__(
self,
folder: str = '{run_name}/torch_traces',
filename: str = 'rank{rank}.{batch}.pt.trace.json',
remote_file_name: Optional[str] = '{run_name}/torch_traces/rank{rank}.{batch}.pt.trace.json',
memory_filename: Optional[str] = None,
memory_remote_file_name: Optional[str] = (
'{run_name}/torch_memory_traces/'
'rank{rank}.{batch}.pt.trace.memory.html'
),
overwrite: bool = False,
use_gzip: bool = False,
record_shapes: bool = False,
profile_memory: bool = True,
with_stack: bool = False,
with_flops: bool = True,
num_traces_to_keep: int = -1,
) -> None:
self.overwrite = overwrite
self.folder = folder
if use_gzip:
if not filename.endswith('.gz'):
filename += '.gz'
self.filename = filename
if use_gzip:
if remote_file_name is not None and not remote_file_name.endswith('.gz'):
remote_file_name += '.gz'
self.remote_file_name = remote_file_name
if memory_filename is not None:
assert memory_filename.endswith('.html'), f'memory_filename must end with .html, got {memory_filename}'
self.memory_filename = memory_filename
if memory_remote_file_name is not None:
assert memory_remote_file_name.endswith(
'.html',
), f'memory_remote_file_name must end with .html, got {memory_remote_file_name}'
self.memory_remote_file_name = memory_remote_file_name
self.record_shapes = record_shapes
self.profile_memory = profile_memory
self.with_stack = with_stack
self.with_flops = with_flops
self.num_traces_to_keep = num_traces_to_keep
self.saved_traces = OrderedDict()
self.profiler: Optional[torch.profiler.profile] = None
def init(self, state: State, logger: Logger) -> None:
if state.profiler is None:
raise RuntimeError((
'The Composer Profiler was not enabled, which is required to use the '
f'{type(self).__name__}. To enable, set the `prof_schedule` argument of the Trainer.'
))
folder_name = format_name_with_dist(self.folder, state.run_name)
os.makedirs(folder_name, exist_ok=True)
if not self.overwrite:
ensure_folder_is_empty(folder_name)
dist.barrier()
def scheduler_fn(torch_profiler_step: int) -> TorchProfilerAction:
del torch_profiler_step # the torch profiler step is unused. Using the composer timestamp instead.
assert state.profiler is not None
composer_profiler_action = state.profiler.schedule(state)
if composer_profiler_action == ProfilerAction.ACTIVE_AND_SAVE:
return TorchProfilerAction.RECORD_AND_SAVE
if composer_profiler_action == ProfilerAction.ACTIVE:
return TorchProfilerAction.RECORD
if composer_profiler_action == ProfilerAction.WARMUP:
return TorchProfilerAction.WARMUP
assert composer_profiler_action == ProfilerAction.SKIP, f'unexpected action: {composer_profiler_action}'
return TorchProfilerAction.NONE
def handler_fn(prof: torch.profiler.profiler.profile):
assert state.profiler is not None
timestamp = state.timestamp
log.info(f'PyTorch Chrome trace profiler enabled: {self.filename if self.filename else False}')
trace_file_name = os.path.join(
folder_name,
format_name_with_dist_and_time(self.filename, run_name=state.run_name, timestamp=timestamp),
)
trace_file_dirname = os.path.dirname(trace_file_name)
if trace_file_dirname:
os.makedirs(trace_file_dirname, exist_ok=True)
prof.export_chrome_trace(trace_file_name)
state.profiler.record_chrome_json_trace_file(trace_file_name)
if self.remote_file_name is not None:
trace_remote_file_name = format_name_with_dist_and_time(
self.remote_file_name,
run_name=state.run_name,
timestamp=timestamp,
)
trace_remote_file_name = trace_remote_file_name.lstrip('/')
logger.upload_file(
remote_file_name=trace_remote_file_name,
file_path=trace_file_name,
overwrite=self.overwrite,
)
log.info(
f'PyTorch memory timeline profiler enabled: {self.memory_filename if self.memory_filename else False}',
)
if self.memory_filename is not None:
if version.parse(torch.__version__) > version.parse('2.1.0.dev'): # type: ignore
# memory timeline profiling is only supported in torch v2.1.0-rc1 or higher
memory_trace_file_name = os.path.join(
folder_name,
format_name_with_dist_and_time(
self.memory_filename,
run_name=state.run_name,
timestamp=timestamp,
),
)
log.debug(f'Saving memory trace to {memory_trace_file_name}')
memory_trace_file_dirname = os.path.dirname(memory_trace_file_name)
if memory_trace_file_dirname:
os.makedirs(memory_trace_file_dirname, exist_ok=True)
from composer.profiler.utils import export_memory_timeline_html
export_memory_timeline_html(
prof,
memory_trace_file_name,
torch.cuda.current_device(), # type: ignore
)
log.debug(f'Uploaded memory trace to {self.memory_remote_file_name}')
if self.memory_remote_file_name is not None:
memory_trace_remote_file_name = format_name_with_dist_and_time(
self.memory_remote_file_name,
run_name=state.run_name,
timestamp=timestamp,
)
memory_trace_remote_file_name = memory_trace_remote_file_name.lstrip('/')
log.debug(
f'Uploading memory trace to {memory_trace_remote_file_name} from {memory_trace_file_name}',
)
logger.upload_file(
remote_file_name=memory_trace_remote_file_name,
file_path=memory_trace_file_name,
overwrite=self.overwrite,
)
else:
log.warning('Memory timeline is supported after PyTorch 2.1.0. Skipping memory trace.')
if self.num_traces_to_keep >= 0:
while len(self.saved_traces) > self.num_traces_to_keep:
# self.saved_traces is an ordered dict, so the zeroth item will be the oldest checkpoint
timestamp, filepaths = next(iter(self.saved_traces.items()))
if dist.get_global_rank() < len(filepaths):
# Remove this rank's checkpoint
os.remove(filepaths[dist.get_global_rank()])
del self.saved_traces[timestamp]
self.profiler = torch.profiler.profile(
schedule=scheduler_fn,
on_trace_ready=handler_fn,
record_shapes=self.record_shapes,
profile_memory=self.profile_memory,
with_stack=self.with_stack,
with_flops=self.with_flops,
)
self.profiler.__enter__()
def batch_end(self, state: State, logger: Logger) -> None:
del state, logger # unused
assert self.profiler is not None
self.profiler.add_metadata_json('global_rank', json.dumps(dist.get_global_rank()))
self.profiler.step()
def batch_start(self, state: State, logger: Logger) -> None:
del state # unused
assert self.profiler is not None
logger.log_traces({'profiler/state': self.profiler.current_action.name})
def close(self, state: State, logger: Logger) -> None:
del state, logger # unused
if self.profiler is not None and self.profiler.profiler is not None:
log.info(self.profiler.key_averages().table(sort_by='cpu_time_total', row_limit=20))
if self.profile_memory:
log.info(self.profiler.key_averages().table(sort_by='self_cpu_memory_usage', row_limit=20))
if torch.profiler.ProfilerActivity.CUDA in self.profiler.activities:
log.info(self.profiler.key_averages().table(sort_by='cuda_time_total', row_limit=20))
if self.profile_memory:
log.info(self.profiler.key_averages().table(sort_by='self_cuda_memory_usage', row_limit=20))
self.profiler.__exit__(None, None, None)
self.profiler = None