Source code for ska_mid_dish_manager.component_managers.spfrx_cm

"""Specialization for SPFRx functionality."""

import logging
import threading
from typing import Any, Callable, Optional

import tango
from ska_control_model import AdminMode, HealthState

from ska_mid_dish_manager.component_managers.tango_device_cm import TangoDeviceComponentManager
from ska_mid_dish_manager.models.dish_enums import Band, SPFRxCapabilityStates, SPFRxOperatingMode


[docs]class MonitorPing(threading.Thread): """A thread that executes SPFRx's MonitorPing command at a specified interval.""" PING_ERROR_LOG_REPEAT = 5
[docs] def __init__( self, logger: logging.Logger, interval: float, stop_event: threading.Event, device_fqdn: str, ): """Initialize the MonitorPing thread. :param logger: Logger to use for logging. :param interval: Time interval in seconds between function calls. :param stop_event: Event to signal when the thread should stop. :param device_fqdn: FQDN of the SPFRx device. """ super().__init__(name="MonitorPingThread") self._logger = logger self._interval = interval self._stop_event = stop_event self._spfrx_trl = device_fqdn self._log_counter = 0 self._device_proxy = None
[docs] def run(self) -> None: """Execute the function at regular intervals until the stop event is set.""" while not self._stop_event.is_set(): self._execute_monitor_ping() # Wait for the next interval or until stopped self._stop_event.wait(self._interval)
def _create_device_proxy(self) -> None: """Create the Tango DeviceProxy if not already created.""" if self._device_proxy is None: try: self._device_proxy = tango.DeviceProxy(self._spfrx_trl) except tango.DevFailed: pass def _execute_monitor_ping(self) -> None: """Execute MonitorPing on the SPFRx controller. self.execute_command is not used to prevent spam logs about MonitorPing. """ error_msg = { "type_error": f"DeviceProxy to {self._spfrx_trl} failed for MonitorPing", "other_errors": f"Failed to execute MonitorPing on {self._spfrx_trl}", } with tango.EnsureOmniThread(): self._create_device_proxy() try: self._device_proxy.command_inout("MonitorPing", None) # type: ignore except Exception: if self._log_counter < self.PING_ERROR_LOG_REPEAT: if self._device_proxy is None: self._logger.error(error_msg["type_error"]) else: self._logger.error(error_msg["other_errors"]) self._log_counter += 1
[docs]class SPFRxComponentManager(TangoDeviceComponentManager): """Specialization for SPFRx functionality.""" _MONITOR_PING_INTERVAL = 3 # Constant for ping interval in seconds
[docs] def __init__( self, tango_device_fqdn: str, logger: logging.Logger, state_update_lock: threading.Lock, *args: Any, communication_state_callback: Optional[Callable] = None, component_state_callback: Optional[Callable] = None, **kwargs: Any, ): monitored_attr_names = ( "operatingMode", "dataFiberCheck", "configuredBand", "healthState", "b1CapabilityState", "b2CapabilityState", "b3CapabilityState", "b4CapabilityState", "b5aCapabilityState", "b5bCapabilityState", "kValue", "noisediodemode", "periodicnoisediodepars", "pseudorandomnoisediodepars", # "adminMode", TODO: Wait for SPFRx to implement adminMode "attenuation1polhx", "attenuation1polvy", "attenuation2polhx", "attenuation2polvy", "attenuationpolhx", "attenuationpolvy", "isklocked", "spectralinversion", ) super().__init__( tango_device_fqdn, logger, monitored_attr_names, *args, communication_state_callback=communication_state_callback, component_state_callback=component_state_callback, quality_monitored_attributes=( "attenuation1polhx", "attenuation1polvy", "attenuation2polhx", "attenuation2polvy", "attenuationpolhx", "attenuationpolvy", "noisediodemode", ), **kwargs, ) self._monitor_ping_thread: Optional[MonitorPing] = None self._communication_state_lock = state_update_lock self._component_state_lock = state_update_lock self._ping_thread_stop_event = threading.Event()
def _stop_ping_thread(self) -> None: """Stop the periodic MonitorPing thread if it is running.""" if self._monitor_ping_thread and self._monitor_ping_thread.is_alive(): self._ping_thread_stop_event.set() self._monitor_ping_thread.join() # Clean up self._ping_thread_stop_event.clear() self._monitor_ping_thread = None def _start_ping_thread(self) -> None: """Start the MonitorPing thread.""" self._stop_ping_thread() # Ensure any existing ping thread is stopped self._monitor_ping_thread = MonitorPing( self.logger, self._MONITOR_PING_INTERVAL, self._ping_thread_stop_event, self._tango_device_fqdn, ) self._monitor_ping_thread.start() def _update_component_state(self, **kwargs: Any) -> None: """Update component state with proper enum conversion.""" enum_conversion = { "operatingmode": SPFRxOperatingMode, "healthstate": HealthState, "configuredband": Band, "b1capabilitystate": SPFRxCapabilityStates, "b2capabilitystate": SPFRxCapabilityStates, "b3capabilitystate": SPFRxCapabilityStates, "b4capabilitystate": SPFRxCapabilityStates, "b5acapabilitystate": SPFRxCapabilityStates, "b5bcapabilitystate": SPFRxCapabilityStates, "adminmode": AdminMode, } for attr, enum_ in enum_conversion.items(): if attr in kwargs: try: kwargs[attr] = enum_(kwargs[attr]) except ValueError: self.logger.warning(f"Invalid value for {attr} during enum conversion.") super()._update_component_state(**kwargs)
[docs] def start_communicating(self) -> None: """Start communication and initiate the periodic ping.""" super().start_communicating() self.logger.debug("Starting MonitorPing thread.") self._start_ping_thread()
[docs] def stop_communicating(self) -> None: """Stop communication and stop the periodic ping.""" self.logger.debug("Stopping MonitorPing thread.") self._stop_ping_thread() super().stop_communicating()