Source code for ska_sdp_batchlet.utils.monitor.log

import os
from multiprocessing import Pipe, Process, Queue
from multiprocessing.connection import Connection

from ...plugins import ConsumerPlugin, FilterPlugin
from ...plugins.utils import instantiate_plugins, load_plugin

__all__ = ("LogMonitor",)

EOF_MSG = "===DONE===\n"


def _filter_process(
    pipe_read: Connection,
    output_queue: Queue,
    filter_plugins: list[dict],
):
    """
    Filter process that reads log lines from the input queue,
    applies filter plugins to extract events,
    and passes matching events to the output queue.
    Terminates when EOF_MSG is received.

    Parameters
    ----------
    pipe_read
        Read end of the pipe from multiprocessing.Pipe
    output_queue
        Queue to put filtered events
    filter_plugins
        Configuration of the filter plugins
    """
    filters = instantiate_plugins(FilterPlugin, filter_plugins)

    with os.fdopen(os.dup(pipe_read.fileno()), "r") as pipe_readf:
        for line in iter(pipe_readf.readline, EOF_MSG):
            event = None
            for filter_plugin in filters:
                if event := filter_plugin.filter(line.strip()):
                    break

            if event:
                output_queue.put(event)

            print(line, end="")

    output_queue.put(EOF_MSG)


def _consumer_process(
    input_queue: Queue,
    consumer_plugins: list[dict],
):
    """
    Consumer process that reads filtered events from the input queue and
    passes them to consumer plugins.
    Terminates when EOF_MSG is received.

    Parameters
    ----------
    input_queue
        Queue containing filtered events
    consumer_plugins
        Configuration of the consumer plugins
    """
    consumers = instantiate_plugins(ConsumerPlugin, consumer_plugins)

    while True:
        event = input_queue.get()
        if event == EOF_MSG:
            break

        for consumer_plugin in consumers:
            consumer_plugin.process(event)


[docs] class LogMonitor: """ Monitors logs by utilizing filter and consumer plugins. This schema of filter_plugins and consumer_plugins is same as the schema of the input configuration of batchlet. Parameters ---------- filter_plugins : list of dict, optional List of dictionaries specifying filter plugins consumer_plugins : list of dict, optional List of dictionaries specifying consumer plugins """ pipe_read: Connection "Read end of the pipe, between application and filter process" pipe_write: Connection "Write end of the pipe, between application and filter process" event_queue: Queue "Queue for filter process -> consumer process" filter_process: Process "The filter subprocess" consumer_process: Process "The consumer subprocess" def __init__( self, filter_plugins: list[dict], consumer_plugins: list[dict], ): self.filter_plugins = filter_plugins self.consumer_plugins = consumer_plugins # Ensure that all plugins are loadable for f in self.filter_plugins: load_plugin(FilterPlugin, **f) for c in self.consumer_plugins: load_plugin(ConsumerPlugin, **c) def __enter__(self): """ Start filter and consumer processes. This function sets up queue-based communication between: - The application to be watched (producer of log lines) - The filter process (consumes log lines, produces events) - The consumer process (consumes events) Returns ------- Self """ self.pipe_read, self.pipe_write = Pipe(duplex=False) self.event_queue = Queue() self.filter_process = Process( target=_filter_process, args=( self.pipe_read, self.event_queue, self.filter_plugins, ), ) self.filter_process.start() self.consumer_process = Process( target=_consumer_process, args=( self.event_queue, self.consumer_plugins, ), ) self.consumer_process.start() return self def __exit__(self, *args): with self.log_write_stream as f: f.write(EOF_MSG) self.consumer_process.join() self.filter_process.join() self.event_queue.close() self.event_queue.join_thread() self.pipe_read.close() self.pipe_write.close() @property def log_write_stream(self): """ Returns a new stream to which another process can write its stdout/stderr """ return os.fdopen(os.dup(self.pipe_write.fileno()), "w")