#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""Interface for devices with an observation state."""
import sys
import typing
import ska_control_model as scm
import tango
from ..obs.obs_interface import standard_obs_mode
from ..software_bus import (
NoValue,
NoValueType,
SharingObserver,
Signal,
attribute_from_signal,
)
from ..type_hints import ReadAttrType, SharingObserverProtocol
from ._base_interface import BaseInterface
if sys.version_info >= (3, 11):
from typing import Unpack
else:
from typing_extensions import Unpack
__all__ = ["ObsStateEmitMixin", "ObsInterface", "standard_obs_mode"]
class ObsStateInput(typing.TypedDict, total=False):
"""Type for observation state input dictionary."""
resourced: bool
configured: bool
scanning: bool
obsfault: bool
command_in_progress: str | None
aborted: bool
[docs]
class ObsStateEmitMixin(SharingObserver):
"""
A mixin class for a SKA subarray device or its component manager.
It provides helper methods for managing the Observation State of the device.
"""
[docs]
def on_new_shared_bus(self) -> None:
"""Initialise the mixin sharing observer."""
super().on_new_shared_bus()
# TODO: Implement this cleanly in the software bus API
setattr(self.shared_bus, "_obs_state_input", {})
[docs]
def _update_obs_state(self, **kwargs: Unpack[ObsStateInput]) -> None:
"""
Emit a change in observation state.
This is a helper method for use by subclasses.
:param kwargs: key/values for state.
:meta public:
"""
if not kwargs:
raise TypeError(
f"{type(self).__name__}._update_obs_state() expects at least "
"one keyword argument."
)
for key in kwargs:
if key not in ObsStateInput.__annotations__.keys():
raise TypeError(
f"{type(self).__name__}._update_obs_state() got an "
f"unexpected keyword argument '{key}'."
)
# TODO: Need to hold a lock shared between `SharingObservers` while updating
updated_dict = getattr(self.shared_bus, "_obs_state_input") | kwargs
setattr(self.shared_bus, "_obs_state_input", updated_dict)
self.shared_bus.emit(
"._obs_state", self._combine_obs_state(updated_dict), store=True
)
@staticmethod
def _combine_obs_state(obs_state_input: ObsStateInput) -> scm.ObsState:
"""
Combine the given observation state dict into a single `ObsState`.
:param obs_state_input: the observation state input dictionary.
:return: the combined `ObsState`.
"""
obsfault = obs_state_input.get("obsfault")
command_in_progress = obs_state_input.get("command_in_progress")
aborted = obs_state_input.get("aborted")
resourced = obs_state_input.get("resourced")
configured = obs_state_input.get("configured")
scanning = obs_state_input.get("scanning")
obs_state = scm.ObsState.EMPTY
if obsfault:
obs_state = scm.ObsState.FAULT
elif command_in_progress in [
"AssignResources",
"ReleaseResources",
"ReleaseAllResources",
]:
obs_state = scm.ObsState.RESOURCING
elif command_in_progress == "Configure":
obs_state = scm.ObsState.CONFIGURING
elif command_in_progress == "Restart":
obs_state = scm.ObsState.RESTARTING
elif command_in_progress == "Abort":
obs_state = scm.ObsState.ABORTING
elif aborted:
obs_state = scm.ObsState.ABORTED
elif resourced:
if configured:
if scanning:
obs_state = scm.ObsState.SCANNING
else:
obs_state = scm.ObsState.READY
else:
obs_state = scm.ObsState.IDLE
return obs_state
[docs]
def component_scanning(self) -> None:
"""Update observation state to reflect that scanning is in progress."""
self._update_obs_state(scanning=True)
[docs]
def component_not_scanning(self) -> None:
"""Update observation state to reflect that scanning is not in progress."""
self._update_obs_state(scanning=False)
[docs]
def component_resourced(self) -> None:
"""Update observation state to reflect that resources are allocated."""
self._update_obs_state(resourced=True)
[docs]
def component_unresourced(self) -> None:
"""Update observation state to reflect that no resources are allocated."""
self._update_obs_state(resourced=False)
[docs]
def component_obsfault(self) -> None:
"""Update observation state to reflect that a fault has occurred."""
self._update_obs_state(obsfault=True)
[docs]
def component_no_obsfault(self) -> None:
"""Revert the observation state fault."""
self._update_obs_state(obsfault=False)
class CommandedObsStateSignal(Signal[scm.ObsState]):
"""
Special signal for the commanded observation state.
Ensures that only stable enumerants from :py:class:`~ska_control_model.ObsState` for
a commanded observation state are emitted for the signal.
"""
def __init__(self) -> None:
"""Initialise the signal."""
super().__init__(stored=True, initial_value=scm.ObsState.EMPTY)
def __set__(
self, obj: SharingObserverProtocol, value: scm.ObsState | None | NoValueType
) -> None:
"""
Emit value on the bus for the signal.
:raises ValueError: if the value is not a stable observation state.
"""
if value not in [
None,
NoValue,
scm.ObsState.EMPTY,
scm.ObsState.IDLE,
scm.ObsState.READY,
scm.ObsState.ABORTED,
]:
raise ValueError(f"{value} is not a stable observation state.")
super().__set__(obj, value)
[docs]
class ObsInterface(ObsStateEmitMixin, BaseInterface):
"""Provides the Tango interface for an SKA device with an Observation State."""
_obs_state: Signal[scm.ObsState] = Signal[scm.ObsState](
stored=True, initial_value=scm.ObsState.EMPTY
)
"""Signal for the observation state of the device.
Write to this signal to report the observation state of the device to Tango clients.
:meta public:
"""
obsState = attribute_from_signal(
_obs_state,
dtype=scm.ObsState,
description="The Observation State of the device.",
)
"""Observation state attribute of the device.
This should be set by subclasses of this interface by writing to
:py:attr:`_obs_state`.
"""
_commanded_obs_state: CommandedObsStateSignal = CommandedObsStateSignal()
"""Signal for the commanded observation state of the device.
Write to this signal whenever a command is executed which will result in
an obs state transition.
:meta public:
"""
commandedObsState = attribute_from_signal(
_commanded_obs_state,
dtype=scm.ObsState,
description="""
The last commanded stable Observation State of the device.
Initial value is EMPTY. The only stable states it can
change to are EMPTY, IDLE, READY or ABORTED, following the start
of any state transition command.
""",
)
"""
Attribute for the last commanded Observation State of the device.
This should be set by subclasses of this interface by writing to
:py:attr:`_commanded_obs_state`.
"""
_config_progress: Signal[int] = Signal[int](stored=True, initial_value=0)
"""Signal for the configuration progress of the device.
Write to this signal to report configuration progress to Tango clients.
:meta public:
"""
configurationProgress = attribute_from_signal(
_config_progress,
dtype="uint16",
unit="%",
max_value=100,
min_value=0,
description="The percentage configuration progress of the device.",
)
"""
Configuration progress attribute of the device.
This should be set by subclasses of this interface by writing to
:py:attr:`_config_progress`.
"""
_config_delay_expected: Signal[int] = Signal[int](stored=True, initial_value=0)
"""Signal for the configuration delay expected of the device.
Write to this signal to report the expected configuration delay to Tango clients.
:meta public:
"""
configurationDelayExpected = attribute_from_signal(
_config_delay_expected,
dtype="uint16",
unit="seconds",
description="The expected configuration delay of the device in seconds.",
)
"""
Configuration delay expected attribute of the device.
This should be set by subclasses of this interface by writing to
:py:attr:`_config_delay_expected`.
"""
# -----------------
# Attribute methods
# -----------------
def __read_obsState(self) -> ReadAttrType[scm.ObsState]:
"""Dispatch to read method to allow subclasses to override."""
return self.read_obsState()
obsState.read(__read_obsState)
[docs]
def read_obsState(self) -> ReadAttrType[scm.ObsState]:
"""
Read the observation state of the device.
Subclasses can override this to change the behaviour of the
:py:obj:`obsState` attribute.
"""
return self.__class__.obsState.do_read(self)
def __is_obsState_allowed(self, request_type: tango.AttReqType) -> bool:
return self.is_obsState_allowed(request_type)
[docs]
def is_obsState_allowed(self, request_type: tango.AttReqType) -> bool:
"""
Check if the obsState can be read currently.
This can be overridden by subclasses to restrict when clients
can access the attribute.
"""
return True
obsState.fisallowed = __is_obsState_allowed
def __read_commandedObsState(self) -> ReadAttrType[scm.ObsState]:
"""Dispatch to read method to allow subclasses to override."""
return self.read_commandedObsState()
commandedObsState.read(__read_commandedObsState)
[docs]
def read_commandedObsState(self) -> ReadAttrType[scm.ObsState]:
"""
Read the commanded observation state of the device.
Subclasses can override this to change the behaviour of the
:py:obj:`commandedObsState` attribute.
"""
return self.__class__.commandedObsState.do_read(self)
def __is_commandedObsState_allowed(self, request_type: tango.AttReqType) -> bool:
return self.is_commandedObsState_allowed(request_type)
[docs]
def is_commandedObsState_allowed(self, request_type: tango.AttReqType) -> bool:
"""
Check if the commandedObsState can be read currently.
This can be overridden by subclasses to restrict when clients
can access the attribute.
"""
return True
commandedObsState.fisallowed = __is_commandedObsState_allowed
def __read_configurationProgress(self) -> ReadAttrType[int]:
"""Dispatch to read method to allow subclasses to override."""
return self.read_configurationProgress()
configurationProgress.read(__read_configurationProgress)
[docs]
def read_configurationProgress(self) -> ReadAttrType[int]:
"""
Read the configuration progress of the device.
Subclasses can override this to change the behaviour of the
:py:obj:`configurationProgress` attribute.
"""
return self.__class__.configurationProgress.do_read(self)
def __is_configurationProgress_allowed(
self, request_type: tango.AttReqType
) -> bool:
return self.is_configurationProgress_allowed(request_type)
[docs]
def is_configurationProgress_allowed(self, request_type: tango.AttReqType) -> bool:
"""
Check if the configurationProgress can be read currently.
This can be overridden by subclasses to restrict when clients
can access the attribute.
"""
return True
configurationProgress.fisallowed = __is_configurationProgress_allowed
def __read_configurationDelayExpected(self) -> ReadAttrType[int]:
"""Dispatch to read method to allow subclasses to override."""
return self.read_configurationDelayExpected()
configurationDelayExpected.read(__read_configurationDelayExpected)
[docs]
def read_configurationDelayExpected(self) -> ReadAttrType[int]:
"""
Read the expected configuration delay of the device.
Subclasses can override this to change the behaviour of the
:py:obj:`configurationDelayExpected` attribute.
"""
return self.__class__.configurationDelayExpected.do_read(self)
def __is_configurationDelayExpected_allowed(
self, request_type: tango.AttReqType
) -> bool:
return self.is_configurationDelayExpected_allowed(request_type)
[docs]
def is_configurationDelayExpected_allowed(
self, request_type: tango.AttReqType
) -> bool:
"""
Check if the configurationDelayExpected can be read currently.
This can be overridden by subclasses to restrict when clients
can access the attribute.
"""
return True
configurationDelayExpected.fisallowed = __is_configurationDelayExpected_allowed