Source code for ska_mid_cbf_fhs_common.testing.simulation

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-fhs-common project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
#
# Copyright (c) 2025 National Research Council of Canada

from __future__ import annotations

import json
import logging
from functools import partial
from queue import Queue
from threading import Event
from typing import Any, Callable, Optional

from pydantic.v1.utils import deep_update
from ska_control_model import CommunicationStatus, ObsState, PowerState, ResultCode, TaskStatus
from ska_tango_base.base.base_component_manager import TaskCallbackType
from ska_tango_base.executor.executor_component_manager import TaskExecutorComponentManager
from tango import DevState
from tango.server import Device, attribute

from ska_mid_cbf_fhs_common.state_model.fhs_obs_state import FhsObsStateMachine


class OverridesQueue(Queue):
    # Standard Queue with peek method and default values when # of items == 0
    def __init__(self, default_override: dict[Any, Any]):
        super().__init__()
        self.default_override = default_override

    def get(self):
        """Return queued override or default override if empty

        Overrides Queue method to get queued values

        Args:
            default_override: default override to return if queue is empty
        """
        if self.qsize() < 1:
            return self.default_override
        return super().get()

    def get_default(self):
        """Return set default override"""
        return self.default_override

    def peek(self):
        """Return front value from Queue or default value if Queue is empty"""
        if self.qsize() < 1:
            return self.default_override
        return self.queue[0]


class ObservedDict(dict):
    """Add subscription to standard dict, will run all provided callbacks when dict is updated"""

    def __init__(self):
        super().__init__()
        self.observer_callback_list = []

    def update(self, updates):
        """Update values and call all stored callback functions

        Overrides update method of dict class to update subscribers using
        the stored callback functions.

        Args:
            updates: dictionary values to update
        """
        super().update(updates)
        for callback in self.observer_callback_list:
            callback()

    def subscribe(self, callback):
        """Add a callback to stored callback list"""
        self.observer_callback_list.append(callback)


class OverridesQueueDict:
    # Stores OverridesQueues for every provided override
    def __init__(self, default_overrides: dict[str, Any], logger: logging.Logger):
        self.default_overrides = default_overrides
        self.queue_dict = self._build_queues(default_overrides)
        self.logger = logger
        self.logger.info(f"Default override values: {self.default_overrides}")

    def _build_queues(self, default_overrides):
        """Return a dict which contains a new OverrideQueue for each provided default override

        Args:
            default_overrides: defaults for each override
        """
        queue_dict = dict()
        for key, value in default_overrides.items():
            queue_dict[key] = OverridesQueue(value)
        return queue_dict

    def set_defaults(self, new_default_overrides):
        """Replace queue dict with a new dict which contains a new OverrideQueue for each override

        Args:
            new_default_overrides: new defaults for each override
        """
        self.queue_dict = self._build_queues(new_default_overrides)

    def get(self, identifier: str):
        """Return override from specified queue

        Args:
            identifier: override queue identifier
        """
        override = self.queue_dict[identifier].get()
        self.logger.info(f"Popping Override [{identifier}] with value: {str(override)}")
        return override

    def peek(self, identifier: str):
        """Return override from specified queue without consuming override

        Args:
            identifier: override queue identifier
        """
        return self.queue_dict[identifier].peek()

    def get_all_overrides(self):
        """Return all overrides from queues"""
        item_dict = {}
        for key, value in self.queue_dict.items():
            item_dict[key] = value.get()
        return item_dict

    def update(self, identifier, override):
        """Queue new override

        If value is of type dict, deep update the queue dict using the default values

        Args:
            identifier: override queue identifier
            override: override value to queue
        """
        self.logger.info(f"Updating Override [{identifier}] to new value: {str(override)}")
        if isinstance(override, dict):
            original_dict = self.queue_dict[identifier].get_default()
            new_dict = deep_update(original_dict, override)
            self.queue_dict[identifier].put(new_dict)
        else:
            self.queue_dict[identifier].put(override)

    def update_all(self, overrides):
        """Queue all provided overrides

        Args:
            overrides: overrides to update
        """
        for key, value in overrides.items():
            self.update(key, value)


