Source code for ska_tango_base.future._base_interface

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""Base interfaces for SKA Tango devices."""

import sys
import time
import typing

import ska_control_model as scm
import tango

from ..base.base_interface import (
    standard_simulation_mode,
    standard_test_mode,
)
from ..ska_device import SKADevice
from ..software_bus import (
    NoValue,
    SharingObserver,
    Signal,
    SignalBusMixin,
    attribute_from_signal,
    listen_to_signal,
)
from ..software_bus._attribute import LastEmittedValue
from ..type_hints import ReadAttrType, SharingObserverProtocol
from ..utils import _hold_tango_monitor

if sys.version_info >= (3, 11):
    from typing import Unpack
else:
    from typing_extensions import Unpack


__all__ = [
    "OpStateEmitMixin",
    "OpStateSignal",
    "BaseInterface",
    "standard_control_mode",
    "standard_simulation_mode",
    "standard_test_mode",
]


class _BaseOpStateEmitMixin(SharingObserver):
    """Common base for operational state emitter mixins."""

    def on_new_shared_bus(self) -> None:
        """Initialise the mixin sharing observer."""
        super().on_new_shared_bus()
        # TODO: Implement this cleanly in the software bus API
        setattr(self.shared_bus, "_op_state_input", {})

    def software_fault(self, status: str) -> None:
        """
        Update operational state to reflect that a software fault has occurred.

        :param status: the fault message to emit as the device status.
        """
        self._update_op_state(fault=status)

    def _update_op_state(self, **kwargs: typing.Any) -> None:
        raise NotImplementedError

    @staticmethod
    def _combine_op_state(**kwargs: typing.Any) -> tango.DevState:
        raise NotImplementedError

    def _do_update_op_state(
        self, valid_keys: typing.KeysView[str], update: typing.Any
    ) -> None:
        """
        Validate, merge, and emit a change in operational state.

        :param valid_keys: the set of valid key names for this mixin's `TypedDict`.
        :param update: key/value pairs to merge into the operational state.
        """
        if not update:
            raise TypeError(
                f"{type(self).__name__}._update_op_state() expects at least "
                "one keyword argument."
            )

        for key in update:
            if key not in valid_keys:
                raise TypeError(
                    f"{type(self).__name__}._update_op_state() got an "
                    f"unexpected keyword argument '{key}'."
                )

        # TODO: Need to hold a lock shared between `SharingObservers` while updating
        updated_dict = getattr(self.shared_bus, "_op_state_input") | update
        setattr(self.shared_bus, "_op_state_input", updated_dict)
        state = self._combine_op_state(**updated_dict)
        status = updated_dict.get("fault") or f"The device is in {state} state."
        self.shared_bus.emit("._op_state", state, store=True)
        self.shared_bus.emit("._status", status, store=True)


class OpStateInput(typing.TypedDict, total=False):
    """Type for operational state input dictionary."""

    initialising: bool
    fault: str | None


[docs] class OpStateEmitMixin(_BaseOpStateEmitMixin): """ A mixin class for a SKA base device or its component manager. It provides helper methods for managing the Operational State of the device. """
[docs] def _update_op_state(self, **kwargs: Unpack[OpStateInput]) -> None: # type: ignore[override] """ Emit a change in operational state. This is a helper method for use by subclasses. :param kwargs: key/values for state. :meta public: """ self._do_update_op_state(OpStateInput.__annotations__.keys(), kwargs)
@staticmethod def _combine_op_state(**kwargs: Unpack[OpStateInput]) -> tango.DevState: # type: ignore[override] """ Combine the given operational state values into a single `tango.DevState`. :param kwargs: key/values for state. :return: the combined `tango.DevState`. """ fault = kwargs.get("fault") initialising = kwargs.get("initialising") op_state = tango.DevState.ON if fault: op_state = tango.DevState.FAULT elif initialising: op_state = tango.DevState.INIT return op_state
[docs] class OpStateSignal(Signal[tango.DevState]): """ Special signal for Operational State. Ensures that only valid enumerants from :py:class:`~tango.DevState` for the SKA Operational State are emitted for the signal. """
[docs] def __init__(self) -> None: """Initialise the signal.""" super().__init__(stored=True)
def __set__(self, obj: SharingObserverProtocol, value: tango.DevState) -> None: """ Emit value on the bus for the signal. :raises ValueError: if the value is not a valid Operational State. """ if value in [None, NoValue]: raise ValueError( f"'{self._relative_name}' does not support emitting " f"{'NoValue' if value is NoValue else 'None'}" ) if value not in [ tango.DevState.INIT, tango.DevState.ON, tango.DevState.OFF, tango.DevState.STANDBY, tango.DevState.FAULT, ]: raise ValueError(f"{value} is not a valid operational state.") super().__set__(obj, value)
[docs] class BaseInterface(OpStateEmitMixin, SignalBusMixin, SKADevice): """ Provides the Tango interface for a base SKA device with an Operational State. This class only provides the Tango interface required for SKA Tango devices which support an Operational State. It is up to subclasses to override various abstract methods to provide the appropriate behaviour and set the Operational State as appropriate, except for the initial state that is set in :py:meth:`~init_device`. The Operational State of an SKA Tango device is exposed as the built-in Tango device state via :py:meth:`~tango.server.Device.set_state()`. Subclasses are not expected to call :py:meth:`~tango.server.Device.set_state()` themselves. Instead, the state is driven automatically by calling :py:meth:`~.OpStateEmitMixin._update_op_state` with ``initialising`` or ``fault`` keyword arguments, which will additionally ensure that change and archive events are sent. The Operational State of the :py:class:`BaseInterface` only supports a subset of the :py:class:`~tango.DevState` enumeration, with the following interpretations: - **INIT**: The device is initialising. Mandatory initial state for all devices. - **ON**: The Tango device is communicating with the system under control. Mandatory default state after initialisation has finished. - **FAULT**: The Tango device encountered a software fault/exception. Optional. :py:class:`BaseInterface` also provides the signals :py:attr:`_admin_mode` and :py:attr:`_health_state` as well as the corresponding Tango attributes :py:attr:`adminMode` and :py:attr:`healthState`, as well as :py:attr:`healthInfo`. The :py:attr:`adminMode` Tango attribute is writable and should not be set by subclasses of this interface. The :py:attr:`healthState` and :py:attr:`healthInfo` read-only attributes should be set by subclasses by calling :py:meth:`report_health()`. It is intended to provide information of the overall health of the device, based on the state of the component(s) it is monitoring. """ _admin_mode: Signal[scm.AdminMode] = Signal[scm.AdminMode]( stored=True, initial_value=scm.AdminMode.OFFLINE ) """Signal for the admin mode of the device. Values are emitted for this signal whenever a client successfully changes to the adminMode attribute. :meta public: """ admin_mode_model: scm.AdminModeModel """State model used to ensure that admin mode transitions are allowed.""" adminMode: attribute_from_signal = attribute_from_signal( _admin_mode, access=tango.AttrWriteType.READ_WRITE, dtype=scm.AdminMode, description=( "The Admin Mode of the device.\n\n" "It may interpret the current device condition and condition of all " "managed devices to set this." ), memorized=True, hw_memorized=True, ) """ Admin mode attribute of the device. """ _op_state: OpStateSignal = OpStateSignal() """Signal for the Operational State of the device. Write to this signal to set the state of the Tango device. :meta public: """ _status: Signal[str] = Signal[str](stored=True) """Signal for the status of the device. Write to this signal to set the status of the Tango device. :meta public: """ _health_state = LastEmittedValue[scm.HealthState]() """Signal for the health state of the device. This signal cannot be written to directly. Use the :meth:`report_health` method instead. :meta public: """ healthState = attribute_from_signal( _health_state, dtype=scm.HealthState, description=( "Read the Health State of the device.\n\n" "It interprets the current device condition and condition of " "all managed devices to set this." ), ) """ Health state attribute of the device. This should be set by subclasses of this interface by calling :py:meth:`report_health()`. """ healthInfo = attribute_from_signal( "._health_info", dtype=(str,), max_dim_x=256, description=( "Read the Health Info of the device.\n\n" "It provides an explanation for the currently reported Health State" ), ) """ A list of reasons why the device has a particular health state. Should be an empty list when the :attr:`healthState` is ``HealthState.OK``. This should be set by subclasses of this interface by calling :py:meth:`report_health()`. """ # ----------- # Init device # -----------
[docs] def init_device(self) -> None: """ Initialise the tango device after startup. Subclasses overriding :py:meth:`init_device` must call :py:meth:`init_completed` once initialisation has finished. **Example** :: class MyDevice(BaseInterface): def init_device(self) -> None: super().init_device() ... self.init_completed() """ super().init_device() self.set_change_event("state", True) self.set_archive_event("state", True) self.set_change_event("status", True) self.set_archive_event("status", True) self.init_invoked()
[docs] def init_invoked(self) -> None: """Update operational state to reflect that initialisation has started.""" self._update_op_state(initialising=True)
[docs] def init_completed(self) -> None: """ Update operational state to reflect that initialisation has completed. Must be called from your :py:func:`!init_device()` method. **Example** :: class MyDevice(BaseInterface): def init_device(self) -> None: super().init_device() ... self.init_completed() """ self._update_op_state(initialising=False)
[docs] def on_new_shared_bus(self) -> None: """Enable manual event pushing for state and status.""" super().on_new_shared_bus() self.admin_mode_model = scm.AdminModeModel(self.logger, self._update_admin_mode) self.report_health( scm.HealthState.FAILED, ["Device implementation has not provided a health report"], )
# ----------------------- # Component state changes # ----------------------- @listen_to_signal("_op_state") def __on_op_state(self, value: tango.DevState) -> None: """Update the device state.""" with _hold_tango_monitor(self): self.set_state(value) self.push_change_event("state") self.push_archive_event("state") @listen_to_signal("_status") def __on_status(self, value: str) -> None: with _hold_tango_monitor(self): self.set_status(value) self.push_change_event("status") self.push_archive_event("status")
[docs] def dev_state(self) -> tango.DevState: """Override the device state method to prevent automatic ALARM state.""" return self.get_state()
[docs] def dev_status(self) -> tango.DevState: """Override the device status method to prevent automatic ALARM state.""" return self.get_status()
# ----------------- # Attribute methods # -----------------
[docs] def _update_admin_mode(self, admin_mode: scm.AdminMode) -> None: """ Update the admin mode of the device. This helper method is passed to the admin mode model as a callback, so that the model can trigger actions in the Tango device. :param admin_mode: the new admin mode value :meta public: """ self._admin_mode = admin_mode
def __read_adminMode( self, ) -> ReadAttrType[scm.AdminMode]: """Dispatch to read method to allow subclasses to override.""" return self.read_adminMode() adminMode.read(__read_adminMode)
[docs] def read_adminMode(self) -> ReadAttrType[scm.AdminMode]: """ Read the admin mode of the device. Subclasses can override this to change the behaviour of the :py:obj:`adminMode` attribute. """ return self.__class__.adminMode.do_read(self)
def __write_adminMode(self, mode: scm.AdminMode) -> None: """Dispatch to write method to allow subclasses to override.""" self.write_adminMode(mode) adminMode.write(__write_adminMode)
[docs] def write_adminMode(self, mode: scm.AdminMode) -> None: """ Set the Admin Mode of the device. Subclasses can override this to change the behaviour of the :py:obj:`adminMode` attribute. :param mode: Admin Mode of the device. :raises ValueError: for unknown adminMode :raises StateModelError: for a disallowed transition """ if mode == scm.AdminMode.NOT_FITTED: self.admin_mode_model.perform_action("to_notfitted") elif mode == scm.AdminMode.OFFLINE: self.admin_mode_model.perform_action("to_offline") elif mode == scm.AdminMode.ENGINEERING: self.admin_mode_model.perform_action("to_engineering") elif mode == scm.AdminMode.ONLINE: self.admin_mode_model.perform_action("to_online") elif mode == scm.AdminMode.RESERVED: self.admin_mode_model.perform_action("to_reserved") else: raise ValueError(f"Unknown adminMode {mode}")
def __is_adminMode_allowed(self, request_type: tango.AttReqType) -> bool: return self.is_adminMode_allowed(request_type)
[docs] def is_adminMode_allowed(self, request_type: tango.AttReqType) -> bool: """ Check if the adminMode can be read/written currently. This can be overridden by subclasses to restrict when clients can access the attribute. """ return True
adminMode.fisallowed = __is_adminMode_allowed def __read_healthState(self) -> ReadAttrType[scm.HealthState]: """Dispatch to read method to allow subclasses to override.""" return self.read_healthState() healthState.read(__read_healthState) def __is_healthState_allowed(self, request_type: tango.AttReqType) -> bool: return self.is_healthState_allowed(request_type)
[docs] def is_healthState_allowed(self, request_type: tango.AttReqType) -> bool: """ Check if the healthState can be read currently. This can be overridden by subclasses to restrict when clients can access the attribute. """ return True
healthState.fisallowed = __is_healthState_allowed
[docs] def read_healthState(self) -> ReadAttrType[scm.HealthState]: """ Read the health state of the device. Subclasses can override this to change the behaviour of the :py:obj:`healthState` attribute. """ return self.__class__.healthState.do_read(self)
def __read_healthInfo(self) -> ReadAttrType[list[str]]: """Dispatch to read method to allow subclasses to override.""" return self.read_healthInfo() healthInfo.read(__read_healthInfo) def __is_healthInfo_allowed(self, request_type: tango.AttReqType) -> bool: return self.is_healthInfo_allowed(request_type)
[docs] def is_healthInfo_allowed(self, request_type: tango.AttReqType) -> bool: """ Check if the healthInfo can be read currently. This can be overridden by subclasses to restrict when clients can access the attribute. """ return True
healthInfo.fisallowed = __is_healthInfo_allowed
[docs] def read_healthInfo(self) -> ReadAttrType[list[str]]: """ Read the health info of the device. Subclasses can override this to change the behaviour of the :py:obj:`healthInfo` attribute. """ return self.__class__.healthInfo.do_read(self)
[docs] def report_health( self, health_state: scm.HealthState, health_info: list[str] ) -> None: """ Report the health of the device. The ``health_info`` should include all the reasons that the current device is reporting a ``DEGRADED`` or ``FAILED`` health_state. Each element should be a separate reason and should be as brief as possible, while still providing enough information to aid in diagnosis. If the ``health_state == OK``, then the ``health_info`` must be an empty list. ``health_state == UNKNOWN`` is not supported from this interface. Use ``health_state == FAILED`` with a descriptive ``health_info`` instead, e.g. "component <x> is unreachable". :param health_state: The overall health state of the device :param health_info: A list of reasons for the current overall health state :raises ValueError: If ``health_state==OK`` and ``health_info != []`` :raises ValueError: If ``health_state==UNKNOWN`` """ if health_state == scm.HealthState.UNKNOWN: raise ValueError( "HealthState.UNKNOWN is not supported as a health report. " "Use HealthState.FAILED with a descriptive health_info string." ) if health_state == scm.HealthState.OK and health_info: raise ValueError("HealthState.OK must have an empty health_info.") if health_state != scm.HealthState.OK and not health_info: raise ValueError( f"HealthState.{health_state.name} must have an non-empty health_info." ) timestamp = time.time() quality = tango.AttrQuality.ATTR_VALID self.shared_bus.emit( "._health_state", (health_state, timestamp, quality), store=True ) self.shared_bus.emit("._health_info", (health_info, timestamp, quality))
[docs] def standard_control_mode( doc: str | None = None, **kwargs: typing.Any, ) -> tuple[Signal[scm.ControlMode], attribute_from_signal]: """ Return a signal and attribute pair for the optional ``controlMode`` attribute. When set to ``ControlMode.LOCAL``, the Tango device accepts commands only from a local client and ignores commands and queries received from TM or any other remote clients. The local client must release LOCAL control before remote clients can take control again. The returned tuple should be unpacked into class-level signal and attribute declarations, for example:: _control_mode, controlMode = standard_control_mode() The attribute is writable and memorized. Override ``doc`` to provide a device-specific description. :param doc: Optional override for the Tango attribute description. :param kwargs: Additional keyword arguments forwarded to :py:func:`~ska_tango_base.software_bus.attribute_from_signal`. :return: A ``(signal, attribute)`` tuple. """ signal = Signal[scm.ControlMode](stored=True, initial_value=scm.ControlMode.REMOTE) if doc is None: doc = ( "The control mode of the device.\n\n" "A Tango Device accepts only from a 'local' client and ignores commands " "and queries received from TM or any other 'remote' clients. The local " "client has to release LOCAL control before REMOTE clients can take " "control again." ) attr = attribute_from_signal( signal, name="controlMode", dtype=scm.ControlMode, doc=doc, write_to_signal=True, memorized=True, hw_memorized=True, **kwargs, ) return signal, attr
# ---------- # Run server # ---------- def main(*args: str, **kwargs: str) -> int: """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ return typing.cast(int, BaseInterface.run_server(args=args or None, **kwargs)) if __name__ == "__main__": main()