Batchlet Plugins
Batchlet log monitoring is implemented using a plugin framework. The default plugins are provided in
ska_sdp_batchlet_plugins package.
Filter Plugins
Filter plugin receives the log line, it parses the log, does some matches and creates the event.
Batchlet provides filter plugins such as SKASDPFilter that filters event in pipeline logs. We need to enable filters
by providing them in configuration to batchlet.
We can also add custom filters to batchlet by providing configuration as shown below.
{
// ...,
"monitor": {
"logs": {
"filter_plugins": [
{
"name": "SKASDPFilter",
"kwargs": {"pipeline": "INST"}
},
{
"name": "custom-filter",
"path": "/path/to/custom-filter-plugin.py"
}
]
// ,"consumer_plugins": []
}
}
}
SKASDPFilter Plugin
SKASDPFilter plugin uses regex provided by SKA Log Message Format
documentation. When it receives a log line, it
performs following operations:
Parses the log line using the regex.
Gets the tag from parsed log.
Matches the tags for
sdpStatus.If match found, creates a dictionary and returns.
For example.
# For phase updates
>>> line = '1|2025-07-02T12:06:49.933Z|INFO|node|MainThread|start|stage.py#1|sdpPhase:PREDICT,state:START|Starting predict stage'
>>> filter = SKASDPFilter("INST")
>>> filter.filter(line)
{
'target': 'CSVFile',
'event': {
"timestamp": '2025-07-02T12:06:49.933Z',
"pipeline": 'INST',
"stage": 'PREDICT',
"event": 'START',
"node": 'node',
"process": 'MainThread',
"source": 'stage.py#1',
'message': 'Starting predict stage'
}
}
# For flow updates
>>> line = '1|2025-07-02T12:06:49.933Z|INFO||MainThread|start|stage.py#1|sdpFlow:mswriter,state:FLOWING|Starting to write data'
>>> filter = SKASDPFilter("INST")
>>> filter.filter(line)
{
'target': 'SDPConfigurationDB',
'flow_name': 'mswriter',
'log': {
'state': 'FLOWING',
'level': 'INFO',
'message': 'Starting to write data',
'timestamp': '2025-07-02T12:06:49.933Z',
'source': 'stage.py#1',
'pipeline': 'INST'
}
}
Refer API documentation for more.
Consumer Plugins
Consumer plugin is an implemention which forwards the event to the end destination. For instance, SDPConfigurationDB
plugin sends the event to SDP config database. Batchlet provides consumer plugins SDPConfigurationDB and
CSVFile.
We can add custom consumer plugins similar to filter plugins by adding them in batchlet configuration.
{
// ...,
"monitor": {
"logs": {
// "filter_plugins": ...,
"consumer_plugins": [
{
"name": "SDPConfigurationDB",
"kwargs": {
"pb_id": "pb-script-12345678-00001",
"kind": "data-product",
"flow_names": [ "mswriter", "gaintable" ],
"host": "1.2.3.4",
"port": 2379
}
},
{
"name": "CSVFile",
"kwargs": {
"file_path": "./events.csv"
}
}
]
}
}
}
SDPConfigurationDB Plugin
It processes the filtered event and updates the state of flow obtained from the log message. The flow key is built using
the pb_id, kind and the flow_name obtained from the log. To ensure that the underlying script does not
update flows which it is not supposed to, the flow_name obtained from log is validated against the flow_names
provided in the config. The update is processed only if the flow_name obtained from log is present in the
configuration provided flow_names.
Also refer SDPConfigurationDB API documentation.
CSVFile Plugin
It processes the filtered event and writes the event to a csv file. The path of the csv file should be provided by configuration to batchlet.
Also refer CSVFile API documentation.
Example
Following shows an example where batchlet runs a pipeline script which generates example logs and adds the event to csv file.
Example pipeline
pipeline.py
import logging
from time import sleep
from ska_sdp_batchlet.log_config import LOGGING_CONFIG
from ska_ser_logging import configure_logging
configure_logging(level=logging.INFO, overrides=LOGGING_CONFIG)
logger = logging.getLogger(__name__)
def main():
sleep(2)
logger.info("Starting predict stage", extra={"tags": "sdpPhase:PREDICT,state:START"})
sleep(2)
logger.info("Finished predict stage", extra={"tags": "sdpPhase:PREDICT,state:FINISHED"})
sleep(2)
if __name__ == "__main__":
main()
Note: It is not advisable to use Piper as a dependency just to configure the log. Please use the logging configurations from Piper source code.
Batchlet script
batchlet.sh
#!/usr/bin/env bash
cat <<EOF | batchlet run -
{
"command": [
"python",
"pipeline.py"
],
"monitor": {
"logs": {
"filter_plugins": [
{
"name": "SKASDPFilter",
"kwargs": {"pipeline": "INST"}
}
],
"consumer_plugins": [
{
"name": "CSVFile",
"kwargs": {
"file_path": "./events.csv"
}
}
]
}
}
}
EOF
With these scripts in place, we can run bash batchlet.sh. Once it has run, we can take a look at csv file.
cat ./events.csv
It outputs
timestamp,pipeline,stage,event,node,process,source,message,core
1760419311.278,INST,PREDICT,START,ska-host,MainThread,pipeline.py#13,Starting predict stage,
1760419313.281,INST,PREDICT,FINISHED,ska-host,MainThread,pipeline.py#15,Finished predict stage,
Create Custom plugins
Batchlet provides interfaces for filter and consumer plugins which need to be implemented by custom plugins. These
interfaces are part of the batchlet's ska_sdp_batchlet.plugins.plugin API documentation
Filter Plugin
One needs to implement FilterPlugin interface to custom filter plugin.
from ska_sdp_batchlet.plugins import FilterPlugin
class CustomFilterPlugin(FilterPlugin):
def __init__(self, tag_identifier) -> None:
super().__init__()
self.tag_identifier = tag_identifier
def filter(self, line: str) -> dict | None:
if is_matching(line, self.tag_identifier):
return create_event(line)
return None
While configuring the filter plugin, the path has be to provided.
"filter_plugins": [
{
"name": "CustomFilterPlugin",
"kwargs": {
"tag_identifier": "CustomEvent"
},
"path": "custom_plugins/custom_filter_plugin.py"
}
]
Consumer Plugin
One needs to implement ConsumerPlugin interface to custom consumer plugin.
from ska_sdp_batchlet.plugins import ConsumerPlugin
class CustomConsumerPlugin(ConsumerPlugin):
def __init__(self) -> None:
super().__init__()
def process(self, event: dict) -> None:
# process event
send_http(event)
Similarly to filter custom plugin, the path has be to provided.
"consumer_plugins": [
{
"name": "CustomConsumerPlugin",
"path": "custom_plugins/custom_consumer_plugin.py"
}
]