[docs] class SimModeCMBase(TaskExecutorComponentManager): """A base class for FHS device simulation mode component managers. FHS device instances can make use component managers that inherit from this class and `FhsSimMode`-derived mixins to simulate their own behaviour. `SimModeCMBase`-derived simulation mode component managers should initialize to default behaviour and supply an interface matching the one expected of the genuine component manager that is being replaced; property getter methods are used to return `attribute_overrides` values instead of expected attributes, and `functools.partial` is used to override LRC `submit_task` methods with `sim_command`, which submits a generic LRC task that pulls from `command_overrides` values to generate expected behaviour. """ def __init__( self, *args: Any, attr_change_callback: Callable[[str, Any], None] | None = None, attr_archive_callback: Callable[[str, Any], None] | None = None, **kwargs: Any, ) -> None: """Initialize default attribute and command overrides. Requires only `logger`, `communication_state_callback` and `component_state_callback` arguments for `BaseComponentManager` initialization. """ self.state = DevState.INIT self._change_event_attrs = set() self._archive_event_attrs = set() self._attr_change_callback = attr_change_callback self._attr_archive_callback = attr_archive_callback # Supply operating state machine trigger keywords to initialize _component_state # dict in parent constructor super().__init__( *args, invoked_action=None, completed_action=None, fault=None, power=None, **kwargs, ) self.state = DevState.INIT # Setup attribute read overrides self.enum_attrs = {} self.attribute_overrides = ObservedDict() # Setup LRC method simulation self.command_overrides = ObservedDict() self.command_overrides.update( { "start_communicating": { "power_state": "ON", "communication_state": "ESTABLISHED", }, "stop_communicating": { "power_state": "UNKNOWN", "communication_state": "DISABLED", }, } ) self.command_overrides_queue_dict = OverridesQueueDict(self.command_overrides, self.logger) self.attribute_overrides_queue_dict = OverridesQueueDict(self.attribute_overrides, self.logger) self.attribute_overrides.subscribe(self.update_default_attributes) self.command_overrides.subscribe(self.update_default_commands)
[docs] def update_default_commands(self): """Rebuild command queue_dict using command_overrides value""" self.command_overrides_queue_dict.set_defaults(self.command_overrides)
[docs] def update_default_attributes(self): """Rebuild attribute queue_dict using command_overrides value""" self.attribute_overrides_queue_dict.set_defaults(self.attribute_overrides)
[docs] def get_attribute_override(self, attr_name): """Return specified attribute override from queue Args: attribute: specified attribute to return the value of """ return self.attribute_overrides_queue_dict.get(attr_name)
[docs] def get_command_override(self, command): """Return specified command override from queue Args: command: specified command to return the value of """ return self.command_overrides_queue_dict.get(command)
def _update_component_state( self, **kwargs: Any, ) -> None: """Handle a change in component state. Overrides BaseComponentManager method to add "invoked" and "completed" actions, which should occur regardless of if they have changed value since last update. Args: **kwargs (:obj:`Any`): key/values for component state """ callback_kwargs = {} with self._component_state_lock: for key, value in kwargs.items(): if key in ["invoked_action", "completed_action"] or self._component_state[key] != value: self._component_state[key] = value callback_kwargs[key] = value if callback_kwargs: self._push_component_state_update(**callback_kwargs) # ------------- # Communication # ------------- def _update_communication_power_state(self, command_name: str) -> None: """Update component communication and power state. Args: command_name (:obj:`str`): command_overrides key """ command = self.command_overrides_queue_dict.peek(command_name) power_state = command.get("power_state", None) if power_state is not None: self.logger.info(f"power_state update {power_state}") self._update_component_state(power=PowerState[power_state]) communication_state = command.get("communication_state", None) if communication_state is not None: self.logger.info(f"communication_state update {communication_state}") self._update_communication_state(communication_state=CommunicationStatus[communication_state])
[docs] def start_communicating(self, *args, **kwargs) -> None: """Simulate basic start_communicating behaviour. Updates communication state and power state. """ self.logger.info("Entering SimModeCMBase.start_communicating") self._update_communication_power_state(command_name="start_communicating")
[docs] def stop_communicating(self, *args, **kwargs) -> None: """Simulate basic stop_communicating behaviour. Updates communication state and power state. """ self.logger.info("Entering SimModeCMBase.stop_communicating") self._update_communication_power_state(command_name="stop_communicating")
# -------- # Commands # --------
[docs] def is_sim_command_allowed(self, command_name: str) -> bool: """Check if the simulated command is allowed. Args: command_name (:obj:`str`): name of the command to be simulated Returns: :obj:`bool`: True if the simulated command is allowed or allowance is not specified, otherwise False. """ self.logger.info(f"Checking if {command_name} command allowed") command = self.command_overrides_queue_dict.peek(command_name) allowed = command.get("allowed", None) if allowed is not None and not allowed: self.logger.error(f"{command_name} not allowed") return False allowed_states = command.get("allowed_states", None) if allowed_states is not None: dev_states = [DevState.names[state] for state in allowed_states] if self.state not in dev_states: self.logger.error(f"{command_name} not allowed in current state {self.state} (permitted states: {allowed_states})") return False return True
def _sim_command( self, command_name: str, task_callback: Callable, task_abort_event: Event, ) -> None: """Simulates LRC thread; triggers actions and issues callbacks that occur during asynchronous command execution. Order of operations is detailed below; note that any step may or may not occur based on command overrides set by simOverrides attribute write. 1. Trigger command "invoked" action. 2. Set simulated LRC TaskStatus to IN_PROGRESS. 3. Abort simulated LRC thread if Abort command event is set. 4. Update component state. 5. Push command return value, set simulated LRC TaskStatus to COMPLETED. 6. Trigger command "completed" action. Args: command_name (:obj:`str`): name of the command to be simulated task_callback (:obj:`Callable`): Callback function to update task status """ self.logger.info(f"{command_name} begin") command = self.command_overrides_queue_dict.get(command_name) invoked_action = command.get("invoked_action", None) if invoked_action is not None: self.logger.info(f"Command invoked action: {invoked_action}") self._update_component_state(invoked_action=invoked_action) task_callback(status=TaskStatus.IN_PROGRESS) if task_abort_event.is_set(): task_callback( status=TaskStatus.ABORTED, result=( ResultCode.ABORTED, f"{command_name} command aborted by task executor abort event.", ), ) return # Push attribute change events if specified attr_change_events = command.get("attr_change_events", None) if attr_change_events is not None: for attr_name, attr_value in attr_change_events.items(): # Pop value off of queue if unspecified if attr_value is None: attr_value = self.get_attribute_override(attr_name) # Push event if attribute is configured for change events if attr_name in self._change_event_attrs and self._attr_change_callback is not None: self._attr_change_callback(attr_name, attr_value) if attr_name in self._archive_event_attrs and self._attr_archive_callback is not None: self._attr_archive_callback(attr_name, attr_value) self._update_communication_power_state(command_name=command_name) task_callback( result=( ResultCode[command["result_code"]], command["message"], ), status=TaskStatus.COMPLETED, ) completed_action = command.get("completed_action", None) if completed_action is not None: self.logger.info(f"Command completed action: {completed_action}") self._update_component_state(completed_action=completed_action) self.logger.info(f"{command_name} end")
[docs] def sim_command( self, *args: Any, task_callback: Callable, command_name: str, **kwargs: Any, ) -> tuple[TaskStatus, str]: """Submit simulated command operation method to task executor queue. Along with the submitted _sim_command thread, provides the ability to modify LRC behaviour asynchronously to the device, thus approaching a truer, more tunable simulation of LRCs. Args: task_callback (:obj:`Callable`): Callback function to update task status command_name (:obj:`str`): name of the command to be simulated Returns: :obj:`tuple[TaskStatus, str]`: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ self.logger.info(f"Submitting simulated {command_name} command task; unused args: {args}; unused kwargs {kwargs}") return self.submit_task( self._sim_command, is_cmd_allowed=partial(self.is_sim_command_allowed, command_name), args=[command_name], task_callback=task_callback, )
[docs] class SimModeObsCMBase(SimModeCMBase): """A base class for FHS observing device simulation mode component managers.""" def __init__( self, *args: Any, **kwargs: Any, ) -> None: """Initialize default obs device attribute and command overrides.""" # Supply operating state machine trigger keywords to initialize _component_state # dict in parent constructor super().__init__( *args, configured=None, scanning=None, resourced=None, obsfault=None, **kwargs, ) self.obs_state = ObsState.IDLE # Setup LRC method simulation self.command_overrides.update( { "GoToIdle": { "allowed": True, "allowed_states": ["ON"], "allowed_obs_states": ["READY"], "result_code": "OK", "message": "GoToIdle completed OK", "invoked_action": "DECONFIGURE_INVOKED", "completed_action": "DECONFIGURE_COMPLETED", }, } ) self.__dict__["go_to_idle"] = partial(self.sim_command, command_name="GoToIdle")
[docs] def is_sim_command_allowed(self, command_name: str) -> bool: """Check if the simulated command is allowed. Adds ObsState checks. Args: command_name (:obj:`str`): name of the command to be simulated Returns: :obj:`bool`: True if the simulated command is allowed or allowance is not specified, otherwise False. """ command = self.command_overrides_queue_dict.peek(command_name) if not super().is_sim_command_allowed(command_name): return False allowed_obs_states = command.get("allowed_obs_states", None) if allowed_obs_states is not None: obs_states = [ObsState[obs_state] for obs_state in allowed_obs_states] if self.obs_state not in obs_states: self.logger.error(f"{command_name} not allowed in current obs state {self.obs_state} (permitted states: {allowed_obs_states})") return False return True
[docs] def abort_commands( self, task_callback: TaskCallbackType | None = None, ) -> tuple[TaskStatus, str]: """Abort all tasks queued & running. Args: task_callback (:obj:`TaskCallbackType | None`, optional): callback to be called whenever the status of the task changes. Default is None. """ self.logger.info("abort_commands invoked") self._update_component_state(invoked_action=FhsObsStateMachine.ABORT_INVOKED) result = super().abort_commands(task_callback) self._update_component_state(completed_action=FhsObsStateMachine.ABORT_COMPLETED) return result
[docs] class FhsSimMode(Device): """A base simulation mode mixin class for Mid.CBF FHS devices. Adds the simOverrides attribute which allows for overriding device behaviour when in simulation mode when used alongside SimModeCMBase-derived component managers. """ @attribute( dtype=str, doc="Attribute value overrides (JSON dict)", ) # type: ignore[misc] def simOverrides(self) -> str: """Read the current override configuration. Returns: :obj:`str`: JSON-encoded dictionary """ return json.dumps( { "attributes": self.component_manager.attribute_overrides_queue_dict.get_all_overrides(), "commands": self.component_manager.command_overrides_queue_dict.get_all_overrides(), } ) @simOverrides.write # type: ignore[no-redef, misc] def simOverrides(self, value_str: str) -> None: """Write new override configuration. Uses `pydantic.v1.utils.deep_update` to only update behaviour specified in the provided dictionary. Args: value_str (:obj:`str`): JSON-encoded dict of overrides """ if not self.simulation_mode: self.logger.error("Cannot override device behaviour in SimulationMode.FALSE.") return self.logger.info(f"Received new value for simOverrides: {value_str}") try: value_dict = json.loads(value_str) except json.JSONDecodeError as je: self.logger.error(f"{je}") return if "commands" in value_dict: value = value_dict["commands"] self.component_manager.command_overrides_queue_dict.update_all(value_dict["commands"]) else: self.logger.info("No command overrides provided") if "attributes" in value_dict: for attr_name, value in value_dict["attributes"].items(): # Convert to enum value if enum attribute if attr_name in self.component_manager.enum_attrs and isinstance(value, str): value = self.component_manager.enum_attrs[attr_name][value] # Update attribute self.component_manager.attribute_overrides_queue_dict.update(attr_name, value) # Update attribute if value has changed if self.component_manager.attribute_overrides_queue_dict.peek(attr_name) == value: continue else: self.logger.info("No attribute overrides provided")
[docs] class FhsObsSimMode(FhsSimMode): """A base simulation mode mixin class for Mid.CBF FHS observing devices.""" def _component_state_changed( self, invoked_action: Optional[str] = None, completed_action: Optional[str] = None, fault: Optional[bool] = None, power: Optional[PowerState] = None, ) -> None: """Callback for component manager to perform obs state model action.""" if invoked_action is not None: self.obs_state_model.perform_action(getattr(FhsObsStateMachine, invoked_action)) self._component_state_changed(fault=fault, power=power) if completed_action is not None: self.obs_state_model.perform_action(getattr(FhsObsStateMachine, completed_action))