"""get_runs SDK for MAPI"""
from __future__ import annotations
from concurrent.futures import Future
from datetime import datetime
from http import HTTPStatus
from typing import List, Optional, Union, cast, overload
from typing_extensions import Literal
from mcli.api.engine.engine import convert_plural_future_to_singleton, get_return_response, run_paginated_mapi_request
from mcli.api.exceptions import MAPIException
from mcli.api.model.cluster_details import ClusterDetails
from mcli.api.model.run import Run, RunType
from mcli.config import MCLIConfig
from mcli.models.common import ObjectList
from mcli.models.gpu_type import GPUType
from mcli.utils.utils_run_status import RunStatus
__all__ = ['get_runs', 'get_run']
DEFAULT_LIMIT = 100
QUERY_FUNCTION_PAGINATED = 'getRunsPaginated'
VARIABLE_DATA_NAME_PAGINATED = 'getRunsPaginatedData'
QUERY = f"""
query GetRunsPaginated(${VARIABLE_DATA_NAME_PAGINATED}: GetRunsPaginatedInput!) {{
{QUERY_FUNCTION_PAGINATED}({VARIABLE_DATA_NAME_PAGINATED}: ${VARIABLE_DATA_NAME_PAGINATED}) {{
cursor
hasNextPage
runs {{
id
name
status
createdAt
updatedAt
reason
createdByEmail
priority
maxRetries
preemptible
retryOnSystemFailure
maxDurationSeconds
isDeleted
runType
resumptions {{
clusterName
cpus
gpuType
gpus
nodes
executionIndex
startTime
endTime
status
estimatedEndTime
reason
}}
}}
}}
}}"""
QUERY_WITH_DETAILS = f"""
query GetRunsPaginated(${VARIABLE_DATA_NAME_PAGINATED}: GetRunsPaginatedInput!) {{
{QUERY_FUNCTION_PAGINATED}({VARIABLE_DATA_NAME_PAGINATED}: ${VARIABLE_DATA_NAME_PAGINATED}) {{
cursor
hasNextPage
runs {{
id
name
status
createdAt
updatedAt
reason
createdByEmail
priority
maxRetries
preemptible
retryOnSystemFailure
maxDurationSeconds
runType
isDeleted
resumptions {{
clusterName
cpus
gpuType
gpus
nodes
executionIndex
startTime
endTime
status
estimatedEndTime
reason
}}
details {{
image
originalRunInput
metadata
lastExecutionId
lifecycle {{
executionIndex
status
startTime
endTime
reason
}}
nodes {{
rank
name
status
reason
}}
formattedRunEvents {{
resumptionIndex
eventType
eventTime
eventMessage
}}
}}
}}
}}
}}"""
@overload
def get_run(
run: Union[str, Run],
*,
timeout: Optional[float] = 10,
future: Literal[False] = False,
include_details: bool = True,
) -> Run:
...
@overload
def get_run(
run: Union[str, Run],
*,
timeout: Optional[float] = None,
future: Literal[True] = True,
include_details: bool = True,
) -> Future[Run]:
...
[docs]def get_run(
run: Union[str, Run],
*,
timeout: Optional[float] = 10,
future: bool = False,
include_details: bool = True,
):
"""Get a run that has been launched in the MosaicML platform
The run will contain all details requested
Arguments:
run: Run on which to get information
timeout: Time, in seconds, in which the call should complete. If the call
takes too long, a TimeoutError will be raised. If ``future`` is ``True``, this
value will be ignored.
future: Return the output as a :class:`~concurrent.futures.Future`. If True, the
call to `get_runs` will return immediately and the request will be
processed in the background. This takes precedence over the ``timeout``
argument. To get the list of runs, use ``return_value.result()``
with an optional ``timeout`` argument.
include_details: If true, will fetch detailed information like run input for each run.
Raises:
MAPIException: If connecting to MAPI, raised when a MAPI communication error occurs
"""
runs = cast(Union[List[str], List[Run]], [run])
error_message = f'Run {run.name if isinstance(run, Run) else run} not found'
if future:
res = get_runs(runs=runs, timeout=None, future=True, include_details=include_details, limit=1)
return convert_plural_future_to_singleton(res, error_message)
res = get_runs(runs=runs, timeout=timeout, future=False, include_details=include_details)
if not res:
raise MAPIException(HTTPStatus.NOT_FOUND, error_message)
return res[0]
@overload
def get_runs(
runs: Optional[Union[List[str], List[Run], ObjectList[Run]]] = None,
*,
cluster_names: Optional[Union[List[str], List[ClusterDetails], ObjectList[ClusterDetails]]] = None,
before: Optional[Union[str, datetime]] = None,
after: Optional[Union[str, datetime]] = None,
gpu_types: Optional[Union[List[str], List[GPUType]]] = None,
gpu_nums: Optional[List[int]] = None,
statuses: Optional[Union[List[str], List[RunStatus]]] = None,
timeout: Optional[float] = 10,
future: Literal[False] = False,
user_emails: Optional[List[str]] = None,
run_types: Optional[Union[List[str], List[RunType]]] = None,
include_details: bool = False,
include_deleted: bool = False,
ended_before: Optional[Union[str, datetime]] = None,
ended_after: Optional[Union[str, datetime]] = None,
limit: Optional[int] = DEFAULT_LIMIT,
) -> ObjectList[Run]:
...
@overload
def get_runs(
runs: Optional[Union[List[str], List[Run], ObjectList[Run]]] = None,
*,
cluster_names: Optional[Union[List[str], List[ClusterDetails], ObjectList[ClusterDetails]]] = None,
before: Optional[Union[str, datetime]] = None,
after: Optional[Union[str, datetime]] = None,
gpu_types: Optional[Union[List[str], List[GPUType]]] = None,
gpu_nums: Optional[List[int]] = None,
statuses: Optional[Union[List[str], List[RunStatus]]] = None,
timeout: Optional[float] = None,
future: Literal[True] = True,
user_emails: Optional[List[str]] = None,
run_types: Optional[Union[List[str], List[RunType]]] = None,
include_details: bool = False,
include_deleted: bool = False,
ended_before: Optional[Union[str, datetime]] = None,
ended_after: Optional[Union[str, datetime]] = None,
limit: Optional[int] = DEFAULT_LIMIT,
) -> Future[ObjectList[Run]]:
...
[docs]def get_runs(
runs: Optional[Union[List[str], List[Run], ObjectList[Run]]] = None,
*,
cluster_names: Optional[Union[List[str], List[ClusterDetails], ObjectList[ClusterDetails]]] = None,
before: Optional[Union[str, datetime]] = None,
after: Optional[Union[str, datetime]] = None,
gpu_types: Optional[Union[List[str], List[GPUType]]] = None,
gpu_nums: Optional[List[int]] = None,
statuses: Optional[Union[List[str], List[RunStatus]]] = None,
timeout: Optional[float] = 10,
future: bool = False,
user_emails: Optional[List[str]] = None,
run_types: Optional[Union[List[str], List[RunType]]] = None,
include_details: bool = False,
include_deleted: bool = False,
ended_before: Optional[Union[str, datetime]] = None,
ended_after: Optional[Union[str, datetime]] = None,
limit: Optional[int] = DEFAULT_LIMIT,
):
"""List runs that have been launched in the MosaicML platform
The returned list will contain all of the details stored about the requested runs.
Arguments:
runs: List of runs on which to get information
cluster_names: List of cluster names to filter runs. This can be a list of str or
:type Cluster: objects. Only runs submitted to these clusters will be
returned.
before: Only runs created strictly before this time will be returned. This
can be a str in ISO 8601 format(e.g 2023-03-31T12:23:04.34+05:30)
or a datetime object.
after: Only runs created at or after this time will be returned. This can
be a str in ISO 8601 format(e.g 2023-03-31T12:23:04.34+05:30)
or a datetime object.
gpu_types: List of gpu types to filter runs. This can be a list of str or
:type GPUType: enums. Only runs scheduled on these GPUs will be returned.
gpu_nums: List of gpu counts to filter runs. Only runs scheduled on this number
of GPUs will be returned.
statuses: List of run statuses to filter runs. This can be a list of str or
:type RunStatus: enums. Only runs currently in these phases will be returned.
user_emails: List of user emails to filter runs. Only runs submitted by these
users will be returned. By default, will return runs submitted by the
current user. Requires shared runs or admin permission
run_types: List of run types to filter runs
- 'INTERACTIVE': Runs created with the `mcli interactive` command
- 'HERO_RUN': Runs created with `is_hero_run` in the metadata
- 'TRAINING': All other runs
timeout: Time, in seconds, in which the call should complete. If the call
takes too long, a TimeoutError will be raised. If ``future`` is ``True``, this
value will be ignored.
future: Return the output as a :class:`~concurrent.futures.Future`. If True, the
call to `get_runs` will return immediately and the request will be
processed in the background. This takes precedence over the ``timeout``
argument. To get the list of runs, use ``return_value.result()``
with an optional ``timeout`` argument.
include_details: If true, will fetch detailed information like run input for each run.
include_deleted: If true, will include deleted runs in the response.
ended_before: Only runs ended strictly before this time will be returned.
ended_after: Only runs ended at or after this time will be returned.
limit: Maximum number of runs to return. If None, the latest 100 runs will be returned.
Raises:
MAPIException: If connecting to MAPI, raised when a MAPI communication error occurs
"""
filters = {}
if runs:
filters['name'] = {'in': [r.name if isinstance(r, Run) else r for r in runs]}
if before or after:
date_filters = {}
if before:
date_filters['lt'] = before.astimezone().isoformat() if isinstance(before, datetime) else before
if after:
date_filters['gte'] = after.astimezone().isoformat() if isinstance(after, datetime) else after
filters['createdAt'] = date_filters
if ended_before or ended_after:
date_filters = {}
if ended_before:
date_filters['lt'] = ended_before.astimezone().isoformat() if isinstance(ended_before,
datetime) else ended_before
if ended_after:
date_filters['gte'] = ended_after.astimezone().isoformat() if isinstance(ended_after,
datetime) else ended_after
filters['endedAt'] = date_filters
if statuses:
filters['status'] = {'in': [s.value.upper() if isinstance(s, RunStatus) else s.upper() for s in statuses]}
if cluster_names:
filters['clusterName'] = {'in': [c if isinstance(c, str) else c.name for c in cluster_names]}
if gpu_types:
filters['gpuType'] = {'in': [gt.value if isinstance(gt, GPUType) else gt for gt in gpu_types]}
if gpu_nums:
filters['gpuNum'] = {'in': gpu_nums}
if run_types:
filters['runType'] = {'in': [r.value.upper() if isinstance(r, RunType) else r.upper() for r in run_types]}
variables = {
VARIABLE_DATA_NAME_PAGINATED: {
'filters': filters,
'includeDeleted': include_deleted,
'limit': limit,
},
}
cfg = MCLIConfig.load_config()
cfg.update_entity(variables[VARIABLE_DATA_NAME_PAGINATED])
if user_emails:
if variables[VARIABLE_DATA_NAME_PAGINATED].get('entity'):
variables[VARIABLE_DATA_NAME_PAGINATED]['entity']['emails'] = user_emails
else:
variables[VARIABLE_DATA_NAME_PAGINATED]['entity'] = {'emails': user_emails}
response = run_paginated_mapi_request(
query=QUERY if not include_details else QUERY_WITH_DETAILS,
query_function=QUERY_FUNCTION_PAGINATED,
return_model_type=Run,
variables=variables,
)
return get_return_response(response, future=future, timeout=timeout)