# pylint: disable=protected-access
"""
Common functions used to configure the QueueConnector
"""
import logging
from time import time
from ska_sdp_config import Config
from ska_sdp_config.config import Transaction
# Initialise logging
LOG = logging.getLogger("ska_sdp_scripting")
TIMEOUT_SEC = 60.0
[docs]
def tear_down_queue_connector(config: Config, qc_config_path: str) -> None:
"""
Function for tearing down the QueueConnector
:param config: SDP configuration client
:param qc_config_path: QueueConnector config path
"""
for txn in config.txn():
txn._delete(qc_config_path, recurse=False)
wait_for_queue_connector_state(
config, qc_config_path, None, timeout=TIMEOUT_SEC
)
[docs]
def wait_for_queue_connector_state(
config: Config, qc_config_path: str, expected_state: str, timeout: float
) -> bool:
"""
Function to wait for queue connector state.
:param config: SDP configuration client
:param qc_config_path: Path of the QueueConnector configuration (str)
:param expected_state: Expected state the
QueueConnector needs to reach (str)
:param timeout: How much time to elapse before TimeoutError
"""
state_path = f"{qc_config_path}/state"
is_ready = False
timeout_time = time() + timeout
for watcher in config.watcher(timeout=timeout):
for txn in watcher.txn():
state_dict = txn._get(state_path)
state = state_dict.get("state") if state_dict is not None else None
LOG.info("Got LMC Queue Connector state: %s", state)
if state == "FAULT":
fault_dict = txn._get(f"{qc_config_path}/fault")
fault_message = (
fault_dict.get("message")
if fault_dict is not None
else None
)
LOG.warning(
"Got LMC Queue Connector fault message: %s", fault_message
)
fault_stacktrace = (
fault_dict.get("stacktrace")
if fault_dict is not None
else None
)
LOG.debug(
"Got LMC Queue Connector fault stacktrace: %s",
fault_stacktrace,
)
elif state == expected_state:
is_ready = True
if time() > timeout_time:
raise TimeoutError(
f"Timeout waiting for {state_path} "
f"to reach {expected_state} state"
)
if is_ready:
break
return True