Batchlet Plugins

Batchlet log monitoring is implemented using a plugin framework. The default plugins are provided in ska_sdp_batchlet_plugins package.

Filter Plugins

Filter plugin receives the log line, it parses the log, does some matches and creates the event.

Batchlet provides filter plugins such as SKASDPFilter that filters event in pipeline logs. We need to enable filters by providing them in configuration to batchlet.

We can also add custom filters to batchlet by providing configuration as shown below.

{
// ...,
  "monitor": {
    "logs": {
      "filter_plugins": [
        {
          "name": "SKASDPFilter",
          "kwargs": {"pipeline": "INST"}
        },
        {
          "name": "custom-filter",
          "path": "/path/to/custom-filter-plugin.py"
        }
      ]
//   ,"consumer_plugins": []
    }
  }
}

SKASDPFilter Plugin

SKASDPFilter plugin uses regex provided by SKA Log Message Format documentation. When it receives a log line, it performs following operations:

  1. Parses the log line using the regex.

  2. Gets the tag from parsed log.

  3. Matches the tags for sdpStatus.

  4. If match found, creates a dictionary and returns.

For example.

# For phase updates

>>> line = '1|2025-07-02T12:06:49.933Z|INFO|node|MainThread|start|stage.py#1|sdpPhase:PREDICT,state:START|Starting predict stage'
>>> filter = SKASDPFilter("INST")
>>> filter.filter(line)
    {
        'target': 'CSVFile',
        'event': {
            "timestamp": '2025-07-02T12:06:49.933Z',
            "pipeline": 'INST',
            "stage": 'PREDICT',
            "event": 'START',
            "node": 'node',
            "process": 'MainThread',
            "source": 'stage.py#1',
            'message': 'Starting predict stage'
        }
    }


# For flow updates
>>> line = '1|2025-07-02T12:06:49.933Z|INFO||MainThread|start|stage.py#1|sdpFlow:mswriter,state:FLOWING|Starting to write data'
>>> filter = SKASDPFilter("INST")
>>> filter.filter(line)
    {
        'target': 'SDPConfigurationDB',
        'flow_name': 'mswriter',
        'log': {
            'state': 'FLOWING',
            'level': 'INFO',
            'message': 'Starting to write data',
            'timestamp': '2025-07-02T12:06:49.933Z',
            'source': 'stage.py#1',
            'pipeline': 'INST'
        }
    }

Refer API documentation for more.

Consumer Plugins

Consumer plugin is an implemention which forwards the event to the end destination. For instance, SDPConfigurationDB plugin sends the event to SDP config database. Batchlet provides consumer plugins SDPConfigurationDB and CSVFile.

We can add custom consumer plugins similar to filter plugins by adding them in batchlet configuration.

{
// ...,
  "monitor": {
    "logs": {
//   "filter_plugins": ...,
      "consumer_plugins": [
        {
          "name": "SDPConfigurationDB",
          "kwargs": {
            "pb_id": "pb-script-12345678-00001",
            "kind": "data-product",
            "flow_names": [ "mswriter", "gaintable" ],
            "host": "1.2.3.4",
            "port": 2379
          }
        },
        {
          "name": "CSVFile",
          "kwargs": {
            "file_path": "./events.csv"
          }
        }
      ]
    }
  }
}

SDPConfigurationDB Plugin

It processes the filtered event and updates the state of flow obtained from the log message. The flow key is built using the pb_id, kind and the flow_name obtained from the log. To ensure that the underlying script does not update flows which it is not supposed to, the flow_name obtained from log is validated against the flow_names provided in the config. The update is processed only if the flow_name obtained from log is present in the configuration provided flow_names.

Also refer SDPConfigurationDB API documentation.

CSVFile Plugin

It processes the filtered event and writes the event to a csv file. The path of the csv file should be provided by configuration to batchlet.

Also refer CSVFile API documentation.

Example

Following shows an example where batchlet runs a pipeline script which generates example logs and adds the event to csv file.

Example pipeline

pipeline.py

import logging
from time import sleep
from ska_sdp_batchlet.log_config import LOGGING_CONFIG

from ska_ser_logging import configure_logging

configure_logging(level=logging.INFO, overrides=LOGGING_CONFIG)

logger = logging.getLogger(__name__)

def main():
    sleep(2)
    logger.info("Starting predict stage", extra={"tags": "sdpPhase:PREDICT,state:START"})
    sleep(2)
    logger.info("Finished predict stage", extra={"tags": "sdpPhase:PREDICT,state:FINISHED"})
    sleep(2)

if __name__ == "__main__":
    main()

Note: It is not advisable to use Piper as a dependency just to configure the log. Please use the logging configurations from Piper source code.

Batchlet script

batchlet.sh

#!/usr/bin/env bash

cat <<EOF | batchlet run -
{
  "command": [
    "python",
    "pipeline.py"
  ],
  "monitor": {
    "logs": {
      "filter_plugins": [
        {
          "name": "SKASDPFilter",
          "kwargs": {"pipeline": "INST"}
        }
      ],
      "consumer_plugins": [
        {
          "name": "CSVFile",
          "kwargs": {
            "file_path": "./events.csv"
          }
        }
      ]
    }
  }
}
EOF

With these scripts in place, we can run bash batchlet.sh. Once it has run, we can take a look at csv file.

cat ./events.csv

It outputs

timestamp,pipeline,stage,event,node,process,source,message,core
1760419311.278,INST,PREDICT,START,ska-host,MainThread,pipeline.py#13,Starting predict stage,
1760419313.281,INST,PREDICT,FINISHED,ska-host,MainThread,pipeline.py#15,Finished predict stage,

Create Custom plugins

Batchlet provides interfaces for filter and consumer plugins which need to be implemented by custom plugins. These interfaces are part of the batchlet's ska_sdp_batchlet.plugins.plugin API documentation

Filter Plugin

One needs to implement FilterPlugin interface to custom filter plugin.

from ska_sdp_batchlet.plugins import FilterPlugin


class CustomFilterPlugin(FilterPlugin):
    def __init__(self, tag_identifier) -> None:
        super().__init__()
        self.tag_identifier = tag_identifier

    def filter(self, line: str) -> dict | None:
        if is_matching(line, self.tag_identifier):
          return create_event(line)
        return None

While configuring the filter plugin, the path has be to provided.

"filter_plugins": [
  {
    "name": "CustomFilterPlugin",
    "kwargs": {
      "tag_identifier": "CustomEvent"
    },
    "path": "custom_plugins/custom_filter_plugin.py"
  }
]

Consumer Plugin

One needs to implement ConsumerPlugin interface to custom consumer plugin.

from ska_sdp_batchlet.plugins import ConsumerPlugin


class CustomConsumerPlugin(ConsumerPlugin):
    def __init__(self) -> None:
        super().__init__()

    def process(self, event: dict) -> None:
      # process event
      send_http(event)

Similarly to filter custom plugin, the path has be to provided.

"consumer_plugins": [
  {
    "name": "CustomConsumerPlugin",
    "path": "custom_plugins/custom_consumer_plugin.py"
  }
]