# Copyright 2022 MosaicML Composer authors
# SPDX-License-Identifier: Apache-2.0
"""A wrapper for a dataloader to include metrics that apply to a specific dataset."""
from __future__ import annotations
import textwrap
import warnings
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
from composer.core.data_spec import DataSpec, ensure_data_spec
from composer.core.event import Event
from composer.core.state import State
from composer.core.time import Time
from composer.utils import create_interval_scheduler
__all__ = ['Evaluator', 'ensure_evaluator']
[docs]class Evaluator:
"""A wrapper for a dataloader to include metrics that apply to a specific dataset.
For example, :class:`.CrossEntropyLoss` metric for NLP models.
.. doctest::
>>> eval_evaluator = Evaluator(
... label='myEvaluator',
... dataloader=eval_dataloader,
... metric_names=['MulticlassAccuracy']
... )
>>> trainer = Trainer(
... model=model,
... train_dataloader=train_dataloader,
... eval_dataloader=eval_evaluator,
... optimizers=optimizer,
... max_duration='1ep',
... )
Args:
label (str): Name of the Evaluator.
dataloader (DataSpec | Iterable | Dict[str, Any]): Iterable that yields batches, a :class:`.DataSpec`
for evaluation, or a Dict of :class:`.DataSpec` kwargs.
metric_names: The list of metric names to compute.
Each value in this list can be a regex string (e.g. "MulticlassAccuracy", "f1" for "BinaryF1Score",
"Top-." for "Top-1", "Top-2", etc). Each regex string will be matched against the keys of the dictionary returned
by ``model.get_metrics()``. All matching metrics will be evaluated.
By default, if left blank, then all metrics returned by ``model.get_metrics()`` will be used.
subset_num_batches (int, optional): The maximum number of batches to use for each evaluation. Defaults to ``None``,
which means that the ``eval_subset_num_batches`` parameter from the :class:`.Trainer` will be used.
Set to ``-1`` to evaluate the entire ``dataloader``.
eval_interval (Time | int | str | (State, Event) -> bool, optional): An integer,
which will be interpreted to be epochs, a str (e.g. ``1ep``, or ``10ba``), a :class:`.Time` object, or a callable.
Defaults to ``None``, which means that the ``eval_interval`` parameter from the :class:`.Trainer` will be used.
If an integer (in epochs), :class:`.Time` string, or :class:`.Time` instance, the evaluator will be run
with this frequency. :class:`.Time` strings or :class:`.Time` instances must have units of
:attr:`.TimeUnit.BATCH` or :attr:`.TimeUnit.EPOCH`.
Set to ``0`` to disable evaluation.
If a callable, it should take two arguments (:class:`.State`, :class:`.Event`) and return a bool
representing whether the evaluator should be invoked. The event will be either :attr:`.Event.BATCH_END`
or :attr:`.Event.EPOCH_END`.
When specifying ``eval_interval``, the evaluator(s) are also run at the ``Event.FIT_END`` if it doesn't
evenly divide the training duration.
device_eval_microbatch_size (int, optional): The number of samples to use for each microbatch when evaluating.
If set to ``auto``, dynamically decreases device_eval_microbatch_size if microbatch is too large for GPU.
If None, sets `device_eval_microbatch_size` to per rank batch size. (default: ``None``)
"""
def __init__(
self,
*,
label: str,
dataloader: Union[DataSpec, Iterable, Dict[str, Any]],
metric_names: Optional[List[str]] = None,
subset_num_batches: Optional[int] = None,
eval_interval: Optional[Union[int, str, Time, Callable[[State, Event], bool]]] = None,
device_eval_microbatch_size: Optional[Union[int, str]] = None,
):
self.label = label
self.dataloader = ensure_data_spec(dataloader)
if metric_names is not None:
if not isinstance(metric_names, list):
raise ValueError(f'``metric_names`` should be a list of strings, not a {type(metric_names)}')
self.metric_names = metric_names
self.subset_num_batches = subset_num_batches
self._eval_interval = None
self.eval_interval = eval_interval
self.auto_microbatching = _is_auto_microbatching(device_eval_microbatch_size)
if self.auto_microbatching and hasattr(self.dataloader, 'seq_parallel_world_size'):
raise ValueError('`device_eval_microbatch_size="auto"` is not compatible with sequence parallelism.')
self.device_eval_microbatch_size = _get_initial_device_eval_microbatch_size(
device_eval_microbatch_size,
self.auto_microbatching,
self.dataloader.dataloader,
)
@property
def eval_interval(self):
return self._eval_interval
@eval_interval.setter
def eval_interval(self, eval_interval: Optional[Union[int, str, Time, Callable[[State, Event], bool]]]):
if eval_interval is None:
self._eval_interval = None
elif not callable(eval_interval):
self._eval_interval = create_interval_scheduler(
eval_interval,
checkpoint_events=False,
final_events={Event.FIT_END},
)
else:
self._eval_interval = eval_interval
[docs]def ensure_evaluator(evaluator: Union[Evaluator, DataSpec, Iterable, Dict[str, Any]], default_metric_names: List[str]):
"""Ensure that ``evaluator`` is an :class:`.Evaluator`.
Args:
evaluator (Evaluator | DataSpec | Iterable | Dict[str, Any]): A dataloader,
:class:`.DataSpec` instance, dictionary of :class:`.DataSpec` kwargs, or existing evaluator.
default_metric_names (List[str]): The names of the metrics for the ``evaluator``,
if a dataloader was specified.
Returns:
Evaluator: An evaluator.
"""
if isinstance(evaluator, Evaluator):
return evaluator
else:
return Evaluator(
label='eval',
dataloader=evaluator,
metric_names=default_metric_names,
)
def _is_auto_microbatching(device_eval_microbatch_size: Optional[Union[int, str]]):
if device_eval_microbatch_size == 'auto':
warnings.warn((
"Setting `device_eval_microbatch_size='auto'` is an experimental feature which may cause "
'uncaught Cuda Out of Memory errors. In this case, please manually '
'set device_eval_microbatch_size explicitly to an integer instead.'
))
return True
else:
return False
def _get_initial_device_eval_microbatch_size(
device_eval_microbatch_size: Optional[Union[int, str]],
auto_microbatching: bool,
dataloader: Iterable,
) -> int:
"""Sets initial value of device_eval_microbatch_size.
If auto_microbatching, sets initial `device_eval_microbatch_size` to per rank batch size.
"""
if auto_microbatching or device_eval_microbatch_size is None:
try:
batch_size = getattr(dataloader, 'batch_size')
except AttributeError as e:
if auto_microbatching:
raise AttributeError(
"`device_eval_microbatch_size='auto'` requires the `dataloader` to have a `batch_size` attribute.",
) from e
else:
raise AttributeError(
textwrap.dedent(
'`device_eval_microbatch_size` is not set and `dataloader` does not have a `batch_size` attribute. '
'Please either set `device_eval_microbatch_size` or `dataloader.batch_size`.',
),
) from e
return batch_size
elif isinstance(device_eval_microbatch_size, Union[int, float]):
return device_eval_microbatch_size
else:
raise ValueError("device_eval_microbatch_size must be an int or ``'auto'``")