Batchlet Plugins ================ Batchlet log monitoring is implemented using a plugin framework. The default plugins are provided in :py:class:`ska_sdp_batchlet_plugins` package. Filter Plugins -------------- Filter plugin receives the log line, it parses the log, does some matches and creates the event. Batchlet provides filter plugins such as ``SKASDPFilter`` that filters event in pipeline logs. We need to enable filters by providing them in configuration to batchlet. We can also add custom filters to batchlet by providing configuration as shown below. .. code:: json { // ..., "monitor": { "logs": { "filter_plugins": [ { "name": "SKASDPFilter", "kwargs": {"pipeline": "INST"} }, { "name": "custom-filter", "path": "/path/to/custom-filter-plugin.py" } ] // ,"consumer_plugins": [] } } } ``SKASDPFilter`` Plugin ~~~~~~~~~~~~~~~~~~~~~~~ ``SKASDPFilter`` plugin uses regex provided by `SKA Log Message Format documentation `__. When it receives a log line, it performs following operations: 1. Parses the log line using the regex. 2. Gets the tag from parsed log. 3. Matches the tags for ``sdpStatus``. 4. If match found, creates a dictionary and returns. For example. .. code:: python # For phase updates >>> line = '1|2025-07-02T12:06:49.933Z|INFO|node|MainThread|start|stage.py#1|sdpPhase:PREDICT,state:START|Starting predict stage' >>> filter = SKASDPFilter("INST") >>> filter.filter(line) { 'target': 'CSVFile', 'event': { "timestamp": '2025-07-02T12:06:49.933Z', "pipeline": 'INST', "stage": 'PREDICT', "event": 'START', "node": 'node', "process": 'MainThread', "source": 'stage.py#1', 'message': 'Starting predict stage' } } # For flow updates >>> line = '1|2025-07-02T12:06:49.933Z|INFO||MainThread|start|stage.py#1|sdpFlow:mswriter,state:FLOWING|Starting to write data' >>> filter = SKASDPFilter("INST") >>> filter.filter(line) { 'target': 'SDPConfigurationDB', 'flow_name': 'mswriter', 'log': { 'state': 'FLOWING', 'level': 'INFO', 'message': 'Starting to write data', 'timestamp': '2025-07-02T12:06:49.933Z', 'source': 'stage.py#1', 'pipeline': 'INST' } } Refer `API `__ documentation for more. Consumer Plugins ---------------- Consumer plugin is an implemention which forwards the event to the end destination. For instance, ``SDPConfigurationDB`` plugin sends the event to SDP config database. Batchlet provides consumer plugins ``SDPConfigurationDB`` and ``CSVFile``. We can add custom consumer plugins similar to filter plugins by adding them in batchlet configuration. .. code:: json { // ..., "monitor": { "logs": { // "filter_plugins": ..., "consumer_plugins": [ { "name": "SDPConfigurationDB", "kwargs": { "pb_id": "pb-script-12345678-00001", "kind": "data-product", "flow_names": [ "mswriter", "gaintable" ], "host": "1.2.3.4", "port": 2379 } }, { "name": "CSVFile", "kwargs": { "file_path": "./events.csv" } } ] } } } ``SDPConfigurationDB`` Plugin ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ It processes the filtered event and updates the state of flow obtained from the log message. The flow key is built using the ``pb_id``, ``kind`` and the ``flow_name`` obtained from the log. To ensure that the underlying script does not update flows which it is not supposed to, the ``flow_name`` obtained from log is validated against the ``flow_names`` provided in the config. The update is processed only if the ``flow_name`` obtained from log is present in the configuration provided ``flow_names``. Also refer :py:class:`~ska_sdp_batchlet_plugins.consumers.sdp_configuration_db.SDPConfigurationDB` API documentation. ``CSVFile`` Plugin ~~~~~~~~~~~~~~~~~~ It processes the filtered event and writes the event to a csv file. The path of the csv file should be provided by configuration to batchlet. Also refer :py:class:`~ska_sdp_batchlet_plugins.consumers.csv_file.CSVFile` API documentation. Example ------- Following shows an example where batchlet runs a pipeline script which generates example logs and adds the event to csv file. Example pipeline ~~~~~~~~~~~~~~~~ ``pipeline.py`` .. code:: python import logging from time import sleep from ska_sdp_batchlet.log_config import LOGGING_CONFIG from ska_ser_logging import configure_logging configure_logging(level=logging.INFO, overrides=LOGGING_CONFIG) logger = logging.getLogger(__name__) def main(): sleep(2) logger.info("Starting predict stage", extra={"tags": "sdpPhase:PREDICT,state:START"}) sleep(2) logger.info("Finished predict stage", extra={"tags": "sdpPhase:PREDICT,state:FINISHED"}) sleep(2) if __name__ == "__main__": main() .. **Note**: It is not advisable to use Piper as a dependency just to configure the log. Please use the logging configurations from Piper `source code `__. Batchlet script ~~~~~~~~~~~~~~~ ``batchlet.sh`` .. code:: sh #!/usr/bin/env bash cat < None: super().__init__() self.tag_identifier = tag_identifier def filter(self, line: str) -> dict | None: if is_matching(line, self.tag_identifier): return create_event(line) return None While configuring the filter plugin, the path has be to provided. .. code:: json "filter_plugins": [ { "name": "CustomFilterPlugin", "kwargs": { "tag_identifier": "CustomEvent" }, "path": "custom_plugins/custom_filter_plugin.py" } ] Consumer Plugin ~~~~~~~~~~~~~~~ One needs to implement ``ConsumerPlugin`` interface to custom consumer plugin. .. code:: python from ska_sdp_batchlet.plugins import ConsumerPlugin class CustomConsumerPlugin(ConsumerPlugin): def __init__(self) -> None: super().__init__() def process(self, event: dict) -> None: # process event send_http(event) Similarly to filter custom plugin, the path has be to provided. .. code:: json "consumer_plugins": [ { "name": "CustomConsumerPlugin", "path": "custom_plugins/custom_consumer_plugin.py" } ]