Source code for ska_sdp_scripting.queue_connector

# pylint: disable=protected-access
"""
Common functions used to configure the QueueConnector
"""

import logging
from time import time

from ska_sdp_config import Config
from ska_sdp_config.config import Transaction

# Initialise logging
LOG = logging.getLogger("ska_sdp_scripting")
TIMEOUT_SEC = 60.0


[docs] def configure_queue_connector( txn: Transaction, qc_config: dict, pb_id: str, eb_id: str ) -> str: """ Function to configure queue connector. :param qc_config: QueueConnector configuration string :param txn: Transaction :param pb_id: Processing block ID :param eb_id: Execution block ID :return: Configuration path string """ LOG.info("Configuring queue connector") subarray_id = txn.get_execution_block(eb_id)["subarray_id"] config_path = ( f"/component/lmc-queueconnector-{subarray_id:02}/owner/{pb_id}" ) LOG.info("The QueueConnector config path: %s", config_path) # Write to a single group path if txn._get(config_path) is None: txn._create(config_path, qc_config) else: txn._update(config_path, qc_config) return config_path
[docs] def tear_down_queue_connector(config: Config, qc_config_path: str) -> None: """ Function for tearing down the QueueConnector :param config: SDP configuration client :param qc_config_path: QueueConnector config path """ for txn in config.txn(): txn._delete(qc_config_path, recurse=False) wait_for_queue_connector_state( config, qc_config_path, None, timeout=TIMEOUT_SEC )
[docs] def wait_for_queue_connector_state( config: Config, qc_config_path: str, expected_state: str, timeout: float ) -> bool: """ Function to wait for queue connector state. :param config: SDP configuration client :param qc_config_path: Path of the QueueConnector configuration (str) :param expected_state: Expected state the QueueConnector needs to reach (str) :param timeout: How much time to elapse before TimeoutError """ state_path = f"{qc_config_path}/state" is_ready = False timeout_time = time() + timeout for watcher in config.watcher(timeout=timeout): for txn in watcher.txn(): state_dict = txn._get(state_path) state = state_dict.get("state") if state_dict is not None else None LOG.info("Got LMC Queue Connector state: %s", state) if state == "FAULT": fault_dict = txn._get(f"{qc_config_path}/fault") fault_message = ( fault_dict.get("message") if fault_dict is not None else None ) LOG.warning( "Got LMC Queue Connector fault message: %s", fault_message ) fault_stacktrace = ( fault_dict.get("stacktrace") if fault_dict is not None else None ) LOG.debug( "Got LMC Queue Connector fault stacktrace: %s", fault_stacktrace, ) elif state == expected_state: is_ready = True if time() > timeout_time: raise TimeoutError( f"Timeout waiting for {state_path} " f"to reach {expected_state} state" ) if is_ready: break return True