"""
Simulates the CBF subarray's processorsReadyPercent attribute, providing input
for the Subarray Quality Monitor (SQM). The SQM is a Transducer device that
subscribes to ``processorsReadyPercent`` and exposes a derived ``readyToScan``
attribute (true when ``processorsReadyPercent == 100``). In this programme
increment (PI30), ``readyToScan`` depends solely on ``processorsReadyPercent``,
but additional inputs will be added in future PIs.
"""
import logging
import threading
import tango
from ska_control_model import ObsState
from tango import AttrWriteType, DevState
from tango.server import Device, attribute, command, device_property
LOG = logging.getLogger(__name__)
# Tango devices set instance attributes in init_device(), not __init__,
# because the Tango device lifecycle calls init_device() on each startup.
# pylint: disable=attribute-defined-outside-init
[docs]
def get_cbf_trl(domain: str, subarray_id: int) -> str:
"""
Get the TRL for a CBF simulator device.
:param domain: Tango domain (e.g. "mid-tmc" or "low-tmc")
:param subarray_id: Subarray ID
:returns: Full TRL for the CBF simulator
"""
return f"sim-{domain.removesuffix('-tmc')}-cbf/subarray/{subarray_id:02}"
[docs]
class CbfSimulator(Device):
"""
Lightweight CBF subarray simulator that publishes processorsReadyPercent
via change events, with runtime control for happy/sad path testing.
Auto-triggers: subscribes to SubArrayNode.obsState change events and starts
the ramp when obsState transitions to READY, resets when it transitions to
IDLE or EMPTY. Explicit StartRamp()/Reset() calls override the
auto-trigger.
"""
SubarrayNodeTRL = device_property(dtype=str)
"""Full TRL of the SubArrayNode to subscribe to for auto-trigger."""
RampDuration = device_property(dtype=float, default_value=3.0)
"""Duration in seconds for StartRamp() to ramp from 0 to 100."""
_RAMP_STEP = 5
[docs]
def init_device(self):
"""
Simulate CBF device initialisation.
"""
Device.init_device(self)
self.set_state(DevState.ON)
self._processors_ready_percent = 0
self._stop_event = threading.Event()
self._ramp_thread = None
self._lock = threading.Lock()
self._fail_at = None
self.set_change_event("processorsReadyPercent", True, False)
self.push_change_event("processorsReadyPercent", 0)
self._subarray_proxy = None
self._obsstate_sub_id = None
if self.SubarrayNodeTRL:
self._subscribe_to_obsstate()
[docs]
def delete_device(self):
"""
Tear down event subscriptions and cancel any running ramp.
"""
self._unsubscribe_from_obsstate()
with self._lock:
self._cancel_ramp()
def _subscribe_to_obsstate(self):
"""
Subscribe to SubArrayNode obsState change events to auto-trigger the ramp.
"""
try:
self._subarray_proxy = tango.DeviceProxy(self.SubarrayNodeTRL)
self._obsstate_sub_id = self._subarray_proxy.subscribe_event(
"obsState",
tango.EventType.CHANGE_EVENT,
self._obsstate_callback,
[],
True, # stateless — retries internally until the device appears
)
LOG.info(
"Subscribed to obsState on %s (sub_id=%s)",
self.SubarrayNodeTRL,
self._obsstate_sub_id,
)
except Exception: # pylint: disable=broad-exception-caught
LOG.error(
"Failed to subscribe to obsState on %s",
self.SubarrayNodeTRL,
)
def _unsubscribe_from_obsstate(self):
"""
Unsubscribe from the SubArrayNode obsState change event.
"""
if self._obsstate_sub_id is not None and self._subarray_proxy is not None:
try:
self._subarray_proxy.unsubscribe_event(self._obsstate_sub_id)
except Exception: # pylint: disable=broad-exception-caught
pass
self._obsstate_sub_id = None
self._subarray_proxy = None
def _handle_obsstate(self, obsstate: ObsState, source: str = ""):
"""
Dispatch obsState transitions to ramp or reset actions.
"""
if obsstate == ObsState.READY:
LOG.info("%sobsState -> READY, auto-starting ramp", source)
self._start_ramp_internal()
elif obsstate in (ObsState.IDLE, ObsState.EMPTY):
LOG.info("%sobsState -> %s, auto-resetting", source, obsstate.name)
self._reset_internal()
def _obsstate_callback(self, event):
"""
Tango event callback for SubArrayNode obsState changes.
"""
if event.err:
return
if event.attr_value is None or event.attr_value.value is None:
return
try:
obsstate = ObsState(event.attr_value.value)
except ValueError:
return
self._handle_obsstate(obsstate)
@attribute(dtype=int, access=AttrWriteType.READ)
def processorsReadyPercent(self) -> int: # pylint: disable=invalid-name
"""
Get the current percentage of processors ready.
"""
return self._processors_ready_percent
@command
def StartRamp(self): # pylint: disable=invalid-name
"""
Start ramping processorsReadyPercent from 0 to 100.
"""
self._start_ramp_internal()
@command
def SetFailed(self): # pylint: disable=invalid-name
"""Cancel the ramp, leaving processorsReadyPercent at its current value."""
with self._lock:
self._cancel_ramp()
self._fail_at = None
@command(dtype_in=int)
def SetFailedOnNextRamp(self, percentage: int): # pylint: disable=invalid-name
"""
Make the next ramp stop at the given percentage instead of reaching 100.
Cleared after the ramp stops or by Reset/SetFailed.
"""
with self._lock:
self._fail_at = percentage
@command
def Reset(self): # pylint: disable=invalid-name
"""
Cancel any in-progress ramp and reset processorsReadyPercent to 0.
"""
self._reset_internal()
@command(dtype_in=int)
def SimulateObsState(self, obsstate_value: int): # pylint: disable=invalid-name
"""
Simulate receiving an obsState change event for testing.
The in-process Tango test context does not deliver change events across
devices, so this command exercises the same callback logic directly.
"""
try:
obsstate = ObsState(obsstate_value)
except ValueError:
return
self._handle_obsstate(obsstate, source="Simulated ")
def _start_ramp_internal(self):
"""
Cancel any running ramp and start a new one from 0 to 100.
"""
with self._lock:
self._cancel_ramp()
self._processors_ready_percent = 0
self._stop_event.clear()
self._ramp_thread = threading.Thread(
target=self._ramp_worker,
name=f"cbf-ramp-{self.get_name()}",
daemon=True,
)
self._ramp_thread.start()
self.push_change_event("processorsReadyPercent", 0)
def _reset_internal(self):
"""
Cancel any running ramp and reset processorsReadyPercent to 0.
"""
with self._lock:
self._cancel_ramp()
self._processors_ready_percent = 0
self._fail_at = None
self.push_change_event("processorsReadyPercent", 0)
def _cancel_ramp(self):
"""
Signal the ramp thread to stop and wait for it to exit.
"""
self._stop_event.set()
if self._ramp_thread is not None and self._ramp_thread.is_alive():
self._ramp_thread.join(timeout=self.RampDuration + 1)
self._ramp_thread = None
self._stop_event.clear()
def _ramp_worker(self):
"""
Increment processorsReadyPercent in steps, pushing change events at each step.
"""
steps = 100 // self._RAMP_STEP
step_duration = self.RampDuration / steps
for step in range(1, steps + 1):
if self._stop_event.is_set():
return
value = min(step * self._RAMP_STEP, 100)
if self._fail_at is not None and value >= self._fail_at:
self._processors_ready_percent = self._fail_at
self.push_change_event("processorsReadyPercent", self._fail_at)
self._fail_at = None
return
self._processors_ready_percent = value
self.push_change_event("processorsReadyPercent", value)
if step < steps:
self._stop_event.wait(timeout=step_duration)
if self._stop_event.is_set():
return