- Module code
- ska_sdp_batchlet_plugins.consumers.sdp_configuration_db
-
Source code for ska_sdp_batchlet_plugins.consumers.sdp_configuration_db
from time import time
from ska_sdp_batchlet.plugins import ConsumerPlugin
try:
from ska_sdp_config import Config
from ska_sdp_config.entity.flow import Flow
except ImportError:
pass
# pylint: disable=R0903
[docs]
class SDPConfigurationDB(ConsumerPlugin):
"""
Consumer to connect to the SDP configuration database
to update the flow states based on the received events.
"""
def __init__(
self, pb_id: str, kind: str, flow_names: list[str], **config_kwargs
):
self.config = Config(**config_kwargs)
self.pb_id = pb_id
self.kind = kind
self.flow_names = flow_names
super().__init__()
[docs]
def process(self, event: dict) -> None:
"""
Processes the event dictionary and updates flow states
in SDP Config DB.
"""
if event.get("target") != SDPConfigurationDB.__name__:
return
flow_name = event["flow_name"]
if flow_name not in self.flow_names:
return
flow_id = Flow.Key(
pb_id=self.pb_id,
kind=self.kind,
name=flow_name,
)
log = event["log"]
if log["level"] in ["ERROR", "CRITICAL"]:
flow_log_key = "error_messages"
else:
flow_log_key = "log"
for txn in self.config.txn():
flow_state = txn.flow.state(flow_id).get()
if flow_state:
flow_state["status"] = log["state"]
flow_state["last_updated"] = time()
flow_log = flow_state.get(flow_log_key, [])
flow_log.append(log)
flow_state[flow_log_key] = flow_log
txn.flow.state(flow_id).update(flow_state)