Source code for mcli.api.cluster.api_get_clusters

"""get_clusters SDK for MAPI"""
from __future__ import annotations

from concurrent.futures import Future
from http import HTTPStatus
from typing import List, Optional, Union, cast, overload

from typing_extensions import Literal

from mcli.api.engine.engine import convert_plural_future_to_singleton, get_return_response, run_plural_mapi_request
from mcli.api.exceptions import MAPIException
from mcli.api.model.cluster_details import ClusterDetails
from mcli.config import MCLIConfig
from mcli.models.common import ObjectList
from mcli.utils.utils_model import SubmissionType

__all__ = ['get_cluster', 'get_clusters']

QUERY_FUNCTION = 'getClusters'
VARIABLE_DATA_NAME = 'getClustersData'
QUERY = f"""
query GetClusters(${VARIABLE_DATA_NAME}: GetClustersInput!) {{
  {QUERY_FUNCTION}({VARIABLE_DATA_NAME}: ${VARIABLE_DATA_NAME}) {{
    id
    name
    provider
    allowFractional
    allowMultinode
    allowedSubmissionTypes
    isMultiTenant
    reservationType
    schedulerEnabled
    allowedInstances {{
      name
      gpuType
      gpusPerNode
      cpus
      memory
      storage
      gpuNums
      numNodes
      nodes {{
        name
        isAlive
        isSchedulable
        upTime {{
          endDate
          startDate
        }}
      }}
    }}
  }}
}}"""
QUERY_UTILIZATION = f"""
query GetClusters(${VARIABLE_DATA_NAME}: GetClustersInput!) {{
  {QUERY_FUNCTION}({VARIABLE_DATA_NAME}: ${VARIABLE_DATA_NAME}) {{
    id
    name
    provider
    allowedSubmissionTypes
    isMultiTenant
    reservationType
    schedulerEnabled
    utilization {{
      instanceUtils {{
        instance {{
          name
          gpuType
          gpuNums
          gpusPerNode
          cpus
          memory
          storage
          numNodes
          nodes {{
            name
            isAlive
            isSchedulable
            upTime {{
              endDate
              startDate
            }}
          }}
        }}
        clusterId
        gpusUsed
        gpusAvailable
        gpusTotal
      }}
      activeRunsByUser{{
        id
        createdAt
        userName
        runName
        instance
        gpuNum
        gpuType
        estimatedEndTime
        maxDurationSeconds
        startTime
        scheduling {{
          priority
          retryOnSystemFailure
          preemptible
        }}
      }}
      queuedRunsByUser{{
        id
        createdAt
        userName
        runName
        instance
        gpuNum
        gpuType
        reason
        scheduling {{
          priority
          retryOnSystemFailure
          preemptible
        }}
      }}
      activeDeploymentsByUser{{
        id
        createdAt
        userName
        deploymentName
        instance
        gpuNum
      }}
      queuedDeploymentsByUser{{
        id
        createdAt
        userName
        deploymentName
        instance
        gpuNum
      }}
    }}
  }}
}}"""


@overload
def get_cluster(
    cluster: Union[str, ClusterDetails],
    *,
    include_utilization: bool = True,
    include_all: bool = False,
    timeout: Optional[float] = 10,
    future: Literal[False] = False,
) -> ClusterDetails:
    ...


@overload
def get_cluster(
    cluster: Union[str, ClusterDetails],
    *,
    include_utilization: bool = True,
    include_all: bool = False,
    timeout: Optional[float] = None,
    future: Literal[True] = True,
) -> Future[ClusterDetails]:
    ...


