Source code for mcli.api.runs.api_update_run

""" Update the data of a run. """
from __future__ import annotations

from concurrent.futures import Future
from typing import Any, Dict, Optional, Union, overload

from typing_extensions import Literal

from mcli.api.engine.engine import get_return_response, run_singular_mapi_request
from mcli.api.model.run import Run
from mcli.config import MCLIConfig

__all__ = ['update_run']

QUERY_FUNCTION = 'updateRun'
VARIABLE_DATA_GET_RUNS = 'getRunsData'
VARIABLE_DATA_UPDATE_RUN = 'updateRunData'
QUERY = f"""
mutation UpdateRun(${VARIABLE_DATA_GET_RUNS}: GetRunsInput!, ${VARIABLE_DATA_UPDATE_RUN}: UpdateRunInput!) {{
  {QUERY_FUNCTION}({VARIABLE_DATA_GET_RUNS}: ${VARIABLE_DATA_GET_RUNS}, {VARIABLE_DATA_UPDATE_RUN}: ${VARIABLE_DATA_UPDATE_RUN}) {{
    id
    name
    createdByEmail
    status
    createdAt
    updatedAt
    reason
    priority
    maxRetries
    preemptible
    retryOnSystemFailure
    runType
    isDeleted
    resumptions {{
        clusterName
        cpus
        gpuType
        gpus
        nodes
        executionIndex
        startTime
        endTime
        status
    }}
  }}
}}"""


@overload
def update_run(run: Union[str, Run],
               update_run_data: Optional[Dict[str, Any]] = None,
               *,
               preemptible: Optional[bool] = None,
               priority: Optional[str] = None,
               max_retries: Optional[int] = None,
               retry_on_system_failure: Optional[bool] = None,
               timeout: Optional[float] = 10,
               future: Literal[False] = False,
               max_duration: Optional[float] = None) -> Run:
    ...


@overload
def update_run(run: Union[str, Run],
               update_run_data: Optional[Dict[str, Any]] = None,
               *,
               preemptible: Optional[bool] = None,
               priority: Optional[str] = None,
               max_retries: Optional[int] = None,
               retry_on_system_failure: Optional[bool] = None,
               timeout: Optional[float] = None,
               future: Literal[True] = True,
               max_duration: Optional[float] = None) -> Future[Run]:
    ...


[docs]def update_run(run: Union[str, Run], update_run_data: Optional[Dict[str, Any]] = None, *, preemptible: Optional[bool] = None, priority: Optional[str] = None, max_retries: Optional[int] = None, retry_on_system_failure: Optional[bool] = None, timeout: Optional[float] = 10, future: bool = False, max_duration: Optional[float] = None): """Update a run's data in the MosaicML platform. Any values that are not specified will not be modified. Args: run (``Optional[str | ``:class:`~mcli.api.model.run.Run` ``]``): A run or run name to update. Using :class:`~mcli.api.model.run.Run` objects is most efficient. See the note below. update_run_data (`Dict[str, Any]`): DEPRECATED: Use the individual named-arguments instead. The data to update the run with. This can include `preemptible`, `priority`, `maxRetries`, and `retryOnSystemFailure` preemptible (bool): Update whether the run can be stopped and re-queued by higher priority jobs; default is False priority (str): Update the default priority of the run from `auto` to `low` or `lowest` max_retries (int): Update the max number of times the run can be retried; default is 0 retry_on_system_failure (bool): Update whether the run should be retried on system failure (i.e. a node failure); default is False timeout (``Optional[float]``): Time, in seconds, in which the call should complete. If the call takes too long, a :exc:`~concurrent.futures.TimeoutError` will be raised. If ``future`` is ``True``, this value will be ignored. max_duration: Update the max time that a run can run for (in hours). future (``bool``): Return the output as a :class:`~concurrent.futures.Future`. If True, the call to :func:`update_run` will return immediately and the request will be processed in the background. This takes precedence over the ``timeout`` argument. To get the list of :class:`~mcli.api.model.run.Run` output, use ``return_value.result()`` with an optional ``timeout`` argument. Raises: MAPIException: Raised if updating the requested run failed Returns: If future is False: Updated :class:`~mcli.api.model.run.Run` object Otherwise: A :class:`~concurrent.futures.Future` for the list """ if not update_run_data: update_run_data = {} if preemptible is not None: update_run_data['preemptible'] = preemptible if priority is not None: update_run_data['priority'] = priority if max_retries is not None: update_run_data['maxRetries'] = max_retries if retry_on_system_failure is not None: update_run_data['retryOnSystemFailure'] = retry_on_system_failure if max_duration is not None: update_run_data['maxDurationSeconds'] = int(3600 * max_duration) variables = { VARIABLE_DATA_GET_RUNS: { 'filters': { 'name': { 'in': [run.name if isinstance(run, Run) else run] }, } }, VARIABLE_DATA_UPDATE_RUN: update_run_data, } cfg = MCLIConfig.load_config() cfg.update_entity(variables[VARIABLE_DATA_GET_RUNS]) response = run_singular_mapi_request( query=QUERY, query_function=QUERY_FUNCTION, return_model_type=Run, variables=variables, ) return get_return_response(response, future=future, timeout=timeout)