Source code for ska_tango_base.base.base_interface

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""Base interfaces for SKA Tango devices."""

from __future__ import annotations

from enum import Enum
from typing import Any

import ska_control_model
import tango.server
from packaging import version
from ska_control_model import (
    AdminMode,
    ControlMode,
    HealthState,
    SimulationMode,
    TestMode,
)
from tango import AttrWriteType, AutoTangoMonitor, DebugIt, DevState

from .._interface_helpers import get_request_type
from ..long_running_commands import LRCReqType
from ..ska_device import SKADevice
from ..software_bus import (
    Signal,
    SignalBusMixin,
    attribute_from_signal,
    listen_to_signal,
)
from ..type_hints import (
    DevVarLongStringArrayType,
    ReadAttrType,
    SharingObserverProtocol,
)

__all__ = [
    "OpStateSignal",
    "CommandedStateSignal",
    "ControlLevel",
    "BaseInterface",
    "standard_control_mode",
    "standard_simulation_mode",
    "standard_test_mode",
]


[docs] class OpStateSignal(Signal[DevState]): """ Special signal for Operational State. Ensures that only valid enumerants from :py:class:`~tango.DevState` for the SKA Operational State are emitted for the signal. """
[docs] def __init__(self) -> None: """Initialise the signal.""" super().__init__(stored=True)
def __set__(self, obj: SharingObserverProtocol, value: DevState) -> None: """ Emit value on the bus for the signal. :raises ValueError: if the value is not a valid Operational State. """ if value not in [ DevState.DISABLE, DevState.FAULT, DevState.INIT, DevState.OFF, DevState.ON, DevState.ALARM, DevState.STANDBY, DevState.UNKNOWN, ]: raise ValueError(f"{value} is not a valid operational state.") super().__set__(obj, value)
[docs] class CommandedStateSignal(Signal[DevState | None]): """ Special signal for the Commanded State. Ensures that only valid enumerants from :py:class:`~tango.DevState` for the Commanded State are emitted for the signal. """
[docs] def __init__(self) -> None: """Initialise the signal.""" super().__init__(stored=True, initial_value=None)
def __set__(self, obj: SharingObserverProtocol, value: DevState) -> None: """ Emit value on the bus for the signal. :raises ValueError: if the value is not a valid Operational State. """ if value is not None and value not in [ DevState.OFF, DevState.ON, DevState.STANDBY, ]: raise ValueError(f"{value} is not a valid commanded state.") super().__set__(obj, value)
[docs] class ControlLevel(Enum): """ How much control the Tango device should exert on the system under control. This enumeration should not be consider exhaustive as, in the future, we may add ``MONITORING_ONLY`` to this enumeration. """ NO_CONTACT = 1 """The Tango device should make no contact with the system under control.""" FULL_CONTROL = 2 """The Tango device should attempt to control the system under control fully."""
_tango_version = version.parse(tango.__version__) def _make_cmd(meth: Any) -> Any: return tango.server.command( meth, dtype_out="DevVarLongStringArray", doc_out="[ResultCode][message or command id]", )
[docs] class BaseInterface(SignalBusMixin, SKADevice): """ Provides the Tango interface for an SKA device with an Operational State. This class only provides the Tango interface required for SKA Tango devices which support an Operational State. It is up to subclasses to override various abstract methods to provide the appropriate behaviour and to drive the Operational State Model as appropriate, except for the initial state that is set in :py:meth:`~init_device`. The Operational State of an SKA Tango device is exposed as the built-in Tango device state via :py:meth:`~tango.server.Device.set_state()`. Subclasses are not expected to call :py:meth:`~tango.server.Device.set_state()` themselves and should instead change the state by performing the appropriate action on the :py:class:`~ska_control_model.OpStateModel`, which will additionally ensure that change and archive events are sent. The Operational State only supports a subset of the :py:class:`~tango.DevState` enumeration, with the following interpretations: - **INIT**: The device is initialising - **DISABLE**: The device is not currently monitoring or controlling the system - **UNKNOWN**: The device cannot determine the state of the system - **OFF**: The system under control is powered off - **STANDBY**: The system under control is in low-power standby mode - **ON**: The system under control is powered on - **ALARM**: The system under control is powered on and some attribute is raising an alarm. This is typically automatically handled by Tango. - **FAULT**: The system under control is in fault The :py:attr:`_op_state` signal object will raise a :py:class:`ValueError` if set to a :py:class:`~tango.DevState` value other than those above. :py:class:`BaseInterface` similarly provides a :py:attr:`_status` signal which will update the built-in Tango status attribute with :py:meth:`~tango.server.Device.set_status()` and push change and archive events whenever set. In addition to providing the :py:attr:`_op_state` and :py:attr:`_status` signals :py:class:`BaseInterface` also provides the signals :py:attr:`_admin_mode`, :py:attr:`_commanded_state` and :py:attr:`_health_state` as well as the corresponding Tango attributes :py:attr:`adminMode`, :py:attr:`commandedState` and :py:attr:`healthState`. The :py:attr:`adminMode` Tango attribute is writable and should not be set by subclasses of this interface. Subclasses will be notified when a client has requested the device to stop/start controlling the system under control via the :py:meth:`change_control_level` method, which they are expected to override. The :py:attr:`commandedState` and :py:attr:`healthState` should be set by subclasses as appropriate, using the corresponding signals. This interface also provides optional state transition commands :py:meth:`!Off()`, :py:meth:`!Standby()`, :py:meth:`!On()` as well as implementations of the :py:meth:`is_Off_allowed()`, :py:meth:`is_Standby_allowed()` and :py:meth:`is_On_allowed()` methods which respect the Operational State machine. Subclasses can provide an implementation for these commands by overriding the :py:meth:`execute_Off()`, :py:meth:`execute_Standby()` and :py:meth:`execute_On()` methods. Additionally, the :py:meth:`!Reset()` command is required and the corresponding :py:meth:`execute_Reset` method must be overridden by all subclasses. """ # In the type annotations here we seem to need to include the # ska_control_model to make autodoc happy... _admin_mode: Signal[ska_control_model.AdminMode] = Signal[AdminMode]( stored=True, initial_value=AdminMode.OFFLINE ) """Signal for the admin mode of the device. Values are emitted for this signal whenever a client successfully changes to the adminMode attribute. Typically, subclasses will not need to listen to this signal and should instead override :py:meth:`change_control_level`. :meta public: """ admin_mode_model: ska_control_model.AdminModeModel """State model used to ensure that admin mode transitions are allowed.""" adminMode: attribute_from_signal = attribute_from_signal( _admin_mode, access=AttrWriteType.READ_WRITE, dtype=AdminMode, description=( "The Admin Mode of the device. " "It may interpret the current device condition and condition of all " "managed devices to set this. Most possibly an aggregate attribute." ), memorized=True, hw_memorized=True, ) """ Admin mode attribute of the device. """ op_state_model: ska_control_model.OpStateModel """State model used to ensure that operational state transitions are allowed.""" _op_state: OpStateSignal = OpStateSignal() """Signal for the Operational State of the device. Write to this signal to set the state of the Tango device. :meta public: """ _status: Signal[str] = Signal[str](stored=True) """Signal for the status of the device. Write to this signal to set the status of the Tango device. :meta public: """ _commanded_state: CommandedStateSignal = CommandedStateSignal() """Signal for the commanded Operational State of the device. Write to this signal whenever a command is executed which will result in a state transition. :meta public: """ commandedState: attribute_from_signal = attribute_from_signal( _commanded_state, dtype=str, to_tango=str, description=( "The last commanded Operating State of the device. " 'Initial string is "None". Only other strings it can change to ' 'is "OFF", "STANDBY" or "ON", following the Off(), Standby() or On() ' "commands. If the state transition commands are long running commands " "the commanded state will only update when the long running command " "starts executing." ), ) """ Attribute for the last commanded Operating State of the device. This should be set by subclasses of this interface by writing to :py:attr:`_commanded_state`. """ _health_state = Signal[HealthState](stored=True, initial_value=HealthState.UNKNOWN) """Signal for the health state of the device. Write to this signal to set the reported health state of the Tango device. :meta public: """ healthState = attribute_from_signal( _health_state, dtype=HealthState, description=( "Read the Health State of the device. " "It interprets the current device condition and condition of " "all managed devices to set this. Most possibly an aggregate attribute. " ), ) """ Health state attribute of the device. This should be set by subclasses of this interface by writing to :py:attr:`_health_state`. """ # ----------- # Init device # -----------
[docs] def init_device(self) -> None: """ Initialise the tango device after startup. Subclasses overriding `init_device` must call :py:func:`init_completed` once initialisation has finished. **Example** :: class MyDevice(BaseInterface): def init_device(self) -> None: super().init_device() ... self.init_completed() """ super().init_device() if not (version.parse("10.0") <= _tango_version < version.parse("10.1")): with AutoTangoMonitor(self): if ( type(self).execute_Reset != BaseInterface.execute_Reset and self.Reset is not None ): self.add_command(_make_cmd(self.Reset)) if ( type(self).execute_Standby != BaseInterface.execute_Standby and self.Standby is not None ): self.add_command(_make_cmd(self.Standby)) if ( type(self).execute_Off != BaseInterface.execute_Off and self.Off is not None ): self.add_command(_make_cmd(self.Off)) if ( type(self).execute_On != BaseInterface.execute_On and self.On is not None ): self.add_command(_make_cmd(self.On)) self.init_invoked()
[docs] def on_new_shared_bus(self) -> None: """Enable manual event pushing for state and status.""" super().on_new_shared_bus() self.set_change_event("state", True) self.set_archive_event("state", True) self.set_change_event("status", True) self.set_archive_event("status", True) self._init_state_model()
[docs] def _init_state_model(self) -> None: """ Initialise the state model for the device. :meta public: """ def _admin_mode_cb(mode: AdminMode) -> None: self._admin_mode = mode self.admin_mode_model = ska_control_model.AdminModeModel( self.logger, _admin_mode_cb ) def _op_state_cb(state: DevState) -> None: self.logger.debug(f"OpState={repr(DevState(state))}") self._op_state = state self._status = f"The device is in {state} state." self.op_state_model = ska_control_model.OpStateModel(self.logger, _op_state_cb)
[docs] def init_invoked(self) -> None: """Perform 'init_invoked' action on the ``OpStateModel``.""" self.op_state_model.perform_action("init_invoked")
[docs] def init_completed(self) -> None: """ Notify the state machine that initialisation has completed. Must be called from your :py:func:`!init_device()` method. **Example** :: class MyDevice(BaseInterface): def init_device(self) -> None: super().init_device() ... self.init_completed() """ self.op_state_model.perform_action("init_completed")
[docs] def component_disconnected(self) -> None: """Perform disconnected action on the ``OpStateModel``.""" self.op_state_model.perform_action("component_disconnected")
[docs] def component_unknown(self) -> None: """Perform unknown state action on the ``OpStateModel``.""" self.op_state_model.perform_action("component_unknown")
[docs] def component_off(self) -> None: """Perform off action on the ``OpStateModel``.""" self.op_state_model.perform_action("component_off")
[docs] def component_standby(self) -> None: """Perform cstandby action on the ``OpStateModel``.""" self.op_state_model.perform_action("component_standby")
[docs] def component_on(self) -> None: """Perform on action on the ``OpStateModel``.""" self.op_state_model.perform_action("component_on")
[docs] def component_fault(self) -> None: """Perform fault action on the ``OpStateModel``.""" self.op_state_model.perform_action("component_fault")
[docs] def component_no_fault(self) -> None: """Perform fault stopped action on the ``OpStateModel``.""" self.op_state_model.perform_action("component_no_fault")
@listen_to_signal("_op_state") def __on_op_state(self, value: DevState) -> None: """Update the device state.""" self.set_state(value) self.push_change_event("state") self.push_archive_event("state") @listen_to_signal("_status") def __on_status(self, value: str) -> None: self.set_status(value) self.push_change_event("status") self.push_archive_event("status") def __read_adminMode( self, ) -> ReadAttrType[AdminMode]: """Dispatch to read method to allow subclasses to override.""" return self.read_adminMode() adminMode.read(__read_adminMode)
[docs] def read_adminMode(self) -> ReadAttrType[AdminMode]: """ Read the admin mode of the device. Subclasses can override this to change the behaviour of the :py:obj:`adminMode` attribute. """ return self.__class__.adminMode.do_read(self)
def __write_adminMode(self, mode: AdminMode) -> None: """Dispatch to write method to allow subclasses to override.""" self.write_adminMode(mode) adminMode.write(__write_adminMode)
[docs] def write_adminMode(self, mode: AdminMode) -> None: """ Set the Admin Mode of the device. Subclasses can override this to change the behaviour of the :py:obj:`adminMode` attribute. :param value: Admin Mode of the device. :raises ValueError: for unknown adminMode :raises StateModelError: for a disallowed transition """ if mode == AdminMode.NOT_FITTED: self.admin_mode_model.perform_action("to_notfitted") elif mode == AdminMode.OFFLINE: self.admin_mode_model.perform_action("to_offline") self.change_control_level(ControlLevel.NO_CONTACT) elif mode == AdminMode.ENGINEERING: self.admin_mode_model.perform_action("to_engineering") self.change_control_level(ControlLevel.FULL_CONTROL) elif mode == AdminMode.ONLINE: self.admin_mode_model.perform_action("to_online") self.change_control_level(ControlLevel.FULL_CONTROL) elif mode == AdminMode.RESERVED: self.admin_mode_model.perform_action("to_reserved") else: raise ValueError(f"Unknown adminMode {mode}")
[docs] def change_control_level(self, control_level: ControlLevel) -> None: """ Change how the device is interacting with the system under control. Subclasses must override this method to do the following: - Stop all communication with the system under control when passed :py:const:`ControlLevel.NO_CONTACT`. Once the Tango device has stopped communicating with the system under control, the operational state must transition to **DISABLE**. - Start controlling the system under control when passed :py:const:`ControlLevel.FULL_CONTROL`. Once the Tango device has started trying to communicating with the system under control, it must transition to a state other than **DISABLE**. If the Tango device is trying to communicate with the system under control, but unable to, it should transition to operational state **UNKNOWN**. Subclasses overriding this method should not assume that the :py:class:`ControlLevel` is exhaustive. """ raise NotImplementedError( "'change_control_level' method must be implemented by " f"'{self.__class__.__name__}'. " "The parent 'BaseInterface' is an abstract base class." )
def __read_commandedState(self) -> ReadAttrType[str]: """Dispatch to read method to allow subclasses to override.""" return self.read_commandedState() commandedState.read(__read_commandedState)
[docs] def read_commandedState(self) -> ReadAttrType[str]: """ Read the commanded state of the device. Subclasses can override this to change the behaviour of the :py:obj:`commandedState` attribute. """ return self.__class__.commandedState.do_read(self)
def __read_healthState(self) -> ReadAttrType[HealthState]: """Dispatch to read method to allow subclasses to override.""" return self.read_healthState() healthState.read(__read_healthState)
[docs] def read_healthState(self) -> ReadAttrType[HealthState]: """ Read the health state of the device. Subclasses can override this to change the behaviour of the :py:obj:`healthState` attribute. """ return self.__class__.healthState.do_read(self)
[docs] def is_Reset_allowed(self, request_type: LRCReqType | None = None) -> bool: """ Return whether the :py:meth:`!Reset()` command may be called currently. :return: whether the command may be called in the current device state """ if ( request_type is None and get_request_type(self, "Reset") == LRCReqType.ENQUEUE_REQ ): return True return self.get_state() not in [DevState.INIT, DevState.DISABLE]
[docs] def execute_Reset(self) -> DevVarLongStringArrayType: """ Execute the standard Reset command. This method must be overridden by a subclass to enable the Tango command, otherwise the command will not be part of the device interface. The :py:func:`~ska_tango_base.long_running_commands.decorators.submit_lrc_task` decorator can be used to make Reset a long running command. For a device that directly monitors and controls hardware, this command should put that hardware into a known state, for example by clearing buffers and loading default values into registers, or if necessary even by power-cycling and re-initialising the hardware. Logical devices should generally implement this command to perform a sensible reset of that logical device. For example, aborting any current activities and clearing internal state. :py:meth:`!Reset()` generally should not change the power state of the device or its hardware: * If invoking :py:meth:`!Reset()` from **STANDBY** state, the device would usually be expected to remain in **STANDBY**. * If invoking :py:meth:`!Reset()` from **ON** state, the device would usually be expected to remain in **ON**. * If invoking :py:meth:`!Reset()` from **FAULT** state, the device would usually be expected to transition to **ON** or remain in **FAULT**, depending on whether the reset was successful in clearing then fault. :py:meth:`!Reset()` generally should *not* propagate to subservient devices. For example, a subsystem controller device should implement :py:meth`!Reset()` to reset the subsystem as a whole, but that probably should not result in all of the subsystem's hardware being power-cycled. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ raise NotImplementedError( "'execute_Reset' method must be implemented by " f"'{self.__class__.__name__}'. " "The parent 'BaseInterface' is an abstract base class." )
[docs] @DebugIt() def Reset(self) -> DevVarLongStringArrayType: """ Reset the device. Subclasses should override the :py:attr:`execute_Reset` method to change the behaviour of this command. :return: A tuple containing a return code and a string message indicating status or a command ID. The message is for information purpose only. """ return self.execute_Reset()
Reset.__check_is_long_running__ = "execute_Reset"
[docs] def is_Standby_allowed(self, request_type: LRCReqType | None = None) -> bool: """ Return whether the :py:meth:`!Standby()` command may be called currently. :return: whether the command may be called in the current device state """ if ( request_type is None and get_request_type(self, "Standby") == LRCReqType.ENQUEUE_REQ ): return True return self.get_state() not in [DevState.INIT, DevState.DISABLE]
[docs] def execute_Standby(self) -> DevVarLongStringArrayType: """ Execute the standard Standby command. This method must be overridden by a subclass to enable the Tango command, otherwise the command will not be part of the device interface. The :py:func:`~ska_tango_base.long_running_commands.decorators.submit_lrc_task` decorator can be used to make Reset a long running command. If implemented, this command should put the device in standby mode and result in the operational state transitioning to ``STANDBY``. The command should not return until the transition has occurred. :return: A tuple containing a return code and a string message indicating status or a command ID. The message is for information purpose only. """ raise NotImplementedError( "'execute_Standby' method must be implemented by " f"'{self.__class__.__name__}'. " "The parent 'BaseInterface' is an abstract base class." )
[docs] @DebugIt() def Standby(self) -> DevVarLongStringArrayType: """ Put the device into standby mode. Subclasses should override the :py:attr:`execute_Standby` method to change the behaviour of this command. :return: A tuple containing a return code and a string message indicating status or a command ID. The message is for information purpose only. """ return self.execute_Standby()
Standby.__check_is_long_running__ = "execute_Standby"
[docs] def is_Off_allowed(self, request_type: LRCReqType | None = None) -> bool: """ Return whether the :py:meth:`!Off()` command may be called currently. :return: whether the command may be called in the current device state """ if ( request_type is None and get_request_type(self, "Off") == LRCReqType.ENQUEUE_REQ ): return True return self.get_state() not in [DevState.INIT, DevState.DISABLE]
[docs] def execute_Off(self) -> DevVarLongStringArrayType: """ Execute the standard Off command. This method must be overridden by a subclass to enable the Tango command, otherwise the command will not be part of the device interface. The :py:func:`~ska_tango_base.long_running_commands.decorators.submit_lrc_task` decorator can be used to make Reset a long running command. If implemented, this command should turn the device off and result in the operational state transitioning to ``OFF``. The command should not return until the transition has occurred. :return: A tuple containing a return code and a string message indicating status or a command ID. The message is for information purpose only. """ raise NotImplementedError( f"'execute_Off' method must be implemented by '{self.__class__.__name__}'. " "The parent 'BaseInterface' is an abstract base class." )
[docs] @DebugIt() def Off(self) -> DevVarLongStringArrayType: """ Turn the device off. Subclasses should override the :py:attr:`execute_Off` method to change the behaviour of this command. :return: A tuple containing a return code and a string message indicating status or a command ID. The message is for information purpose only. """ return self.execute_Off()
Off.__check_is_long_running__ = "execute_Off"
[docs] def is_On_allowed(self, request_type: LRCReqType | None = None) -> bool: """ Return whether the :py:meth:`!On()` command may be called currently. :return: whether the command may be called in the current device state :return: A tuple containing a return code and a string message indicating status or a command ID. The message is for information purpose only. """ if ( request_type is None and get_request_type(self, "On") == LRCReqType.ENQUEUE_REQ ): return True return self.get_state() not in [DevState.INIT, DevState.DISABLE]
[docs] def execute_On(self) -> DevVarLongStringArrayType: """ Execute the standard On command. This method must be overridden by a subclass to enable the Tango command, otherwise the command will not be part of the device interface. The :py:func:`~ska_tango_base.long_running_commands.decorators.submit_lrc_task` decorator can be used to make Reset a long running command. If implemented, this command should turn the device on and result in the operational state transitioning to ``ON`` or ``ALARM``. The command should not return until the transition has occurred. :return: A tuple containing a return code and a string message indicating status or a command ID. The message is for information purpose only. """ raise NotImplementedError( f"'execute_On' method must be implemented by '{self.__class__.__name__}'. " "The parent 'BaseInterface' is an abstract base class." )
[docs] @DebugIt() def On(self) -> DevVarLongStringArrayType: """ Turn the device on. Subclasses should override the :py:attr:`execute_On` method to change the behaviour of this command. :return: A tuple containing a return code and a string message indicating status or a command ID. The message is for information purpose only. """ return self.execute_On()
On.__check_is_long_running__ = "execute_On" if version.parse("10.0") <= _tango_version < version.parse("10.1"): Reset = _make_cmd(Reset) On = _make_cmd(On) Standby = _make_cmd(Standby) Off = _make_cmd(Off)
[docs] def standard_control_mode( doc: str | None = None, **kwargs: Any, ) -> tuple[Signal[ControlMode], attribute_from_signal]: """ Return a signal and attribute for the controlMode optional attribute. The control mode of the device are REMOTE, LOCAL Tango Device accepts only from a ‘local’ client and ignores commands and queries received from TM or any other ‘remote’ clients. The Local clients has to release LOCAL control before REMOTE clients can take control again. :return: signal, attribute tuple """ signal = Signal[ControlMode](stored=True, initial_value=ControlMode.REMOTE) if doc is None: doc = ( "The control mode of the device are REMOTE, LOCAL " "Tango Device accepts only from a ‘local’ client and ignores commands and " "queries received from TM or any other ‘remote’ clients. The Local clients " "has to release LOCAL control before REMOTE clients can take control again." ) attr = attribute_from_signal( signal, name="controlMode", dtype=ControlMode, doc=doc, write_to_signal=True, memorized=True, hw_memorized=True, **kwargs, ) return signal, attr
[docs] def standard_simulation_mode( doc: str | None = None, **kwargs: Any, ) -> tuple[Signal[SimulationMode], attribute_from_signal]: """ Return a signal and attribute for the simulationMode optional attribute. When TRUE the device is using a simulator. """ signal = Signal[SimulationMode](stored=True, initial_value=SimulationMode.FALSE) if doc is None: doc = "When TRUE the device is using a simulator" attr = attribute_from_signal( signal, doc=doc, name="simulationMode", dtype=SimulationMode, write_to_signal=True, memorized=True, hw_memorized=True, **kwargs, ) return signal, attr
[docs] def standard_test_mode( doc: str | None = None, **kwargs: Any, ) -> tuple[Signal[TestMode], attribute_from_signal]: """ Return a signal and attribute for the testMode optional attribute. If TEST the device is using testing logic. """ signal = Signal[TestMode](stored=True, initial_value=TestMode.NONE) if doc is None: doc = "If TEST the device is using testing logic" attr = attribute_from_signal( signal, doc=doc, name="testMode", dtype=TestMode, write_to_signal=True, memorized=True, hw_memorized=True, **kwargs, ) return signal, attr