Long Running Commands Mix-in Class

This module implements a mixin class for long running commands.

The mixin requires the class using it to also inherit from the generic SKADevice class.

class ska_tango_base.long_running_commands.mixin.LRCReqType[source]

Used to discriminate between command is allowed callers.

ENQUEUE_REQ = 1

Passed to is_<Cmd>_allowed when the task is submitted.

EXECUTE_REQ = 2

Passed to is_<Cmd>_allowed when the task is about to be executed.

__new__(value)
class ska_tango_base.long_running_commands.mixin.LRCMixin[source]

Bases: AbstractLRCMixin

Mixin class that adds support for long running commands with a task executor.

It must be used with SKADevice or a subclass of it. It uses a shared bus with a command tracker to manage long running commands and the related user-facing Tango attributes.

lrcQueue: list[str]

Tango attribute returning a table describing queued long running commands.

Each element of the list is a JSON object with the following keys:

  • "uid" - Unique identifier for the command (str)

  • "name" - Name of the command (str)

  • "submitted_time" - Timestamp when the command was submitted (str)

lrcExecuting: list[str]

Tango attribute returning a table describing currently executing long running commands.

Each element of the list is a JSON object with the following keys:

  • "uid" - Unique identifier for the command (str)

  • "name" - Name of the command (str)

  • "submitted_time" - Timestamp when the command was submitted (str)

  • "started_time" - Timestamp when the command was started (str)

  • "progress" - Percentage completion (optional, int)

lrcFinished: list[str]

Tango attribute returning a table describing finished long running commands.

Each element of the list is a JSON object with the following keys:

  • "uid" - Unique identifier for the command (str)

  • "name" - Name of the command (str)

  • "submitted_time" - Timestamp when the command was submitted (str)

  • "started_time" - Timestamp when the command was started (str)

  • "finished_time" - Timestamp when the command finished (str)

  • "result" - JSON encoded result of command (str)

  • "status" - TaskStatus the command finished with (str)

  • "progress" - Percentage completion (optional, int)

on_new_shared_bus() None[source]

Initialise attributes which depend on signals from the bus.

create_task_executor() TaskExecutorProtocol[source]

Create and return a task executor instance.

This factory method can be overridden in a device using the LRCMixin class to use its own task executor.

Returns:

Task executor instance.

delete_device() None[source]

Delete the device.

schedule_abort_task(task_callback: TaskCallbackType) tuple[TaskStatus, str][source]

Schedule an Abort task to begin executing immediately.

Subclasses should override this to change the behaviour of the Abort command.

Parameters:

task_callback – Notified of progress of the abort command.

Returns:

A tuple containing TaskStatus.IN_PROGRESS and a message

property task_executor: TaskExecutorProtocol

Get the task executor.

Returns:

The initialised task executor.

class ska_tango_base.long_running_commands.mixin.AbstractLRCMixin[source]

Bases: SignalBusMixin

Partial mixin class that adds support for long running commands.

It must be used with SKADevice or a subclass of it. It uses a shared bus with a command tracker to manage long running commands and the related user-facing Tango attributes. A task_executor() must be implemented for a long running command and the related attributes to behave correctly.

on_new_shared_bus() None[source]

Initialise attributes which depend on signals from the bus.

Raises:

AssertionError – If ‘self.task_executor.max_queued_tasks’ or ‘self.task_executor.max_executing_tasks’ is not equal to or greater than 0 or 1 respectively.

property command_tracker: CommandTrackerProtocol

Get the command tracker.

Returns:

The initialised CommandTrackerProtocol object.

property task_executor: TaskExecutorProtocol

Get the task executor.

Returns:

The initialised task executor.

lrcProtocolVersions() tuple[int, int]

Return supported protocol versions.

Returns:

A tuple containing the lower and upper bounds of supported long running command protocol versions.

is_Abort_allowed() bool[source]

Return whether the Abort() command may be called currently.

This method can be overridden by subclasses to change when this command is allowed.

Returns:

whether the command may be called in the current device state

Abort() DevVarLongStringArrayType

Abort any executing long running command(s) and empty the queue.

Abort is itself a long running command that is executed immediately.

Subclasses should override execute_Abort() to change the behaviour of this command.

Returns:

A tuple containing ResultCode.STARTED and the unique ID of the command

allocate_lrc(name: str, *, started_callback: Callable[[...], None] | str | None = None, completed_callback: Callable[[...], None] | str | None = None) tuple[str, TaskCallbackType][source]

Allocate a new command_id for a long running command.

The status and progress of the long running command should be updated using the returned task_callback object.

Parameters:
  • name – The name of the command

  • started_callback – Callback passed to the command tracker that is called when the command transitions to IN_PROGRESS status. If a str, this will be used to look up a callable on the self. If None the default name of “started_<Cmd>” will be used.

  • completed_callback – Callback passed to the command tracker that is called when the command transitions to COMPLETED status. If a str, this will be used to look up a callable on the self. If None the default name of “completed_<Cmd>” will be used.

Returns:

(command_id, task_callback)

static convert_submission_result_to_lrc_return(command_id: str, status: TaskStatus, message: str) DevVarLongStringArrayType[source]

Convert a submit result to a DevVarLongStringArray.

Example:

@command
def MyCommand(self) -> DevVarLongStringArrayType:
    command_id, task_callback = self.allocate_lrc("MyCommand")
    status, message = self.submit_lrc_task(
        "MyCommand", self.do_my_command, task_callback
    )
    return self.submit_result_to_lrc_return(
        command_id, status, message
    )
Parameters:
  • command_id – The command id of the LRC

  • status – The task status returned by submit

  • message – The message returned by submit

Returns:

[converted ResultCode], [command_id or message]

submit_lrc_task(name: str, task: TaskFunctionType, *, args: list[Any] | None = None, kwargs: dict[str, Any] | None = None, task_callback: TaskCallbackType | None = None, fisallowed: Callable[[LRCReqType], bool] | str | None = None) tuple[TaskStatus, str][source]

Submit a long running command task to the task_executor.

The f”is_{name}_allowed” method is looked up and, if found, will be called with LRCReqType.EXECUTE_REQ immediately before the task is executed.

Parameters:
  • task – task to be executed

  • args – positional arguments to be passed to the task

  • kwargs – keyword arguments to be passed to the task

  • fisallowed – override the is_cmd_allowed method used

Returns:

TaskStatus, message

execute_Abort() DevVarLongStringArrayType[source]

Execute the Abort command.

Subclasses should override this to change the behaviour of the Abort command.

Returns:

A tuple containing TaskStatus.IN_PROGRESS and a message

schedule_abort_task(task_callback: TaskCallbackType) tuple[TaskStatus, str][source]

Schedule an Abort task to begin executing immediately.

Subclasses should override this to change the behaviour of the Abort command.

Parameters:

task_callback – Notified of progress of the abort command.

Returns:

A tuple containing TaskStatus.IN_PROGRESS and a message

CheckLongRunningCommandStatus(argin: str) str

Check the status of a long running command by ID.

Parameters:

argin – the command id

Returns:

command status

execute_CheckLongRunningCommandStatus(argin: str) str[source]

Execute the CheckLongRunningCommandStatus command.

Subclasses should override this to change the behaviour of the CheckLongRunningCommandStatus command.

Parameters:

argin – the command id

Returns:

command status