Source code for ska_sdp_batchlet_plugins.consumers.sdp_configuration_db

from time import time

from ska_sdp_batchlet.plugins import ConsumerPlugin

try:
    from ska_sdp_config import Config
    from ska_sdp_config.entity.flow import Flow
except ImportError:
    pass


# pylint: disable=R0903
[docs] class SDPConfigurationDB(ConsumerPlugin): """ Consumer to connect to the SDP configuration database to update the flow states based on the received events. """ def __init__( self, pb_id: str, kind: str, flow_names: list[str], **config_kwargs ): self.config = Config(**config_kwargs) self.pb_id = pb_id self.kind = kind self.flow_names = flow_names super().__init__()
[docs] def process(self, event: dict) -> None: """ Processes the event dictionary and updates flow states in SDP Config DB. """ if event.get("target") != SDPConfigurationDB.__name__: return flow_name = event["flow_name"] if flow_name not in self.flow_names: return flow_id = Flow.Key( pb_id=self.pb_id, kind=self.kind, name=flow_name, ) log = event["log"] if log["level"] in ["ERROR", "CRITICAL"]: flow_log_key = "error_messages" else: flow_log_key = "log" for txn in self.config.txn(): flow_state = txn.flow.state(flow_id).get() if flow_state: flow_state["status"] = log["state"] flow_state["last_updated"] = time() flow_log = flow_state.get(flow_log_key, []) flow_log.append(log) flow_state[flow_log_key] = flow_log txn.flow.state(flow_id).update(flow_state)