"""Get a run's logs from the MosaicML platform"""from__future__importannotationsimportbase64importloggingfromconcurrent.futuresimportFuturefromtypingimportAny,Dict,Generator,Optional,Union,overloadimportgqlfromtyping_extensionsimportLiteralfrommcli.api.engine.engineimportMAPIConnectionfrommcli.api.model.runimportRunfrommcli.configimportMCLIConfigfrommcli.utils.utils_message_decodingimportMessageDecoderlogger=logging.getLogger(__name__)QUERY_FUNCTION='getRunLogsV2'VARIABLE_DATA_NAME='getRunLogsInput'QUERY=f"""subscription Subscription(${VARIABLE_DATA_NAME}: GetRunLogsInput!) {{{QUERY_FUNCTION}({VARIABLE_DATA_NAME}: ${VARIABLE_DATA_NAME}) {{ chunk endOffset}}}}"""@overloaddefget_run_logs(run:Union[str,Run],rank:Optional[int]=None,*,node_rank:Optional[int]=None,local_gpu_rank:Optional[int]=None,global_gpu_rank:Optional[int]=None,timeout:Optional[float]=None,future:Literal[False]=False,failed:Optional[bool]=False,resumption:Optional[int]=None,tail:Optional[int]=None,container:Optional[str]=None,)->Generator[str,None,None]:...@overloaddefget_run_logs(run:Union[str,Run],rank:Optional[int]=None,*,node_rank:Optional[int]=None,local_gpu_rank:Optional[int]=None,global_gpu_rank:Optional[int]=None,timeout:Optional[float]=None,future:Literal[True]=True,failed:Optional[bool]=False,resumption:Optional[int]=None,tail:Optional[int]=None,container:Optional[str]=None,)->Generator[Future[str],None,None]:...
[docs]defget_run_logs(run:Union[str,Run],rank:Optional[int]=None,*,node_rank:Optional[int]=None,local_gpu_rank:Optional[int]=None,global_gpu_rank:Optional[int]=None,timeout:Optional[float]=None,future:bool=False,failed:Optional[bool]=False,resumption:Optional[int]=None,tail:Optional[int]=None,container:Optional[str]=None,)->Union[Generator[str,None,None],Generator[Future[str],None,None]]:"""Get the current logs for an active or completed run Get the current logs for an active or completed run in the MosaicML platform. This returns the full logs as a ``str``, as they exist at the time the request is made. If you want to follow the logs for an active run line-by-line, use :func:`follow_run_logs`. Args: run (:obj:`str` | :class:`~mcli.api.model.run.Run`): The run to get logs for. If a name is provided, the remaining required run details will be queried with :func:`~mcli.get_runs`. rank (``Optional[int]``): [DEPRECATED, Use node_rank instead] Node rank of a run to get logs for. Defaults to the lowest available rank. This will usually be rank 0 unless something has gone wrong. node_rank (``Optional[int]``): Specifies the node rank within a multi-node run to fetch logs for. Defaults to lowest available rank. Indexing starts from 0. local_gpu_rank (``Optional[int]``): Specifies the GPU rank on the specified node to fetch logs for. Cannot be used with global_gpu_rank. Indexing starts from 0. Note: GPU rank logs are only available for runs using `Composer <https://github.com/mosaicml/composer>`_ and/or `LLM Foundry <https://github.com/mosaicml/llm-foundry>`_ and MAIN container logs. global_gpu_rank (``Optional[int]``): Specifies the global GPU rank to fetch logs for. Cannot be used with node_rank and local_gpu_rank. Indexing starts from 0. Note: GPU rank logs are only available for runs using `Composer <https://github.com/mosaicml/composer>`_ and/or `LLM Foundry <https://github.com/mosaicml/llm-foundry>`_ and MAIN container logs. timeout (``Optional[float]``): Time, in seconds, in which the call should complete. If the the call takes too long, a :exc:`~concurrent.futures.TimeoutError` will be raised. If ``future`` is ``True``, this value will be ignored. future (``bool``): Return the output as a :class:`~concurrent.futures.Future` . If True, the call to :func:`get_run_logs` will return immediately and the request will be processed in the background. This takes precedence over the ``timeout`` argument. To get the log text, use ``return_value.result()`` with an optional ``timeout`` argument. failed (``bool``): Return the logs of the first failed rank for the provided resumption if ``True``. ``False`` by default. resumption (``Optional[int]``): Resumption (0-indexed) of a run to get logs for. Defaults to the last resumption tail (``Optional[int]``): Number of chars to read from the end of the log. Defaults to reading the entire log. container (``Optional[str]``): Container name of a run to get logs for. Defaults to the MAIN container. Returns: If future is False: The full log text for a run at the time of the request as a :obj:`str` Otherwise: A :class:`~concurrent.futures.Future` for the log text """# Convert to stringsrun_name=run.nameifisinstance(run,Run)elserunfilters:Dict[str,Any]={'name':run_name,'follow':False,'failed':failed}ifnode_rankisnotNoneandrankisnotNone:raiseValueError("""Node rank and rank are both provided and are aliases of each other. Please use node rank to specify node rank logs. The rank parameter will be deprecated in a future release.""")# TODO: update after deprecating rankifrankisnotNone:filters['nodeRank']=rankifnode_rankisnotNone:filters['nodeRank']=node_rankiffailedandfilters.get('nodeRank')isnotNone:raiseValueError('Node rank and failed cannot be provided together. Please specify one or the other.')iflocal_gpu_rankisnotNone:ifcontainerisnotNoneandcontainer!='MAIN':raiseValueError(f'GPU rank logs are not available for {container} container.')ifglobal_gpu_rankisnotNone:raiseValueError('Global GPU rank and local GPU rank cannot be provided together. Please specify one or the other.')filters['localGpuRank']=local_gpu_rankifglobal_gpu_rankisnotNone:ifcontainerisnotNoneandcontainer!='MAIN':raiseValueError(f'GPU rank logs are not available for {container} container.')iffailed:raiseValueError('Global GPU rank and failed cannot be provided together. Please specify one or the other.')iffilters.get('nodeRank')isnotNone:raiseValueError('Global GPU rank and node rank cannot be provided together. Please specify one or the other.')filters['globalGpuRank']=global_gpu_rankifresumptionisnotNone:filters['attemptIndex']=resumptioniftailisnotNone:filters['tail']=tailifcontainerisnotNone:filters['containerName']=containercfg=MCLIConfig.load_config()cfg.update_entity(filters)variables={VARIABLE_DATA_NAME:filters}formessagein_get_logs(QUERY,variables,QUERY_FUNCTION):ifnotfuture:try:yieldmessage.result(timeout)exceptStopAsyncIteration:breakelse:yieldmessage
[docs]deffollow_run_logs(run:Union[str,Run],rank:Optional[int]=None,*,node_rank:Optional[int]=None,local_gpu_rank:Optional[int]=None,global_gpu_rank:Optional[int]=None,timeout:Optional[float]=None,future:bool=False,resumption:Optional[int]=None,tail:Optional[int]=None,container:Optional[str]=None,)->Union[Generator[str,None,None],Generator[Future[str],None,None]]:"""Follow the logs for an active or completed run in the MosaicML platform This returns a :obj:`generator` of individual log lines, line-by-line, and will wait until new lines are produced if the run is still active. Args: run (:obj:`str` | :class:`~mcli.api.model.run.Run`): The run to get logs for. If a name is provided, the remaining required run details will be queried with :func:`~mcli.get_runs`. rank (``Optional[int]``): Node rank of a run to get logs for. Defaults to the lowest available rank. This will usually be rank 0 unless something has gone wrong. timeout (``Optional[float]``): Time, in seconds, in which the call should complete. If the call takes too long, a :exc:`~concurrent.futures.TimeoutError` will be raised. If ``future`` is ``True``, this value will be ignored. A run may take some time to generate logs, so you likely do not want to set a timeout. future (``bool``): Return the output as a :class:`~concurrent.futures.Future` . If True, the call to :func:`follow_run_logs` will return immediately and the request will be processed in the background. The generator returned by the `~concurrent.futures.Future` will yield a `~concurrent.futures.Future` for each new log string returned from the cloud. This takes precedence over the ``timeout`` argument. To get the generator, use ``return_value.result()`` with an optional ``timeout`` argument and ``log_future.result()`` for each new log string. resumption (``Optional[int]``): Resumption (0-indexed) of a run to get logs for. Defaults to the last resumption tail (``Optional[int]``): Number of chars to read from the end of the log. Defaults to reading the entire log. container (``Optional[str]``): Container name of a run to get logs for. Defaults to the MAIN container. Returns: If future is False: A line-by-line :obj:`Generator` of the logs for a run Otherwise: A :class:`~concurrent.futures.Future` of a line-by-line generator of the logs for a run """# Convert to stringsrun_name=run.nameifisinstance(run,Run)elserunfilters:Dict[str,Any]={'name':run_name,'follow':True}ifnode_rankisnotNoneandrankisnotNone:raiseValueError("""Node rank and rank are both provided and are aliases of each other. Please use node rank to specify node rank logs. The rank parameter will be deprecated in a future release.""")# TODO: update after deprecating rankifrankisnotNone:filters['nodeRank']=rankifnode_rankisnotNone:filters['nodeRank']=node_rankiflocal_gpu_rankisnotNone:ifcontainerisnotNoneandcontainer!='MAIN':raiseValueError(f'GPU rank logs are not available for {container} container.')ifglobal_gpu_rankisnotNone:raiseValueError('Global GPU rank and local GPU rank cannot be provided together. Please specify one or the other.')filters['localGpuRank']=local_gpu_rankifglobal_gpu_rankisnotNone:ifcontainerisnotNoneandcontainer!='MAIN':raiseValueError(f'GPU rank logs are not available for {container} container.')iffilters.get('nodeRank')isnotNone:raiseValueError('Global GPU rank and node rank cannot be provided together. Please specify one or the other.')filters['globalGpuRank']=global_gpu_rankifresumptionisnotNone:filters['attemptIndex']=resumptioniftailisnotNone:filters['tail']=tailifcontainerisnotNone:filters['containerName']=containercfg=MCLIConfig.load_config()cfg.update_entity(filters)variables={VARIABLE_DATA_NAME:filters}formessagein_get_logs(QUERY,variables,QUERY_FUNCTION):ifnotfuture:try:yieldmessage.result(timeout)exceptStopAsyncIteration:breakelse:yieldmessage
def_get_logs(query:str,variables:Dict[str,Any],return_key:str)->Generator[Future[str],None,None]:gql_query=gql.gql(query)connection=MAPIConnection.get_current_connection()decoder=LogsDecoder(return_key=return_key)yield fromconnection.subscribe(query=gql_query,variables=variables,callback=decoder.parse_message,retry_callback=decoder.update_offset,)classLogsDecoder(MessageDecoder):"""Decode log messages and update read offset """end_offset:int=0def__init__(self,return_key:str):self.return_key=return_keysuper().__init__()defupdate_offset(self,variables:Dict[str,Any])->Dict[str,Any]:resolver_input=variables['getRunLogsInput']ifself.return_key=='getRunLogsV2'elsevariables['getInferenceDeploymentLogsInput']# We set the offset to read from the end of the last message, or zero if this is the first messageresolver_input['offset']=self.end_offset# If we have already read some bytes with the tail parameter set, we must have already calculated# the tail offset. Thus, we can simply continue reading normally, without the tail parameter.ifself.end_offset!=0and'tail'inresolver_input:delresolver_input['tail']returnvariablesdefparse_message(self,data:Dict[str,Any])->str:"""Get the next message from the GraphQL logging subscription """# Convert from base64 string to a bytestringmessage_data=data['getRunLogsV2']ifself.return_key=='getRunLogsV2'elsedata['getInferenceDeploymentLogsV2']b64_message=message_data['chunk']b64_bytes=b64_message.encode('utf8')message_bytes=base64.b64decode(b64_bytes)self.end_offset=message_data['endOffset']returnself.decode(message_bytes)