Source code for composer.profiler.system_profiler

# Copyright 2022 MosaicML Composer authors
# SPDX-License-Identifier: Apache-2.0

"""Profiler to record system level metrics."""

from __future__ import annotations

import threading
import time
from typing import TYPE_CHECKING, Dict, cast

import psutil

from composer.core import Callback

    from composer.core import State
    from composer.loggers import Logger
    from composer.profiler import Profiler

__all__ = ['SystemProfiler']

[docs]class SystemProfiler(Callback): """The SystemProfiler records system level metrics. .. note:: The Composer :class:`.Trainer` automatically creates an instance of this :class:`.SystemProfiler` callback whenever any of the System Profiler arguments (``sys_prof_cpu``, ``sys_prof_memory``, ``sys_prof_disk``, or ``sys_prof_net``) are enabled. When using the Composer :class:`.Trainer`, one does not need to directly create an instance of this :class:`.SystemProfiler` callback. Args: profile_cpu (bool): Whether to record cpu statistics (Default: ``True``) profile_memory (bool): Whether to record memory statistics (Default: ``False``) profile_disk (bool): Whether to record disk I/O statistics (Default: ``False``) profile_net (bool): Whether to record network I/O statistics (Default: ``False``) stats_thread_interval_seconds (float): Interval to record system-level stats, in seconds. (Default: every ``0.5`` seconds) """ def __init__(self, profile_cpu: bool = True, profile_memory: bool = False, profile_disk: bool = False, profile_net: bool = False, stats_thread_interval_seconds: float = 0.5) -> None: self.profile_cpu = profile_cpu self.profile_disk = profile_disk self.profile_memory = profile_memory self.profile_net = profile_net self.stats_thread_interval_seconds = stats_thread_interval_seconds self.finished_event = threading.Event() def init(self, state: State, logger: Logger): del logger # unused if state.profiler is None: raise RuntimeError(('The Composer Profiler was not enabled, which is required to use the ' f'{type(self).__name__}. To enable, set the `prof_schedule` argument of the Trainer.')) # Start the stats thread self.finished_event.clear() threading.Thread(target=self._stats_thread, daemon=True, args=[state.profiler]).start() def close(self, state: State, logger: Logger) -> None: self.finished_event.set() def _stats_thread(self, profiler: Profiler): from composer.callbacks import memory_monitor """Gathers requested system metrics at :attr:`SystemProfiler.stats_thread_interval_seconds` interval.""" psutil.disk_io_counters.cache_clear() psutil.net_io_counters.cache_clear() if self.profile_cpu: psutil.cpu_percent() # spin it once to clear the default 0.0 value on the first call while not self.finished_event.is_set(): if self.profile_cpu: cpu_percent = psutil.cpu_percent() profiler.marker(name='cpu', categories=['cpu']).counter({'cpu_percent': cpu_percent}) if self.profile_memory: cuda_memory_stats = memory_monitor._get_memory_report() for name, val in cuda_memory_stats.items(): profiler.marker(f'memory/cuda/{name}', categories=['memory']).counter({name: val}) swap_memory = psutil.swap_memory() profiler.marker('memory/swap', categories=['memory']).counter({ 'used_gb': swap_memory.used / 2**9, 'free_gb': / 2**9 }) virtual_memory = psutil.virtual_memory() profiler.marker('memory/virtual', categories=['memory']).counter({ 'used_gb': virtual_memory.used / 2**9, 'available_gb': virtual_memory.available / 2**9 }) if self.profile_disk: disk_io_counters = cast( Dict[str, psutil._common.sdiskio], # type: ignore psutil.disk_io_counters(perdisk=True)) for disk_name, disk_stats in disk_io_counters.items(): for field_name in ('read_count', 'write_count', 'read_bytes', 'write_bytes', 'read_time', 'write_time', 'busy_time'): profiler.marker(f'disk/{disk_name}/{field_name}', categories=['disk']).counter({'field_name': getattr(disk_stats, field_name)}) if self.profile_net: net_io_counters = cast( Dict[str, psutil._common.snetio], # type: ignore psutil.net_io_counters(pernic=True)) for nic, nic_stats in net_io_counters.items(): profiler.marker(f'network/{nic}/kb_sent', categories=['net']).counter({'kb_sent': nic_stats.bytes_sent / 2**3}) profiler.marker(f'network/{nic}/kb_recv', categories=['net']).counter({'kb_recv': nic_stats.bytes_recv / 2**3}) time.sleep(self.stats_thread_interval_seconds)