Source code for ska_tango_base.long_running_commands.mixin

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
This module implements a mixin class for long running commands.

The mixin requires the class using it to also inherit from the generic
:class:`~ska_tango_base.ska_device.SKADevice` class.
"""

from __future__ import annotations

import copy
import functools
import inspect
import itertools
import json
import typing
import warnings
from enum import Enum, IntEnum
from typing import Any, Callable

from ska_control_model import ResultCode, TaskStatus
from tango import Attribute, DebugIt
from tango.server import attribute, command

from ..executor import TaskExecutor
from ..software_bus import (
    Signal,
    SignalBusMixin,
    attribute_from_signal,
    listen_to_signal,
)
from ..type_hints import (
    CommandTrackerProtocol,
    DevVarLongStringArrayType,
    JSONData,
    TaskCallbackType,
    TaskExecutorProtocol,
    TaskFunctionType,
)
from .command_tracker import (
    LRC_FINISHED_MAX_LENGTH,
    UserLRCAttr,
    _CommandTracker,
    _LRCEvent,
)
from .common import _SUPPORTED_LRC_PROTOCOL_VERSIONS

__all__ = ["LRCMixin", "LRCReqType", "_LRCEvent"]

_MINIMUM_STATUS_QUEUE_SIZE = 32


[docs] class LRCReqType(IntEnum): """Used to discriminate between command is allowed callers.""" ENQUEUE_REQ = 1 """Passed to is_<Cmd>_allowed when the task is submitted.""" EXECUTE_REQ = 2 """Passed to is_<Cmd>_allowed when the task is about to be executed."""
[docs] class AbstractLRCMixin(SignalBusMixin): """ Partial mixin class that adds support for long running commands. It must be used with :class:`~ska_tango_base.ska_device.SKADevice` or a subclass of it. It uses a shared bus with a command tracker to manage long running commands and the related user-facing Tango attributes. A :meth:`.task_executor` must be implemented for a long running command and the related attributes to behave correctly. """ MIN_SUPPORTED_LRC_PROTOCOL_VERSION = 1 # TODO: Protocol V1 - remove in future _command_ids_in_queue = Signal[list[str]](stored=True) _commands_in_queue = Signal[list[str]](stored=True) _command_statuses = Signal[list[str]](stored=True) _commands_in_progress = Signal[list[str]](stored=True) _command_progresses = Signal[list[str]](stored=True) _command_result = Signal[tuple[str, str]](stored=True) @classmethod def __init_subclass__(cls: type[AbstractLRCMixin], **kwargs: Any) -> None: """Check that the subclass has a valid minimum LRC protocol version set.""" super().__init_subclass__(**kwargs) min_support_ver = _SUPPORTED_LRC_PROTOCOL_VERSIONS[0] max_support_ver = _SUPPORTED_LRC_PROTOCOL_VERSIONS[1] if hasattr(cls, "MIN_SUPPORTED_LRC_PROTOCOL_VERSION"): min_set_ver = cls.MIN_SUPPORTED_LRC_PROTOCOL_VERSION if not isinstance(cls.MIN_SUPPORTED_LRC_PROTOCOL_VERSION, int): raise TypeError( f"{cls.__name__}.MIN_SUPPORTED_LRC_PROTOCOL_VERSION must be an " f"integer, got {type(min_set_ver).__name__}." ) if not (min_support_ver <= min_set_ver <= max_support_ver): raise ValueError( f"{cls.__name__}.MIN_SUPPORTED_LRC_PROTOCOL_VERSION must be between" f" {min_support_ver} and {max_support_ver}, got {min_set_ver}." ) # ----------- # Init device # -----------
[docs] def on_new_shared_bus(self) -> None: """ Initialise attributes which depend on signals from the bus. :raises AssertionError: If 'self.task_executor.max_queued_tasks' or 'self.task_executor.max_executing_tasks' is not equal to or greater than 0 or 1 respectively. """ super().on_new_shared_bus() self._command_tracker = _CommandTracker( self.MIN_SUPPORTED_LRC_PROTOCOL_VERSION, self._update_commands_in_queue, # TODO: Protocol V1 - remove in future self._update_command_statuses, # TODO: Protocol V1 - remove in future self._update_command_progresses, # TODO: Protocol V1 - remove in future self._update_command_result, # TODO: Protocol V1 - remove in future logger=self.logger, ) # Check and set 'max_queued_tasks' and 'max_executing_tasks' properties if self.task_executor is None: try: max_queued_tasks = self.component_manager.max_queued_tasks max_executing_tasks = self.component_manager.max_executing_tasks except AttributeError: max_queued_tasks = 1 max_executing_tasks = 2 else: max_queued_tasks = self.task_executor.max_queued_tasks max_executing_tasks = self.task_executor.max_executing_tasks assert max_queued_tasks >= 0, ( "'max_queued_tasks' property must be equal to or greater than 0." ) assert max_executing_tasks >= 1, ( "'max_executing_tasks' property must be equal to or greater than 1." ) if max_queued_tasks == 0: warning_msg = ( "Omitting the 'lrcQueue' attribute is currently not supported, even " "when 'max_queued_tasks' is 0. Creating the attribute with max_dim_x=1." ) warnings.warn(warning_msg, UserWarning) if max_executing_tasks == 1: warning_msg = ( "'max_executing_tasks' will be required to be at least 2 in a " "future release of ska-tango-base (found 1). A device must support " "the 'Abort()' command and at least one other command executing " "simultaneously." ) warnings.warn(warning_msg, FutureWarning) if self.logger: self.logger.warning(warning_msg) max_executing_tasks = 2 # Create Tango dynamic attributes which depend # on the _command_tracker.lrc_event signal lrc_queue = attribute_from_signal( "_command_tracker.lrc_queue_signal", name="lrcQueue", doc="A list of info JSON blobs of the commands in queue.", dtype=(str,), max_dim_x=max(max_queued_tasks, 1), to_tango=functools.partial( self._get_json_list_of_lrc_attributes, allowed_keys=["uid", "name", "submitted_time"], ), ) self.add_attribute(lrc_queue) lrc_executing = attribute_from_signal( "_command_tracker.lrc_executing_signal", name="lrcExecuting", doc="A list of info JSON blobs of the currently executing commands.", dtype=(str,), max_dim_x=max_executing_tasks, to_tango=functools.partial( self._get_json_list_of_lrc_attributes, allowed_keys=[ "uid", "name", "submitted_time", "started_time", "progress", ], ), ) self.add_attribute(lrc_executing) self.set_change_event("_lrcEvent", True) # ---------------------- # Protocol V1 attributes # ---------------------- if self.MIN_SUPPORTED_LRC_PROTOCOL_VERSION == 1: self._max_executing_tasks = max_executing_tasks self._status_queue_size = max( max_queued_tasks * 2 + max_executing_tasks, _MINIMUM_STATUS_QUEUE_SIZE, ) self._command_ids_in_queue = [] self._commands_in_queue = [] self._command_statuses = [] self._commands_ids_in_progress: list[str] = [] self._commands_in_progress = [] self._command_progresses = [] self._command_result = ("", "") self._create_deprecated_lrc_attributes()
# ---------- # Properties # ---------- @property def command_tracker(self) -> CommandTrackerProtocol: """ Get the command tracker. :return: The initialised ``CommandTrackerProtocol`` object. """ return self._command_tracker @property def task_executor(self) -> TaskExecutorProtocol: """ Get the task executor. :return: The initialised task executor. """ raise NotImplementedError( "'task_executor' property must be implemented by " f"'{self.__class__.__name__}'. " "The parent 'AbstractLRCMixin' is an abstract base class." ) @staticmethod def _get_json_list_of_lrc_attributes( lrc_attr: UserLRCAttr, allowed_keys: list[str] ) -> list[str]: """ Get a list of JSON formatted strings representing the LRC attribute. Serialises each key-value pair that's in the allowed_keys list of the LRC's data dict to a flat JSON dict. :param lrc_attr: Dict of LRC IDs as keys and their nested CommandData dicts. :param allowed_keys: List of allowed keys to include from the JSON dicts. :return: A list of JSON strings containing a serialised info dict for each LRC. """ if lrc_attr: # Check for empty dict return [ json.dumps( { "uid": command_id, **{ key: val.name if isinstance(val, Enum) else val for key, val in data.items() if key in allowed_keys }, } ) for command_id, data in lrc_attr.items() ] return [] # ---------- # Attributes # ---------- @attribute_from_signal( "_command_tracker.lrc_finished_signal", max_dim_x=LRC_FINISHED_MAX_LENGTH, dtype=("str",), doc="A list of info JSON blobs of the finished long running commands.", ) def lrcFinished(self, value: UserLRCAttr) -> list[str]: """ Read info of the finished long running commands. :return: a list of info JSON blobs of the finished long running commands. """ return self._get_json_list_of_lrc_attributes( value, allowed_keys=[ "uid", "name", "submitted_time", "started_time", "finished_time", "status", "result", "progress", ], ) @attribute(dtype=("str",), max_dim_x=2) def _lrcEvent(self) -> list[str]: """ Read the long running commands events. Always returns an empty list. :return: empty list. """ return [] @attribute(dtype=("int",), max_dim_x=2) def lrcProtocolVersions(self) -> tuple[int, int]: """ Return supported protocol versions. :return: A tuple containing the lower and upper bounds of supported long running command protocol versions. """ return ( self.MIN_SUPPORTED_LRC_PROTOCOL_VERSION, _SUPPORTED_LRC_PROTOCOL_VERSIONS[1], ) @listen_to_signal("_command_tracker.lrc_event") def __update_lrc_event(self, value: _LRCEvent) -> None: """Update the _lrcEvent attribute.""" event = copy.copy(value) if event: command_id = event.pop("command_id") self.push_change_event("_lrcEvent", [command_id, json.dumps(event)]) # TODO: Remove in future # ---------------------- # Protocol V1 attributes # ---------------------- def _create_deprecated_lrc_attributes(self) -> None: """Create attributes for the long running commands.""" if self._status_queue_size > LRC_FINISHED_MAX_LENGTH: self._command_tracker._lrc_finished_max_length = self._status_queue_size self._lrcs_in_queue = attribute_from_signal( "_commands_in_queue", name="longRunningCommandsInQueue", dtype=(str,), max_dim_x=self._status_queue_size, fget=self._read_longRunningCommandsInQueue, ) self.add_attribute(self._lrcs_in_queue) self._lrc_ids_in_queue = attribute_from_signal( "_command_ids_in_queue", name="longRunningCommandIDsInQueue", dtype=(str,), max_dim_x=self._status_queue_size, fget=self._read_longRunningCommandIDsInQueue, ) self.add_attribute(self._lrc_ids_in_queue) self._lrc_status = attribute_from_signal( "_command_statuses", name="longRunningCommandStatus", dtype=(str,), max_dim_x=self._status_queue_size * 2, fget=self._read_longRunningCommandStatus, ) self.add_attribute(self._lrc_status) self._lrc_in_progress = attribute_from_signal( "_commands_in_progress", name="longRunningCommandInProgress", dtype=(str,), max_dim_x=self._max_executing_tasks, fget=self._read_longRunningCommandInProgress, ) self.add_attribute(self._lrc_in_progress) self._lrc_progress = attribute_from_signal( "_command_progresses", name="longRunningCommandProgress", dtype=(str,), max_dim_x=self._max_executing_tasks * 2, fget=self._read_longRunningCommandProgress, ) self.add_attribute(self._lrc_progress) self._lrc_result = attribute_from_signal( "_command_result", name="longRunningCommandResult", dtype=(str,), max_dim_x=2, fget=self._read_longRunningCommandResult, ) self.add_attribute(self._lrc_result) def _prune_completed_commands( self, command_list: list[tuple[str, str]] ) -> list[tuple[str, str]]: """ Prune a command list of any lingering completed tasks. :param command_list: list of (cmd_id, <info>) tuples of commands to prune :return: the pruned list with the oldest completed commands removed """ if len(command_list) <= self._status_queue_size: # Nothing to do return command_list # Determine which commands have completed by looking at the # current command statuses completed_commands = [ uid for (uid, status) in self._command_tracker.command_statuses if status.name in [ TaskStatus.ABORTED.name, TaskStatus.COMPLETED.name, TaskStatus.REJECTED.name, TaskStatus.FAILED.name, ] ] # Create a pruned list by removing the oldest completed commands number_to_remove = len(command_list) - self._status_queue_size pruned_list = command_list prune_candidates = [ (uid, info) for (uid, info) in sorted( command_list, key=lambda item: item[0].split(sep="_")[0] ) if uid in completed_commands ] for item in prune_candidates[0:number_to_remove]: # This gets called many times so we # keep track of which ones we have already logged if self._command_tracker._evict_command(item[0]): self.logger.warning(f"Status queue too big: removing item {item[0]}") pruned_list.remove(item) return pruned_list def _update_commands_in_queue( self, commands_in_queue: list[tuple[str, str]] ) -> None: if commands_in_queue: command_ids, command_names = zip( *self._prune_completed_commands(commands_in_queue) ) self._commands_in_queue = [ str(command_name) for command_name in command_names ] self._command_ids_in_queue = [str(command_id) for command_id in command_ids] else: self._commands_in_queue = [] self._command_ids_in_queue = [] def _update_command_statuses( self, command_statuses: list[tuple[str, TaskStatus]], ) -> None: statuses = [(uid, status.name) for (uid, status) in command_statuses] self._command_statuses = [ str(item) for item in itertools.chain.from_iterable( self._prune_completed_commands(statuses) ) ] # Check for commands starting and ending execution for command_id, status in command_statuses: if ( status == TaskStatus.IN_PROGRESS and command_id not in self._commands_ids_in_progress ): self._update_commands_in_progress(command_id, True) elif ( status in [ TaskStatus.ABORTED, TaskStatus.COMPLETED, TaskStatus.FAILED, ] and command_id in self._commands_ids_in_progress ): self._update_commands_in_progress(command_id, False) def _update_commands_in_progress(self, command_id: str, in_progress: bool) -> None: # Pass a reference to a new object for the push events, as this callback can be # called multiple times before the event is pushed in the tango omni thread. commands_ids_in_progress = self._commands_ids_in_progress.copy() if in_progress: commands_ids_in_progress.append(command_id) elif command_id in commands_ids_in_progress: commands_ids_in_progress.remove(command_id) self._commands_ids_in_progress = commands_ids_in_progress self._commands_in_progress = [ uid.split("_")[-1] for uid in commands_ids_in_progress ] def _update_command_progresses( self, command_progresses: list[tuple[str, int | str]], ) -> None: self._command_progresses = [ str(item) for item in itertools.chain.from_iterable(command_progresses) ] def _update_command_result( self, command_id: str, command_result: JSONData, ) -> None: self._command_result = (command_id, json.dumps(command_result)) def _read_longRunningCommandsInQueue(self, attr: Attribute) -> None: """ Read the long running commands in the queue. Keep track of which commands are that are currently known about. Entries are removed ``self._command_tracker._removal_time`` seconds after they have finished. **DEPRECATED**: A client can check for the queued command(s) in 'lrcQueue'. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandsInQueue' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the queued command(s) in the 'lrcQueue' " "attribute if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrcs_in_queue.do_read(self)) def _read_longRunningCommandIDsInQueue(self, attr: Attribute) -> None: """ Read the IDs of the long running commands in the queue. Every client that executes a command will receive a command ID as response. Keep track of IDs currently allocated. Entries are removed ``self._command_tracker._removal_time`` seconds after they have finished. **DEPRECATED**: A client can check for the queued command(s) in 'lrcQueue'. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandIDsInQueue' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the queued command(s) in the 'lrcQueue' " "attribute if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrc_ids_in_queue.do_read(self)) def _read_longRunningCommandStatus(self, attr: Attribute) -> None: """ Read the status of the currently executing long running commands. ID, status pairs of the currently executing commands. Clients can subscribe to on change event and wait for the ID they are interested in. **DEPRECATED**: A client can check for the status of a command in the 'lrcQueue', 'lrcExecuting' and 'lrcFinished' attributes. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandStatus' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the status of a command in the 'lrcQueue', " "'lrcExecuting' and 'lrcFinished' attributes if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrc_status.do_read(self)) def _read_longRunningCommandInProgress(self, attr: Attribute) -> None: """ Read the name(s) of the currently executing long running command(s). Name(s) of command and possible abort in progress or empty string(s). **DEPRECATED**: A client can check for the command(s) in progress in 'lrcExecuting'. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandInProgress' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the command(s) in progress in the " "'lrcExecuting' attribute if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrc_in_progress.do_read(self)) def _read_longRunningCommandProgress(self, attr: Attribute) -> None: """ Read the progress of the currently executing long running command(s). ID, progress of the currently executing command(s). Clients can subscribe to on change event and wait for the ID they are interested in. **DEPRECATED**: A client can check the progress of command(s) in 'lrcExecuting'. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandProgress' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the progress of a command in the " "'lrcExecuting' attribute if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrc_progress.do_read(self)) def _read_longRunningCommandResult(self, attr: Attribute) -> None: """ Read the result of the completed long running command. Reports unique ID and json-encoded result. Clients can subscribe to on change event and wait for the ID they are interested in. **DEPRECATED**: A client can check for the result of a command in 'lrcFinished'. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandResult' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the result of a command in the 'lrcFinished' " "attribute if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrc_result.do_read(self)) # -------- # Commands # -------- def __is_Abort_allowed(self) -> bool: return self.is_Abort_allowed()
[docs] def is_Abort_allowed(self) -> bool: """ Return whether the :py:meth:`!Abort()` command may be called currently. This method can be overridden by subclasses to change when this command is allowed. :return: whether the command may be called in the current device state """ return True
@command( dtype_out="DevVarLongStringArray", doc_in="Abort any executing long running command(s) and empty the queue.", doc_out="[ResultCode.STARTED][Command ID]", fisallowed="_AbstractLRCMixin__is_Abort_allowed", ) @DebugIt() def Abort(self) -> DevVarLongStringArrayType: """ Abort any executing long running command(s) and empty the queue. Abort is itself a long running command that is executed immediately. Subclasses should override :py:meth:`execute_Abort()` to change the behaviour of this command. :return: A tuple containing ResultCode.STARTED and the unique ID of the command """ return self.execute_Abort()
[docs] def allocate_lrc( self, name: str, *, started_callback: Callable[..., None] | str | None = None, completed_callback: Callable[..., None] | str | None = None, ) -> tuple[str, TaskCallbackType]: """ Allocate a new `command_id` for a long running command. The status and progress of the long running command should be updated using the returned `task_callback` object. :param name: The name of the command :param started_callback: Callback passed to the command tracker that is called when the command transitions to ``IN_PROGRESS`` status. If a `str`, this will be used to look up a callable on the self. If `None` the default name of "started_<Cmd>" will be used. :param completed_callback: Callback passed to the command tracker that is called when the command transitions to ``COMPLETED`` status. If a `str`, this will be used to look up a callable on the self. If `None` the default name of "completed_<Cmd>" will be used. :returns: (command_id, task_callback) """ # Check for command started and completed callbacks started_cb: Callable[..., None] | str | None = ( started_callback if started_callback is not None else f"started_{name}" ) if isinstance(started_cb, str): started_cb = getattr(self, started_cb, None) completed_cb: Callable[..., None] | str | None = ( completed_callback if completed_callback is not None else f"completed_{name}" ) if isinstance(completed_cb, str): completed_cb = getattr(self, completed_cb, None) command_id = self.command_tracker.new_command(name, completed_cb, started_cb) task_callback = functools.partial( self.command_tracker.update_command_info, command_id ) return command_id, task_callback
[docs] @staticmethod def convert_submission_result_to_lrc_return( command_id: str, status: TaskStatus, message: str ) -> DevVarLongStringArrayType: """ Convert a submit result to a DevVarLongStringArray. Example:: @command def MyCommand(self) -> DevVarLongStringArrayType: command_id, task_callback = self.allocate_lrc("MyCommand") status, message = self.submit_lrc_task( "MyCommand", self.do_my_command, task_callback ) return self.submit_result_to_lrc_return( command_id, status, message ) :param command_id: The command id of the LRC :param status: The task status returned by submit :param message: The message returned by submit :returns: [converted ResultCode], [command_id or message] """ match status: case TaskStatus.QUEUED: return [ResultCode.QUEUED], [command_id] case TaskStatus.REJECTED: return [ResultCode.REJECTED], [message] case TaskStatus.IN_PROGRESS: return [ResultCode.STARTED], [message] case TaskStatus.COMPLETED: return [ResultCode.OK], [message] case _: return [ResultCode.FAILED], [f"Unknown TaskStatus {status.name}"]
[docs] def submit_lrc_task( self, name: str, task: TaskFunctionType, *, args: list[Any] | None = None, kwargs: dict[str, Any] | None = None, task_callback: TaskCallbackType | None = None, fisallowed: Callable[[LRCReqType], bool] | str | None = None, ) -> tuple[TaskStatus, str]: """ Submit a long running command task to the task_executor. The f"is_{name}_allowed" method is looked up and, if found, will be called with :py:class:`LRCReqType.EXECUTE_REQ <ska_tango_base.long_running_commands.mixin.LRCReqType>` immediately before the task is executed. :param task: task to be executed :param args: positional arguments to be passed to the task :param kwargs: keyword arguments to be passed to the task :param fisallowed: override the is_cmd_allowed method used :returns: TaskStatus, message """ # Check for is_<cmd>_allowed method allowed_func: Callable[..., bool] | str | None = ( fisallowed if fisallowed is not None else f"is_{name}_allowed" ) if isinstance(allowed_func, str): allowed_func = getattr(self, allowed_func, None) if isinstance(fisallowed, str) and allowed_func is None: raise AttributeError(f'"{fisallowed}" not found.') if ( not isinstance(fisallowed, str | None) and allowed_func is not None and not inspect.ismethod(allowed_func) ): allowed_func = functools.partial(allowed_func, self) is_cmd_allowed: Callable[[], bool] | None if allowed_func is not None: def is_cmd_allowed() -> bool: """ Check command is allowed. Invoked when task is popped from LRC queue. """ return allowed_func(request_type=LRCReqType.EXECUTE_REQ) else: is_cmd_allowed = None return self.task_executor.submit( task, args, kwargs, is_cmd_allowed=is_cmd_allowed, task_callback=task_callback, )
[docs] def execute_Abort(self) -> DevVarLongStringArrayType: """ Execute the Abort command. Subclasses should override this to change the behaviour of the Abort command. :return: A tuple containing TaskStatus.IN_PROGRESS and a message """ command_id, task_callback = self.allocate_lrc("Abort") status, _ = self.schedule_abort_task(task_callback) assert status == TaskStatus.IN_PROGRESS return ([ResultCode.STARTED], [command_id])
[docs] def schedule_abort_task( self, task_callback: TaskCallbackType ) -> tuple[TaskStatus, str]: """ Schedule an Abort task to begin executing immediately. Subclasses should override this to change the behaviour of the Abort command. :param task_callback: Notified of progress of the abort command. :return: A tuple containing TaskStatus.IN_PROGRESS and a message """ return self.task_executor.abort(task_callback)
@command( dtype_in=str, doc_in="Check the status of a long running command by ID.", dtype_out=str, doc_out="Status of the asynchronous task.", ) @DebugIt() def CheckLongRunningCommandStatus(self, argin: str) -> str: """ Check the status of a long running command by ID. :param argin: the command id :return: command status """ return self.execute_CheckLongRunningCommandStatus(argin)
[docs] def execute_CheckLongRunningCommandStatus(self, argin: str) -> str: """ Execute the CheckLongRunningCommandStatus command. Subclasses should override this to change the behaviour of the CheckLongRunningCommandStatus command. :param argin: the command id :return: command status """ enum_status = self.command_tracker.get_command_status(argin) return TaskStatus(enum_status).name
[docs] class LRCMixin(AbstractLRCMixin): """ Mixin class that adds support for long running commands with a task executor. It must be used with :class:`~ska_tango_base.ska_device.SKADevice` or a subclass of it. It uses a shared bus with a command tracker to manage long running commands and the related user-facing Tango attributes. """ MIN_SUPPORTED_LRC_PROTOCOL_VERSION = 1 # ---------------- # Init and cleanup # ----------------
[docs] def on_new_shared_bus(self) -> None: """Initialise attributes which depend on signals from the bus.""" self._task_executor = self.create_task_executor() super().on_new_shared_bus()
[docs] def create_task_executor(self) -> TaskExecutorProtocol: """ Create and return a task executor instance. This factory method can be overridden in a device using the :py:class:`.LRCMixin` class to use its own task executor. :return: Task executor instance. """ return TaskExecutor(unhandled_exception_callback=self._on_unhandled_exception)
def _on_unhandled_exception(self, exception: Exception) -> None: """ Do something when a task raises an unhandled exception. :param exception: the unhandled exception that was caught. """
[docs] def delete_device(self) -> None: """Delete the device.""" with self.allow_internal_threads(): self.task_executor.shutdown() super().delete_device()
[docs] def schedule_abort_task( self, task_callback: TaskCallbackType ) -> tuple[TaskStatus, str]: """ Schedule an Abort task to begin executing immediately. Subclasses should override this to change the behaviour of the Abort command. :param task_callback: Notified of progress of the abort command. :return: A tuple containing TaskStatus.IN_PROGRESS and a message """ return typing.cast( # type: ignore[redundant-cast] tuple[TaskStatus, str], AbstractLRCMixin.schedule_abort_task(self, task_callback), )
# ---------- # Properties # ---------- @property def task_executor(self) -> TaskExecutorProtocol: """ Get the task executor. :return: The initialised task executor. """ return self._task_executor