Source code for composer.loggers.file_logger

# Copyright 2021 MosaicML. All Rights Reserved.

"""Logs to a file."""

from __future__ import annotations

import os
import queue
import sys
import textwrap
from typing import Any, Callable, Dict, Optional, TextIO

from composer.core.state import State
from composer.loggers.logger import Logger, LogLevel, format_log_data_value
from composer.loggers.logger_destination import LoggerDestination
from composer.utils.file_helpers import FORMAT_NAME_WITH_DIST_TABLE, format_name_with_dist

__all__ = ["FileLogger"]


[docs]class FileLogger(LoggerDestination): __doc__ = f"""Log data to a file. Example usage: .. testcode:: from composer.loggers import FileLogger, LogLevel from composer.trainer import Trainer file_logger = FileLogger( filename="{{run_name}}/logs-rank{{rank}}.txt", buffer_size=1, log_level=LogLevel.BATCH, log_interval=2, flush_interval=50 ) trainer = Trainer( ..., loggers=[file_logger] ) .. testcleanup:: import os trainer.engine.close() path = os.path.join(trainer.logger.run_name, "logs-rank0.txt") try: os.remove(file_logger.filename) except FileNotFoundError as e: pass Example output:: [FIT][step=2]: {{ "logged_metric": "logged_value", }} [EPOCH][step=2]: {{ "logged_metric": "logged_value", }} [BATCH][step=2]: {{ "logged_metric": "logged_value", }} [EPOCH][step=3]: {{ "logged_metric": "logged_value", }} Args: filename (str, optional): Format string for the filename. The following format variables are available: {textwrap.indent(FORMAT_NAME_WITH_DIST_TABLE, prefix=' ')} .. note:: When training with multiple devices (i.e. GPUs), ensure that ``'{{rank}}'`` appears in the format. Otherwise, multiple processes may attempt to write to the same file. Consider the following example when using default value of '{{run_name}}/logs-rank{{rank}}.txt': >>> file_logger = FileLogger(filename='{{run_name}}/logs-rank{{rank}}.txt') >>> trainer = Trainer(loggers=[file_logger], run_name='my-awesome-run') >>> file_logger.filename 'my-awesome-run/logs-rank0.txt' Default: `'{{run_name}}/logs-rank{{rank}}.txt'` artifact_name (str, optional): Format string for the logfile's artifact name. The logfile will be periodically logged (according to the ``flush_interval``) as a file artifact. The artifact name will be determined by this format string. .. seealso:: :meth:`~composer.loggers.logger.Logger.log_file_artifact` for file artifact logging. The same format variables for ``filename`` are available. Setting this parameter to ``None`` (the default) will use the same format string as ``filename``. It is sometimes helpful to deviate from this default. For example, when ``filename`` contains an absolute path, it is recommended to set this parameter explicitely, so the absolute path does not appear in any artifact stores. Leading slashes (``'/'``) will be stripped. Default: ``None`` (which uses the same format string as ``filename``) capture_stdout (bool, optional): Whether to include the ``stdout``in ``filename``. (default: ``True``) capture_stderr (bool, optional): Whether to include the ``stderr``in ``filename``. (default: ``True``) buffer_size (int, optional): Buffer size. See :py:func:`open`. Default: ``1`` for line buffering. log_level (LogLevel, optional): :class:`~.logger.LogLevel` (i.e. unit of resolution) at which to record. Default: :attr:`~.LogLevel.EPOCH`. log_interval (int, optional): Frequency to print logs. If ``log_level`` is :attr:`~.LogLevel.EPOCH`, logs will only be recorded every n epochs. If ``log_level`` is :attr:`~.LogLevel.BATCH`, logs will be printed every n batches. Otherwise, if ``log_level`` is :attr:`~.LogLevel.FIT`, this parameter is ignored, as calls at the :attr:`~.LogLevel.FIT` log level are always recorded. Default: ``1``. flush_interval (int, optional): How frequently to flush the log to the file, relative to the ``log_level``. For example, if the ``log_level`` is :attr:`~.LogLevel.EPOCH`, then the logfile will be flushed every n epochs. If the ``log_level`` is :attr:`~.LogLevel.BATCH`, then the logfile will be flushed every n batches. Default: ``100``. """ def __init__( self, filename: str = "{run_name}/logs-rank{rank}.txt", artifact_name: Optional[str] = None, *, capture_stdout: bool = True, capture_stderr: bool = True, buffer_size: int = 1, log_level: LogLevel = LogLevel.EPOCH, log_interval: int = 1, flush_interval: int = 100, overwrite: bool = False, ) -> None: self.filename_format = filename if artifact_name is None: artifact_name = filename.replace(os.path.sep, '/') self.artifact_name_format = artifact_name self.buffer_size = buffer_size self.log_level = log_level self.log_interval = log_interval self.flush_interval = flush_interval self.is_batch_interval = False self.is_epoch_interval = False self.file: Optional[TextIO] = None self.overwrite = overwrite, self._queue: queue.Queue[str] = queue.Queue() self._original_stdout_write = sys.stdout.write self._original_stderr_write = sys.stderr.write self._run_name = None if capture_stdout: sys.stdout.write = self._get_new_writer("[stdout]: ", self._original_stdout_write) if capture_stderr: sys.stderr.write = self._get_new_writer("[stderr]: ", self._original_stderr_write) def _get_new_writer(self, prefix: str, original_writer: Callable[[str], int]): """Returns a writer that intercepts calls to the ``original_writer``.""" def new_write(s: str) -> int: self.write(prefix, s) return original_writer(s) return new_write @property def filename(self) -> str: """The filename for the logfile.""" if self._run_name is None: raise RuntimeError("The run name is not set. The engine should have been set on Event.INIT") name = format_name_with_dist(self.filename_format, run_name=self._run_name) return name @property def artifact_name(self) -> str: """The artifact name for the logfile.""" if self._run_name is None: raise RuntimeError("The run name is not set. The engine should have been set on Event.INIT") name = format_name_with_dist(self.artifact_name_format, run_name=self._run_name) name.lstrip("/") return name def batch_start(self, state: State, logger: Logger) -> None: self.is_batch_interval = (int(state.timer.batch) + 1) % self.log_interval == 0 def epoch_start(self, state: State, logger: Logger) -> None: self.is_epoch_interval = (int(state.timer.epoch) + 1) % self.log_interval == 0 # Flush any log calls that occurred during INIT or FIT_START self._flush_file(logger) def _will_log(self, log_level: LogLevel) -> bool: if log_level == LogLevel.FIT: return True # fit is always logged if log_level == LogLevel.EPOCH: if self.log_level < LogLevel.EPOCH: return False if self.log_level > LogLevel.EPOCH: return True return self.is_epoch_interval if log_level == LogLevel.BATCH: if self.log_level < LogLevel.BATCH: return False if self.log_level > LogLevel.BATCH: return True return self.is_batch_interval raise ValueError(f"Unknown log level: {log_level}") def log_data(self, state: State, log_level: LogLevel, data: Dict[str, Any]): if not self._will_log(log_level): return data_str = format_log_data_value(data) self.write( f'[{log_level.name}][batch={int(state.timer.batch)}]: ', data_str + "\n", ) def init(self, state: State, logger: Logger) -> None: del state # unused self._run_name = logger.run_name if self.file is not None: raise RuntimeError("The file logger is already initialized") file_dirname = os.path.dirname(self.filename) if file_dirname: os.makedirs(file_dirname, exist_ok=True) mode = 'w+' if self.overwrite else 'x+' self.file = open(self.filename, mode, buffering=self.buffer_size) self._flush_queue() def batch_end(self, state: State, logger: Logger) -> None: assert self.file is not None if self.log_level == LogLevel.BATCH and int(state.timer.batch) % self.flush_interval == 0: self._flush_file(logger) def eval_start(self, state: State, logger: Logger) -> None: # Flush any log calls that occurred during INIT when using the trainer in eval-only mode self._flush_file(logger) def epoch_end(self, state: State, logger: Logger) -> None: if self.log_level > LogLevel.EPOCH or self.log_level == LogLevel.EPOCH and int( state.timer.epoch) % self.flush_interval == 0: self._flush_file(logger)
[docs] def write(self, prefix: str, s: str): """Write to the logfile. .. note:: If the ``write`` occurs before the :attr:`~composer.core.event.Event.INIT` event, the write will be enqueued, as the file is not yet open. Args: prefix (str): A prefix for each line in the logfile. s (str): The string to write. Each line will be prefixed with ``prefix``. """ formatted_lines = [] for line in s.splitlines(True): if line == os.linesep: # If it's an empty line, don't print the prefix formatted_lines.append(line) else: formatted_lines.append(f"{prefix}{line}") formatted_s = ''.join(formatted_lines) if self.file is None: self._queue.put_nowait(formatted_s) else: # Flush the queue, so all prints will be in order self._flush_queue() # Then, write to the file print(formatted_s, file=self.file, flush=False, end='')
def _flush_queue(self): while True: try: s = self._queue.get_nowait() except queue.Empty: break print(s, file=self.file, flush=False, end='') def _flush_file(self, logger: Logger) -> None: assert self.file is not None self._flush_queue() self.file.flush() os.fsync(self.file.fileno()) logger.file_artifact(LogLevel.FIT, self.artifact_name, self.file.name, overwrite=True) def close(self, state: State, logger: Logger) -> None: del state # unused if self.file is not None: sys.stdout.write = self._original_stdout_write sys.stderr.write = self._original_stderr_write self._flush_file(logger) self.file.close() self.file = None