#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""
This module implements a mixin class for long running commands.
The mixin requires the class using it to also inherit from the generic
:class:`~ska_tango_base.ska_device.SKADevice` class.
"""
from __future__ import annotations
import copy
import functools
import inspect
import itertools
import json
import typing
import warnings
from enum import Enum, IntEnum
from typing import Any, Callable
from ska_control_model import ResultCode, TaskStatus
from tango import Attribute, DebugIt
from tango.server import attribute, command
from ..executor import TaskExecutor
from ..software_bus import (
Signal,
SignalBusMixin,
attribute_from_signal,
listen_to_signal,
)
from ..type_hints import (
CommandTrackerProtocol,
DevVarLongStringArrayType,
JSONData,
TaskCallbackType,
TaskExecutorProtocol,
TaskFunctionType,
)
from .command_tracker import (
LRC_FINISHED_MAX_LENGTH,
UserLRCAttr,
_CommandTracker,
_LRCEvent,
)
from .common import _SUPPORTED_LRC_PROTOCOL_VERSIONS
__all__ = ["LRCMixin", "LRCReqType", "_LRCEvent"]
_MINIMUM_STATUS_QUEUE_SIZE = 32
[docs]
class LRCReqType(IntEnum):
"""Used to discriminate between command is allowed callers."""
ENQUEUE_REQ = 1
"""Passed to is_<Cmd>_allowed when the task is submitted."""
EXECUTE_REQ = 2
"""Passed to is_<Cmd>_allowed when the task is about to be executed."""
[docs]
class AbstractLRCMixin(SignalBusMixin):
"""
Partial mixin class that adds support for long running commands.
It must be used with :class:`~ska_tango_base.ska_device.SKADevice` or a subclass of
it. It uses a shared bus with a command tracker to manage long running commands and
the related user-facing Tango attributes. A :meth:`.task_executor` must be
implemented for a long running command and the related attributes to behave
correctly.
"""
MIN_SUPPORTED_LRC_PROTOCOL_VERSION = 1
# TODO: Protocol V1 - remove in future
_command_ids_in_queue = Signal[list[str]](stored=True)
_commands_in_queue = Signal[list[str]](stored=True)
_command_statuses = Signal[list[str]](stored=True)
_commands_in_progress = Signal[list[str]](stored=True)
_command_progresses = Signal[list[str]](stored=True)
_command_result = Signal[tuple[str, str]](stored=True)
@classmethod
def __init_subclass__(cls: type[AbstractLRCMixin], **kwargs: Any) -> None:
"""Check that the subclass has a valid minimum LRC protocol version set."""
super().__init_subclass__(**kwargs)
min_support_ver = _SUPPORTED_LRC_PROTOCOL_VERSIONS[0]
max_support_ver = _SUPPORTED_LRC_PROTOCOL_VERSIONS[1]
if hasattr(cls, "MIN_SUPPORTED_LRC_PROTOCOL_VERSION"):
min_set_ver = cls.MIN_SUPPORTED_LRC_PROTOCOL_VERSION
if not isinstance(cls.MIN_SUPPORTED_LRC_PROTOCOL_VERSION, int):
raise TypeError(
f"{cls.__name__}.MIN_SUPPORTED_LRC_PROTOCOL_VERSION must be an "
f"integer, got {type(min_set_ver).__name__}."
)
if not (min_support_ver <= min_set_ver <= max_support_ver):
raise ValueError(
f"{cls.__name__}.MIN_SUPPORTED_LRC_PROTOCOL_VERSION must be between"
f" {min_support_ver} and {max_support_ver}, got {min_set_ver}."
)
# -----------
# Init device
# -----------
[docs]
def on_new_shared_bus(self) -> None:
"""Initialise attributes which depend on signals from the bus."""
super().on_new_shared_bus()
self._command_tracker = _CommandTracker(
self.MIN_SUPPORTED_LRC_PROTOCOL_VERSION,
self._update_commands_in_queue, # TODO: Protocol V1 - remove in future
self._update_command_statuses, # TODO: Protocol V1 - remove in future
self._update_command_progresses, # TODO: Protocol V1 - remove in future
self._update_command_result, # TODO: Protocol V1 - remove in future
)
# Check and set 'max_queued_tasks' and 'max_executing_tasks' properties
if self.task_executor is not None:
assert self.task_executor.max_queued_tasks >= 0, (
"max_queued_tasks property must be equal to or greater than 0."
)
assert self.task_executor.max_executing_tasks >= 1, (
"max_executing_tasks property must be equal to or greater than 1."
)
max_queued_tasks = self.task_executor.max_queued_tasks
max_executing_tasks = self.task_executor.max_executing_tasks
if max_executing_tasks == 1:
warning_msg = (
"'max_executing_tasks' will be required to be at least 2 in a "
"future release of ska-tango-base (found 1). A device must support "
"the 'Abort()' command and at least one other command executing "
"simulanteously."
)
warnings.warn(warning_msg, FutureWarning)
if self.logger:
self.logger.warning(warning_msg)
max_executing_tasks = 2
else:
max_queued_tasks = 1
max_executing_tasks = 1
# Create Tango dynamic attributes which depend
# on the _command_tracker.lrc_event signal
lrc_queue = attribute_from_signal(
"_command_tracker.lrc_queue_signal",
name="lrcQueue",
dtype=(str,),
max_dim_x=max_queued_tasks,
to_tango=functools.partial(
self._get_json_list_of_lrc_attributes,
allowed_keys=["uid", "name", "submitted_time"],
),
)
self.add_attribute(lrc_queue)
lrc_executing = attribute_from_signal(
"_command_tracker.lrc_executing_signal",
name="lrcExecuting",
dtype=(str,),
max_dim_x=max_executing_tasks,
to_tango=functools.partial(
self._get_json_list_of_lrc_attributes,
allowed_keys=[
"uid",
"name",
"submitted_time",
"started_time",
"progress",
],
),
)
self.add_attribute(lrc_executing)
self.set_change_event("_lrcEvent", True)
# ----------------------
# Protocol V1 attributes
# ----------------------
if self.MIN_SUPPORTED_LRC_PROTOCOL_VERSION == 1:
self._max_queued_tasks = max_queued_tasks
self._max_executing_tasks = max_executing_tasks
self._status_queue_size = max(
max_queued_tasks * 2 + max_executing_tasks,
_MINIMUM_STATUS_QUEUE_SIZE,
)
self._command_ids_in_queue = []
self._commands_in_queue = []
self._command_statuses = []
self._commands_ids_in_progress: list[str] = []
self._commands_in_progress = []
self._command_progresses = []
self._command_result = ("", "")
self._create_deprecated_lrc_attributes()
# ----------
# Properties
# ----------
@property
def command_tracker(self) -> CommandTrackerProtocol:
"""
Get the command tracker.
:return: The initialised ``CommandTrackerProtocol`` object.
"""
return self._command_tracker
@property
def task_executor(self) -> TaskExecutorProtocol:
"""
Get the task executor.
:return: The initialised task executor.
"""
raise NotImplementedError(
"'task_executor' property must be implemented by "
f"'{self.__class__.__name__}'. "
"The parent 'AbstractLRCMixin' is an abstract base class."
)
@staticmethod
def _get_json_list_of_lrc_attributes(
lrc_attr: UserLRCAttr, allowed_keys: list[str]
) -> list[str]:
"""
Get a list of JSON formatted strings representing the LRC attribute.
Serialises each key-value pair that's in the allowed_keys list of the LRC's data
dict to a flat JSON dict.
:param lrc_attr: Dict of LRC IDs as keys and their nested CommandData dicts.
:param allowed_keys: List of allowed keys to include from the JSON dicts.
:return: A list of JSON strings containing a serialised info dict for each LRC.
"""
if lrc_attr: # Check for empty dict
return [
json.dumps(
{
"uid": command_id,
**{
key: val.name if isinstance(val, Enum) else val
for key, val in data.items()
if key in allowed_keys
},
}
)
for command_id, data in lrc_attr.items()
]
return []
# ----------
# Attributes
# ----------
@attribute_from_signal(
"_command_tracker.lrc_finished_signal",
max_dim_x=LRC_FINISHED_MAX_LENGTH,
dtype=("str",),
)
def lrcFinished(self, value: UserLRCAttr) -> list[str]:
"""
Read info of the finished long running commands.
:return: a list of info JSON blobs of the finished long running commands.
"""
return self._get_json_list_of_lrc_attributes(
value,
allowed_keys=[
"uid",
"name",
"submitted_time",
"started_time",
"finished_time",
"status",
"result",
"progress",
],
)
@attribute(dtype=("str",), max_dim_x=2)
def _lrcEvent(self) -> list[str]:
"""
Read the long running commands events. Always returns an empty list.
:return: empty list.
"""
return []
@attribute(dtype=("int",), max_dim_x=2)
def lrcProtocolVersions(self) -> tuple[int, int]:
"""
Return supported protocol versions.
:return: A tuple containing the lower and upper bounds of supported long running
command protocol versions.
"""
return (
self.MIN_SUPPORTED_LRC_PROTOCOL_VERSION,
_SUPPORTED_LRC_PROTOCOL_VERSIONS[1],
)
@listen_to_signal("_command_tracker.lrc_event")
def __update_lrc_event(self, value: _LRCEvent) -> None:
"""Update the _lrcEvent attribute."""
event = copy.copy(value)
if event:
command_id = event.pop("command_id")
self.push_change_event("_lrcEvent", [command_id, json.dumps(event)])
# TODO: Remove in future
# ----------------------
# Protocol V1 attributes
# ----------------------
def _create_deprecated_lrc_attributes(self) -> None:
"""
Create attributes for the long running commands.
:raises AssertionError: if max_queued_tasks or max_executing_tasks is not
equal to or greater than 0 or 1 respectively.
"""
if self._status_queue_size > LRC_FINISHED_MAX_LENGTH:
self._command_tracker._lrc_finished_max_length = self._status_queue_size
self._lrcs_in_queue = attribute_from_signal(
"_commands_in_queue",
name="longRunningCommandsInQueue",
dtype=(str,),
max_dim_x=self._status_queue_size,
fget=self._read_longRunningCommandsInQueue,
)
self.add_attribute(self._lrcs_in_queue)
self._lrc_ids_in_queue = attribute_from_signal(
"_command_ids_in_queue",
name="longRunningCommandIDsInQueue",
dtype=(str,),
max_dim_x=self._status_queue_size,
fget=self._read_longRunningCommandIDsInQueue,
)
self.add_attribute(self._lrc_ids_in_queue)
self._lrc_status = attribute_from_signal(
"_command_statuses",
name="longRunningCommandStatus",
dtype=(str,),
max_dim_x=self._status_queue_size * 2,
fget=self._read_longRunningCommandStatus,
)
self.add_attribute(self._lrc_status)
self._lrc_in_progress = attribute_from_signal(
"_commands_in_progress",
name="longRunningCommandInProgress",
dtype=(str,),
max_dim_x=self._max_executing_tasks,
fget=self._read_longRunningCommandInProgress,
)
self.add_attribute(self._lrc_in_progress)
self._lrc_progress = attribute_from_signal(
"_command_progresses",
name="longRunningCommandProgress",
dtype=(str,),
max_dim_x=self._max_executing_tasks * 2,
fget=self._read_longRunningCommandProgress,
)
self.add_attribute(self._lrc_progress)
self._lrc_result = attribute_from_signal(
"_command_result",
name="longRunningCommandResult",
dtype=(str,),
max_dim_x=2,
fget=self._read_longRunningCommandResult,
)
self.add_attribute(self._lrc_result)
def _prune_completed_commands(
self, command_list: list[tuple[str, str]]
) -> list[tuple[str, str]]:
"""
Prune a command list of any lingering completed tasks.
:param command_list: list of (cmd_id, <info>) tuples of commands to prune
:return: the pruned list with the oldest completed commands removed
"""
if len(command_list) <= self._status_queue_size:
# Nothing to do
return command_list
# Determine which commands have completed by looking at the
# current command statuses
completed_commands = [
uid
for (uid, status) in self._command_tracker.command_statuses
if status.name
in [
TaskStatus.ABORTED.name,
TaskStatus.COMPLETED.name,
TaskStatus.REJECTED.name,
TaskStatus.FAILED.name,
]
]
# Create a pruned list by removing the oldest completed commands
number_to_remove = len(command_list) - self._status_queue_size
pruned_list = command_list
prune_candidates = [
(uid, info)
for (uid, info) in sorted(
command_list, key=lambda item: item[0].split(sep="_")[0]
)
if uid in completed_commands
]
for item in prune_candidates[0:number_to_remove]:
# This gets called many times so we
# keep track of which ones we have already logged
if self._command_tracker._evict_command(item[0]):
self.logger.warning(f"Status queue too big: removing item {item[0]}")
pruned_list.remove(item)
return pruned_list
def _update_commands_in_queue(
self, commands_in_queue: list[tuple[str, str]]
) -> None:
if commands_in_queue:
command_ids, command_names = zip(
*self._prune_completed_commands(commands_in_queue)
)
self._commands_in_queue = [
str(command_name) for command_name in command_names
]
self._command_ids_in_queue = [str(command_id) for command_id in command_ids]
else:
self._commands_in_queue = []
self._command_ids_in_queue = []
def _update_command_statuses(
self,
command_statuses: list[tuple[str, TaskStatus]],
) -> None:
statuses = [(uid, status.name) for (uid, status) in command_statuses]
self._command_statuses = [
str(item)
for item in itertools.chain.from_iterable(
self._prune_completed_commands(statuses)
)
]
# Check for commands starting and ending execution
for command_id, status in command_statuses:
if (
status == TaskStatus.IN_PROGRESS
and command_id not in self._commands_ids_in_progress
):
self._update_commands_in_progress(command_id, True)
elif (
status
in [
TaskStatus.ABORTED,
TaskStatus.COMPLETED,
TaskStatus.FAILED,
]
and command_id in self._commands_ids_in_progress
):
self._update_commands_in_progress(command_id, False)
def _update_commands_in_progress(self, command_id: str, in_progress: bool) -> None:
# Pass a reference to a new object for the push events, as this callback can be
# called multiple times before the event is pushed in the tango omni thread.
commands_ids_in_progress = self._commands_ids_in_progress.copy()
if in_progress:
commands_ids_in_progress.append(command_id)
elif command_id in commands_ids_in_progress:
commands_ids_in_progress.remove(command_id)
self._commands_ids_in_progress = commands_ids_in_progress
self._commands_in_progress = [
uid.split("_")[-1] for uid in commands_ids_in_progress
]
def _update_command_progresses(
self,
command_progresses: list[tuple[str, int | str]],
) -> None:
self._command_progresses = [
str(item) for item in itertools.chain.from_iterable(command_progresses)
]
def _update_command_result(
self,
command_id: str,
command_result: JSONData,
) -> None:
self._command_result = (command_id, json.dumps(command_result))
def _read_longRunningCommandsInQueue(self, attr: Attribute) -> None:
"""
Read the long running commands in the queue.
Keep track of which commands are that are currently known about.
Entries are removed `self._command_tracker._removal_time` seconds
after they have finished.
:param attr: Tango attribute being read
"""
warning_msg = (
"'longRunningCommandsInQueue' is deprecated since ska-tango-base "
"1.2.0 and will be removed in a future major release. The client should "
"use the 'ska_tango_base.long_running_commands.invoke_lrc()' method "
"instead, which will automatically subscribe to the correct attribute. "
"The client can check for the queued command(s) in the 'lrcQueue' "
"attribute if needed."
)
warnings.warn(warning_msg, DeprecationWarning)
if self.logger:
self.logger.warning(warning_msg)
attr.set_value_date_quality(*self._lrcs_in_queue.do_read(self))
def _read_longRunningCommandIDsInQueue(self, attr: Attribute) -> None:
"""
Read the IDs of the long running commands in the queue.
Every client that executes a command will receive a command ID as response.
Keep track of IDs currently allocated.
Entries are removed `self._command_tracker._removal_time` seconds
after they have finished.
:param attr: Tango attribute being read
"""
warning_msg = (
"'longRunningCommandIDsInQueue' is deprecated since ska-tango-base "
"1.2.0 and will be removed in a future major release. The client should "
"use the 'ska_tango_base.long_running_commands.invoke_lrc()' method "
"instead, which will automatically subscribe to the correct attribute. "
"The client can check for the queued command(s) in the 'lrcQueue' "
"attribute if needed."
)
warnings.warn(warning_msg, DeprecationWarning)
if self.logger:
self.logger.warning(warning_msg)
attr.set_value_date_quality(*self._lrc_ids_in_queue.do_read(self))
def _read_longRunningCommandStatus(self, attr: Attribute) -> None:
"""
Read the status of the currently executing long running commands.
ID, status pairs of the currently executing commands.
Clients can subscribe to on_change event and wait for the
ID they are interested in.
:param attr: Tango attribute being read
"""
warning_msg = (
"'longRunningCommandStatus' is deprecated since ska-tango-base "
"1.2.0 and will be removed in a future major release. The client should "
"use the 'ska_tango_base.long_running_commands.invoke_lrc()' method "
"instead, which will automatically subscribe to the correct attribute. "
"The client can check for the status of a command in the 'lrcQueue', "
"'lrcExecuting' and 'lrcFinished' attributes if needed."
)
warnings.warn(warning_msg, DeprecationWarning)
if self.logger:
self.logger.warning(warning_msg)
attr.set_value_date_quality(*self._lrc_status.do_read(self))
def _read_longRunningCommandInProgress(self, attr: Attribute) -> None:
"""
Read the name(s) of the currently executing long running command(s).
Name(s) of command and possible abort in progress or empty string(s).
:param attr: Tango attribute being read
"""
warning_msg = (
"'longRunningCommandInProgress' is deprecated since ska-tango-base "
"1.2.0 and will be removed in a future major release. The client should "
"use the 'ska_tango_base.long_running_commands.invoke_lrc()' method "
"instead, which will automatically subscribe to the correct attribute. "
"The client can check for the command(s) in progress in the "
"'lrcExecuting' attribute if needed."
)
warnings.warn(warning_msg, DeprecationWarning)
if self.logger:
self.logger.warning(warning_msg)
attr.set_value_date_quality(*self._lrc_in_progress.do_read(self))
def _read_longRunningCommandProgress(self, attr: Attribute) -> None:
"""
Read the progress of the currently executing long running command(s).
ID, progress of the currently executing command(s).
Clients can subscribe to on_change event and wait
for the ID they are interested in.
:param attr: Tango attribute being read
"""
warning_msg = (
"'longRunningCommandProgress' is deprecated since ska-tango-base "
"1.2.0 and will be removed in a future major release. The client should "
"use the 'ska_tango_base.long_running_commands.invoke_lrc()' method "
"instead, which will automatically subscribe to the correct attribute. "
"The client can check for the progress of a command in the "
"'lrcExecuting' attribute if needed."
)
warnings.warn(warning_msg, DeprecationWarning)
if self.logger:
self.logger.warning(warning_msg)
attr.set_value_date_quality(*self._lrc_progress.do_read(self))
def _read_longRunningCommandResult(self, attr: Attribute) -> None:
"""
Read the result of the completed long running command.
Reports unique_id, json-encoded result.
Clients can subscribe to on_change event and wait for
the ID they are interested in.
:param attr: Tango attribute being read
"""
warning_msg = (
"'longRunningCommandResult' is deprecated since ska-tango-base "
"1.2.0 and will be removed in a future major release. The client should "
"use the 'ska_tango_base.long_running_commands.invoke_lrc()' method "
"instead, which will automatically subscribe to the correct attribute. "
"The client can check for the result of a command in the 'lrcFinished' "
"attribute if needed."
)
warnings.warn(warning_msg, DeprecationWarning)
if self.logger:
self.logger.warning(warning_msg)
attr.set_value_date_quality(*self._lrc_result.do_read(self))
# --------
# Commands
# --------
@command(
dtype_out="DevVarLongStringArray", doc_out="[ResultCode.STARTED][command_id]"
)
@DebugIt()
def Abort(self) -> DevVarLongStringArrayType:
"""
Abort any executing long running command(s) and empty the queue.
Abort is itself a long running command that is executed immediately.
Subclasses should override :py:meth:`execute_Abort()` to change the behaviour
of this command.
:return: A tuple containing ResultCode.STARTED and the unique ID of the command
"""
return self.execute_Abort()
[docs]
def allocate_lrc(
self,
name: str,
*,
started_callback: Callable[..., None] | str | None = None,
completed_callback: Callable[..., None] | str | None = None,
) -> tuple[str, TaskCallbackType]:
"""
Allocate a new `command_id` for a long running command.
The status and progress of the long running command should be updated using the
returned `task_callback` object.
:param name: The name of the command
:param started_callback: Callback passed to the command tracker that is called
when the command transitions to ``IN_PROGRESS`` status. If a `str`, this
will be used to look up a callable on the self. If `None` the default name
of "started_<Cmd>" will be used.
:param completed_callback: Callback passed to the command tracker that is called
when the command transitions to ``COMPLETED`` status. If a `str`, this will
be used to look up a callable on the self. If `None` the default name of
"completed_<Cmd>" will be used.
:returns: (command_id, task_callback)
"""
# Check for command started and completed callbacks
started_cb: Callable[..., None] | str | None = (
started_callback if started_callback is not None else f"started_{name}"
)
if isinstance(started_cb, str):
started_cb = getattr(self, started_cb, None)
completed_cb: Callable[..., None] | str | None = (
completed_callback
if completed_callback is not None
else f"completed_{name}"
)
if isinstance(completed_cb, str):
completed_cb = getattr(self, completed_cb, None)
command_id = self.command_tracker.new_command(name, completed_cb, started_cb)
task_callback = functools.partial(
self.command_tracker.update_command_info, command_id
)
return command_id, task_callback
[docs]
@staticmethod
def convert_submission_result_to_lrc_return(
command_id: str, status: TaskStatus, message: str
) -> DevVarLongStringArrayType:
"""
Convert a submit result to a DevVarLongStringArray.
Example::
@command
def MyCommand(self) -> DevVarLongStringArrayType:
command_id, task_callback = self.allocate_lrc("MyCommand")
status, message = self.submit_lrc_task(
"MyCommand", self.do_my_command, task_callback
)
return self.submit_result_to_lrc_return(
command_id, status, message
)
:param command_id: The command id of the LRC
:param status: The task status returned by submit
:param message: The message returned by submit
:returns: [converted ResultCode], [command_id or message]
"""
match status:
case TaskStatus.QUEUED:
return [ResultCode.QUEUED], [command_id]
case TaskStatus.REJECTED:
return [ResultCode.REJECTED], [message]
case TaskStatus.IN_PROGRESS:
return [ResultCode.STARTED], [message]
case TaskStatus.COMPLETED:
return [ResultCode.OK], [message]
case _:
return [ResultCode.FAILED], [f"Unknown TaskStatus {status.name}"]
[docs]
def submit_lrc_task(
self,
name: str,
task: TaskFunctionType,
*,
args: list[Any] | None = None,
kwargs: dict[str, Any] | None = None,
task_callback: TaskCallbackType | None = None,
fisallowed: Callable[[LRCReqType], bool] | str | None = None,
) -> tuple[TaskStatus, str]:
"""
Submit a long running command task to the task_executor.
The f"is_{name}_allowed" method is looked up and, if found, will be called
with :py:class:`LRCReqType.EXECUTE_REQ
<ska_tango_base.long_running_commands.mixin.LRCReqType>` immediately
before the task is executed.
:param task: task to be executed
:param args: positional arguments to be passed to the task
:param kwargs: keyword arguments to be passed to the task
:param fisallowed: override the is_cmd_allowed method used
:returns: TaskStatus, message
"""
# Check for is_<cmd>_allowed method
allowed_func: Callable[..., bool] | str | None = (
fisallowed if fisallowed is not None else f"is_{name}_allowed"
)
if isinstance(allowed_func, str):
allowed_func = getattr(self, allowed_func, None)
if isinstance(fisallowed, str) and allowed_func is None:
raise AttributeError(f'"{fisallowed}" not found.')
if (
not isinstance(fisallowed, str | None)
and allowed_func is not None
and not inspect.ismethod(allowed_func)
):
allowed_func = functools.partial(allowed_func, self)
is_cmd_allowed: Callable[[], bool] | None
if allowed_func is not None:
def is_cmd_allowed() -> bool:
"""
Check command is allowed.
Invoked when task is popped from LRC queue.
"""
return allowed_func(request_type=LRCReqType.EXECUTE_REQ)
else:
is_cmd_allowed = None
return self.task_executor.submit(
task,
args,
kwargs,
is_cmd_allowed=is_cmd_allowed,
task_callback=task_callback,
)
[docs]
def execute_Abort(self) -> DevVarLongStringArrayType:
"""
Execute the Abort command.
Subclasses should override this to change the behaviour of the
Abort command.
:return: A tuple containing TaskStatus.IN_PROGRESS and a message
"""
command_id, task_callback = self.allocate_lrc("Abort")
status, _ = self.schedule_abort_task(task_callback)
assert status == TaskStatus.IN_PROGRESS
return ([ResultCode.STARTED], [command_id])
[docs]
def schedule_abort_task(
self, task_callback: TaskCallbackType
) -> tuple[TaskStatus, str]:
"""
Schedule an Abort task to begin executing immediately.
Subclasses should override this to change the behaviour of the
Abort command.
:param task_callback: Notified of progress of the abort command.
:return: A tuple containing TaskStatus.IN_PROGRESS and a message
"""
return self.task_executor.abort(task_callback)
@command(dtype_in=str, doc_in="command id", dtype_out=str, doc_out="TaskStatus")
@DebugIt()
def CheckLongRunningCommandStatus(self, argin: str) -> str:
"""
Check the status of a long running command by ID.
:param argin: the command id
:return: command status
"""
return self.execute_CheckLongRunningCommandStatus(argin)
[docs]
def execute_CheckLongRunningCommandStatus(self, argin: str) -> str:
"""
Execute the CheckLongRunningCommandStatus command.
Subclasses should override this to change the behaviour of the
CheckLongRunningCommandStatus command.
:param argin: the command id
:return: command status
"""
enum_status = self.command_tracker.get_command_status(argin)
return TaskStatus(enum_status).name
[docs]
class LRCMixin(AbstractLRCMixin):
"""
Mixin class that adds support for long running commands with a task executor.
It must be used with :class:`~ska_tango_base.ska_device.SKADevice` or a subclass of
it. It uses a shared bus with a command tracker to manage long running commands and
the related user-facing Tango attributes.
"""
MIN_SUPPORTED_LRC_PROTOCOL_VERSION = 2
# ----------------
# Init and cleanup
# ----------------
[docs]
def on_new_shared_bus(self) -> None:
"""Initialise attributes which depend on signals from the bus."""
self._task_executor = self.create_task_executor()
super().on_new_shared_bus()
[docs]
def create_task_executor(self) -> TaskExecutorProtocol:
"""
Create and return a task executor instance.
This factory method can be overridden in a device using the
:py:class:`.LRCMixin` class to use its own task executor.
:return: Task executor instance.
"""
return TaskExecutor(unhandled_exception_callback=self._on_unhandled_exception)
def _on_unhandled_exception(self, exception: Exception) -> None:
"""
Do something when a task raises an unhandled exception.
:param exception: the unhandled exception that was caught.
"""
[docs]
def delete_device(self) -> None:
"""Delete the device."""
with self.allow_internal_threads():
self.task_executor.shutdown()
super().delete_device()
[docs]
def schedule_abort_task(
self, task_callback: TaskCallbackType
) -> tuple[TaskStatus, str]:
"""
Schedule an Abort task to begin executing immediately.
Subclasses should override this to change the behaviour of the
Abort command.
:param task_callback: Notified of progress of the abort command.
:return: A tuple containing TaskStatus.IN_PROGRESS and a message
"""
return typing.cast(
tuple[TaskStatus, str],
AbstractLRCMixin.schedule_abort_task(self, task_callback),
)
# ----------
# Properties
# ----------
@property
def task_executor(self) -> TaskExecutorProtocol:
"""
Get the task executor.
:return: The initialised task executor.
"""
return self._task_executor