""" Run Input """
from __future__ import annotations
import difflib
import logging
import os
import warnings
from dataclasses import asdict, dataclass, field
from http import HTTPStatus
from typing import Any, Dict, List, Optional, Union, get_type_hints
import yaml
from mcli.api.exceptions import MAPIException, MCLIRunConfigValidationError
from mcli.api.schema.generic_model import DeserializableModel
from mcli.utils.utils_config import (BaseSubmissionConfig, ComputeConfig, ComputeTranslation, DependentDeploymentConfig,
EnvVarTranslation, IntegrationTranslation, SchedulingConfig, SchedulingTranslation,
strip_nones)
from mcli.utils.utils_string_functions import clean_run_name, validate_image
logger = logging.getLogger(__name__)
@dataclass
class FinalRunConfig(DeserializableModel):
"""A finalized run configuration
This configuration must be complete, with enough details to submit a new run to the
MosaicML platform.
"""
integrations: List[Dict[str, Any]]
env_variables: Dict[str, str]
parameters: Dict[str, Any]
image: Optional[str] = None
name: Optional[str] = None
parent_name: Optional[str] = None
cluster: str = '' # deprecating, use compute['cluster']
gpu_type: Optional[str] = None # deprecating, use compute['gpu_type']
gpu_num: Optional[int] = None # deprecating, use compute['gpus']
cpus: Optional[int] = None # deprecating, use compute['cpus']
command: str = ''
# Scheduling parameters - optional for backwards-compatibility
scheduling: SchedulingConfig = field(default_factory=SchedulingConfig)
# Compute parameters - optional for backwards-compatibility
compute: ComputeConfig = field(default_factory=ComputeConfig)
# User defined metadata
metadata: Dict[str, Any] = field(default_factory=dict)
dependent_deployment: Dict[str, Any] = field(default_factory=dict)
_property_translations = {
'runName': 'name',
'parentName': 'parent_name',
'gpuType': 'gpu_type',
'gpuNum': 'gpu_num',
'cpus': 'cpus',
'cluster': 'cluster',
'image': 'image',
'integrations': 'integrations',
'envVariables': 'env_variables',
'parameters': 'parameters',
'command': 'command',
'scheduling': 'scheduling',
'compute': 'compute',
'metadata': 'metadata',
'dependentDeployment': 'dependent_deployment',
}
_optional_properties = {
'parentName',
'scheduling',
'compute',
'metadata',
'dependentDeployment',
}
def __str__(self) -> str:
return yaml.safe_dump(asdict(self))
@classmethod
def from_mapi_response(cls, response: Dict[str, Any]) -> FinalRunConfig:
missing = set(cls._property_translations) - \
set(response) - cls._optional_properties
if missing:
raise MAPIException(
status=HTTPStatus.BAD_REQUEST,
message=
f'Missing required key(s) in response to deserialize FinalRunConfig object: {", ".join(missing)}',
)
data = {}
for k, v in cls._property_translations.items():
if k not in response:
# This must be an optional property, so skip
continue
value = response[k]
if v == 'env_variables':
value = EnvVarTranslation.from_mapi(value)
elif v == 'integrations':
value = IntegrationTranslation.from_mapi(value)
elif v == 'scheduling':
value = SchedulingTranslation.from_mapi(value)
elif v == 'compute':
value = ComputeTranslation.from_mapi(value)
data[v] = value
return cls(**data)
@classmethod
def finalize_config(cls, run_config: RunConfig) -> FinalRunConfig: # pylint: disable=too-many-statements
"""Create a :class:`~mcli.models.run_config.FinalRunConfig` from the provided
:class:`~mcli.models.run_config.RunConfig`.
If the :class:`~mcli.models.run_config.RunConfig` is not fully populated then
this function fails with an error.
Args:
run_config (:class:`~mcli.models.run_config.RunConfig`): The RunConfig to finalize
Returns:
:class:`~mcli.models.run_config.FinalRunConfig`: The object created using values from the input
Raises:
:class:`~mcli.api.exceptions.MCLIConfigError`: If MCLI config is not present or is missing information
:class:`~mcli.api.exceptions.MCLIRunConfigValidationError`: If run_config is not valid
"""
if run_config.cpus is None:
run_config.cpus = 0
model_as_dict = asdict(run_config)
model_as_dict = strip_nones(model_as_dict)
# Fill in default initial values for FinalRunConfig
if 'name' in model_as_dict:
model_as_dict['name'] = clean_run_name(model_as_dict.get('name'))
if isinstance(model_as_dict.get('gpu_type'), int):
model_as_dict['gpu_type'] = str(model_as_dict['gpu_type'])
if isinstance(model_as_dict.get('env_variables'), list):
model_as_dict['env_variables'] = {var['key']: var['value'] for var in model_as_dict['env_variables']}
warnings.warn('Support for passing env_variables as a list will soon be deprecated. ' +
f'Please use a dict instead, eg: {model_as_dict["env_variables"]}')
image = model_as_dict.get('image')
if not image:
raise MCLIRunConfigValidationError('An image name must be provided using the keyword [bold]image[/]')
elif not validate_image(image):
raise MCLIRunConfigValidationError(f'The image name "{model_as_dict["image"]}" is not valid')
return cls(**model_as_dict)
def get_parent_name_from_env(self) -> Optional[str]:
"""Get the parent name from the environment
Returns:
Optional[str]: The parent name if it exists, otherwise None
"""
inside_run = os.environ.get('MOSAICML_PLATFORM', 'false').lower() == 'true'
if not inside_run:
return None
return os.environ.get('RUN_NAME')
def to_create_run_api_input(self) -> Dict[str, Dict[str, Any]]:
"""Convert a run configuration to a proper JSON to pass to MAPI's createRun
Returns:
Dict[str, Dict[str, Any]]: The run configuration as a MAPI runInput JSON
"""
translations = {v: k for k, v in self._property_translations.items()}
translated_input = {}
for field_name, value in asdict(self).items():
if value is None:
continue
translated_name = translations.get(field_name, field_name)
if field_name == 'env_variables':
value = EnvVarTranslation.to_mapi(value)
elif field_name == 'integrations':
value = IntegrationTranslation.to_mapi(value)
elif field_name == "scheduling":
value = SchedulingTranslation.to_mapi(value)
elif field_name == "compute":
value = ComputeTranslation.to_mapi(value)
elif field_name == "command":
value = value.strip()
elif field_name == "parameters":
# parameters should be passed as-is, explicitly
pass
elif field_name == "dependent_deployment":
value = DependentDeploymentConfig.to_mapi(value)
elif field_name == "gpu_type" and not value:
continue
elif field_name == "cluster" and not value:
continue
elif isinstance(value, dict):
value = strip_nones(value)
translated_input[translated_name] = value
# Automatically set the parentName if mcli is running inside a run
if not translated_input.get('parentName'):
translated_input['parentName'] = self.get_parent_name_from_env()
return {
'runInput': translated_input,
}
[docs]@dataclass
class RunConfig(BaseSubmissionConfig):
"""A run configuration for the MosaicML platform
Values in here are not yet validated and some required values may be missing.
On attempting to create the run, a bad config will raise a MapiException with a 400 status code.
Required args:
- name (`str`): User-defined name of the run
- image (`str`): Docker image (e.g. `mosaicml/composer`)
- command (`str`): Command to use when a run starts
- compute (:class:`~mcli.ComputeConfig` or `Dict[str, Any]`): Compute configuration. Typically
a subset of the following fields will be required:
- `cluster` (`str`): Name of cluster to use
- `instance` (`str`): Name of instance to use
- `gpu_type` (`str`): Name of gpu type to use
- `gpus` (`int`): Number of GPUs to use
- `cpus` (`int`): Number of CPUs to use
- `nodes` (`int`): Number of nodes to use
See `mcli get clusters` for a list of available clusters and instances
Optional args:
- parameters (`Dict[str, Any]`): Parameters to mount into the environment
- scheduling (:class:`~mcli.SchedulingConfig` or `Dict[str, Any]`): Scheduling configuration
- `priority` (`str`): Priority of the run (default `auto` with options `low` and lowest`)
- `preemptible` (`bool`): Whether the run is preemptible (default False)
- `retry_on_system_failure` (`bool`): Whether the run should be retried on system failure (default False)
- `max_retries` (`int`): Maximum number of retries (default 0)
- `max_duration` (`float`): Maximum duration of the run in hours (default None)
Run will be automatically stopped after this duration has elapsed.
- integrations (`List[Dict[str, Any]]`): List of integrations. See integration documentation for more details:
https://docs.mosaicml.com/projects/mcli/en/latest/resources/integrations/index.html
- env_variables (`Dict[str, str]`): Dictionary of environment variables to set in the run
- key (`str`): Name of the environment variable
- value (`str`): Value of the environment variable
- metadata (`Dict[str, Any]`): Arbitrary metadata to attach to the run
"""
name: Optional[str] = None
parent_name: Optional[str] = None
image: Optional[str] = None
gpu_type: Optional[str] = None
gpu_num: Optional[int] = None
cpus: Optional[int] = None
cluster: Optional[str] = None
scheduling: SchedulingConfig = field(default_factory=SchedulingConfig)
compute: ComputeConfig = field(default_factory=ComputeConfig)
parameters: Dict[str, Any] = field(default_factory=dict)
scheduling: SchedulingConfig = field(default_factory=SchedulingConfig)
integrations: List[Dict[str, Any]] = field(default_factory=list)
env_variables: Union[Dict[str, str], List[Dict[str, str]]] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
command: str = ''
parameters: Dict[str, Any] = field(default_factory=dict)
dependent_deployment: Dict[str, Any] = field(default_factory=dict)
_suppress_deprecation_warnings: Optional[bool] = False
_property_translations = {
'runName': 'name',
'parentName': 'parent_name',
'gpuNum': 'gpu_num',
'gpuType': 'gpu_type',
'cpus': 'cpus',
'cluster': 'cluster',
'image': 'image',
'integrations': 'integrations',
'envVariables': 'env_variables',
'parameters': 'parameters',
'command': 'command',
'compute': 'compute',
'scheduling': 'scheduling',
'metadata': 'metadata',
'dependentDeployment': 'dependent_deployment',
}
_required_display_properties = {'name', 'image', 'command', 'compute'}
@classmethod
def from_mapi_response(cls, response: Dict[str, Any]) -> RunConfig:
data = {}
for k, v in cls._property_translations.items():
if k not in response:
# This must be an optional property, so skip
continue
value = response[k]
if v == 'env_variables':
value = EnvVarTranslation.from_mapi(value)
elif v == 'integrations':
value = IntegrationTranslation.from_mapi(value)
elif v == 'scheduling':
value = SchedulingTranslation.from_mapi(value)
elif v == 'compute':
value = ComputeTranslation.from_mapi(value)
elif v == 'dependent_deployment':
value = DependentDeploymentConfig.from_mapi(value)
data[v] = value
# Convert deprecated fields to new format
compute = data.get('compute', {})
if data.get('gpu_type') is not None and 'gpu_type' not in compute:
compute['gpu_type'] = data['gpu_type']
data['gpu_type'] = None
if data.get('gpu_num') is not None and 'gpus' not in compute:
compute['gpus'] = data['gpu_num']
data['gpu_num'] = None
if data.get('cpus') and 'cpus' not in compute:
compute['cpus'] = data['cpus']
data['cpus'] = None
if data.get('cluster') is not None and 'cluster' not in compute:
compute['cluster'] = data['cluster']
data['cluster'] = None
data['compute'] = compute
# Always suppress warnings when deserializing from MAPI response
data['_suppress_deprecation_warnings'] = True
return cls(**data)
def __post_init__(self):
if isinstance(self.env_variables, list):
self.env_variables = {var['key']: var['value'] for var in self.env_variables}
if not self._suppress_deprecation_warnings:
warnings.warn('Support for passing env_variables as a list will soon be deprecated. ' +
f'Please use a dict instead, eg: {self.env_variables}')
if self.cluster is not None and self._suppress_deprecation_warnings is False:
warnings.warn('Field "cluster" is deprecated. Please use "compute.cluster" instead.')
if self.gpu_type is not None and self._suppress_deprecation_warnings is False:
warnings.warn('Field "gpu_type" is deprecated. Please use "compute.gpu_type" instead.')
if self.gpu_num is not None and self._suppress_deprecation_warnings is False:
warnings.warn('Field "gpu_num" is deprecated. Please use "compute.gpus" instead.')
if self.cpus is not None and self._suppress_deprecation_warnings is False:
warnings.warn('Field "cpus" is deprecated. Please use "compute.cpus" instead.')
# Verify types in the compute and scheduling configs and warn for any unknown fields.
# Soft-warn to maintain backwards compatibility with existing run configs.
if self._suppress_deprecation_warnings is False:
unknown_fields_msg = 'Encountered unknown fields in run configuration:\n'
has_unknown_fields = False
if self.scheduling is not None:
expected_scheduling_keys = set(get_type_hints(SchedulingConfig).keys())
# TODO(MCLOUD-2742): We deprecated `retryOnSystemFailure` in MAPI in favor of `watchdogEnabled` but it
# is not yet deprecated in MCLI. We should remove this once we deprecate `retryOnSystemFailure` in MCLI
# and add `watchdog_enabled` to SchedulingConfig.
expected_scheduling_keys.add('watchdogEnabled')
unknown_scheduling_keys = set(self.scheduling.keys()) - expected_scheduling_keys
if len(unknown_scheduling_keys) > 0:
has_unknown_fields = True
for key in unknown_scheduling_keys:
unknown_fields_msg += f' - \'scheduling.{key}\''
match_candidate = difflib.get_close_matches(key, expected_scheduling_keys, n=1, cutoff=0.5)
if len(match_candidate) > 0:
unknown_fields_msg += f' (did you mean \'{match_candidate[0]}\'?)'
unknown_fields_msg += '\n'
if self.compute is not None:
expected_compute_keys = get_type_hints(ComputeConfig).keys()
unknown_compute_keys = set(self.compute.keys()) - expected_compute_keys
if len(unknown_compute_keys) > 0:
has_unknown_fields = True
for key in unknown_compute_keys:
unknown_fields_msg += f' - \'compute.{key}\''
match_candidate = difflib.get_close_matches(key, expected_compute_keys, n=1, cutoff=0.5)
if len(match_candidate) > 0:
unknown_fields_msg += f' (did you mean \'{match_candidate[0]}\'?)'
unknown_fields_msg += '\n'
if has_unknown_fields:
warnings.warn(f'{unknown_fields_msg.strip()}')
self._suppress_deprecation_warnings = None # so it won't be printed