#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""Base interfaces for SKA Tango devices."""
from __future__ import annotations
import time
from enum import Enum
from typing import Any, cast
import ska_control_model
import tango.server
from packaging import version
from ska_control_model import (
AdminMode,
ControlMode,
HealthState,
SimulationMode,
TestMode,
)
from tango import AttReqType, AttrWriteType, AutoTangoMonitor, DebugIt, DevState
from .._interface_helpers import get_request_type
from ..long_running_commands import LRCReqType
from ..ska_device import SKADevice
from ..software_bus import (
NoValue,
NoValueType,
Signal,
SignalBusMixin,
attribute_from_signal,
listen_to_signal,
)
from ..software_bus._attribute import _DeprecatedAttrSignal
from ..type_hints import (
DevVarLongStringArrayType,
ReadAttrType,
SharingObserverProtocol,
)
from ..utils import _hold_tango_monitor
__all__ = [
"OpStateSignal",
"CommandedStateSignal",
"ControlLevel",
"BaseInterface",
"standard_control_mode",
"standard_simulation_mode",
"standard_test_mode",
]
[docs]
class OpStateSignal(Signal[DevState]):
"""
Special signal for Operational State.
Ensures that only valid enumerants from :py:class:`~tango.DevState` for
the SKA Operational State are emitted for the signal.
"""
[docs]
def __init__(self) -> None:
"""Initialise the signal."""
super().__init__(stored=True)
def __set__(self, obj: SharingObserverProtocol, value: DevState) -> None:
"""
Emit value on the bus for the signal.
:raises ValueError: if the value is not a valid Operational State.
"""
if value in [None, NoValue]:
raise ValueError(
f"'{self._relative_name}' does not support emitting "
f"{'NoValue' if value is NoValue else 'None'}"
)
if value not in [
DevState.DISABLE,
DevState.FAULT,
DevState.INIT,
DevState.OFF,
DevState.ON,
DevState.ALARM,
DevState.STANDBY,
DevState.UNKNOWN,
]:
raise ValueError(f"{value} is not a valid operational state.")
super().__set__(obj, value)
[docs]
class CommandedStateSignal(Signal[DevState | None]):
"""
Special signal for the Commanded State.
Ensures that only valid enumerants from :py:class:`~tango.DevState` for
the Commanded State are emitted for the signal.
"""
[docs]
def __init__(self) -> None:
"""Initialise the signal."""
super().__init__(stored=True, initial_value=None)
def __set__(
self, obj: SharingObserverProtocol, value: DevState | None | NoValueType
) -> None:
"""
Emit value on the bus for the signal.
:raises ValueError: if the value is not a valid Operational State.
"""
if value not in [
None,
NoValue,
DevState.OFF,
DevState.ON,
DevState.STANDBY,
]:
raise ValueError(f"{value} is not a valid commanded state.")
super().__set__(obj, value)
[docs]
class ControlLevel(Enum):
"""
How much control the Tango device should exert on the system under control.
This enumeration should not be consider exhaustive as, in the future, we
may add ``MONITORING_ONLY`` to this enumeration.
"""
NO_CONTACT = 1
"""The Tango device should make no contact with the system under control."""
FULL_CONTROL = 2
"""The Tango device should attempt to control the system under control fully."""
_tango_version = version.parse(tango.__version__)
def _make_cmd(meth: Any, doc_in: str) -> Any:
return tango.server.command(
meth,
doc_in=doc_in,
dtype_out="DevVarLongStringArray",
doc_out="[ResultCode][Status message or command ID]",
fisallowed=f"_BaseInterface__is_{meth.__name__}_allowed",
)
[docs]
class BaseInterface(SignalBusMixin, SKADevice):
"""
Provides the Tango interface for an SKA device with an Operational State.
This class only provides the Tango interface required for SKA Tango devices
which support an Operational State. It is up to subclasses to override various
abstract methods to provide the appropriate behaviour and to drive the Operational
State Model as appropriate, except for the initial state that is set in
:py:meth:`~init_device`.
The Operational State of an SKA Tango device is exposed as the built-in Tango
device state via :py:meth:`~tango.server.Device.set_state()`. Subclasses are
not expected to call :py:meth:`~tango.server.Device.set_state()` themselves and
should instead change the state by performing the appropriate action on the
:py:class:`~ska_control_model.OpStateModel`, which will additionally ensure that
change and archive events are sent.
The Operational State only supports a subset of the :py:class:`~tango.DevState`
enumeration, with the following interpretations:
- **INIT**: The device is initialising
- **DISABLE**: The device is not currently monitoring or controlling the system
- **UNKNOWN**: The device cannot determine the state of the system
- **OFF**: The system under control is powered off
- **STANDBY**: The system under control is in low-power standby mode
- **ON**: The system under control is powered on
- **ALARM**: The system under control is powered on and some attribute is
raising an alarm. This is typically automatically handled by Tango.
- **FAULT**: The system under control is in fault
The :py:attr:`_op_state` signal
object will raise a :py:class:`ValueError` if set to a :py:class:`~tango.DevState`
value other than those above.
:py:class:`BaseInterface` similarly provides a :py:attr:`_status` signal which will
update the built-in Tango status attribute with
:py:meth:`~tango.server.Device.set_status()` and push change and archive events
whenever set.
In addition to providing the :py:attr:`_op_state` and :py:attr:`_status` signals
:py:class:`BaseInterface` also provides the signals :py:attr:`_admin_mode`,
:py:attr:`_commanded_state` and :py:attr:`_health_state` as well as the
corresponding Tango attributes :py:attr:`adminMode`, :py:attr:`commandedState`
and :py:attr:`healthState`.
The :py:attr:`adminMode` Tango attribute is writable and should not be set by
subclasses of this interface. Subclasses will be notified when a client has
requested the device to stop/start controlling the system under control via the
:py:meth:`change_control_level` method, which they are expected to override.
The :py:attr:`commandedState` and :py:attr:`healthState` should be set by subclasses
as appropriate, using the corresponding signals.
This interface also provides optional state transition commands
:py:meth:`!Off()`, :py:meth:`!Standby()`, :py:meth:`!On()` as well as
implementations of the :py:meth:`is_Off_allowed()`,
:py:meth:`is_Standby_allowed()` and :py:meth:`is_On_allowed()` methods
which respect the Operational State machine.
Subclasses can provide an implementation for these commands by overriding the
:py:meth:`execute_Off()`, :py:meth:`execute_Standby()` and
:py:meth:`execute_On()` methods.
Additionally, the :py:meth:`!Reset()` command is required and the corresponding
:py:meth:`execute_Reset` method must be overridden by all subclasses.
"""
# In the type annotations here we seem to need to include the
# ska_control_model to make autodoc happy...
_admin_mode: Signal[ska_control_model.AdminMode] = Signal[AdminMode](
stored=True, initial_value=AdminMode.OFFLINE
)
"""Signal for the admin mode of the device.
Values are emitted for this signal whenever a client successfully changes to
the adminMode attribute.
Typically, subclasses will not need to listen to this signal and should instead
override :py:meth:`change_control_level`.
:meta public:
"""
admin_mode_model: ska_control_model.AdminModeModel
"""State model used to ensure that admin mode transitions are allowed."""
adminMode: attribute_from_signal = attribute_from_signal(
_admin_mode,
access=AttrWriteType.READ_WRITE,
dtype=AdminMode,
description=(
"The Admin Mode of the device.\n\n"
"It may interpret the current device condition and condition of all "
"managed devices to set this."
),
memorized=True,
hw_memorized=True,
)
"""
Admin mode attribute of the device.
"""
op_state_model: ska_control_model.OpStateModel
"""State model used to ensure that operational state transitions are allowed."""
_op_state: OpStateSignal = OpStateSignal()
"""Signal for the Operational State of the device.
Write to this signal to set the state of the Tango device.
:meta public:
"""
_status: Signal[str] = Signal[str](stored=True)
"""Signal for the status of the device.
Write to this signal to set the status of the Tango device.
:meta public:
"""
_commanded_state: CommandedStateSignal = CommandedStateSignal()
"""Signal for the commanded Operational State of the device.
Write to this signal whenever a command is executed which will result in
a state transition.
:meta public:
"""
commandedState: attribute_from_signal = attribute_from_signal(
_commanded_state,
dtype=str,
to_tango=str,
description=(
"The last commanded Operating State of the device.\n\n"
'Initial string is "None". Only other strings it can change to '
'is "OFF", "STANDBY" or "ON", following the Off(), Standby() or On() '
"commands. If the state transition commands are long running commands "
"the commanded state will only update when the long running command "
"starts executing."
),
)
"""
Attribute for the last commanded Operating State of the device.
This should be set by subclasses of this interface by writing to
:py:attr:`_commanded_state`.
"""
_health_state = _DeprecatedAttrSignal[HealthState]("report_health()", stored=True)
"""Signal for the health state of the device.
Writing to this signal sets the reported health state of the Tango device.
Writing to this signal directly is deprecated. Use the
:meth:`report_health` method instead.
:meta public:
"""
healthState = attribute_from_signal(
_health_state,
dtype=HealthState,
description=(
"Read the Health State of the device.\n\n"
"It interprets the current device condition and condition of "
"all managed devices to set this."
),
)
"""
Health state attribute of the device.
This should be set by subclasses of this interface by calling
:py:meth:`report_health()`.
"""
healthInfo = attribute_from_signal(
"._health_info",
dtype=(str,),
max_dim_x=256,
description=(
"Read the Health Info of the device.\n\n"
"It provides an explanation for the currently reported Health State",
),
)
"""
A list of reasons why the device has a particular health state.
Should be an empty list when the :attr:`healthState` is ``HealthState.OK``.
This should be set by subclasses of this interface by calling
:py:meth:`report_health()`.
"""
# -----------
# Init device
# -----------
[docs]
def init_device(self) -> None:
"""
Initialise the tango device after startup.
Subclasses overriding `init_device` must call :py:func:`init_completed` once
initialisation has finished.
**Example** ::
class MyDevice(BaseInterface):
def init_device(self) -> None:
super().init_device()
...
self.init_completed()
"""
super().init_device()
if not (version.parse("10.0") <= _tango_version < version.parse("10.1")):
with AutoTangoMonitor(self):
cls = type(self)
if (
cls.execute_Reset != BaseInterface.execute_Reset
and cls.Reset == BaseInterface.Reset
):
self.add_command(_make_cmd(self.Reset, "Reset the device."))
if (
cls.execute_Standby != BaseInterface.execute_Standby
and cls.Standby == BaseInterface.Standby
):
self.add_command(
_make_cmd(self.Standby, "Put the device into standby mode.")
)
if (
cls.execute_Off != BaseInterface.execute_Off
and cls.Off == BaseInterface.Off
):
self.add_command(_make_cmd(self.Off, "Turn the device off."))
if (
cls.execute_On != BaseInterface.execute_On
and cls.On == BaseInterface.On
):
self.add_command(_make_cmd(self.On, "Turn the device on."))
self.init_invoked()
[docs]
def on_new_shared_bus(self) -> None:
"""Enable manual event pushing for state and status."""
super().on_new_shared_bus()
self.set_change_event("state", True)
self.set_archive_event("state", True)
self.set_change_event("status", True)
self.set_archive_event("status", True)
self._init_state_model()
self.report_health(
HealthState.FAILED,
["Device implementation has not provided a health report"],
)
[docs]
def _init_state_model(self) -> None:
"""
Initialise the state model for the device.
:meta public:
"""
self.admin_mode_model = ska_control_model.AdminModeModel(
self.logger, self._update_admin_mode
)
self.op_state_model = ska_control_model.OpStateModel(
self.logger, self._update_state
)
[docs]
def init_invoked(self) -> None:
"""Perform 'init_invoked' action on the ``OpStateModel``."""
self.op_state_model.perform_action("init_invoked")
[docs]
def init_completed(self) -> None:
"""
Notify the state machine that initialisation has completed.
Must be called from your :py:func:`!init_device()` method.
**Example** ::
class MyDevice(BaseInterface):
def init_device(self) -> None:
super().init_device()
...
self.init_completed()
"""
self.op_state_model.perform_action("init_completed")
# -----------------------
# Component state actions
# -----------------------
[docs]
def component_disconnected(self) -> None:
"""Perform disconnected action on the ``OpStateModel``."""
self.op_state_model.perform_action("component_disconnected")
[docs]
def component_unknown(self) -> None:
"""Perform unknown state action on the ``OpStateModel``."""
self.op_state_model.perform_action("component_unknown")
[docs]
def component_off(self) -> None:
"""Perform off action on the ``OpStateModel``."""
self.op_state_model.perform_action("component_off")
[docs]
def component_standby(self) -> None:
"""Perform cstandby action on the ``OpStateModel``."""
self.op_state_model.perform_action("component_standby")
[docs]
def component_on(self) -> None:
"""Perform on action on the ``OpStateModel``."""
self.op_state_model.perform_action("component_on")
[docs]
def component_fault(self) -> None:
"""Perform fault action on the ``OpStateModel``."""
self.op_state_model.perform_action("component_fault")
[docs]
def component_no_fault(self) -> None:
"""Perform fault stopped action on the ``OpStateModel``."""
self.op_state_model.perform_action("component_no_fault")
# ---------
# Callbacks
# ---------
[docs]
def _update_state(self, state: DevState, status: str | None = None) -> None:
"""
Perform Tango operations in response to a change in op state.
This helper method is passed to the op state model as a
callback, so that the model can trigger actions in the Tango
device.
:param state: the new state value
:param status: an optional new status string
:meta public:
"""
self.logger.debug(f"OpState={repr(DevState(state))}")
self._op_state = state
self._status = status or f"The device is in {state} state."
[docs]
def _update_admin_mode(self, admin_mode: AdminMode) -> None:
"""
Update the admin mode of the device.
This helper method is passed to the admin mode model as a callback,
so that the model can trigger actions in the Tango device.
:param admin_mode: the new admin mode value
:meta public:
"""
self._admin_mode = admin_mode
@listen_to_signal("_op_state")
def __on_op_state(self, value: DevState) -> None:
"""Update the device state."""
with _hold_tango_monitor(self):
self.set_state(value)
self.push_change_event("state")
self.push_archive_event("state")
@listen_to_signal("_status")
def __on_status(self, value: str) -> None:
with _hold_tango_monitor(self):
self.set_status(value)
self.push_change_event("status")
self.push_archive_event("status")
def __read_adminMode(
self,
) -> ReadAttrType[AdminMode]:
"""Dispatch to read method to allow subclasses to override."""
return self.read_adminMode()
adminMode.read(__read_adminMode)
[docs]
def read_adminMode(self) -> ReadAttrType[AdminMode]:
"""
Read the admin mode of the device.
Subclasses can override this to change the behaviour of the
:py:obj:`adminMode` attribute.
"""
return self.__class__.adminMode.do_read(self)
def __write_adminMode(self, mode: AdminMode) -> None:
"""Dispatch to write method to allow subclasses to override."""
self.write_adminMode(mode)
adminMode.write(__write_adminMode)
[docs]
def write_adminMode(self, mode: AdminMode) -> None:
"""
Set the Admin Mode of the device.
Subclasses can override this to change the behaviour of the
:py:obj:`adminMode` attribute.
:param value: Admin Mode of the device.
:raises ValueError: for unknown adminMode
:raises StateModelError: for a disallowed transition
"""
if mode == AdminMode.NOT_FITTED:
self.admin_mode_model.perform_action("to_notfitted")
elif mode == AdminMode.OFFLINE:
self.admin_mode_model.perform_action("to_offline")
self.change_control_level(ControlLevel.NO_CONTACT)
elif mode == AdminMode.ENGINEERING:
self.admin_mode_model.perform_action("to_engineering")
self.change_control_level(ControlLevel.FULL_CONTROL)
elif mode == AdminMode.ONLINE:
self.admin_mode_model.perform_action("to_online")
self.change_control_level(ControlLevel.FULL_CONTROL)
elif mode == AdminMode.RESERVED:
self.admin_mode_model.perform_action("to_reserved")
else:
raise ValueError(f"Unknown adminMode {mode}")
[docs]
def change_control_level(self, control_level: ControlLevel) -> None:
"""
Change how the device is interacting with the system under control.
Subclasses must override this method to do the following:
- Stop all communication with the system under control when passed
:py:const:`ControlLevel.NO_CONTACT`. Once the Tango device has
stopped communicating with the system under control, the operational
state must transition to **DISABLE**.
- Start controlling the system under control when passed
:py:const:`ControlLevel.FULL_CONTROL`. Once the Tango device has started
trying to communicating with the system under control, it must transition
to a state other than **DISABLE**. If the Tango device is trying to
communicate with the system under control, but unable to, it should
transition to operational state **UNKNOWN**.
Subclasses overriding this method should not assume that the
:py:class:`ControlLevel` is exhaustive.
"""
raise NotImplementedError(
"'change_control_level' method must be implemented by "
f"'{self.__class__.__name__}'. "
"The parent 'BaseInterface' is an abstract base class."
)
def __is_adminMode_allowed(self, request_type: AttReqType) -> bool:
return self.is_adminMode_allowed(request_type)
[docs]
def is_adminMode_allowed(self, request_type: AttReqType) -> bool:
"""
Check if the adminMode can be read/written currently.
This can be overridden by subclasses to restrict when clients
can access the attribute.
"""
return True
adminMode.fisallowed = __is_adminMode_allowed
def __read_commandedState(self) -> ReadAttrType[str]:
"""Dispatch to read method to allow subclasses to override."""
return self.read_commandedState()
commandedState.read(__read_commandedState)
def __is_commandedState_allowed(self, request_type: AttReqType) -> bool:
return self.is_commandedState_allowed(request_type)
[docs]
def is_commandedState_allowed(self, request_type: AttReqType) -> bool:
"""
Check if the commandedState can be read currently.
This can be overridden by subclasses to restrict when clients
can access the attribute.
"""
return True
commandedState.fisallowed = __is_commandedState_allowed
[docs]
def read_commandedState(self) -> ReadAttrType[str]:
"""
Read the commanded state of the device.
Subclasses can override this to change the behaviour of the
:py:obj:`commandedState` attribute.
"""
return self.__class__.commandedState.do_read(self)
def __read_healthState(self) -> ReadAttrType[HealthState]:
"""Dispatch to read method to allow subclasses to override."""
return self.read_healthState()
healthState.read(__read_healthState)
def __is_healthState_allowed(self, request_type: AttReqType) -> bool:
return self.is_healthState_allowed(request_type)
[docs]
def is_healthState_allowed(self, request_type: AttReqType) -> bool:
"""
Check if the healthState can be read currently.
This can be overridden by subclasses to restrict when clients
can access the attribute.
"""
return True
healthState.fisallowed = __is_healthState_allowed
[docs]
def read_healthState(self) -> ReadAttrType[HealthState]:
"""
Read the health state of the device.
Subclasses can override this to change the behaviour of the
:py:obj:`healthState` attribute.
"""
return self.__class__.healthState.do_read(self)
def __read_healthInfo(self) -> ReadAttrType[list[str]]:
"""Dispatch to read method to allow subclasses to override."""
return self.read_healthInfo()
healthInfo.read(__read_healthInfo)
def __is_healthInfo_allowed(self, request_type: AttReqType) -> bool:
return self.is_healthInfo_allowed(request_type)
[docs]
def is_healthInfo_allowed(self, request_type: AttReqType) -> bool:
"""
Check if the healthInfo can be read currently.
This can be overridden by subclasses to restrict when clients
can access the attribute.
"""
return True
healthInfo.fisallowed = __is_healthInfo_allowed
[docs]
def read_healthInfo(self) -> ReadAttrType[list[str]]:
"""
Read the health info of the device.
Subclasses can override this to change the behaviour of the
:py:obj:`healthInfo` attribute.
"""
return self.__class__.healthInfo.do_read(self)
[docs]
def report_health(self, health_state: HealthState, health_info: list[str]) -> None:
"""
Report the health of the device.
The ``health_info`` should include all the reasons that the current device is
reporting a ``DEGRADED`` or ``FAILED`` health_state. Each element should be a
separate reason and should be as brief as possible, while still providing enough
information to aid in diagnosis.
If the ``health_state == OK``, then the `health_info`` must be an empty list.
``health_state == UNKNOWN`` is not supported from this interface. Use
``health_state == FAILED`` with a descriptive ``health_info`` instead,
e.g. "component <x> is unreachable".
:param health_state: The overall health state of the device
:param health_info: A list of reasons for the current overall health state
:raises ValueError: If ``health_state==OK`` and ``health_info != []``
:raises ValueError: If ``health_state==UNKNOWN``
"""
if health_state == HealthState.UNKNOWN:
raise ValueError(
"HealthState.UNKNOWN is not supported as a health report. "
"Use HealthState.FAILED with a descriptive health_info string."
)
if health_state == HealthState.OK and health_info:
raise ValueError("HealthState.OK must have an empty health_info.")
if health_state != HealthState.OK and not health_info:
raise ValueError(
f"HealthState.{health_state.name} must have an non-empty health_info."
)
timestamp = time.time()
quality = tango.AttrQuality.ATTR_VALID
self.shared_bus.emit(
"._health_state", (health_state, timestamp, quality), store=True
)
self.shared_bus.emit("._health_info", (health_info, timestamp, quality))
def __is_Reset_allowed(self) -> bool:
return self.is_Reset_allowed()
[docs]
def is_Reset_allowed(self, request_type: LRCReqType | None = None) -> bool:
"""
Return whether the :py:meth:`!Reset()` command may be called currently.
This method can be overridden by subclasses to change when this command
is allowed.
:return: whether the command may be called in the current device
state
"""
if (
request_type is None
and get_request_type(self, "Reset") == LRCReqType.ENQUEUE_REQ
):
return True
return self.get_state() not in [DevState.INIT, DevState.DISABLE]
[docs]
def execute_Reset(self) -> DevVarLongStringArrayType:
"""
Execute the standard Reset command.
This method must be overridden by a subclass to enable the Tango command,
otherwise the command will not be part of the device interface. The
:py:func:`~ska_tango_base.long_running_commands.decorators.submit_lrc_task`
decorator can be used to make Reset a long running command.
For a device that directly monitors and controls hardware, this
command should put that hardware into a known state, for example
by clearing buffers and loading default values into registers,
or if necessary even by power-cycling and re-initialising the
hardware.
Logical devices should generally implement this command to
perform a sensible reset of that logical device. For example,
aborting any current activities and clearing internal state.
:py:meth:`!Reset()` generally should not change the power state of the
device or its hardware:
* If invoking :py:meth:`!Reset()` from **STANDBY** state, the device would
usually be expected to remain in **STANDBY**.
* If invoking :py:meth:`!Reset()` from **ON** state, the device would
usually be expected to remain in **ON**.
* If invoking :py:meth:`!Reset()` from **FAULT** state, the device would
usually be expected to transition to **ON** or remain in
**FAULT**, depending on whether the reset was successful in
clearing then fault.
:py:meth:`!Reset()` generally should *not* propagate to subservient devices.
For example, a subsystem controller device should implement
:py:meth`!Reset()` to reset the subsystem as a whole, but that probably
should not result in all of the subsystem's hardware being
power-cycled.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
raise NotImplementedError(
"'execute_Reset' method must be implemented by "
f"'{self.__class__.__name__}'. "
"The parent 'BaseInterface' is an abstract base class."
)
[docs]
@DebugIt()
def Reset(self) -> DevVarLongStringArrayType:
"""
Reset the device.
Subclasses should override the :py:attr:`execute_Reset` method
to change the behaviour of this command.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
return self.execute_Reset()
Reset.__check_is_long_running__ = "execute_Reset"
def __is_Standby_allowed(self) -> bool:
return self.is_Standby_allowed()
[docs]
def is_Standby_allowed(self, request_type: LRCReqType | None = None) -> bool:
"""
Return whether the :py:meth:`!Standby()` command may be called currently.
This method can be overridden by subclasses to change when this command
is allowed.
:return: whether the command may be called in the current device
state
"""
if (
request_type is None
and get_request_type(self, "Standby") == LRCReqType.ENQUEUE_REQ
):
return True
return self.get_state() not in [DevState.INIT, DevState.DISABLE]
[docs]
def execute_Standby(self) -> DevVarLongStringArrayType:
"""
Execute the standard Standby command.
This method must be overridden by a subclass to enable the Tango command,
otherwise the command will not be part of the device interface. The
:py:func:`~ska_tango_base.long_running_commands.decorators.submit_lrc_task`
decorator can be used to make Reset a long running command.
If implemented, this command should put the device in standby
mode and result in the operational state transitioning to
``STANDBY``. The command should not return until the transition
has occurred.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
raise NotImplementedError(
"'execute_Standby' method must be implemented by "
f"'{self.__class__.__name__}'. "
"The parent 'BaseInterface' is an abstract base class."
)
[docs]
@DebugIt()
def Standby(self) -> DevVarLongStringArrayType:
"""
Put the device into standby mode.
Subclasses should override the :py:attr:`execute_Standby` method
to change the behaviour of this command.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
return self.execute_Standby()
Standby.__check_is_long_running__ = "execute_Standby"
def __is_Off_allowed(self) -> bool:
return self.is_Off_allowed()
[docs]
def is_Off_allowed(self, request_type: LRCReqType | None = None) -> bool:
"""
Return whether the :py:meth:`!Off()` command may be called currently.
This method can be overridden by subclasses to change when this command
is allowed.
:return: whether the command may be called in the current device
state
"""
if (
request_type is None
and get_request_type(self, "Off") == LRCReqType.ENQUEUE_REQ
):
return True
return self.get_state() not in [DevState.INIT, DevState.DISABLE]
[docs]
def execute_Off(self) -> DevVarLongStringArrayType:
"""
Execute the standard Off command.
This method must be overridden by a subclass to enable the Tango command,
otherwise the command will not be part of the device interface. The
:py:func:`~ska_tango_base.long_running_commands.decorators.submit_lrc_task`
decorator can be used to make Reset a long running command.
If implemented, this command should turn the device off and
result in the operational state transitioning to
``OFF``. The command should not return until the transition
has occurred.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
raise NotImplementedError(
f"'execute_Off' method must be implemented by '{self.__class__.__name__}'. "
"The parent 'BaseInterface' is an abstract base class."
)
[docs]
@DebugIt()
def Off(self) -> DevVarLongStringArrayType:
"""
Turn the device off.
Subclasses should override the :py:attr:`execute_Off` method
to change the behaviour of this command.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
return self.execute_Off()
Off.__check_is_long_running__ = "execute_Off"
def __is_On_allowed(self) -> bool:
return self.is_On_allowed()
[docs]
def is_On_allowed(self, request_type: LRCReqType | None = None) -> bool:
"""
Return whether the :py:meth:`!On()` command may be called currently.
This method can be overridden by subclasses to change when this command
is allowed.
:return: whether the command may be called in the current device
state
"""
if (
request_type is None
and get_request_type(self, "On") == LRCReqType.ENQUEUE_REQ
):
return True
return self.get_state() not in [DevState.INIT, DevState.DISABLE]
[docs]
def execute_On(self) -> DevVarLongStringArrayType:
"""
Execute the standard On command.
This method must be overridden by a subclass to enable the Tango command,
otherwise the command will not be part of the device interface. The
:py:func:`~ska_tango_base.long_running_commands.decorators.submit_lrc_task`
decorator can be used to make Reset a long running command.
If implemented, this command should turn the device on and
result in the operational state transitioning to
``ON`` or ``ALARM``. The command should not return until
the transition has occurred.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
raise NotImplementedError(
f"'execute_On' method must be implemented by '{self.__class__.__name__}'. "
"The parent 'BaseInterface' is an abstract base class."
)
[docs]
@DebugIt()
def On(self) -> DevVarLongStringArrayType:
"""
Turn the device on.
Subclasses should override the :py:attr:`execute_On` method
to change the behaviour of this command.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
return self.execute_On()
On.__check_is_long_running__ = "execute_On"
if version.parse("10.0") <= _tango_version < version.parse("10.1"):
Reset = _make_cmd(Reset, "Reset the device.")
On = _make_cmd(On, "Turn the device on.")
Standby = _make_cmd(Standby, "Put the device into standby mode.")
Off = _make_cmd(Off, "Turn the device off.")
[docs]
def standard_control_mode(
doc: str | None = None,
**kwargs: Any,
) -> tuple[Signal[ControlMode], attribute_from_signal]:
"""
Return a signal and attribute pair for the optional ``controlMode`` attribute.
When set to ``ControlMode.LOCAL``, the Tango device accepts commands only from a
local client and ignores commands and queries received from TM or any other remote
clients. The local client must release LOCAL control before remote clients can take
control again. The returned tuple should be unpacked into class-level signal and
attribute declarations, for example::
_control_mode, controlMode = standard_control_mode()
The attribute is writable and memorized. Override ``doc`` to provide a
device-specific description.
:param doc: Optional override for the Tango attribute description.
:param kwargs: Additional keyword arguments forwarded to
:py:func:`~ska_tango_base.software_bus.attribute_from_signal`.
:return: A ``(signal, attribute)`` tuple.
"""
signal = Signal[ControlMode](stored=True, initial_value=ControlMode.REMOTE)
if doc is None:
doc = (
"The control mode of the device.\n\n"
"A Tango Device accepts only from a 'local' client and ignores commands "
"and queries received from TM or any other 'remote' clients. The local "
"client has to release LOCAL control before REMOTE clients can take "
"control again."
)
attr = attribute_from_signal(
signal,
name="controlMode",
dtype=ControlMode,
doc=doc,
write_to_signal=True,
memorized=True,
hw_memorized=True,
**kwargs,
)
return signal, attr
[docs]
def standard_simulation_mode(
doc: str | None = None,
**kwargs: Any,
) -> tuple[Signal[SimulationMode], attribute_from_signal]:
"""
Return a signal and attribute pair for the optional ``simulationMode`` attribute.
When ``SimulationMode.TRUE``, the device is using a simulator instead of real
hardware. The returned tuple should be unpacked into class-level signal and
attribute declarations, for example::
_simulation_mode, simulationMode = standard_simulation_mode()
The attribute is writable and memorized. Override ``doc`` to provide a
device-specific description.
:param doc: Optional override for the Tango attribute description.
:param kwargs: Additional keyword arguments forwarded to
:py:func:`~ska_tango_base.software_bus.attribute_from_signal`.
:return: A ``(signal, attribute)`` tuple.
"""
signal = Signal[SimulationMode](stored=True, initial_value=SimulationMode.FALSE)
if doc is None:
doc = "When TRUE the device is using a simulator."
attr = attribute_from_signal(
signal,
doc=doc,
name="simulationMode",
dtype=SimulationMode,
write_to_signal=True,
memorized=True,
hw_memorized=True,
**kwargs,
)
return signal, attr
[docs]
def standard_test_mode(
doc: str | None = None,
**kwargs: Any,
) -> tuple[Signal[TestMode], attribute_from_signal]:
"""
Return a signal and attribute pair for the optional ``testMode`` attribute.
When ``TestMode.TEST``, the device substitutes its normal operating logic with
testing/stub logic, which is useful for integration testing without real hardware.
The returned tuple should be unpacked into class-level signal and attribute
declarations, for example::
_test_mode, testMode = standard_test_mode()
The attribute is writable and memorized. Override ``doc`` to provide a
device-specific description.
:param doc: Optional override for the Tango attribute description.
:param kwargs: Additional keyword arguments forwarded to
:py:func:`~ska_tango_base.software_bus.attribute_from_signal`.
:return: A ``(signal, attribute)`` tuple.
"""
signal = Signal[TestMode](stored=True, initial_value=TestMode.NONE)
if doc is None:
doc = "If TEST the device is using testing logic."
attr = attribute_from_signal(
signal,
doc=doc,
name="testMode",
dtype=TestMode,
write_to_signal=True,
memorized=True,
hw_memorized=True,
**kwargs,
)
return signal, attr
# ----------
# Run server
# ----------
def main(*args: str, **kwargs: str) -> int:
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return cast(int, BaseInterface.run_server(args=args or None, **kwargs))
if __name__ == "__main__":
main()