import os
from multiprocessing import Pipe, Process, Queue
from multiprocessing.connection import Connection
from ...plugins import ConsumerPlugin, FilterPlugin
from ...plugins.utils import instantiate_plugins, load_plugin
__all__ = ("LogMonitor",)
EOF_MSG = "===DONE===\n"
def _filter_process(
pipe_read: Connection,
output_queue: Queue,
filter_plugins: list[dict],
):
"""
Filter process that reads log lines from the input queue,
applies filter plugins to extract events,
and passes matching events to the output queue.
Terminates when EOF_MSG is received.
Parameters
----------
pipe_read
Read end of the pipe from multiprocessing.Pipe
output_queue
Queue to put filtered events
filter_plugins
Configuration of the filter plugins
"""
filters = instantiate_plugins(FilterPlugin, filter_plugins)
with os.fdopen(os.dup(pipe_read.fileno()), "r") as pipe_readf:
for line in iter(pipe_readf.readline, EOF_MSG):
event = None
for filter_plugin in filters:
if event := filter_plugin.filter(line.strip()):
break
if event:
output_queue.put(event)
print(line, end="")
output_queue.put(EOF_MSG)
def _consumer_process(
input_queue: Queue,
consumer_plugins: list[dict],
):
"""
Consumer process that reads filtered events from the input queue and
passes them to consumer plugins.
Terminates when EOF_MSG is received.
Parameters
----------
input_queue
Queue containing filtered events
consumer_plugins
Configuration of the consumer plugins
"""
consumers = instantiate_plugins(ConsumerPlugin, consumer_plugins)
while True:
event = input_queue.get()
if event == EOF_MSG:
break
for consumer_plugin in consumers:
consumer_plugin.process(event)
[docs]
class LogMonitor:
"""
Monitors logs by utilizing filter and consumer plugins.
This schema of filter_plugins and consumer_plugins is
same as the schema of the input configuration of batchlet.
Parameters
----------
filter_plugins : list of dict, optional
List of dictionaries specifying filter plugins
consumer_plugins : list of dict, optional
List of dictionaries specifying consumer plugins
"""
pipe_read: Connection
"Read end of the pipe, between application and filter process"
pipe_write: Connection
"Write end of the pipe, between application and filter process"
event_queue: Queue
"Queue for filter process -> consumer process"
filter_process: Process
"The filter subprocess"
consumer_process: Process
"The consumer subprocess"
def __init__(
self,
filter_plugins: list[dict],
consumer_plugins: list[dict],
):
self.filter_plugins = filter_plugins
self.consumer_plugins = consumer_plugins
# Ensure that all plugins are loadable
for f in self.filter_plugins:
load_plugin(FilterPlugin, **f)
for c in self.consumer_plugins:
load_plugin(ConsumerPlugin, **c)
def __enter__(self):
"""
Start filter and consumer processes.
This function sets up queue-based communication between:
- The application to be watched (producer of log lines)
- The filter process (consumes log lines, produces events)
- The consumer process (consumes events)
Returns
-------
Self
"""
self.pipe_read, self.pipe_write = Pipe(duplex=False)
self.event_queue = Queue()
self.filter_process = Process(
target=_filter_process,
args=(
self.pipe_read,
self.event_queue,
self.filter_plugins,
),
)
self.filter_process.start()
self.consumer_process = Process(
target=_consumer_process,
args=(
self.event_queue,
self.consumer_plugins,
),
)
self.consumer_process.start()
return self
def __exit__(self, *args):
with self.log_write_stream as f:
f.write(EOF_MSG)
self.consumer_process.join()
self.filter_process.join()
self.event_queue.close()
self.event_queue.join_thread()
self.pipe_read.close()
self.pipe_write.close()
@property
def log_write_stream(self):
"""
Returns a new stream to which another process can
write its stdout/stderr
"""
return os.fdopen(os.dup(self.pipe_write.fileno()), "w")