Source code for ska_tango_base.long_running_commands.mixin

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
This module implements a mixin class for long running commands.

The mixin requires the class using it to also inherit from the generic
:class:`~ska_tango_base.ska_device.SKADevice` class.
"""

from __future__ import annotations

import copy
import functools
import inspect
import itertools
import json
import typing
import warnings
from enum import Enum, IntEnum
from typing import Any, Callable

from ska_control_model import ResultCode, TaskStatus
from tango import Attribute, DebugIt
from tango.server import attribute, command

from ..executor import TaskExecutor
from ..software_bus import (
    Signal,
    SignalBusMixin,
    attribute_from_signal,
    listen_to_signal,
)
from ..type_hints import (
    CommandTrackerProtocol,
    DevVarLongStringArrayType,
    JSONData,
    TaskCallbackType,
    TaskExecutorProtocol,
    TaskFunctionType,
)
from .command_tracker import (
    LRC_FINISHED_MAX_LENGTH,
    UserLRCAttr,
    _CommandTracker,
    _LRCEvent,
)
from .common import _SUPPORTED_LRC_PROTOCOL_VERSIONS

__all__ = ["LRCMixin", "LRCReqType", "_LRCEvent"]

_MINIMUM_STATUS_QUEUE_SIZE = 32


[docs] class LRCReqType(IntEnum): """Used to discriminate between command is allowed callers.""" ENQUEUE_REQ = 1 """Passed to is_<Cmd>_allowed when the task is submitted.""" EXECUTE_REQ = 2 """Passed to is_<Cmd>_allowed when the task is about to be executed."""
[docs] class AbstractLRCMixin(SignalBusMixin): """ Partial mixin class that adds support for long running commands. It must be used with :class:`~ska_tango_base.ska_device.SKADevice` or a subclass of it. It uses a shared bus with a command tracker to manage long running commands and the related user-facing Tango attributes. A :meth:`.task_executor` must be implemented for a long running command and the related attributes to behave correctly. """ MIN_SUPPORTED_LRC_PROTOCOL_VERSION = 1 # TODO: Protocol V1 - remove in future _command_ids_in_queue = Signal[list[str]](stored=True) _commands_in_queue = Signal[list[str]](stored=True) _command_statuses = Signal[list[str]](stored=True) _commands_in_progress = Signal[list[str]](stored=True) _command_progresses = Signal[list[str]](stored=True) _command_result = Signal[tuple[str, str]](stored=True) @classmethod def __init_subclass__(cls: type[AbstractLRCMixin], **kwargs: Any) -> None: """Check that the subclass has a valid minimum LRC protocol version set.""" super().__init_subclass__(**kwargs) min_support_ver = _SUPPORTED_LRC_PROTOCOL_VERSIONS[0] max_support_ver = _SUPPORTED_LRC_PROTOCOL_VERSIONS[1] if hasattr(cls, "MIN_SUPPORTED_LRC_PROTOCOL_VERSION"): min_set_ver = cls.MIN_SUPPORTED_LRC_PROTOCOL_VERSION if not isinstance(cls.MIN_SUPPORTED_LRC_PROTOCOL_VERSION, int): raise TypeError( f"{cls.__name__}.MIN_SUPPORTED_LRC_PROTOCOL_VERSION must be an " f"integer, got {type(min_set_ver).__name__}." ) if not (min_support_ver <= min_set_ver <= max_support_ver): raise ValueError( f"{cls.__name__}.MIN_SUPPORTED_LRC_PROTOCOL_VERSION must be between" f" {min_support_ver} and {max_support_ver}, got {min_set_ver}." ) # ----------- # Init device # -----------
[docs] def on_new_shared_bus(self) -> None: """Initialise attributes which depend on signals from the bus.""" super().on_new_shared_bus() self._command_tracker = _CommandTracker( self.MIN_SUPPORTED_LRC_PROTOCOL_VERSION, self._update_commands_in_queue, # TODO: Protocol V1 - remove in future self._update_command_statuses, # TODO: Protocol V1 - remove in future self._update_command_progresses, # TODO: Protocol V1 - remove in future self._update_command_result, # TODO: Protocol V1 - remove in future ) # Check and set 'max_queued_tasks' and 'max_executing_tasks' properties if self.task_executor is not None: assert self.task_executor.max_queued_tasks >= 0, ( "max_queued_tasks property must be equal to or greater than 0." ) assert self.task_executor.max_executing_tasks >= 1, ( "max_executing_tasks property must be equal to or greater than 1." ) max_queued_tasks = self.task_executor.max_queued_tasks max_executing_tasks = self.task_executor.max_executing_tasks if max_executing_tasks == 1: warning_msg = ( "'max_executing_tasks' will be required to be at least 2 in a " "future release of ska-tango-base (found 1). A device must support " "the 'Abort()' command and at least one other command executing " "simulanteously." ) warnings.warn(warning_msg, FutureWarning) if self.logger: self.logger.warning(warning_msg) max_executing_tasks = 2 else: max_queued_tasks = 1 max_executing_tasks = 1 # Create Tango dynamic attributes which depend # on the _command_tracker.lrc_event signal lrc_queue = attribute_from_signal( "_command_tracker.lrc_queue_signal", name="lrcQueue", dtype=(str,), max_dim_x=max_queued_tasks, to_tango=functools.partial( self._get_json_list_of_lrc_attributes, allowed_keys=["uid", "name", "submitted_time"], ), ) self.add_attribute(lrc_queue) lrc_executing = attribute_from_signal( "_command_tracker.lrc_executing_signal", name="lrcExecuting", dtype=(str,), max_dim_x=max_executing_tasks, to_tango=functools.partial( self._get_json_list_of_lrc_attributes, allowed_keys=[ "uid", "name", "submitted_time", "started_time", "progress", ], ), ) self.add_attribute(lrc_executing) self.set_change_event("_lrcEvent", True) # ---------------------- # Protocol V1 attributes # ---------------------- if self.MIN_SUPPORTED_LRC_PROTOCOL_VERSION == 1: self._max_queued_tasks = max_queued_tasks self._max_executing_tasks = max_executing_tasks self._status_queue_size = max( max_queued_tasks * 2 + max_executing_tasks, _MINIMUM_STATUS_QUEUE_SIZE, ) self._command_ids_in_queue = [] self._commands_in_queue = [] self._command_statuses = [] self._commands_ids_in_progress: list[str] = [] self._commands_in_progress = [] self._command_progresses = [] self._command_result = ("", "") self._create_deprecated_lrc_attributes()
# ---------- # Properties # ---------- @property def command_tracker(self) -> CommandTrackerProtocol: """ Get the command tracker. :return: The initialised ``CommandTrackerProtocol`` object. """ return self._command_tracker @property def task_executor(self) -> TaskExecutorProtocol: """ Get the task executor. :return: The initialised task executor. """ raise NotImplementedError( "'task_executor' property must be implemented by " f"'{self.__class__.__name__}'. " "The parent 'AbstractLRCMixin' is an abstract base class." ) @staticmethod def _get_json_list_of_lrc_attributes( lrc_attr: UserLRCAttr, allowed_keys: list[str] ) -> list[str]: """ Get a list of JSON formatted strings representing the LRC attribute. Serialises each key-value pair that's in the allowed_keys list of the LRC's data dict to a flat JSON dict. :param lrc_attr: Dict of LRC IDs as keys and their nested CommandData dicts. :param allowed_keys: List of allowed keys to include from the JSON dicts. :return: A list of JSON strings containing a serialised info dict for each LRC. """ if lrc_attr: # Check for empty dict return [ json.dumps( { "uid": command_id, **{ key: val.name if isinstance(val, Enum) else val for key, val in data.items() if key in allowed_keys }, } ) for command_id, data in lrc_attr.items() ] return [] # ---------- # Attributes # ---------- @attribute_from_signal( "_command_tracker.lrc_finished_signal", max_dim_x=LRC_FINISHED_MAX_LENGTH, dtype=("str",), ) def lrcFinished(self, value: UserLRCAttr) -> list[str]: """ Read info of the finished long running commands. :return: a list of info JSON blobs of the finished long running commands. """ return self._get_json_list_of_lrc_attributes( value, allowed_keys=[ "uid", "name", "submitted_time", "started_time", "finished_time", "status", "result", "progress", ], ) @attribute(dtype=("str",), max_dim_x=2) def _lrcEvent(self) -> list[str]: """ Read the long running commands events. Always returns an empty list. :return: empty list. """ return [] @attribute(dtype=("int",), max_dim_x=2) def lrcProtocolVersions(self) -> tuple[int, int]: """ Return supported protocol versions. :return: A tuple containing the lower and upper bounds of supported long running command protocol versions. """ return ( self.MIN_SUPPORTED_LRC_PROTOCOL_VERSION, _SUPPORTED_LRC_PROTOCOL_VERSIONS[1], ) @listen_to_signal("_command_tracker.lrc_event") def __update_lrc_event(self, value: _LRCEvent) -> None: """Update the _lrcEvent attribute.""" event = copy.copy(value) if event: command_id = event.pop("command_id") self.push_change_event("_lrcEvent", [command_id, json.dumps(event)]) # TODO: Remove in future # ---------------------- # Protocol V1 attributes # ---------------------- def _create_deprecated_lrc_attributes(self) -> None: """ Create attributes for the long running commands. :raises AssertionError: if max_queued_tasks or max_executing_tasks is not equal to or greater than 0 or 1 respectively. """ if self._status_queue_size > LRC_FINISHED_MAX_LENGTH: self._command_tracker._lrc_finished_max_length = self._status_queue_size self._lrcs_in_queue = attribute_from_signal( "_commands_in_queue", name="longRunningCommandsInQueue", dtype=(str,), max_dim_x=self._status_queue_size, fget=self._read_longRunningCommandsInQueue, ) self.add_attribute(self._lrcs_in_queue) self._lrc_ids_in_queue = attribute_from_signal( "_command_ids_in_queue", name="longRunningCommandIDsInQueue", dtype=(str,), max_dim_x=self._status_queue_size, fget=self._read_longRunningCommandIDsInQueue, ) self.add_attribute(self._lrc_ids_in_queue) self._lrc_status = attribute_from_signal( "_command_statuses", name="longRunningCommandStatus", dtype=(str,), max_dim_x=self._status_queue_size * 2, fget=self._read_longRunningCommandStatus, ) self.add_attribute(self._lrc_status) self._lrc_in_progress = attribute_from_signal( "_commands_in_progress", name="longRunningCommandInProgress", dtype=(str,), max_dim_x=self._max_executing_tasks, fget=self._read_longRunningCommandInProgress, ) self.add_attribute(self._lrc_in_progress) self._lrc_progress = attribute_from_signal( "_command_progresses", name="longRunningCommandProgress", dtype=(str,), max_dim_x=self._max_executing_tasks * 2, fget=self._read_longRunningCommandProgress, ) self.add_attribute(self._lrc_progress) self._lrc_result = attribute_from_signal( "_command_result", name="longRunningCommandResult", dtype=(str,), max_dim_x=2, fget=self._read_longRunningCommandResult, ) self.add_attribute(self._lrc_result) def _prune_completed_commands( self, command_list: list[tuple[str, str]] ) -> list[tuple[str, str]]: """ Prune a command list of any lingering completed tasks. :param command_list: list of (cmd_id, <info>) tuples of commands to prune :return: the pruned list with the oldest completed commands removed """ if len(command_list) <= self._status_queue_size: # Nothing to do return command_list # Determine which commands have completed by looking at the # current command statuses completed_commands = [ uid for (uid, status) in self._command_tracker.command_statuses if status.name in [ TaskStatus.ABORTED.name, TaskStatus.COMPLETED.name, TaskStatus.REJECTED.name, TaskStatus.FAILED.name, ] ] # Create a pruned list by removing the oldest completed commands number_to_remove = len(command_list) - self._status_queue_size pruned_list = command_list prune_candidates = [ (uid, info) for (uid, info) in sorted( command_list, key=lambda item: item[0].split(sep="_")[0] ) if uid in completed_commands ] for item in prune_candidates[0:number_to_remove]: # This gets called many times so we # keep track of which ones we have already logged if self._command_tracker._evict_command(item[0]): self.logger.warning(f"Status queue too big: removing item {item[0]}") pruned_list.remove(item) return pruned_list def _update_commands_in_queue( self, commands_in_queue: list[tuple[str, str]] ) -> None: if commands_in_queue: command_ids, command_names = zip( *self._prune_completed_commands(commands_in_queue) ) self._commands_in_queue = [ str(command_name) for command_name in command_names ] self._command_ids_in_queue = [str(command_id) for command_id in command_ids] else: self._commands_in_queue = [] self._command_ids_in_queue = [] def _update_command_statuses( self, command_statuses: list[tuple[str, TaskStatus]], ) -> None: statuses = [(uid, status.name) for (uid, status) in command_statuses] self._command_statuses = [ str(item) for item in itertools.chain.from_iterable( self._prune_completed_commands(statuses) ) ] # Check for commands starting and ending execution for command_id, status in command_statuses: if ( status == TaskStatus.IN_PROGRESS and command_id not in self._commands_ids_in_progress ): self._update_commands_in_progress(command_id, True) elif ( status in [ TaskStatus.ABORTED, TaskStatus.COMPLETED, TaskStatus.FAILED, ] and command_id in self._commands_ids_in_progress ): self._update_commands_in_progress(command_id, False) def _update_commands_in_progress(self, command_id: str, in_progress: bool) -> None: # Pass a reference to a new object for the push events, as this callback can be # called multiple times before the event is pushed in the tango omni thread. commands_ids_in_progress = self._commands_ids_in_progress.copy() if in_progress: commands_ids_in_progress.append(command_id) elif command_id in commands_ids_in_progress: commands_ids_in_progress.remove(command_id) self._commands_ids_in_progress = commands_ids_in_progress self._commands_in_progress = [ uid.split("_")[-1] for uid in commands_ids_in_progress ] def _update_command_progresses( self, command_progresses: list[tuple[str, int | str]], ) -> None: self._command_progresses = [ str(item) for item in itertools.chain.from_iterable(command_progresses) ] def _update_command_result( self, command_id: str, command_result: JSONData, ) -> None: self._command_result = (command_id, json.dumps(command_result)) def _read_longRunningCommandsInQueue(self, attr: Attribute) -> None: """ Read the long running commands in the queue. Keep track of which commands are that are currently known about. Entries are removed `self._command_tracker._removal_time` seconds after they have finished. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandsInQueue' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the queued command(s) in the 'lrcQueue' " "attribute if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrcs_in_queue.do_read(self)) def _read_longRunningCommandIDsInQueue(self, attr: Attribute) -> None: """ Read the IDs of the long running commands in the queue. Every client that executes a command will receive a command ID as response. Keep track of IDs currently allocated. Entries are removed `self._command_tracker._removal_time` seconds after they have finished. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandIDsInQueue' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the queued command(s) in the 'lrcQueue' " "attribute if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrc_ids_in_queue.do_read(self)) def _read_longRunningCommandStatus(self, attr: Attribute) -> None: """ Read the status of the currently executing long running commands. ID, status pairs of the currently executing commands. Clients can subscribe to on_change event and wait for the ID they are interested in. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandStatus' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the status of a command in the 'lrcQueue', " "'lrcExecuting' and 'lrcFinished' attributes if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrc_status.do_read(self)) def _read_longRunningCommandInProgress(self, attr: Attribute) -> None: """ Read the name(s) of the currently executing long running command(s). Name(s) of command and possible abort in progress or empty string(s). :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandInProgress' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the command(s) in progress in the " "'lrcExecuting' attribute if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrc_in_progress.do_read(self)) def _read_longRunningCommandProgress(self, attr: Attribute) -> None: """ Read the progress of the currently executing long running command(s). ID, progress of the currently executing command(s). Clients can subscribe to on_change event and wait for the ID they are interested in. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandProgress' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the progress of a command in the " "'lrcExecuting' attribute if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrc_progress.do_read(self)) def _read_longRunningCommandResult(self, attr: Attribute) -> None: """ Read the result of the completed long running command. Reports unique_id, json-encoded result. Clients can subscribe to on_change event and wait for the ID they are interested in. :param attr: Tango attribute being read """ warning_msg = ( "'longRunningCommandResult' is deprecated since ska-tango-base " "1.2.0 and will be removed in a future major release. The client should " "use the 'ska_tango_base.long_running_commands.invoke_lrc()' method " "instead, which will automatically subscribe to the correct attribute. " "The client can check for the result of a command in the 'lrcFinished' " "attribute if needed." ) warnings.warn(warning_msg, DeprecationWarning) if self.logger: self.logger.warning(warning_msg) attr.set_value_date_quality(*self._lrc_result.do_read(self)) # -------- # Commands # -------- @command( dtype_out="DevVarLongStringArray", doc_out="[ResultCode.STARTED][command_id]" ) @DebugIt() def Abort(self) -> DevVarLongStringArrayType: """ Abort any executing long running command(s) and empty the queue. Abort is itself a long running command that is executed immediately. Subclasses should override :py:meth:`execute_Abort()` to change the behaviour of this command. :return: A tuple containing ResultCode.STARTED and the unique ID of the command """ return self.execute_Abort()
[docs] def allocate_lrc( self, name: str, *, started_callback: Callable[..., None] | str | None = None, completed_callback: Callable[..., None] | str | None = None, ) -> tuple[str, TaskCallbackType]: """ Allocate a new `command_id` for a long running command. The status and progress of the long running command should be updated using the returned `task_callback` object. :param name: The name of the command :param started_callback: Callback passed to the command tracker that is called when the command transitions to ``IN_PROGRESS`` status. If a `str`, this will be used to look up a callable on the self. If `None` the default name of "started_<Cmd>" will be used. :param completed_callback: Callback passed to the command tracker that is called when the command transitions to ``COMPLETED`` status. If a `str`, this will be used to look up a callable on the self. If `None` the default name of "completed_<Cmd>" will be used. :returns: (command_id, task_callback) """ # Check for command started and completed callbacks started_cb: Callable[..., None] | str | None = ( started_callback if started_callback is not None else f"started_{name}" ) if isinstance(started_cb, str): started_cb = getattr(self, started_cb, None) completed_cb: Callable[..., None] | str | None = ( completed_callback if completed_callback is not None else f"completed_{name}" ) if isinstance(completed_cb, str): completed_cb = getattr(self, completed_cb, None) command_id = self.command_tracker.new_command(name, completed_cb, started_cb) task_callback = functools.partial( self.command_tracker.update_command_info, command_id ) return command_id, task_callback
[docs] @staticmethod def convert_submission_result_to_lrc_return( command_id: str, status: TaskStatus, message: str ) -> DevVarLongStringArrayType: """ Convert a submit result to a DevVarLongStringArray. Example:: @command def MyCommand(self) -> DevVarLongStringArrayType: command_id, task_callback = self.allocate_lrc("MyCommand") status, message = self.submit_lrc_task( "MyCommand", self.do_my_command, task_callback ) return self.submit_result_to_lrc_return( command_id, status, message ) :param command_id: The command id of the LRC :param status: The task status returned by submit :param message: The message returned by submit :returns: [converted ResultCode], [command_id or message] """ match status: case TaskStatus.QUEUED: return [ResultCode.QUEUED], [command_id] case TaskStatus.REJECTED: return [ResultCode.REJECTED], [message] case TaskStatus.IN_PROGRESS: return [ResultCode.STARTED], [message] case TaskStatus.COMPLETED: return [ResultCode.OK], [message] case _: return [ResultCode.FAILED], [f"Unknown TaskStatus {status.name}"]
[docs] def submit_lrc_task( self, name: str, task: TaskFunctionType, *, args: list[Any] | None = None, kwargs: dict[str, Any] | None = None, task_callback: TaskCallbackType | None = None, fisallowed: Callable[[LRCReqType], bool] | str | None = None, ) -> tuple[TaskStatus, str]: """ Submit a long running command task to the task_executor. The f"is_{name}_allowed" method is looked up and, if found, will be called with :py:class:`LRCReqType.EXECUTE_REQ <ska_tango_base.long_running_commands.mixin.LRCReqType>` immediately before the task is executed. :param task: task to be executed :param args: positional arguments to be passed to the task :param kwargs: keyword arguments to be passed to the task :param fisallowed: override the is_cmd_allowed method used :returns: TaskStatus, message """ # Check for is_<cmd>_allowed method allowed_func: Callable[..., bool] | str | None = ( fisallowed if fisallowed is not None else f"is_{name}_allowed" ) if isinstance(allowed_func, str): allowed_func = getattr(self, allowed_func, None) if isinstance(fisallowed, str) and allowed_func is None: raise AttributeError(f'"{fisallowed}" not found.') if ( not isinstance(fisallowed, str | None) and allowed_func is not None and not inspect.ismethod(allowed_func) ): allowed_func = functools.partial(allowed_func, self) is_cmd_allowed: Callable[[], bool] | None if allowed_func is not None: def is_cmd_allowed() -> bool: """ Check command is allowed. Invoked when task is popped from LRC queue. """ return allowed_func(request_type=LRCReqType.EXECUTE_REQ) else: is_cmd_allowed = None return self.task_executor.submit( task, args, kwargs, is_cmd_allowed=is_cmd_allowed, task_callback=task_callback, )
[docs] def execute_Abort(self) -> DevVarLongStringArrayType: """ Execute the Abort command. Subclasses should override this to change the behaviour of the Abort command. :return: A tuple containing TaskStatus.IN_PROGRESS and a message """ command_id, task_callback = self.allocate_lrc("Abort") status, _ = self.schedule_abort_task(task_callback) assert status == TaskStatus.IN_PROGRESS return ([ResultCode.STARTED], [command_id])
[docs] def schedule_abort_task( self, task_callback: TaskCallbackType ) -> tuple[TaskStatus, str]: """ Schedule an Abort task to begin executing immediately. Subclasses should override this to change the behaviour of the Abort command. :param task_callback: Notified of progress of the abort command. :return: A tuple containing TaskStatus.IN_PROGRESS and a message """ return self.task_executor.abort(task_callback)
@command(dtype_in=str, doc_in="command id", dtype_out=str, doc_out="TaskStatus") @DebugIt() def CheckLongRunningCommandStatus(self, argin: str) -> str: """ Check the status of a long running command by ID. :param argin: the command id :return: command status """ return self.execute_CheckLongRunningCommandStatus(argin)
[docs] def execute_CheckLongRunningCommandStatus(self, argin: str) -> str: """ Execute the CheckLongRunningCommandStatus command. Subclasses should override this to change the behaviour of the CheckLongRunningCommandStatus command. :param argin: the command id :return: command status """ enum_status = self.command_tracker.get_command_status(argin) return TaskStatus(enum_status).name
[docs] class LRCMixin(AbstractLRCMixin): """ Mixin class that adds support for long running commands with a task executor. It must be used with :class:`~ska_tango_base.ska_device.SKADevice` or a subclass of it. It uses a shared bus with a command tracker to manage long running commands and the related user-facing Tango attributes. """ MIN_SUPPORTED_LRC_PROTOCOL_VERSION = 2 # ---------------- # Init and cleanup # ----------------
[docs] def on_new_shared_bus(self) -> None: """Initialise attributes which depend on signals from the bus.""" self._task_executor = self.create_task_executor() super().on_new_shared_bus()
[docs] def create_task_executor(self) -> TaskExecutorProtocol: """ Create and return a task executor instance. This factory method can be overridden in a device using the :py:class:`.LRCMixin` class to use its own task executor. :return: Task executor instance. """ return TaskExecutor(unhandled_exception_callback=self._on_unhandled_exception)
def _on_unhandled_exception(self, exception: Exception) -> None: """ Do something when a task raises an unhandled exception. :param exception: the unhandled exception that was caught. """
[docs] def delete_device(self) -> None: """Delete the device.""" with self.allow_internal_threads(): self.task_executor.shutdown() super().delete_device()
[docs] def schedule_abort_task( self, task_callback: TaskCallbackType ) -> tuple[TaskStatus, str]: """ Schedule an Abort task to begin executing immediately. Subclasses should override this to change the behaviour of the Abort command. :param task_callback: Notified of progress of the abort command. :return: A tuple containing TaskStatus.IN_PROGRESS and a message """ return typing.cast( tuple[TaskStatus, str], AbstractLRCMixin.schedule_abort_task(self, task_callback), )
# ---------- # Properties # ---------- @property def task_executor(self) -> TaskExecutorProtocol: """ Get the task executor. :return: The initialised task executor. """ return self._task_executor