# Copyright 2022 MosaicML Composer authors
# SPDX-License-Identifier: Apache-2.0
"""Check GPU Health during training."""
import logging
import os
from collections import deque
from datetime import datetime
from typing import List, Optional, Tuple
import numpy as np
import torch
from composer.core import Callback, State
from composer.core.time import Timestamp
from composer.loggers import Logger
from composer.utils import MissingConditionalImportError, dist
log = logging.getLogger(__name__)
__all__ = ['HealthChecker']
[docs]class HealthChecker(Callback):
"""Checks for GPU health.
This callback checks for GPU health by tracking and alerting for abnormal
GPU utilizations.
For example, if the average utilization during the observation window is,
[30, 30, 45], then the range (45-30=15) would exceed a threshold of 10%.
Args:
threshold (float, optional): Threshold of GPU utilization range to
trigger an alert. Defaults to 10.
sample_freq (int, optional): Sample frequency in seconds. Default: 5.
window_size (int, optional): Window size in seconds. HealthChecker will
check for abnormalities at this frequency. Default: 120.
wait (int, optional): Seconds to wait for starting to sample. Default: 120.
slack_webhook_url (str, optional): Slack URL to send alerts. Can also
be set with the SLACK_WEBHOOK_URL environment variable. Default: None
test_mode (bool, optional): If True, will send a test alert at the first check.
Default: False
"""
def __init__(
self,
threshold: float = 10,
sample_freq: int = 5,
window_size: int = 120,
wait: int = 120,
slack_webhook_url: Optional[str] = None,
test_mode: bool = False,
) -> None:
self.sample_freq = sample_freq
self.window_size = window_size
self.wait = wait
self.slack_webhook_url = slack_webhook_url
self.test_mode = test_mode
if not self.slack_webhook_url:
self.slack_webhook_url = os.environ.get('SLACK_WEBHOOK_URL', None)
if self.slack_webhook_url:
# fail fast if missing import
try:
import slack_sdk
del slack_sdk
except ImportError as e:
raise MissingConditionalImportError('health_checker', 'slack_sdk', None) from e
self.last_sample = 0
self.last_check = 0
self.metrics = []
if self._is_available():
self.metrics.append(GPUUtilization(threshold))
def init(self, state: State, logger: Logger) -> None:
pass
def after_train_batch(self, state: State, logger: Logger):
if not self.metrics:
return
if self._sample(state.timestamp):
for metric in self.metrics:
metric.sample()
if self._check(state.timestamp):
for metric in self.metrics:
message, alert = metric.check()
if self.test_mode and message and dist.get_global_rank() == 0:
alert = True
message = '[**THIS IS A TEST**]' + message
if alert and not metric.alerted:
self._alert(message, state)
metric.alerted = True
metric.clear()
def _sample(self, timestamp: Timestamp) -> bool:
now = timestamp.total_wct.seconds
if now < self.wait:
return False
if now - self.last_sample >= self.sample_freq:
self.last_sample = now
return True
return False
def _check(self, timestamp: Timestamp) -> bool:
now = timestamp.total_wct.seconds
if now - self.last_check >= self.window_size:
self.last_check = now
return True
return False
def _alert(self, message: str, state: State) -> None:
prefix = '[{now}][{run_name}][node_rank={node_rank}]'.format(
now=datetime.now(),
run_name=state.run_name,
node_rank=dist.get_node_rank(),
)
node_name = os.environ.get('NODENAME', None)
if node_name is not None:
prefix += f'[node={node_name}]'
message = prefix + ' : ' + message
logging.warning(message)
if self.slack_webhook_url:
from slack_sdk.webhook import WebhookClient
client = WebhookClient(url=self.slack_webhook_url)
client.send(text=message)
@staticmethod
def _is_available() -> bool:
if not torch.cuda.is_available():
return False
try:
import pynvml
pynvml.nvmlInit() # type: ignore
return True
except ImportError:
raise MissingConditionalImportError('health_checker', 'pynvml', None)
except pynvml.NVMLError_LibraryNotFound: # type: ignore
logging.warning('NVML not found, disabling GPU health checking')
except Exception as e:
logging.warning(f'Error initializing NVML: {e}')
return False
class GPUUtilization:
"""GPU Utilization Metric."""
def __init__(self, threshold=10) -> None:
self.samples = deque()
self.threshold = threshold
self.alerted = False
def sample(self) -> None:
if dist.get_local_rank() == 0:
sample = self._sample()
if sample is not None:
self.samples.append(sample)
def _sample(self) -> Optional[List]:
try:
import pynvml
except ImportError:
raise MissingConditionalImportError('health_checker', 'pynvml', None)
try:
samples = []
device_count = pynvml.nvmlDeviceGetCount()
for i in range(device_count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
samples.append(pynvml.nvmlDeviceGetUtilizationRates(handle).gpu)
except pynvml.NVMLError:
return None
return samples
def check(self) -> Tuple[Optional[str], bool]:
if dist.get_local_rank() == 0:
average_sample = np.nanmean(list(self.samples), axis=0)
if np.nanmax(average_sample) - np.nanmin(average_sample) > self.threshold:
message = f'Abnormal GPU utilizations: {average_sample}'
return message, True
else:
message = f':+1: Normal GPU utilizations: {average_sample}'
return message, False
return None, False
def clear(self) -> None:
self.samples.clear()