Source code for mcli.api.inference_deployments.api_get_inference_deployment_logs

""" Get an inference deployment's logs from the MosaicML platform"""
from __future__ import annotations

from concurrent.futures import Future
from typing import Any, Dict, Generator, Optional, Union, overload

from typing_extensions import Literal

from mcli.api.model.inference_deployment import InferenceDeployment
from mcli.api.runs.api_get_run_logs import _get_logs
from mcli.config import MCLIConfig

QUERY_FUNCTION = 'getInferenceDeploymentLogsV2'
VARIABLE_DATA_NAME = 'getInferenceDeploymentLogsInput'
QUERY = f"""
subscription Subscription(${VARIABLE_DATA_NAME}: GetInferenceDeploymentLogsInput!) {{
    {QUERY_FUNCTION}({VARIABLE_DATA_NAME}: ${VARIABLE_DATA_NAME}) {{
        chunk
        endOffset
    }}
}}"""


@overload
def get_inference_deployment_logs(
    deployment: Union[str, InferenceDeployment],
    *,
    restart: Optional[int] = None,
    timeout: Optional[float] = None,
    future: Literal[False] = False,
    failed: Optional[bool] = False,
    follow: bool = False,
    tail: Optional[int] = None,
) -> Generator[str, None, None]:
    ...


@overload
def get_inference_deployment_logs(
    deployment: Union[str, InferenceDeployment],
    *,
    restart: Optional[int] = None,
    timeout: Optional[float] = None,
    future: Literal[True] = True,
    failed: Optional[bool] = False,
    follow: bool = False,
    tail: Optional[int] = None,
) -> Generator[Future[str], None, None]:
    ...


[docs]def get_inference_deployment_logs( deployment: Union[str, InferenceDeployment], *, restart: Optional[int] = None, timeout: Optional[float] = None, future: bool = False, failed: Optional[bool] = False, follow: bool = False, tail: Optional[int] = None, ) -> Union[Generator[str, None, None], Generator[Future[str], None, None]]: """Get the current logs for an active or completed inference deployment Get the current logs for an active or completed inference deployment in the MosaicML platform. This returns the full logs as a ``str``, as they exist at the time the request is made. Args: deployment (:obj:`str` | :class:`~mcli.api.model.inference_deployment.InferenceDeployment`): The inference deployment to get logs for. If a name is provided, the remaining required deployment details will be queried with :func:`~mcli.get_inference_deployments`. restart (``Optional[int]``): Which restart of a inference deployment to get logs for. Defaults to the most recent deployment restart. timeout (``Optional[float]``): Time, in seconds, in which the call should complete. If the the call takes too long, a :exc:`~concurrent.futures.TimeoutError` will be raised. If ``future`` is ``True``, this value will be ignored. future (``bool``): Return the output as a :class:`~concurrent.futures.Future` . If True, the call to :func:`get_inference_deployment_logs` will return immediately and the request will be processed in the background. This takes precedence over the ``timeout`` argument. To get the log text, use ``return_value.result()`` with an optional ``timeout`` argument. failed (``bool``): Return the logs of the latest failed deployment if ``True``. ``False`` by default. follow (``bool``): Returns the logs of the inference deployment as they are produced if ``True``. Defaults to ``False``. Returns: If future is False: The full log text for a inference deployment at the time of the request as a :obj:`str` Otherwise: A :class:`~concurrent.futures.Future` for the log text """ # Convert to strings deployment_name = deployment.name if isinstance(deployment, InferenceDeployment) else deployment filters: Dict[str, Any] = {'name': deployment_name, 'failed': failed, 'follow': follow} if restart: filters['restartCount'] = restart if tail is not None: filters['tail'] = tail variables = {VARIABLE_DATA_NAME: filters} cfg = MCLIConfig.load_config() cfg.update_entity(variables[VARIABLE_DATA_NAME], set_user=False) for message in _get_logs(QUERY, variables, QUERY_FUNCTION): if not future: try: yield message.result(timeout) except StopAsyncIteration: break else: yield message