Source code for ska_tango_base.base.base_device

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
This module implements a generic base model and device for SKA.

It exposes the generic attributes, properties and commands of an SKA device.
"""

from __future__ import annotations

import inspect
import logging
import threading
import traceback
from collections.abc import Callable
from functools import partial
from types import FunctionType, MethodType
from typing import Any, Generic, TypeVar, cast
from warnings import warn

import debugpy
import ska_control_model
from ska_control_model import (
    CommunicationStatus,
    ControlMode,
    HealthState,
    PowerState,
    ResultCode,
    SimulationMode,
    TaskStatus,
    TestMode,
)
from tango import AttReqType, DebugIt, DevState, Except, is_omni_thread
from tango.server import command, device_property

from ..commands import (
    DeviceInitCommand,
    FastCommand,
    SlowCommand,
    SubmittedSlowCommand,
)
from ..faults import GroupDefinitionsError
from ..long_running_commands.mixin import AbstractLRCMixin, _LRCEvent
from ..software_bus import Signal, attribute_from_signal, listen_to_signal
from ..type_hints import (
    CommandTrackerProtocol,
    DevVarLongStringArrayType,
    ReadAttrType,
    TaskCallbackType,
    TaskExecutorProtocol,
)
from ..utils import _hold_tango_monitor, get_groups_from_json
from . import base_interface
from .base_component_manager import BaseComponentManager
from .base_interface import BaseInterface, ControlLevel

__all__ = ["SKABaseDevice", "CommandTracker"]  # noqa: F822


# We do some magic here to avoid importing .command_tracker unless it is
# explicitly asked for, as this module is deprecated.
def __getattr__(name: str) -> Any:
    if name == "CommandTracker":
        from .command_tracker import CommandTracker  # noqa: PLC0415

        return CommandTracker

    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


ComponentManagerT = TypeVar("ComponentManagerT", bound=BaseComponentManager)

_DEBUGGER_PORT = 5678


[docs] class SKABaseDevice( AbstractLRCMixin, BaseInterface, Generic[ComponentManagerT], ): """ A generic base device for SKA with support for long running commands. :class:`~ska_tango_base.base.base_device.SKABaseDevice` inherits from the :class:`~ska_tango_base.base.base_interface.BaseInterface` and :class:`~ska_tango_base.long_running_commands.mixin.AbstractLRCMixin`, and expects a component manager to be provided by implementing the :meth:`~ska_tango_base.base.base_device.SKABaseDevice.create_component_manager` method. """ MIN_SUPPORTED_LRC_PROTOCOL_VERSION = 1 _component_manager: ComponentManagerT | None _global_debugger_listening = False _global_debugger_allocated_port = 0 @property def component_manager(self) -> ComponentManagerT: """Get the component manager.""" if self._component_manager is None: self._component_manager = self.create_component_manager() return self._component_manager @component_manager.setter def component_manager(self, cm: ComponentManagerT) -> None: """Set component manager.""" self._component_manager = cm @property def task_executor(self) -> TaskExecutorProtocol: """ Get the task executor. :return: The initialised task executor. """ return cast( TaskExecutorProtocol, getattr(self.component_manager, "_task_executor", None), )
[docs] class InitCommand(DeviceInitCommand): """ A class for the SKABaseDevice's init_device() "command". .. warning :: InitCommand is deprecated and should be set to :code:`None` to acknowledge the deprecation. Override :py:func:`!init_device()` directly instead. The overridden :py:func:`!init_device` method must call :py:func:`~ska_tango_base.base.base_device.SKABaseDevice.init_completed()` once initialisation has finished. **Example** :: class MyDevice(SKABaseDevice): InitCommand = None def init_device(self) -> None: super().init_device() self.init_completed() """
[docs] def do( self: SKABaseDevice.InitCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Stateless hook for device initialisation. :param args: positional arguments to this do method :param kwargs: keyword arguments to this do method :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ message = "SKABaseDevice Init command completed OK" self.logger.info(message) self._completed() return (ResultCode.OK, message)
# ----------------- # Device Properties # ----------------- SkaLevel = device_property( dtype="int16", doc=( "Indication of importance of the device in the SKA hierarchy to support " "drill-down navigation: 1..6, with 1 highest." ), default_value=4, ) """ Device property. Indication of importance of the device in the SKA hierarchy to support drill-down navigation: 1..6, with 1 highest. """ GroupDefinitions = device_property( dtype=("str",), doc=( "Each string in the list is a JSON serialised dict defining the " "``group_name``, ``devices`` and ``subgroups`` in the group. A Tango Group " "object is created for each item in the list, according to the hierarchy " "defined. This provides easy access to the managed devices in bulk, or " "individually." ), ) """ Device property. Each string in the list is a JSON serialised dict defining the ``group_name``, ``devices`` and ``subgroups`` in the group. A Tango Group object is created for each item in the list, according to the hierarchy defined. This provides easy access to the managed devices in bulk, or individually. The general format of the list is as follows, with optional ``devices`` and ``subgroups`` keys:: [ {"group_name": "<name>", "devices": ["<dev name>", ...]}, {"group_name": "<name>", "devices": ["<dev name>", "<dev name>", ...], "subgroups" : [{<nested group>}, {<nested group>}, ...]}, ... ] For example, a hierarchy of racks, servers and switches:: [ {"group_name": "servers", "devices": ["elt/server/1", "elt/server/2", "elt/server/3", "elt/server/4"]}, {"group_name": "switches", "devices": ["elt/switch/A", "elt/switch/B"]}, {"group_name": "pdus", "devices": ["elt/pdu/rackA", "elt/pdu/rackB"]}, {"group_name": "racks", "subgroups": [ {"group_name": "rackA", "devices": ["elt/server/1", "elt/server/2", "elt/switch/A", "elt/pdu/rackA"]}, {"group_name": "rackB", "devices": ["elt/server/3", "elt/server/4", "elt/switch/B", "elt/pdu/rackB"], "subgroups": []} ]} ] """ # ----------- # Init device # -----------
[docs] def init_device(self) -> None: """ Initialise the tango device after startup. Subclasses overriding :func:`init_device` should set :code:`InitCommand = None` and call :py:func:`init_completed` once initialisation has finished. **Example** :: class MyDevice(SKABaseDevice): InitCommand = None def init_device(self) -> None: super().init_device() self.init_completed() """ try: self._component_manager = None super().init_device() with self.allow_internal_threads(): self.shared_bus.wait_for_thread() try: # create Tango Groups dict, according to property self.logger.debug(f"Groups definitions: {self.GroupDefinitions}") self.groups = get_groups_from_json(self.GroupDefinitions) self.logger.info(f"Groups loaded: {sorted(self.groups.keys())}") except GroupDefinitionsError: self.logger.debug(f"No Groups loaded for device: {self.get_name()}") if self.InitCommand is not None: warning_msg = ( "The InitCommand object is deprecated and will be removed " "in the next major release. Set `InitCommand = None` and override " "init_device instead" ) warn(warning_msg, DeprecationWarning) self.InitCommand( self, logger=self.logger, )() self.init_command_objects() self._methods_patched_for_debugger = False except Exception as exc: # Deliberately catching all exceptions here, because an uncaught # exception would take our execution thread down. if hasattr(self, "logger"): self.logger.exception("init_device() failed.") else: traceback.print_exc() print(f"ERROR: init_device failed, and no logger: {exc}.") self._update_state( DevState.FAULT, "The device is in FAULT state - init_device failed.", )
[docs] def init_completed(self) -> None: """ Notify the state machine that initialisation has completed. Must be called from your :py:func:`!init_device()` method when you have set :code:`InitCommand = None`. **Example** :: class MyDevice(SKABaseDevice): InitCommand = None def init_device(self) -> None: super().init_device() self.init_completed() """ super().init_completed()
[docs] def on_new_shared_bus(self) -> None: """Create component manager.""" self.__tango_pending_operations = 0 self.__tango_pending_operations_lock = threading.Lock() super().on_new_shared_bus()
[docs] def delete_device(self) -> None: """Cleanup the component manager.""" # We try our best to clean up, but it is important we do not # throw from delete_device(). try: with self.allow_internal_threads(): self.component_manager.cleanup() except Exception: pass super().delete_device()
# --------- # Listeners # --------- # TODO(tri): Add option to disable this? Really this should be being # set by the command... @listen_to_signal("_command_tracker.lrc_event") def __on_lrc_event(self, event: _LRCEvent) -> None: if "status" in event and event["status"] == TaskStatus.IN_PROGRESS: command_name = event["command_id"].split("_")[-1] self._update_commanded_state(command_name) def _update_commanded_state(self, command_name: str) -> None: if command_name in ["On", "Off", "Standby"]: self._commanded_state = getattr(DevState, command_name.upper()) # TODO(tri): Is this logic for Reset really correct? Surely, # calling Reset does not change the commanded state... elif command_name == "Reset": current_state = self.get_state() if current_state in [DevState.FAULT, DevState.ALARM]: self._commanded_state = DevState.ON else: self._commanded_state = current_state
[docs] def change_control_level(self, control_level: ControlLevel) -> None: """Stop/start communicating with the component.""" if control_level == ControlLevel.FULL_CONTROL: self.component_manager.start_communicating() elif control_level == ControlLevel.NO_CONTACT: self.component_manager.stop_communicating()
# --------- # Callbacks # --------- def _update_state(self, state: DevState, status: str | None = None) -> None: """ Perform Tango operations in response to a change in op state. This helper method is passed to the op state model as a callback, so that the model can trigger actions in the Tango device. :param state: the new state value :param status: an optional new status string """ self.logger.debug(f"OpState={repr(DevState(state))}") # To maintain backwards compatibility, we by-pass the OpStateSignal # object and just emit the op state signal directly. This means we # accept all DevStates for this _update_state function. It is then # opt-in to use the _op_state signal object and have the values checked # as valid OpState values. self.shared_bus.emit("._op_state", state) self._status = status or f"The device is in {state} state."
[docs] def _update_health_state( self: SKABaseDevice[ComponentManagerT], health_state: HealthState ) -> None: """ Update the healthState of the device and push events. :param health_state: the new healthState value :meta public: """ self._health_state = health_state
[docs] def _communication_state_changed( self: SKABaseDevice[ComponentManagerT], communication_state: CommunicationStatus ) -> None: """ Update the device about communication state to the component. Subclasses should ensure that this is called by the ``communication_state_changed`` callback passed to their component manager. :param communicate_state: The new communicate state :meta public: """ action_map = { CommunicationStatus.DISABLED: "component_disconnected", CommunicationStatus.NOT_ESTABLISHED: "component_unknown", CommunicationStatus.ESTABLISHED: None, # wait for a component state update } action = action_map[communication_state] if action is not None: getattr(self, action)()
[docs] def _component_state_changed( self: SKABaseDevice[ComponentManagerT], fault: bool | None = None, power: PowerState | None = None, ) -> None: """ Update the device about the component's state. Subclasses should ensure that this is called by the ``component_state_callback`` callback passed to their component manager. :param fault: The new fault state of the device, or None if unchanged :param power: The new power state of the device, or None if unchanged :meta public: """ if power is not None: action_map = { PowerState.UNKNOWN: None, PowerState.OFF: "component_off", PowerState.STANDBY: "component_standby", PowerState.ON: "component_on", } action = action_map[power] if action is not None: getattr(self, action)() if fault is not None: if fault: self.component_fault() else: self.component_no_fault()
# --------------- # General methods # ---------------
[docs] def create_component_manager( self: SKABaseDevice[ComponentManagerT], ) -> ComponentManagerT: """ Create and return a component manager for this device. :raises NotImplementedError: for no implementation """ raise NotImplementedError( "'create_component_manager' method must be implemented by " f"'{self.__class__.__name__}'. " "The parent 'SKABaseDevice' is an abstract base class." )
def _register_default_command_object( self: SKABaseDevice[ComponentManagerT], command_name: str, command_object: FastCommand[Any] | SlowCommand[Any], ) -> None: self._default_command_objects[command_name] = command_object
[docs] def register_command_object( self: SKABaseDevice[ComponentManagerT], command_name: str, command_object: FastCommand[Any] | SlowCommand[Any], ) -> None: """ Register an object as a handler for a command. .. warning :: Command objects are deprecated. Directly provide the Tango command instead, or override :py:func:`!execute_<cmd>()`. :param command_name: name of the command for which the object is being registered :param command_object: the object that will handle invocations of the given command """ if command_name == "Abort": warning_msg = ( "SKA command objects are deprecated and will be removed " "in the next major release. Please override the schedule_abort_task() " "method to change the behaviour of the 'Abort' command." ) warn(warning_msg, DeprecationWarning, stacklevel=2) elif command_name in self._default_command_objects: self._warn_builtin_command_override(command_name) else: warning_msg = ( "SKA command objects are deprecated and will be removed " f"in the next major release. Directly provide the '{command_name}' " "command using the @command/@long_running_command dectorator instead." ) warn(warning_msg, DeprecationWarning, stacklevel=2) self._command_objects[command_name] = command_object
[docs] def get_command_object( self: SKABaseDevice[ComponentManagerT], command_name: str ) -> FastCommand[Any] | SlowCommand[Any]: """ Return the command object (handler) for a given command. .. warning :: Command objects are deprecated. Directly provide the Tango command instead, or override :py:func:`!execute_<cmd>()`. :param command_name: name of the command for which a command object (handler) is sought :return: the registered command object (handler) for the command :raises KeyError: if the command cannot be found. """ if command_name in self._default_command_objects: self._warn_builtin_command_override(command_name) if command_name in self._default_command_objects: return self._command_objects.get( command_name, self._default_command_objects[command_name] ) return self._command_objects[command_name]
def _builtin_command_is_optional(self, command_name: str) -> bool: return command_name in ["On", "Standby", "Off", "Reset"] def _warn_builtin_command_override( self, command_name: str, *, stacklevel: int = 3 ) -> None: if command_name in self._command_objects: return warning_msg = ( "SKA command objects are deprecated and will be removed " "in the next major release. Please override the " f"'execute_{command_name}' method, to change the behaviour " f"of '{command_name}'. Use the @submit_lrc_task " "decorator if it should be long running." ) if self._builtin_command_is_optional(command_name): warning_msg += ( f" Alternatively, set `{command_name} = None` at class scope " f"to disable the '{command_name}' command." ) warn(warning_msg, DeprecationWarning, stacklevel=stacklevel) def _get_overridden_command( self: SKABaseDevice[ComponentManagerT], command_name: str, ) -> FastCommand[Any] | SlowCommand[Any] | None: return self._command_objects.get(command_name, None)
[docs] def init_command_objects(self: SKABaseDevice[ComponentManagerT]) -> None: """Register command objects (handlers) for this device's commands.""" self._default_command_objects: dict[ str, FastCommand[Any] | SlowCommand[Any] ] = {} self._command_objects: dict[str, FastCommand[Any] | SlowCommand[Any]] = {} for command_name, method_name in [ ("Off", "off"), ("Standby", "standby"), ("On", "on"), ("Reset", "reset"), ]: self._register_default_command_object( command_name, SubmittedSlowCommand( command_name, self.command_tracker, self.component_manager, method_name, logger=None, ), ) self._register_default_command_object( "Abort", self.AbortCommand( self.command_tracker, self.component_manager, None, self.logger ), ) # TODO: Deprecated command, remove in future release self._register_default_command_object( "AbortCommands", self.AbortCommandsCommand(self.component_manager, self.logger), ) self._register_default_command_object( "CheckLongRunningCommandStatus", self.CheckLongRunningCommandStatusCommand( self.command_tracker, self.logger ), ) self._register_default_command_object( "DebugDevice", self.DebugDeviceCommand(self, logger=self.logger), )
# ---------- # Attributes # ---------- _control_mode: Signal[ska_control_model.ControlMode] """ Signal for the control mode of the device. Values are emitted for this signal whenever a client successfully changes to the controlMode attribute. :meta public: """ controlMode: attribute_from_signal """ Control mode attribute of the device. The control mode of the device is either REMOTE or LOCAL. Tango Device accepts only from a ‘local’ client and ignores commands and queries received from TM or any other ‘remote’ clients. The Local clients has to release LOCAL control before REMOTE clients can take control again. """ _control_mode, controlMode = base_interface.standard_control_mode() def __read_controlMode(self) -> ReadAttrType[ControlMode]: """Dispatch to read method to allow subclasses to override.""" return self.read_controlMode() controlMode.read(__read_controlMode)
[docs] def read_controlMode(self) -> ReadAttrType[ControlMode]: """ Read the control mode of the device. Subclasses can override this to change the behaviour of the :py:obj:`controlMode` attribute. """ return self.__class__.controlMode.do_read(self)
def __write_controlMode(self, mode: ControlMode) -> None: """Dispatch to write method to allow subclasses to override.""" return self.write_controlMode(mode) controlMode.write(__write_controlMode)
[docs] def write_controlMode(self, mode: ControlMode) -> None: """ Write the control mode of the device. Subclasses can override this to change the behaviour of the :py:obj:`controlMode` attribute. """ self.__class__.controlMode.do_write(self, mode)
def __is_controlMode_allowed(self, request_type: AttReqType) -> bool: return self.is_controlMode_allowed(request_type)
[docs] def is_controlMode_allowed(self, request_type: AttReqType) -> bool: """ Check if the controlMode can be read/written currently. This can be overridden by subclasses to restrict when clients can access the attribute. """ return True
controlMode.fisallowed = __is_controlMode_allowed _simulation_mode: Signal[ska_control_model.SimulationMode] """ Signal for the simulation mode of the device. Values are emitted for this signal whenever a client successfully changes to the simulationMode attribute. :meta public: """ simulationMode: attribute_from_signal """ Simulation mode attribute of the device. Some devices may implement both modes, while others will have simulators that set simulationMode to True while the real devices always set simulationMode to False. """ _simulation_mode, simulationMode = base_interface.standard_simulation_mode() def __read_simulationMode(self) -> ReadAttrType[SimulationMode]: """Dispatch to read method to allow subclasses to override.""" return self.read_simulationMode() simulationMode.read(__read_simulationMode)
[docs] def read_simulationMode(self) -> ReadAttrType[SimulationMode]: """ Read the simulation mode of the device. Subclasses can override this to change the behaviour of the :py:obj:`simulationMode` attribute. """ return self.__class__.simulationMode.do_read(self)
def __write_simulationMode(self, mode: SimulationMode) -> None: """Dispatch to write method to allow subclasses to override.""" return self.write_simulationMode(mode) simulationMode.write(__write_simulationMode)
[docs] def write_simulationMode(self, mode: SimulationMode) -> None: """ Write the simulation mode of the device. Subclasses can override this to change the behaviour of the :py:obj:`simulationMode` attribute. """ self.__class__.simulationMode.do_write(self, mode)
def __is_simulationMode_allowed(self, request_type: AttReqType) -> bool: return self.is_simulationMode_allowed(request_type)
[docs] def is_simulationMode_allowed(self, request_type: AttReqType) -> bool: """ Check if the simulationMode can be read/written currently. This can be overridden by subclasses to restrict when clients can access the attribute. """ return True
simulationMode.fisallowed = __is_simulationMode_allowed _test_mode: Signal[ska_control_model.TestMode] """ Signal for the test mode of the device. Values are emitted for this signal whenever a client successfully changes to the testMode attribute. :meta public: """ testMode: attribute_from_signal """ Test mode attribute of the device. Either no test mode or an indication of the test mode. """ _test_mode, testMode = base_interface.standard_test_mode() def __read_testMode(self) -> ReadAttrType[TestMode]: """Dispatch to read method to allow subclasses to override.""" return self.read_testMode() testMode.read(__read_testMode)
[docs] def read_testMode(self) -> ReadAttrType[TestMode]: """ Read the test mode of the device. Subclasses can override this to change the behaviour of the :py:obj:`testMode` attribute. """ return self.__class__.testMode.do_read(self)
def __write_testMode(self, mode: TestMode) -> None: """Dispatch to write method to allow subclasses to override.""" return self.write_testMode(mode) testMode.write(__write_testMode)
[docs] def write_testMode(self, mode: TestMode) -> None: """ Write the test mode of the device. Subclasses can override this to change the behaviour of the :py:obj:`testMode` attribute. """ self.__class__.testMode.do_write(self, mode)
def __is_testMode_allowed(self, request_type: AttReqType) -> bool: return self.is_testMode_allowed(request_type)
[docs] def is_testMode_allowed(self, request_type: AttReqType) -> bool: """ Check if the testMode can be read/written currently. This can be overridden by subclasses to restrict when clients can access the attribute. """ return True
testMode.fisallowed = __is_testMode_allowed # -------- # Commands # -------- Reset: Callable[[BaseInterface], DevVarLongStringArrayType] | None
[docs] def execute_Reset( self: SKABaseDevice[ComponentManagerT], ) -> DevVarLongStringArrayType: """ Reset the device. The default implementation of this command uses the deprecated command objects to delegate the behaviour to the "reset" method of the component manager. To avoid the deprecation warnings clients should override this method. :ref:`migrate-1-4-cmd-obj` provides guidelines for doing this. See :py:func:`BaseInterface.execute_Reset <ska_tango_base.base.base_interface.BaseInterface.execute_Reset>` for the expected behaviour of the :py:meth`!Reset()` command. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("Reset") result_code, unique_id = handler() return ([result_code], [unique_id])
Standby: Callable[[BaseInterface], DevVarLongStringArrayType] | None
[docs] def execute_Standby( self: SKABaseDevice[ComponentManagerT], ) -> DevVarLongStringArrayType: """ Put the device into standby mode. The default implementation of this command uses the deprecated command objects to delegate the behaviour to the "standby" method of the component manager. To avoid the deprecation warnings clients should override this method. :ref:`migrate-1-4-cmd-obj` provides guidelines for doing this. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("Standby") result_code, unique_id = handler() return ([result_code], [unique_id])
Off: Callable[[BaseInterface], DevVarLongStringArrayType] | None
[docs] def execute_Off( self: SKABaseDevice[ComponentManagerT], ) -> DevVarLongStringArrayType: """ Turn the device off. The default implementation of this command uses the deprecated command objects to delegate the behaviour to the "off" method of the component manager. To avoid the deprecation warnings clients should override this method. :ref:`migrate-1-4-cmd-obj` provides guidelines for doing this. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("Off") result_code, unique_id = handler() return ([result_code], [unique_id])
On: Callable[[BaseInterface], DevVarLongStringArrayType] | None
[docs] def execute_On(self: SKABaseDevice[ComponentManagerT]) -> DevVarLongStringArrayType: """ Turn device on. The default implementation of this command uses the deprecated command objects to delegate the behaviour to the "on" method of the component manager. To avoid the deprecation warnings clients should override this method. :ref:`migrate-1-4-cmd-obj` provides guidelines for doing this. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("On") result_code, unique_id = handler() return ([result_code], [unique_id])
[docs] class AbortCommand(SlowCommand[tuple[ResultCode, str]]): """A class for SKASubarray's Abort() command."""
[docs] def __init__( self: SKABaseDevice.AbortCommand, command_tracker: CommandTrackerProtocol, component_manager: BaseComponentManager, callback: Callable[[bool], None] | None, logger: logging.Logger | None = None, ) -> None: """ Initialise a new AbortCommand instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: callback to be called when this command starts and finishes :param logger: a logger for this command object to use """ self._command_tracker = command_tracker self._component_manager = component_manager super().__init__(callback=callback, logger=logger)
[docs] def do( self: SKABaseDevice.AbortCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Stateless hook for Abort() command functionality. :param args: positional arguments to the command. This command does not take any, so this should be empty. :param kwargs: keyword arguments to the command. This command does not take any, so this should be empty. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ command_id = self._command_tracker.new_command( "Abort", completed_callback=self._completed ) status, _ = self._component_manager.abort( partial(self._command_tracker.update_command_info, command_id) ) assert status == TaskStatus.IN_PROGRESS return ResultCode.STARTED, command_id
[docs] def execute_Abort(self) -> DevVarLongStringArrayType: """ Abort any executing long running command(s) and empty the queue. This override is provided for backwards compatible with the deprecated SKA command objects. Subclasses should override :py:meth:`schedule_abort_task` to change the behaviour of the :py:meth:`!Abort()` command. :ref:`migrate-1-4-cmd-obj` provides guidelines for overriding commands without using the SKA command objects. :return: A tuple containing a result code and the unique ID of the command """ handler = self._get_overridden_command("Abort") if handler is not None: (result_code, message) = handler() return ([result_code], [message]) return super().execute_Abort()
[docs] def schedule_abort_task( self: SKABaseDevice[ComponentManagerT], task_callback: TaskCallbackType ) -> tuple[TaskStatus, str]: """ Schedule an Abort task to begin executing immediately. Subclasses should override this to change the behaviour of the :py:meth:`!Abort()` command. :param task_callback: Notified of progress of the abort command. :return: A tuple containing TaskStatus.IN_PROGRESS and a message """ return self.component_manager.abort(task_callback)
# TODO: Deprecated command, remove in future release
[docs] class AbortCommandsCommand(SlowCommand[tuple[ResultCode, str]]): """The command class for the AbortCommand command."""
[docs] def __init__( self: SKABaseDevice.AbortCommandsCommand, component_manager: ComponentManagerT, logger: logging.Logger | None = None, ) -> None: """ Initialise a new AbortCommandsCommand instance. :param component_manager: contains the queue manager which manages the worker thread and the LRC attributes :param logger: the logger to be used by this Command. If not provided, then a default module logger will be used. """ self._abort_commands_method_overridden = False if isinstance(component_manager, BaseComponentManager): if ( type(component_manager).abort_commands != BaseComponentManager.abort_commands ): self._abort_commands_method_overridden = True elif hasattr(component_manager, "abort_commands"): self._abort_commands_method_overridden = True if self._abort_commands_method_overridden: warning_msg = ( "'abort_commands' is deprecated and will be removed in the " "next major release. Either rename 'abort_commands' in your " "component manager to 'abort_tasks' or override the " "'SKABaseDevice.Abort' command instead." ) warn(warning_msg, DeprecationWarning) self._component_manager = component_manager super().__init__(None, logger=logger)
[docs] def do( self: SKABaseDevice.AbortCommandsCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Abort long running commands. Abort the currently executing LRC and remove all enqueued LRCs. :param args: positional arguments to this do method :param kwargs: keyword arguments to this do method :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ if self._abort_commands_method_overridden: self._component_manager.abort_commands() else: self._component_manager.abort_tasks() return (ResultCode.STARTED, "Aborting commands")
# TODO: Deprecated command, remove in future release @command( doc_in=( "Empty out long running commands in queue.\n\n" "**DEPRECATED**: A client should call the tracked 'Abort' long running " "command instead." ), dtype_out="DevVarLongStringArray", doc_out="[ResultCode.STARTED]['Aborting commands']", ) @DebugIt() def AbortCommands( self: SKABaseDevice[ComponentManagerT], ) -> DevVarLongStringArrayType: """ Empty out long running commands in queue. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ warning_msg = ( "'AbortCommands' is deprecated and will be removed in the next major " "release. The client should call the tracked 'Abort' long running command " "instead." ) warn(warning_msg, DeprecationWarning) self.logger.warning(warning_msg) handler = self.get_command_object("AbortCommands") (return_code, message) = handler() return ([return_code], [message])
[docs] class CheckLongRunningCommandStatusCommand(FastCommand[str]): """The command class for the CheckLongRunningCommandStatus command."""
[docs] def __init__( self: SKABaseDevice.CheckLongRunningCommandStatusCommand, command_tracker: CommandTrackerProtocol, logger: logging.Logger | None = None, ) -> None: """ Initialise a new CheckLongRunningCommandStatusCommand instance. :param command_tracker: command tracker :param logger: the logger to be used by this Command. If not provided, then a default module logger will be used. """ self._command_tracker = command_tracker super().__init__(logger=logger)
[docs] def do( self: SKABaseDevice.CheckLongRunningCommandStatusCommand, *args: Any, **kwargs: Any, ) -> str: """ Determine the status of the command ID passed in, if any. - Check `command_result` to see if it's finished. - Check `command_status` to see if it's in progress - Check `command_ids_in_queue` to see if it's queued :param args: positional arguments to this do method. There should be only one: the command_id. :param kwargs: keyword arguments to this do method :return: The string of the TaskStatus """ command_id = args[0] enum_status = self._command_tracker.get_command_status(command_id) return TaskStatus(enum_status).name
[docs] def execute_CheckLongRunningCommandStatus( self: SKABaseDevice[ComponentManagerT], argin: str ) -> str: """ Check the status of a long running command by ID. :param argin: the command id :return: command status """ handler = cast( Callable[[str], str], self._get_overridden_command("CheckLongRunningCommandStatus"), ) if handler is not None: return handler(argin) return super().execute_CheckLongRunningCommandStatus(argin)
[docs] class DebugDeviceCommand(FastCommand[int]): """A class for the SKABaseDevice's DebugDevice() command."""
[docs] def __init__( self: SKABaseDevice.DebugDeviceCommand, device: SKABaseDevice[ComponentManagerT], logger: logging.Logger | None = None, ) -> None: """ Initialise a new instance. :param device: the device to which this command belongs. :param logger: a logger for this command to use. """ self._device = device super().__init__(logger)
[docs] def do( self: SKABaseDevice.DebugDeviceCommand, *args: Any, **kwargs: Any, ) -> int: """ Stateless hook for device DebugDevice() command. Starts the ``debugpy`` debugger listening for remote connections (via Debugger Adaptor Protocol), and patches all methods so that they can be debugged. If the debugger is already listening, additional execution of this command will trigger a breakpoint. :param args: positional arguments to this do method :param kwargs: keyword arguments to this do method :return: The TCP port the debugger is listening on. """ if not SKABaseDevice._global_debugger_listening: allocated_port = self.start_debugger_and_get_port(_DEBUGGER_PORT) SKABaseDevice._global_debugger_listening = True SKABaseDevice._global_debugger_allocated_port = allocated_port if not self._device._methods_patched_for_debugger: self.monkey_patch_all_methods_for_debugger() self._device._methods_patched_for_debugger = True else: self.logger.warning("Triggering debugger breakpoint...") debugpy.breakpoint() return cast(int, SKABaseDevice._global_debugger_allocated_port) # type: ignore[redundant-cast]
[docs] def start_debugger_and_get_port( self: SKABaseDevice.DebugDeviceCommand, port: int ) -> int: """ Start the debugger and return the allocated port. :param port: port to listen on :return: allocated port """ self.logger.warning("Starting debugger...") interface, allocated_port = cast( tuple[str, int], debugpy.listen(("0.0.0.0", port)) ) self.logger.warning( f"Debugger listening on {interface}:{allocated_port}. Performance may " "be degraded." ) return allocated_port
[docs] def monkey_patch_all_methods_for_debugger( self: SKABaseDevice.DebugDeviceCommand, ) -> None: """Monkeypatch methods that need to be patched for the debugger.""" all_methods = self.get_all_methods() patched = [] for owner, name, method in all_methods: if self.method_must_be_patched_for_debugger(owner, method): self.patch_method_for_debugger(owner, name, method) patched.append( f"{owner} {method.__func__.__qualname__} in " f"{method.__func__.__module__}" ) self.logger.info("Patched %s of %s methods", len(patched), len(all_methods)) self.logger.debug("Patched methods: %s", sorted(patched))
[docs] def get_all_methods( self: SKABaseDevice.DebugDeviceCommand, ) -> list[tuple[object, str, Any]]: """ Return a list of the device's methods. :return: list of device methods """ methods: list[tuple[Any, str, Any]] = [] for name, method in inspect.getmembers(self._device, inspect.ismethod): methods.append((self._device, name, method)) for command_object in self._device._command_objects.values(): for name, method in inspect.getmembers( command_object, inspect.ismethod ): methods.append((command_object, name, method)) for command_object in self._device._default_command_objects.values(): for name, method in inspect.getmembers( command_object, inspect.ismethod ): methods.append((command_object, name, method)) return methods
[docs] @staticmethod def method_must_be_patched_for_debugger( owner: object, method: MethodType ) -> bool: """ Determine if methods are worth debugging. The goal is to find all the user's Python methods, but not the lower level PyTango device and Boost extension methods. The `types.FunctionType` check excludes the Boost methods. :param owner: owner :param method: the name :return: True if the method contains more than the skipped modules. """ skip_module_names = [ "tango.device_server", "tango.server", "logging", ] skip_owner_types = [SKABaseDevice.DebugDeviceCommand] return ( isinstance(method.__func__, FunctionType) and method.__func__.__module__ not in skip_module_names and type(owner) not in skip_owner_types )
[docs] def patch_method_for_debugger( self: SKABaseDevice.DebugDeviceCommand, owner: object, name: str, method: Callable[..., Any], ) -> None: """ Ensure method calls trigger the debugger. Most methods in a device are executed by calls from threads spawned by the cppTango layer. These threads are not known to Python, so we have to explicitly inform the debugger about them. :param owner: owner :param name: the name :param method: method """ def debug_thread_wrapper( orig_method: Callable[..., Any], *args: Any, **kwargs: Any ) -> Any: debugpy.debug_this_thread() return orig_method(*args, **kwargs) patched_method = partial(debug_thread_wrapper, method) setattr(owner, name, patched_method)
@command( doc_in="Enable remote debugging of this device.", dtype_out="DevUShort", doc_out="The TCP port the debugger is listening on.", ) @DebugIt() def DebugDevice(self: SKABaseDevice[ComponentManagerT]) -> int: """ Enable remote debugging of this device. To modify behaviour for this command, modify the do() method of the command class: :py:class:`.DebugDeviceCommand`. :return: the port the debugger is listening on """ command_object = cast(Callable[[], int], self.get_command_object("DebugDevice")) return command_object()
[docs] def set_state(self: SKABaseDevice[ComponentManagerT], state: DevState) -> None: """ Set the device server state. This is dependent on whether the set state call has been actioned from a native python thread or a tango omni thread :param state: the new device state """ self._submit_tango_operation("set_state", state)
[docs] def set_status(self: SKABaseDevice[ComponentManagerT], status: str) -> None: """ Set the device server status string. This is dependent on whether the set status call has been actioned from a native python thread or a tango omni thread :param status: the new device status """ self._submit_tango_operation("set_status", status)
[docs] def push_change_event( self: SKABaseDevice[ComponentManagerT], name: str, *args: Any ) -> None: """ Push a device server change event. This is dependent on whether the push_change_event call has been actioned from a native python thread or a tango omni thread This is an "overloaded" function which can be called with multiple signatures supported. These are dispatched based on the types passed. In the overloads below `Scalar` refers to any data type that can be converted to a tango scalar. `Any` refers to `Scalar | Sequence[Scalar] | Sequence[Sequence[Scalar]]`. - push_change_event(self, name: str) Push a device server change event for the "state" or "status". Raises a tango.DevFailed if name is not "state" or "status". - push_change_event(self, name: str, expection: DevFailed) Push a device server change event for an attribute with an exception. exception: exception to send to client - push_change_event(self, name: str, data: Any) Push a device server change event for an attribute. data: value to send to client - push_change_event(self, name: str, str_data: str, data: bytes | str) Push a device server change event for an encoded attribute. str_data: encoding format for data data: encoded data to send - push_change_event(self, name: str, data: Any, timestamp: float, quality: tango.AttrQuality) Push a device server change event for an attribute with timestamp and quality. data: value to send timestamp: unix timestamp quality: quality of attribute - push_change_event(self, name: str, str_data: str, data: bytes | str, timestamp: double, quality: tango.AttrQuality) Push a device server change event for a encoded attribute with timestamp and quality. str_data: encoding format for data data: encoded data to send timestamp: unix timestamp quality: quality of attribute :param name: the attribute name :param args: the arguments to dispatch on """ # noqa: E501 self._submit_tango_operation("push_change_event", name, *args)
[docs] def push_archive_event( self: SKABaseDevice[ComponentManagerT], name: str, *args: Any ) -> None: """ Push a device server archive event. This is dependent on whether the push_archive_event call has been actioned from a native python thread or a tango omnithread. This is an "overloaded" function which can be called with multiple signatures supported. These are dispatched based on the types passed. In the overloads below `Scalar` refers to any data type that can be converted to a tango scalar. `Any` refers to `Scalar | Sequence[Scalar] | Sequence[Sequence[Scalar]]`. - push_archive_event(self, name: str) Push a device server archive event for the "state" or "status". Raises a DevFailed if name is not "state" or "status". - push_archive_event(self, name: str, expection: DevFailed) Push a device server archive event for an attribute with an exception. exception: exception to send to client - push_archive_event(self, name: str, data: Any) Push a device server archive event for an attribute. data: value to send to client - push_archive_event(self, name: str, str_data: str, data: bytes | str) Push a device server archive event for an encoded attribute. str_data: encoding format for data data: encoded data to send - push_archive_event(self, name: str, data: Any, timestamp: float, quality: tango.AttrQuality) Push a device server archive event for an attribute with timestamp and quality. data: value to send timestamp: unix timestamp quality: quality of attribute - push_archive_event(self, name: str, str_data: str, data: bytes | str, timestamp: double, quality: tango.AttrQuality) Push a device server archive event for a encoded attribute with timestamp and quality. str_data: encoding format for data data: encoded data to send timestamp: unix timestamp quality: quality of attribute :param name: the attribute name :param args: the arguments to dispatch on """ # noqa: E501 has_data_arg = len(args) > 0 is_state_or_status = name.lower() in ["state", "status"] # Work around pytango#589. Note this is only required for if not is_state_or_status and not has_data_arg: desc = ( "push_archive_event without a data parameter is only allowed" + " for state and status attributes" ) Except.throw_exception( "PyDs_InvalidCall", desc, "SKABaseDevice.push_archive_event" ) self._submit_tango_operation("push_archive_event", name, *args)
[docs] def add_attribute( self: SKABaseDevice[ComponentManagerT], *args: Any, **kwargs: Any ) -> None: """ Add a device attribute. This is dependent on whether the push_archive_event call has been actioned from a native python thread or a tango omni thread :param args: positional args :param kwargs: keyword args """ self._submit_tango_operation("add_attribute", *args, *kwargs)
[docs] def set_change_event( self: SKABaseDevice[ComponentManagerT], name: str, implemented: bool, detect: bool = True, ) -> None: """ Set an attribute's change event. This is dependent on whether the push_archive_event call has been actioned from a native python thread or a tango omni thread :param name: name of the attribute :param implemented: whether the device pushes change events :param detect: whether the Tango layer should verify the change event property """ self._submit_tango_operation("set_change_event", name, implemented, detect)
__tango_operation = Signal[tuple[str, tuple[Any, ...], dict[str, Any]]]() def _submit_tango_operation( self: SKABaseDevice[ComponentManagerT], command_name: str, *args: Any, **kwargs: Any, ) -> None: # We do not care if threads race checking the __tango_pending_operations # here. It is up to users to synchronise their calls to tango # operations between threads if not is_omni_thread() or self.__tango_pending_operations != 0: warning_msg = ( "Calling Tango operations from a non-omnithread is deprecated. " "Use tango.EnsureOmniThread at the start of threads to make them " "an omnithread. Alterntaively, use a listener with the SignalBus." ) warn(warning_msg, DeprecationWarning, stacklevel=3) # Again, we do not care about race conditions with incrementing the # operations count here as it is up to users to worry about synchronising # tango operations between threads. We are only using a lock here # to ensure that we don't miss an operation when two threads # race to increment the counter. # # Note: We cannot hold the operations_lock while we emit the signal # here as this might lead to a deadlock if the signal thread waits # on the lock and the queue is full. self.__tango_operation = (command_name, args, kwargs) with self.__tango_pending_operations_lock: self.__tango_pending_operations += 1 return getattr(super(), command_name)(*args, **kwargs) @listen_to_signal(__tango_operation) def __handle_tango_operation( self: SKABaseDevice[ComponentManagerT], value: tuple[str, tuple[Any, ...], dict[str, Any]], ) -> None: (command_name, args, kwargs) = value try: with _hold_tango_monitor(self): getattr(super(), command_name)(*args, **kwargs) finally: with self.__tango_pending_operations_lock: self.__tango_pending_operations -= 1
# ---------- # Run server # ---------- def main(*args: str, **kwargs: str) -> int: """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ def _create_component_manager( self: SKABaseDevice[BaseComponentManager], ) -> BaseComponentManager: return BaseComponentManager(self.logger) SKABaseDevice.create_component_manager = _create_component_manager # type: ignore return cast(int, SKABaseDevice.run_server(args=args or None, **kwargs)) if __name__ == "__main__": main()