#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""Base interfaces for SKA Tango devices."""
import sys
import typing
import ska_control_model as scm
import tango
import tango.server
from packaging import version
from .._interface_helpers import get_request_type
from ..base.base_interface import CommandedStateSignal
from ..long_running_commands import LRCReqType
from ..software_bus import (
attribute_from_signal,
)
from ..type_hints import DevVarLongStringArrayType, ReadAttrType
from ._base_interface import BaseInterface, OpStateInput, _BaseOpStateEmitMixin
if sys.version_info >= (3, 11):
from typing import Unpack
else:
from typing_extensions import Unpack
__all__ = ["PoweredOpStateEmitMixin", "PoweredInterface"]
_tango_version = version.parse(tango.__version__)
def _make_cmd(meth: typing.Any, doc_in: str) -> typing.Any:
return tango.server.command(
meth,
doc_in=doc_in,
dtype_out="DevVarLongStringArray",
doc_out="[ResultCode][Status message or command ID]",
fisallowed=f"_PoweredInterface__is_{meth.__name__}_allowed",
)
class PoweredOpStateInput(OpStateInput, total=False):
"""Type for powered operational state input dictionary."""
power: scm.PowerState
[docs]
class PoweredOpStateEmitMixin(_BaseOpStateEmitMixin):
"""
A mixin class for a SKA powered device or its component manager.
It provides helper methods for managing the Operational State of the device.
"""
[docs]
def component_off(self) -> None:
"""Update device operational state to off."""
self._update_op_state(power=scm.PowerState.OFF)
[docs]
def component_standby(self) -> None:
"""Update device operational state to standby."""
self._update_op_state(power=scm.PowerState.STANDBY)
[docs]
def component_on(self) -> None:
"""Update device operational state to on."""
self._update_op_state(power=scm.PowerState.ON)
[docs]
def _update_op_state(self, **kwargs: Unpack[PoweredOpStateInput]) -> None: # type: ignore[override]
"""
Emit a change in operational state.
This is a helper method for use by subclasses.
:param kwargs: key/values for state.
:meta public:
"""
self._do_update_op_state(PoweredOpStateInput.__annotations__.keys(), kwargs)
@staticmethod
def _combine_op_state(**kwargs: Unpack[PoweredOpStateInput]) -> tango.DevState: # type: ignore[override]
"""
Combine the given operational state values into a single `tango.DevState`.
:param kwargs: key/values for state.
:return: the combined `tango.DevState`.
"""
fault = kwargs.get("fault")
initialising = kwargs.get("initialising")
power_state = kwargs.get("power")
if fault:
op_state = tango.DevState.FAULT
elif initialising:
op_state = tango.DevState.INIT
elif power_state == scm.PowerState.ON:
op_state = tango.DevState.ON
elif power_state == scm.PowerState.STANDBY:
op_state = tango.DevState.STANDBY
else:
op_state = tango.DevState.OFF
return op_state
[docs]
class PoweredInterface(PoweredOpStateEmitMixin, BaseInterface):
"""
Provides the Tango interface for an SKA device that controls a powered component.
This class extends :py:class:`~.future._base_interface.BaseInterface`
with support for power state transitions (ON, OFF, STANDBY) driven by the component
under control. It is up to subclasses to override various abstract methods to
provide the appropriate behaviour and set the Operational State as appropriate,
except for the initial state that is set in :py:meth:`~init_device`.
The Operational State of an SKA Tango device is exposed as the built-in Tango
device state via :py:meth:`~tango.server.Device.set_state()`. Subclasses are
not expected to call :py:meth:`~tango.server.Device.set_state()` themselves.
Instead, the state is driven automatically by calling
:py:meth:`~.PoweredOpStateEmitMixin._update_op_state`
with a ``power`` keyword argument (a :py:class:`~ska_control_model.PowerState`
value), which will additionally ensure that change and archive events are sent.
The Operational State of the :py:class:`PoweredInterface` only supports a subset of
the :py:class:`~tango.DevState` enumeration, with the following interpretations:
- **INIT**: The device is initialising. Mandatory initial state for all devices.
- **ON**: The system under control is powered on. Mandatory default state if the
device is operational.
- **OFF**: The system under control is powered off. Optional.
- **STANDBY**: The system under control is in low-power standby mode. Optional.
- **FAULT**: The system under control is in fault. Optional.
:py:class:`PoweredInterface` also provides the signals
:py:attr:`~.future._base_interface.BaseInterface._admin_mode`,
:py:attr:`_commanded_state` and
:py:attr:`~.future._base_interface.BaseInterface._health_state` as well as the
corresponding Tango attributes
:py:attr:`~.future._base_interface.BaseInterface.adminMode`,
:py:attr:`commandedState`
and :py:attr:`~.future._base_interface.BaseInterface.healthState`,
as well as :py:attr:`~.future._base_interface.BaseInterface.healthInfo`.
The :py:attr:`~.future._base_interface.BaseInterface.adminMode` Tango attribute is
writable and should not be set by subclasses of this interface.
The :py:attr:`commandedState` should be set by subclasses as appropriate, using the
corresponding signal.
The :py:attr:`~.future._base_interface.BaseInterface.healthState` and
:py:attr:`~.future._base_interface.BaseInterface.healthInfo` read-only attributes
should be set by subclasses by calling
:py:meth:`~.future._base_interface.BaseInterface.report_health`.
This interface also provides optional state transition commands
:py:meth:`!Off()`, :py:meth:`!Standby()`, :py:meth:`!On()` as well as
implementations of the :py:meth:`is_Off_allowed()`,
:py:meth:`is_Standby_allowed()` and :py:meth:`is_On_allowed()` methods
which respect the Operational State machine.
Subclasses can provide an implementation for these commands by overriding the
:py:meth:`execute_Off()`, :py:meth:`execute_Standby()` and
:py:meth:`execute_On()` methods.
"""
_commanded_state: CommandedStateSignal = CommandedStateSignal()
"""Signal for the commanded Operational State of the device.
Write to this signal whenever a command is executed which will result in
a state transition.
:meta public:
"""
commandedState: attribute_from_signal = attribute_from_signal(
_commanded_state,
dtype=str,
to_tango=str,
description=(
"The last commanded Operating State of the device.\n\n"
'Initial string is "None". Only other strings it can change to '
'is "OFF", "STANDBY" or "ON", following the Off(), Standby() or On() '
"commands. If the state transition commands are long running commands "
"the commanded state will only update when the long running command "
"starts executing."
),
)
"""
Attribute for the last commanded Operating State of the device.
This should be set by subclasses of this interface by writing to
:py:attr:`_commanded_state`.
"""
# -----------
# Init device
# -----------
[docs]
def init_device(self) -> None:
"""
Initialise the tango device after startup.
Subclasses overriding :py:meth:`init_device` must call
:py:meth:`~.future._base_interface.BaseInterface.init_completed`
once initialisation has finished.
**Example** ::
class MyDevice(PoweredInterface):
def init_device(self) -> None:
super().init_device()
...
self.init_completed()
"""
super().init_device()
if not (version.parse("10.0") <= _tango_version < version.parse("10.1")):
with tango.AutoTangoMonitor(self):
cls = type(self)
if (
cls.execute_Standby != PoweredInterface.execute_Standby
and cls.Standby == PoweredInterface.Standby
):
self.add_command(
_make_cmd(self.Standby, "Put the device into standby mode.")
)
if (
cls.execute_Off != PoweredInterface.execute_Off
and cls.Off == PoweredInterface.Off
):
self.add_command(_make_cmd(self.Off, "Turn the device off."))
# -----------------
# Attribute methods
# -----------------
def __read_commandedState(self) -> ReadAttrType[str]:
"""Dispatch to read method to allow subclasses to override."""
return self.read_commandedState()
commandedState.read(__read_commandedState)
def __is_commandedState_allowed(self, request_type: tango.AttReqType) -> bool:
return self.is_commandedState_allowed(request_type)
[docs]
def is_commandedState_allowed(self, request_type: tango.AttReqType) -> bool:
"""
Check if the commandedState can be read currently.
This can be overridden by subclasses to restrict when clients
can access the attribute.
"""
return True
commandedState.fisallowed = __is_commandedState_allowed
[docs]
def read_commandedState(self) -> ReadAttrType[str]:
"""
Read the commanded state of the device.
Subclasses can override this to change the behaviour of the
:py:obj:`commandedState` attribute.
"""
return self.__class__.commandedState.do_read(self)
# ---------------
# Command methods
# ---------------
def __is_Standby_allowed(self) -> bool:
return self.is_Standby_allowed()
[docs]
def is_Standby_allowed(self, request_type: LRCReqType | None = None) -> bool:
"""
Return whether the :py:meth:`!Standby()` command may be called currently.
This method can be overridden by subclasses to change when this command
is allowed.
:return: whether the command may be called in the current device
state
"""
if (
request_type is None
and get_request_type(self, "Standby") == LRCReqType.ENQUEUE_REQ
):
return True
return self.get_state() not in [tango.DevState.INIT, tango.DevState.DISABLE]
[docs]
def execute_Standby(self) -> DevVarLongStringArrayType:
"""
Execute the standard Standby command.
This method must be overridden by a subclass to enable the Tango command,
otherwise the command will not be part of the device interface. The
:py:func:`~.long_running_commands.decorators.submit_lrc_task`
decorator can be used to make Standby a long running command.
If implemented, this command should put the device in standby
mode and result in the operational state transitioning to
``STANDBY``. The command should not return until the transition
has occurred.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
raise NotImplementedError(
"'execute_Standby' method must be implemented by "
f"'{self.__class__.__name__}'. "
"The parent 'PoweredInterface' is an abstract base class."
)
[docs]
@tango.DebugIt()
def Standby(self) -> DevVarLongStringArrayType:
"""
Put the device into standby mode.
Subclasses should override the :py:meth:`execute_Standby` method
to change the behaviour of this command.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
return self.execute_Standby()
Standby.__check_is_long_running__ = "execute_Standby"
def __is_Off_allowed(self) -> bool:
return self.is_Off_allowed()
[docs]
def is_Off_allowed(self, request_type: LRCReqType | None = None) -> bool:
"""
Return whether the :py:meth:`!Off()` command may be called currently.
This method can be overridden by subclasses to change when this command
is allowed.
:return: whether the command may be called in the current device
state
"""
if (
request_type is None
and get_request_type(self, "Off") == LRCReqType.ENQUEUE_REQ
):
return True
return self.get_state() not in [tango.DevState.INIT, tango.DevState.DISABLE]
[docs]
def execute_Off(self) -> DevVarLongStringArrayType:
"""
Execute the standard Off command.
This method must be overridden by a subclass to enable the Tango command,
otherwise the command will not be part of the device interface. The
:py:func:`~.long_running_commands.decorators.submit_lrc_task`
decorator can be used to make Off a long running command.
If implemented, this command should turn the device off and
result in the operational state transitioning to
``OFF``. The command should not return until the transition
has occurred.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
raise NotImplementedError(
f"'execute_Off' method must be implemented by '{self.__class__.__name__}'. "
"The parent 'PoweredInterface' is an abstract base class."
)
[docs]
@tango.DebugIt()
def Off(self) -> DevVarLongStringArrayType:
"""
Turn the device off.
Subclasses should override the :py:meth:`execute_Off` method
to change the behaviour of this command.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
return self.execute_Off()
Off.__check_is_long_running__ = "execute_Off"
def __is_On_allowed(self) -> bool:
return self.is_On_allowed()
[docs]
def is_On_allowed(self, request_type: LRCReqType | None = None) -> bool:
"""
Return whether the :py:meth:`!On()` command may be called currently.
This method can be overridden by subclasses to change when this command
is allowed.
:return: whether the command may be called in the current device
state
"""
if (
request_type is None
and get_request_type(self, "On") == LRCReqType.ENQUEUE_REQ
):
return True
return self.get_state() not in [tango.DevState.INIT, tango.DevState.DISABLE]
[docs]
def execute_On(self) -> DevVarLongStringArrayType:
"""
Execute the standard On command.
This method must be overridden by a subclass to enable the Tango command,
otherwise the command will not be part of the device interface. The
:py:func:`~.long_running_commands.decorators.submit_lrc_task`
decorator can be used to make On a long running command.
If implemented, this command should turn the device on and
result in the operational state transitioning to ``ON``.
The command should not return until the transition has occurred.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
raise NotImplementedError(
f"'execute_On' method must be implemented by '{self.__class__.__name__}'. "
"The parent 'PoweredInterface' is an abstract base class."
)
[docs]
@tango.DebugIt()
@tango.server.command(
doc_in="Turn the device on.",
dtype_out="DevVarLongStringArray",
doc_out="[ResultCode][Status message or command ID]",
)
def On(self) -> DevVarLongStringArrayType:
"""
Turn the device on.
Subclasses should override the :py:meth:`execute_On` method
to change the behaviour of this command.
:return: A tuple containing a return code and a string
message indicating status or a command ID. The message
is for information purpose only.
"""
return self.execute_On()
On.__check_is_long_running__ = "execute_On"
if version.parse("10.0") <= _tango_version < version.parse("10.1"):
Standby = _make_cmd(Standby, "Put the device into standby mode.")
Off = _make_cmd(Off, "Turn the device off.")
# ----------
# Run server
# ----------
def main(*args: str, **kwargs: str) -> int:
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return typing.cast(int, PoweredInterface.run_server(args=args or None, **kwargs))
if __name__ == "__main__":
main()