[docs]def get_cluster( cluster: Union[str, ClusterDetails], *, include_utilization: bool = True, include_all: bool = False, timeout: Optional[float] = 10, future: bool = False, ): """Gets a cluster available in the MosaicML platform Arguments: cluster (:class:`~mcli.api.model.cluster_details.ClusterDetails`): :class:`~mcli.api.model.cluster_details.ClusterDetails` object or cluster name string to get. include_utilization (``bool``): Include information on how the cluster is currently being utilized timeout (``Optional[float]``): Time, in seconds, in which the call should complete. If the run creation takes too long, a :exc:`~concurrent.futures.TimeoutError` will be raised. If ``future`` is ``True``, this value will be ignored. future (``bool``): Return the output as a :class:`~concurrent.futures.Future`. If True, the call to :func:`get_cluster` will return immediately and the request will be processed in the background. This takes precedence over the ``timeout`` argument. To get the :class:`~mcli.models.cluster_details.ClusterDetails` output, use ``return_value.result()`` with an optional ``timeout`` argument. Raises: ``MAPIException``: If connecting to MAPI, raised when a MAPI communication error occurs """ clusters = cast(Union[List[str], List[ClusterDetails]], [cluster]) error_message = f"Cluster {cluster if isinstance(cluster, str) else cluster.name} not found" if future: res = get_clusters( clusters=clusters, timeout=None, future=True, include_utilization=include_utilization, include_all=include_all, ) return convert_plural_future_to_singleton(res, error_message) res = get_clusters( clusters=clusters, timeout=timeout, future=False, include_utilization=include_utilization, include_all=include_all, ) if not res: raise MAPIException(HTTPStatus.NOT_FOUND, error_message) return res[0]
@overload def get_clusters( clusters: Optional[Union[List[str], List[ClusterDetails], ObjectList[ClusterDetails]]] = None, *, include_utilization: bool = False, include_all: bool = False, timeout: Optional[float] = 10, future: Literal[False] = False, submission_type_filter: Optional[SubmissionType] = None, ) -> ObjectList[ClusterDetails]: ... @overload def get_clusters( clusters: Optional[Union[List[str], List[ClusterDetails], ObjectList[ClusterDetails]]] = None, *, include_utilization: bool = False, include_all: bool = False, timeout: Optional[float] = None, future: Literal[True] = True, submission_type_filter: Optional[SubmissionType] = None, ) -> Future[ObjectList[ClusterDetails]]: ...
[docs]def get_clusters( clusters: Optional[Union[List[str], List[ClusterDetails], ObjectList[ClusterDetails]]] = None, *, include_utilization: bool = False, include_all: bool = False, timeout: Optional[float] = 10, future: bool = False, submission_type_filter: Optional[SubmissionType] = None, ): """Get clusters available in the MosaicML platform Arguments: clusters (:class:`~mcli.ClusterDetails`): List of :class:`~mcli.ClusterDetails` objects or cluster name strings to get. include_utilization (``bool``): Include information on how the cluster is currently being utilized timeout (``Optional[float]``): Time, in seconds, in which the call should complete. If the run creation takes too long, a :exc:`~concurrent.futures.TimeoutError` will be raised. If ``future`` is ``True``, this value will be ignored. future (``bool``): Return the output as a :class:`~concurrent.futures.Future`. If True, the call to :func:`get_clusters` will return immediately and the request will be processed in the background. This takes precedence over the ``timeout`` argument. To get the :class:`~mcli.models.cluster_details.ClusterDetails` output, use ``return_value.result()`` with an optional ``timeout`` argument. Raises: ``MAPIException``: If connecting to MAPI, raised when a MAPI communication error occurs """ filters = {} if clusters: cluster_names = [c if isinstance(c, str) else c.name for c in clusters] filters['name'] = {'in': cluster_names} if submission_type_filter: filters['allowedSubmissionTypes'] = {'has': submission_type_filter.name} variables = { VARIABLE_DATA_NAME: { 'filters': filters, 'includeAll': include_all, }, } cfg = MCLIConfig.load_config() cfg.update_entity(variables[VARIABLE_DATA_NAME]) # Small hack because getClusters incorrectly fills in user's id as default # when only organizationIds is applied if variables[VARIABLE_DATA_NAME].get('entity') and variables[VARIABLE_DATA_NAME]['entity'].get('userIds') is None: variables[VARIABLE_DATA_NAME]['entity']['userIds'] = [] response = run_plural_mapi_request( query=QUERY_UTILIZATION if include_utilization else QUERY, query_function=QUERY_FUNCTION, return_model_type=ClusterDetails, variables=variables, ) return get_return_response(response, future=future, timeout=timeout)