# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-fhs-common project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
#
# Copyright (c) 2025 National Research Council of Canada
from __future__ import annotations
import json
import logging
from functools import partial
from queue import Queue
from threading import Event
from typing import Any, Callable, Optional
from pydantic.v1.utils import deep_update
from ska_control_model import CommunicationStatus, ObsState, PowerState, ResultCode, TaskStatus
from ska_tango_base.base.base_component_manager import TaskCallbackType
from ska_tango_base.executor.executor_component_manager import TaskExecutorComponentManager
from tango import DevState
from tango.server import Device, attribute
from ska_mid_cbf_fhs_common.state_model.fhs_obs_state import FhsObsStateMachine
class OverridesQueue(Queue):
# Standard Queue with peek method and default values when # of items == 0
def __init__(self, default_override: dict[Any, Any]):
super().__init__()
self.default_override = default_override
def get(self):
"""Return queued override or default override if empty
Overrides Queue method to get queued values
Args:
default_override: default override to return if queue is empty
"""
if self.qsize() < 1:
return self.default_override
return super().get()
def get_default(self):
"""Return set default override"""
return self.default_override
def peek(self):
"""Return front value from Queue or default value if Queue is empty"""
if self.qsize() < 1:
return self.default_override
return self.queue[0]
class ObservedDict(dict):
"""Add subscription to standard dict, will run all provided callbacks when dict is updated"""
def __init__(self):
super().__init__()
self.observer_callback_list = []
def update(self, updates):
"""Update values and call all stored callback functions
Overrides update method of dict class to update subscribers using
the stored callback functions.
Args:
updates: dictionary values to update
"""
super().update(updates)
for callback in self.observer_callback_list:
callback()
def subscribe(self, callback):
"""Add a callback to stored callback list"""
self.observer_callback_list.append(callback)
class OverridesQueueDict:
# Stores OverridesQueues for every provided override
def __init__(self, default_overrides: dict[str, Any], logger: logging.Logger):
self.default_overrides = default_overrides
self.queue_dict = self._build_queues(default_overrides)
self.logger = logger
self.logger.info(f"Default override values: {self.default_overrides}")
def _build_queues(self, default_overrides):
"""Return a dict which contains a new OverrideQueue for each provided default override
Args:
default_overrides: defaults for each override
"""
queue_dict = dict()
for key, value in default_overrides.items():
queue_dict[key] = OverridesQueue(value)
return queue_dict
def set_defaults(self, new_default_overrides):
"""Replace queue dict with a new dict which contains a new OverrideQueue for each override
Args:
new_default_overrides: new defaults for each override
"""
self.queue_dict = self._build_queues(new_default_overrides)
def get(self, identifier: str):
"""Return override from specified queue
Args:
identifier: override queue identifier
"""
override = self.queue_dict[identifier].get()
self.logger.info(f"Popping Override [{identifier}] with value: {str(override)}")
return override
def peek(self, identifier: str):
"""Return override from specified queue without consuming override
Args:
identifier: override queue identifier
"""
return self.queue_dict[identifier].peek()
def get_all_overrides(self):
"""Return all overrides from queues"""
item_dict = {}
for key, value in self.queue_dict.items():
item_dict[key] = value.get()
return item_dict
def update(self, identifier, override):
"""Queue new override
If value is of type dict, deep update the queue dict using the default values
Args:
identifier: override queue identifier
override: override value to queue
"""
self.logger.info(f"Updating Override [{identifier}] to new value: {str(override)}")
if isinstance(override, dict):
original_dict = self.queue_dict[identifier].get_default()
new_dict = deep_update(original_dict, override)
self.queue_dict[identifier].put(new_dict)
else:
self.queue_dict[identifier].put(override)
def update_all(self, overrides):
"""Queue all provided overrides
Args:
overrides: overrides to update
"""
for key, value in overrides.items():
self.update(key, value)
[docs]
class SimModeCMBase(TaskExecutorComponentManager):
"""A base class for FHS device simulation mode component managers.
FHS device instances can make use component managers that inherit from this
class and `FhsSimMode`-derived mixins to simulate their own behaviour.
`SimModeCMBase`-derived simulation mode component managers should initialize
to default behaviour and supply an interface matching the one expected of the
genuine component manager that is being replaced; property getter methods are
used to return `attribute_overrides` values instead of expected attributes, and
`functools.partial` is used to override LRC `submit_task` methods with `sim_command`,
which submits a generic LRC task that pulls from `command_overrides` values
to generate expected behaviour.
"""
def __init__(
self,
*args: Any,
attr_change_callback: Callable[[str, Any], None] | None = None,
attr_archive_callback: Callable[[str, Any], None] | None = None,
**kwargs: Any,
) -> None:
"""Initialize default attribute and command overrides.
Requires only `logger`, `communication_state_callback` and `component_state_callback`
arguments for `BaseComponentManager` initialization.
"""
self.state = DevState.INIT
self._change_event_attrs = set()
self._archive_event_attrs = set()
self._attr_change_callback = attr_change_callback
self._attr_archive_callback = attr_archive_callback
# Supply operating state machine trigger keywords to initialize _component_state
# dict in parent constructor
super().__init__(
*args,
invoked_action=None,
completed_action=None,
fault=None,
power=None,
**kwargs,
)
self.state = DevState.INIT
# Setup attribute read overrides
self.enum_attrs = {}
self.attribute_overrides = ObservedDict()
# Setup LRC method simulation
self.command_overrides = ObservedDict()
self.command_overrides.update(
{
"start_communicating": {
"power_state": "ON",
"communication_state": "ESTABLISHED",
},
"stop_communicating": {
"power_state": "UNKNOWN",
"communication_state": "DISABLED",
},
}
)
self.command_overrides_queue_dict = OverridesQueueDict(self.command_overrides, self.logger)
self.attribute_overrides_queue_dict = OverridesQueueDict(self.attribute_overrides, self.logger)
self.attribute_overrides.subscribe(self.update_default_attributes)
self.command_overrides.subscribe(self.update_default_commands)
[docs]
def update_default_commands(self):
"""Rebuild command queue_dict using command_overrides value"""
self.command_overrides_queue_dict.set_defaults(self.command_overrides)
[docs]
def update_default_attributes(self):
"""Rebuild attribute queue_dict using command_overrides value"""
self.attribute_overrides_queue_dict.set_defaults(self.attribute_overrides)
[docs]
def get_attribute_override(self, attr_name):
"""Return specified attribute override from queue
Args:
attribute: specified attribute to return the value of
"""
return self.attribute_overrides_queue_dict.get(attr_name)
[docs]
def get_command_override(self, command):
"""Return specified command override from queue
Args:
command: specified command to return the value of
"""
return self.command_overrides_queue_dict.get(command)
def _update_component_state(
self,
**kwargs: Any,
) -> None:
"""Handle a change in component state.
Overrides BaseComponentManager method to add "invoked" and "completed" actions,
which should occur regardless of if they have changed value since last update.
Args:
**kwargs (:obj:`Any`): key/values for component state
"""
callback_kwargs = {}
with self._component_state_lock:
for key, value in kwargs.items():
if key in ["invoked_action", "completed_action"] or self._component_state[key] != value:
self._component_state[key] = value
callback_kwargs[key] = value
if callback_kwargs:
self._push_component_state_update(**callback_kwargs)
# -------------
# Communication
# -------------
def _update_communication_power_state(self, command_name: str) -> None:
"""Update component communication and power state.
Args:
command_name (:obj:`str`): command_overrides key
"""
command = self.command_overrides_queue_dict.peek(command_name)
power_state = command.get("power_state", None)
if power_state is not None:
self.logger.info(f"power_state update {power_state}")
self._update_component_state(power=PowerState[power_state])
communication_state = command.get("communication_state", None)
if communication_state is not None:
self.logger.info(f"communication_state update {communication_state}")
self._update_communication_state(communication_state=CommunicationStatus[communication_state])
[docs]
def start_communicating(self, *args, **kwargs) -> None:
"""Simulate basic start_communicating behaviour.
Updates communication state and power state.
"""
self.logger.info("Entering SimModeCMBase.start_communicating")
self._update_communication_power_state(command_name="start_communicating")
[docs]
def stop_communicating(self, *args, **kwargs) -> None:
"""Simulate basic stop_communicating behaviour.
Updates communication state and power state.
"""
self.logger.info("Entering SimModeCMBase.stop_communicating")
self._update_communication_power_state(command_name="stop_communicating")
# --------
# Commands
# --------
[docs]
def is_sim_command_allowed(self, command_name: str) -> bool:
"""Check if the simulated command is allowed.
Args:
command_name (:obj:`str`): name of the command to be simulated
Returns:
:obj:`bool`: True if the simulated command is allowed or allowance is not specified,
otherwise False.
"""
self.logger.info(f"Checking if {command_name} command allowed")
command = self.command_overrides_queue_dict.peek(command_name)
allowed = command.get("allowed", None)
if allowed is not None and not allowed:
self.logger.error(f"{command_name} not allowed")
return False
allowed_states = command.get("allowed_states", None)
if allowed_states is not None:
dev_states = [DevState.names[state] for state in allowed_states]
if self.state not in dev_states:
self.logger.error(f"{command_name} not allowed in current state {self.state} (permitted states: {allowed_states})")
return False
return True
def _sim_command(
self,
command_name: str,
task_callback: Callable,
task_abort_event: Event,
) -> None:
"""Simulates LRC thread; triggers actions and issues callbacks that occur
during asynchronous command execution.
Order of operations is detailed below; note that any step may or may not
occur based on command overrides set by simOverrides attribute write.
1. Trigger command "invoked" action.
2. Set simulated LRC TaskStatus to IN_PROGRESS.
3. Abort simulated LRC thread if Abort command event is set.
4. Update component state.
5. Push command return value, set simulated LRC TaskStatus to COMPLETED.
6. Trigger command "completed" action.
Args:
command_name (:obj:`str`): name of the command to be simulated
task_callback (:obj:`Callable`): Callback function to update task status
"""
self.logger.info(f"{command_name} begin")
command = self.command_overrides_queue_dict.get(command_name)
invoked_action = command.get("invoked_action", None)
if invoked_action is not None:
self.logger.info(f"Command invoked action: {invoked_action}")
self._update_component_state(invoked_action=invoked_action)
task_callback(status=TaskStatus.IN_PROGRESS)
if task_abort_event.is_set():
task_callback(
status=TaskStatus.ABORTED,
result=(
ResultCode.ABORTED,
f"{command_name} command aborted by task executor abort event.",
),
)
return
# Push attribute change events if specified
attr_change_events = command.get("attr_change_events", None)
if attr_change_events is not None:
for attr_name, attr_value in attr_change_events.items():
# Pop value off of queue if unspecified
if attr_value is None:
attr_value = self.get_attribute_override(attr_name)
# Push event if attribute is configured for change events
if attr_name in self._change_event_attrs and self._attr_change_callback is not None:
self._attr_change_callback(attr_name, attr_value)
if attr_name in self._archive_event_attrs and self._attr_archive_callback is not None:
self._attr_archive_callback(attr_name, attr_value)
self._update_communication_power_state(command_name=command_name)
task_callback(
result=(
ResultCode[command["result_code"]],
command["message"],
),
status=TaskStatus.COMPLETED,
)
completed_action = command.get("completed_action", None)
if completed_action is not None:
self.logger.info(f"Command completed action: {completed_action}")
self._update_component_state(completed_action=completed_action)
self.logger.info(f"{command_name} end")
[docs]
def sim_command(
self,
*args: Any,
task_callback: Callable,
command_name: str,
**kwargs: Any,
) -> tuple[TaskStatus, str]:
"""Submit simulated command operation method to task executor queue.
Along with the submitted _sim_command thread, provides the ability to
modify LRC behaviour asynchronously to the device, thus approaching a truer,
more tunable simulation of LRCs.
Args:
task_callback (:obj:`Callable`): Callback function to update task status
command_name (:obj:`str`): name of the command to be simulated
Returns:
:obj:`tuple[TaskStatus, str]`: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
self.logger.info(f"Submitting simulated {command_name} command task; unused args: {args}; unused kwargs {kwargs}")
return self.submit_task(
self._sim_command,
is_cmd_allowed=partial(self.is_sim_command_allowed, command_name),
args=[command_name],
task_callback=task_callback,
)
[docs]
class SimModeObsCMBase(SimModeCMBase):
"""A base class for FHS observing device simulation mode component managers."""
def __init__(
self,
*args: Any,
**kwargs: Any,
) -> None:
"""Initialize default obs device attribute and command overrides."""
# Supply operating state machine trigger keywords to initialize _component_state
# dict in parent constructor
super().__init__(
*args,
configured=None,
scanning=None,
resourced=None,
obsfault=None,
**kwargs,
)
self.obs_state = ObsState.IDLE
# Setup LRC method simulation
self.command_overrides.update(
{
"GoToIdle": {
"allowed": True,
"allowed_states": ["ON"],
"allowed_obs_states": ["READY"],
"result_code": "OK",
"message": "GoToIdle completed OK",
"invoked_action": "DECONFIGURE_INVOKED",
"completed_action": "DECONFIGURE_COMPLETED",
},
}
)
self.__dict__["go_to_idle"] = partial(self.sim_command, command_name="GoToIdle")
[docs]
def is_sim_command_allowed(self, command_name: str) -> bool:
"""Check if the simulated command is allowed. Adds ObsState checks.
Args:
command_name (:obj:`str`): name of the command to be simulated
Returns:
:obj:`bool`: True if the simulated command is allowed or allowance is not specified,
otherwise False.
"""
command = self.command_overrides_queue_dict.peek(command_name)
if not super().is_sim_command_allowed(command_name):
return False
allowed_obs_states = command.get("allowed_obs_states", None)
if allowed_obs_states is not None:
obs_states = [ObsState[obs_state] for obs_state in allowed_obs_states]
if self.obs_state not in obs_states:
self.logger.error(f"{command_name} not allowed in current obs state {self.obs_state} (permitted states: {allowed_obs_states})")
return False
return True
[docs]
def abort_commands(
self,
task_callback: TaskCallbackType | None = None,
) -> tuple[TaskStatus, str]:
"""Abort all tasks queued & running.
Args:
task_callback (:obj:`TaskCallbackType | None`, optional): callback to be called whenever the status
of the task changes. Default is None.
"""
self.logger.info("abort_commands invoked")
self._update_component_state(invoked_action=FhsObsStateMachine.ABORT_INVOKED)
result = super().abort_commands(task_callback)
self._update_component_state(completed_action=FhsObsStateMachine.ABORT_COMPLETED)
return result
[docs]
class FhsSimMode(Device):
"""A base simulation mode mixin class for Mid.CBF FHS devices.
Adds the simOverrides attribute which allows for overriding device behaviour
when in simulation mode when used alongside SimModeCMBase-derived component
managers.
"""
@attribute(
dtype=str,
doc="Attribute value overrides (JSON dict)",
) # type: ignore[misc]
def simOverrides(self) -> str:
"""Read the current override configuration.
Returns:
:obj:`str`: JSON-encoded dictionary
"""
return json.dumps(
{
"attributes": self.component_manager.attribute_overrides_queue_dict.get_all_overrides(),
"commands": self.component_manager.command_overrides_queue_dict.get_all_overrides(),
}
)
@simOverrides.write # type: ignore[no-redef, misc]
def simOverrides(self, value_str: str) -> None:
"""Write new override configuration. Uses `pydantic.v1.utils.deep_update` to
only update behaviour specified in the provided dictionary.
Args:
value_str (:obj:`str`): JSON-encoded dict of overrides
"""
if not self.simulation_mode:
self.logger.error("Cannot override device behaviour in SimulationMode.FALSE.")
return
self.logger.info(f"Received new value for simOverrides: {value_str}")
try:
value_dict = json.loads(value_str)
except json.JSONDecodeError as je:
self.logger.error(f"{je}")
return
if "commands" in value_dict:
value = value_dict["commands"]
self.component_manager.command_overrides_queue_dict.update_all(value_dict["commands"])
else:
self.logger.info("No command overrides provided")
if "attributes" in value_dict:
for attr_name, value in value_dict["attributes"].items():
# Convert to enum value if enum attribute
if attr_name in self.component_manager.enum_attrs and isinstance(value, str):
value = self.component_manager.enum_attrs[attr_name][value]
# Update attribute
self.component_manager.attribute_overrides_queue_dict.update(attr_name, value)
# Update attribute if value has changed
if self.component_manager.attribute_overrides_queue_dict.peek(attr_name) == value:
continue
else:
self.logger.info("No attribute overrides provided")
[docs]
class FhsObsSimMode(FhsSimMode):
"""A base simulation mode mixin class for Mid.CBF FHS observing devices."""
def _component_state_changed(
self,
invoked_action: Optional[str] = None,
completed_action: Optional[str] = None,
fault: Optional[bool] = None,
power: Optional[PowerState] = None,
) -> None:
"""Callback for component manager to perform obs state model action."""
if invoked_action is not None:
self.obs_state_model.perform_action(getattr(FhsObsStateMachine, invoked_action))
self._component_state_changed(fault=fault, power=power)
if completed_action is not None:
self.obs_state_model.perform_action(getattr(FhsObsStateMachine, completed_action))