# Copyright 2022 MosaicML Composer authors# SPDX-License-Identifier: Apache-2.0"""Log metrics to slack, using Slack postMessage api."""from__future__importannotationsimportitertoolsimportloggingimportosimportreimporttimefromtypingimportTYPE_CHECKING,Any,Callable,Dict,List,Optional,Sequence,Unionfromcomposer.core.timeimportTime,TimeUnitfromcomposer.loggers.loggerimportLoggerfromcomposer.loggers.logger_destinationimportLoggerDestinationfromcomposer.utilsimportMissingConditionalImportErrorifTYPE_CHECKING:fromcomposer.coreimportStatelog=logging.getLogger(__name__)__all__=['SlackLogger']
[docs]classSlackLogger(LoggerDestination):"""Log metrics to slack, using Slack's postMessage api - https://api.slack.com/methods/chat.postMessage. First export 2 environment variable to use this logger. 1. SLACK_LOGGING_API_KEY: To get app credentials, follow tutorial here - https://api.slack.com/tutorials/tracks/posting-messages-with-curl?app_id_from_manifest=A053W1QCEF2. 2. SLACK_LOGGING_CHANNEL_ID: Channel id to send the message (Open slack channel in web browser to look this up). Next write script to output metrics / hparams / traces to slack channel. See example below. .. code-block:: python trainer = Trainer( model=mnist_model(num_classes=10), train_dataloader=train_dataloader, max_duration='2ep', algorithms=[ LabelSmoothing(smoothing=0.1), CutMix(alpha=1.0), ChannelsLast(), ], loggers=[ SlackLogger( formatter_func=(lambda data: [{ 'type': 'section', 'text': { 'type': 'mrkdwn', 'text': f'*{k}:* {v}' } } for k, v in data.items()]), include_keys=['loss/train/total'], interval_in_seconds=1 ), ], ) trainer.fit() Args: formatter_func ((...) -> Any | None): A formatter function that returns list of blocks to be sent to slack. include_keys (Sequence[str]): A sequence of metric/logs/traces keys to include in the message. log_interval: (int | str | Time): How frequently to log. (default: ``'1ba'``) max_logs_per_message (int)(default:50): Maximum number of logs to send in a single message. Note that no more than 50 items are allowed to send in a single message. If more than 50 items are stored in buffer, the message flushed without waiting the full time interval. """def__init__(self,include_keys:Sequence[str]=(),formatter_func:Optional[Callable[...,List[Dict[str,Any]]]]=None,log_interval:Union[int,str,Time]='1ba',max_logs_per_message:int=50,slack_logging_api_key:Optional[str]=None,channel_id:Optional[str]=None,)->None:try:importslack_sdkself.client=slack_sdk.WebClient()delslack_sdkexceptImportErrorase:raiseMissingConditionalImportError('slack_logger','slack_sdk',None)fromeself.slack_logging_api_key=os.environ.get('SLACK_LOGGING_API_KEY',None)ifslack_logging_api_keyisNoneelseslack_logging_api_keyself.channel_id=os.environ.get('SLACK_LOGGING_CHANNEL_ID',None)ifchannel_idisNoneelsechannel_idifself.slack_logging_api_keyisNone:print('WARNING: SLACK_LOGGING_API_KEY must be set as environment variable')ifself.channel_idisNone:print('WARNING: SLACK_LOGGING_CHANNEL_ID must be set as environment variable')self.formatter_func=formatter_funciflen(include_keys)==0:print('WARNING: The slack logger `include_keys` argument must be a non-empty list of strings.')# Create a regex of all keys to includeself.regex_all_keys='('+')|('.join(include_keys)+')'ifisinstance(log_interval,int):self.log_interval=Time(log_interval,TimeUnit.EPOCH)ifisinstance(log_interval,str):self.log_interval=Time.from_timestring(log_interval)ifself.log_interval.unitnotin(TimeUnit.EPOCH,TimeUnit.BATCH):raiseValueError('The `slack logger log_interval` argument must have units of EPOCH or BATCH.')self.log_dict,self.last_log_time={},time.time()self.max_logs_per_message=min(max_logs_per_message,50)def_log_to_buffer(self,data:Dict[str,Any],**kwargs,# can be used to pass additional arguments to the formatter function (eg for headers)):"""Flush the buffer to slack if the buffer size exceeds max_logs_per_message. Buffer will replace existing keys with updated values if keys exist. Otherwise, add new key-value pairs. If max_logs_per_message is exceeded, flush buffer. Otherwise, wait for the next log_interval (batch end or epoch end) to flush the buffer. """filtered_data={k:vfork,vindata.items()ifre.match(self.regex_all_keys,k)isnotNone}self.log_dict.update(filtered_data)iflen(self.log_dict.keys())>=self.max_logs_per_message:self._flush_logs_to_slack(**kwargs)def_default_log_bold_key_normal_value_pair_with_header(self,data:Dict[str,Any],**kwargs)->List[Dict[str,Any]]:"""Default formatter function if no formatter func is specified. This function will: 1. Log the key-value pairs in bold (key) and normal (value) text. 2. When logging metrics, set the step number as the header of the section. Args: data (Dict[str, Any]): Data to be logged. **kwargs: Additional arguments to be passed to the formatter function (Only supports "header" argument now) Returns: List[Dict[str, str]]: List of blocks to be sent to Slack. """blocks=[{'type':'section','text':{'type':'mrkdwn','text':f'*{k}:* {v}'}}fork,vindata.items()]iflen(blocks)>0and'header'inkwargs:header=kwargs['header']blocks.append({'type':'header','text':{'type':'plain_text','text':f'{header}'}})returnblocksdeflog_metrics(self,metrics:Dict[str,Any],step:Optional[int]=None)->None:self._log_to_buffer(data=metrics,header=step)deflog_hyperparameters(self,hyperparameters:Dict[str,Any]):self._log_to_buffer(data=hyperparameters)deflog_traces(self,traces:Dict[str,Any]):self._log_to_buffer(data=traces)defepoch_end(self,state:State,logger:Logger)->None:cur_epoch=int(state.timestamp.epoch)# epoch gets incremented right before EPOCH_ENDunit=self.log_interval.unitifunit==TimeUnit.EPOCHand(cur_epoch%int(self.log_interval)==0orcur_epoch==1):self._flush_logs_to_slack()defbatch_end(self,state:State,logger:Logger)->None:cur_batch=int(state.timestamp.batch)unit=self.log_interval.unitifunit==TimeUnit.BATCHand(cur_batch%int(self.log_interval)==0orcur_batch==1):self._flush_logs_to_slack()defclose(self,state:State,logger:Logger)->None:self._flush_logs_to_slack()def_flush_logs_to_slack(self,**kwargs)->None:"""Flush buffered metadata to MosaicML. Format slack messages through rich message layouts created using Slack Blocks Kit. See documentation here: https://api.slack.com/messaging/composing/layouts. """inx=0whileinx<len(self.log_dict.keys()):max_log_entries_dict=dict(itertools.islice(self.log_dict.items(),inx,inx+self.max_logs_per_message))self._format_and_send_blocks_to_slack(max_log_entries_dict,**kwargs)inx+=self.max_logs_per_messageself.log_dict={}# reset log_dictdef_format_and_send_blocks_to_slack(self,log_entries:Dict[str,Any],**kwargs,):blocks=self.formatter_func(log_entries,**kwargs)ifself.formatter_funcisnotNoneelseself._default_log_bold_key_normal_value_pair_with_header(log_entries,**kwargs)try:channel_id=self.channel_idslack_logging_key=self.slack_logging_api_keyifchannel_idisNone:raiseTypeError('SLACK_LOGGING_CHANNEL_ID cannot be None.')ifslack_logging_keyisNone:raiseTypeError('SLACK_LOGGING_API_KEY cannot be None')self.client.chat_postMessage(token=f'{self.slack_logging_api_keyifself.slack_logging_api_keyelse""}',channel=channel_id,blocks=blocks,text=f'Logged {len(log_entries)} items to Slack')exceptExceptionase:log.error(f'Error logging to Slack: {e}')