Source code for ska_oso_tmcsim.cbfnode

"""
Simulates the CBF subarray's processorsReadyPercent attribute, providing input
for the Subarray Quality Monitor (SQM). The SQM is a Transducer device that
subscribes to ``processorsReadyPercent`` and exposes a derived ``readyToScan``
attribute (true when ``processorsReadyPercent == 100``). In this programme
increment (PI30), ``readyToScan`` depends solely on ``processorsReadyPercent``,
but additional inputs will be added in future PIs.
"""

import logging
import threading

import tango
from ska_control_model import ObsState
from tango import AttrWriteType, DevState
from tango.server import Device, attribute, command, device_property

LOG = logging.getLogger(__name__)

# Tango devices set instance attributes in init_device(), not __init__,
# because the Tango device lifecycle calls init_device() on each startup.
# pylint: disable=attribute-defined-outside-init


[docs] def get_cbf_trl(domain: str, subarray_id: int) -> str: """ Get the TRL for a CBF simulator device. :param domain: Tango domain (e.g. "mid-tmc" or "low-tmc") :param subarray_id: Subarray ID :returns: Full TRL for the CBF simulator """ return f"sim-{domain.removesuffix('-tmc')}-cbf/subarray/{subarray_id:02}"
[docs] class CbfSimulator(Device): """ Lightweight CBF subarray simulator that publishes processorsReadyPercent via change events, with runtime control for happy/sad path testing. Auto-triggers: subscribes to SubArrayNode.obsState change events and starts the ramp when obsState transitions to READY, resets when it transitions to IDLE or EMPTY. Explicit StartRamp()/Reset() calls override the auto-trigger. """ SubarrayNodeTRL = device_property(dtype=str) """Full TRL of the SubArrayNode to subscribe to for auto-trigger.""" RampDuration = device_property(dtype=float, default_value=3.0) """Duration in seconds for StartRamp() to ramp from 0 to 100.""" _RAMP_STEP = 5
[docs] def init_device(self): """ Simulate CBF device initialisation. """ Device.init_device(self) self.set_state(DevState.ON) self._processors_ready_percent = 0 self._stop_event = threading.Event() self._ramp_thread = None self._lock = threading.Lock() self._fail_at = None self.set_change_event("processorsReadyPercent", True, False) self.push_change_event("processorsReadyPercent", 0) self._subarray_proxy = None self._obsstate_sub_id = None if self.SubarrayNodeTRL: self._subscribe_to_obsstate()
[docs] def delete_device(self): """ Tear down event subscriptions and cancel any running ramp. """ self._unsubscribe_from_obsstate() with self._lock: self._cancel_ramp()
def _subscribe_to_obsstate(self): """ Subscribe to SubArrayNode obsState change events to auto-trigger the ramp. """ try: self._subarray_proxy = tango.DeviceProxy(self.SubarrayNodeTRL) self._obsstate_sub_id = self._subarray_proxy.subscribe_event( "obsState", tango.EventType.CHANGE_EVENT, self._obsstate_callback, [], True, # stateless — retries internally until the device appears ) LOG.info( "Subscribed to obsState on %s (sub_id=%s)", self.SubarrayNodeTRL, self._obsstate_sub_id, ) except Exception: # pylint: disable=broad-exception-caught LOG.error( "Failed to subscribe to obsState on %s", self.SubarrayNodeTRL, ) def _unsubscribe_from_obsstate(self): """ Unsubscribe from the SubArrayNode obsState change event. """ if self._obsstate_sub_id is not None and self._subarray_proxy is not None: try: self._subarray_proxy.unsubscribe_event(self._obsstate_sub_id) except Exception: # pylint: disable=broad-exception-caught pass self._obsstate_sub_id = None self._subarray_proxy = None def _handle_obsstate(self, obsstate: ObsState, source: str = ""): """ Dispatch obsState transitions to ramp or reset actions. """ if obsstate == ObsState.READY: LOG.info("%sobsState -> READY, auto-starting ramp", source) self._start_ramp_internal() elif obsstate in (ObsState.IDLE, ObsState.EMPTY): LOG.info("%sobsState -> %s, auto-resetting", source, obsstate.name) self._reset_internal() def _obsstate_callback(self, event): """ Tango event callback for SubArrayNode obsState changes. """ if event.err: return if event.attr_value is None or event.attr_value.value is None: return try: obsstate = ObsState(event.attr_value.value) except ValueError: return self._handle_obsstate(obsstate) @attribute(dtype=int, access=AttrWriteType.READ) def processorsReadyPercent(self) -> int: # pylint: disable=invalid-name """ Get the current percentage of processors ready. """ return self._processors_ready_percent @command def StartRamp(self): # pylint: disable=invalid-name """ Start ramping processorsReadyPercent from 0 to 100. """ self._start_ramp_internal() @command def SetFailed(self): # pylint: disable=invalid-name """Cancel the ramp, leaving processorsReadyPercent at its current value.""" with self._lock: self._cancel_ramp() self._fail_at = None @command(dtype_in=int) def SetFailedOnNextRamp(self, percentage: int): # pylint: disable=invalid-name """ Make the next ramp stop at the given percentage instead of reaching 100. Cleared after the ramp stops or by Reset/SetFailed. """ with self._lock: self._fail_at = percentage @command def Reset(self): # pylint: disable=invalid-name """ Cancel any in-progress ramp and reset processorsReadyPercent to 0. """ self._reset_internal() @command(dtype_in=int) def SimulateObsState(self, obsstate_value: int): # pylint: disable=invalid-name """ Simulate receiving an obsState change event for testing. The in-process Tango test context does not deliver change events across devices, so this command exercises the same callback logic directly. """ try: obsstate = ObsState(obsstate_value) except ValueError: return self._handle_obsstate(obsstate, source="Simulated ") def _start_ramp_internal(self): """ Cancel any running ramp and start a new one from 0 to 100. """ with self._lock: self._cancel_ramp() self._processors_ready_percent = 0 self._stop_event.clear() self._ramp_thread = threading.Thread( target=self._ramp_worker, name=f"cbf-ramp-{self.get_name()}", daemon=True, ) self._ramp_thread.start() self.push_change_event("processorsReadyPercent", 0) def _reset_internal(self): """ Cancel any running ramp and reset processorsReadyPercent to 0. """ with self._lock: self._cancel_ramp() self._processors_ready_percent = 0 self._fail_at = None self.push_change_event("processorsReadyPercent", 0) def _cancel_ramp(self): """ Signal the ramp thread to stop and wait for it to exit. """ self._stop_event.set() if self._ramp_thread is not None and self._ramp_thread.is_alive(): self._ramp_thread.join(timeout=self.RampDuration + 1) self._ramp_thread = None self._stop_event.clear() def _ramp_worker(self): """ Increment processorsReadyPercent in steps, pushing change events at each step. """ steps = 100 // self._RAMP_STEP step_duration = self.RampDuration / steps for step in range(1, steps + 1): if self._stop_event.is_set(): return value = min(step * self._RAMP_STEP, 100) if self._fail_at is not None and value >= self._fail_at: self._processors_ready_percent = self._fail_at self.push_change_event("processorsReadyPercent", self._fail_at) self._fail_at = None return self._processors_ready_percent = value self.push_change_event("processorsReadyPercent", value) if step < steps: self._stop_event.wait(timeout=step_duration) if self._stop_event.is_set(): return