# Copyright 2021 MosaicML. All Rights Reserved.
"""Profiler to record system level metrics."""
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Dict, cast
import psutil
from composer.core.callback import Callback
if TYPE_CHECKING:
from composer.core.state import State
from composer.loggers.logger import Logger
from composer.profiler import Profiler
__all__ = ["SystemProfiler"]
[docs]class SystemProfiler(Callback):
"""The SystemProfiler records system level metrics.
.. note::
The Composer :class:`~composer.trainer.trainer.Trainer` automatically creates an instance of this
:class:`.SystemProfiler` callback whenever any of the System Profiler arguments (``sys_prof_cpu``,
``sys_prof_memory``, ``sys_prof_disk``, or ``sys_prof_net``) are enabled.
When using the Composer :class:`~composer.trainer.trainer.Trainer`, one does not need to directly create an
instance of this :class:`.SystemProfiler` callback.
Args:
profile_cpu (bool): Whether to record cpu statistics (Default: ``True``)
profile_memory (bool): Whether to record memory statistics (Default: ``False``)
profile_disk (bool): Whether to record disk I/O statistics (Default: ``False``)
profile_net (bool): Whether to record network I/O statistics (Default: ``False``)
stats_thread_interval_seconds (float): Interval to record system-level stats, in seconds. (Default: every ``0.5`` seconds)
"""
def __init__(self,
profile_cpu: bool = True,
profile_memory: bool = False,
profile_disk: bool = False,
profile_net: bool = False,
stats_thread_interval_seconds: float = 0.5) -> None:
self.profile_cpu = profile_cpu
self.profile_disk = profile_disk
self.profile_memory = profile_memory
self.profile_net = profile_net
self.stats_thread_interval_seconds = stats_thread_interval_seconds
self.finished_event = threading.Event()
def init(self, state: State, logger: Logger):
del logger # unused
if state.profiler is None:
raise RuntimeError(("The Composer Profiler was not enabled, which is required to use the "
f"{type(self).__name__}. To enable, set the `prof_schedule` argument of the Trainer."))
# Start the stats thread
self.finished_event.clear()
threading.Thread(target=self._stats_thread, daemon=True, args=[state.profiler]).start()
def close(self, state: State, logger: Logger) -> None:
self.finished_event.set()
def _stats_thread(self, profiler: Profiler):
from composer.callbacks import memory_monitor
"""Gathers requested system metrics at :attr:`SystemProfiler.stats_thread_interval_seconds` interval."""
psutil.disk_io_counters.cache_clear()
psutil.net_io_counters.cache_clear()
if self.profile_cpu:
psutil.cpu_percent() # spin it once to clear the default 0.0 value on the first call
while not self.finished_event.isSet():
if self.profile_cpu:
cpu_percent = psutil.cpu_percent()
profiler.marker(name="cpu", categories=["cpu"]).counter({"cpu_percent": cpu_percent})
if self.profile_memory:
cuda_memory_stats = memory_monitor._get_memory_report()
for name, val in cuda_memory_stats.items():
profiler.marker(f"memory/cuda/{name}", categories=["memory"]).counter({name: val})
swap_memory = psutil.swap_memory()
profiler.marker("memory/swap", categories=["memory"]).counter({
"used_gb": swap_memory.used / 2**9,
"free_gb": swap_memory.free / 2**9
})
virtual_memory = psutil.virtual_memory()
profiler.marker("memory/virtual", categories=["memory"]).counter({
"used_gb": virtual_memory.used / 2**9,
"available_gb": virtual_memory.available / 2**9
})
if self.profile_disk:
disk_io_counters = cast(Dict[str, psutil._common.sdiskio], psutil.disk_io_counters(perdisk=True))
for disk_name, disk_stats in disk_io_counters.items():
for field_name in ("read_count", "write_count", "read_bytes", "write_bytes", "read_time",
"write_time", "busy_time"):
profiler.marker(f"disk/{disk_name}/{field_name}",
categories=["disk"]).counter({"field_name": getattr(disk_stats, field_name)})
if self.profile_net:
net_io_counters = cast(Dict[str, psutil._common.snetio], psutil.net_io_counters(pernic=True))
for nic, nic_stats in net_io_counters.items():
profiler.marker(f"network/{nic}/kb_sent",
categories=["net"]).counter({"kb_sent": nic_stats.bytes_sent / 2**3})
profiler.marker(f"network/{nic}/kb_recv",
categories=["net"]).counter({"kb_recv": nic_stats.bytes_recv / 2**3})
time.sleep(self.stats_thread_interval_seconds)