How to implement a long running command

The SKA long running command (LRC) machinery allows clients to schedule long running tasks on an SKA Tango device and receive updates on the tasks progress using the LRC protocol. In order to support this an SKA Tango device needs to provide, for each long running command it implements, a task to be scheduled and a regular Tango command to do the scheduling.

This guide explains the recommended way to implement an LRC using ska-tango-base >=1.4.0, with or without using a component manager. This guide is for users implementing new devices. Please refer to the 1.4 migration guide on how to migrate existing devices that are using deprecated SKA command objects.

Step 1: Create a task to fulfil the LRC

The first step to implement an LRC is to create a task method which performs the resource-intensive/blocking work of the command. The task should report its status/progress via a callback and periodically check whether it has been requested to abort. It is recommended to use the TaskExecutor.task() decorator to apply some of this boilerplate code for you, but it is also possible to manage the task status yourself if needed.

Tip

A task can be a method on the Tango device class or another object such as a component manager, or even a free function.

Using the task decorator

The TaskExecutor.task() decorator wraps a method that accepts a progress callback and abort event with common task executor boilerplate code to transition the task through the task state machine. The decorator will pass the progress callback and abort event as keyword arguments with the names progress_callback and task_abort_event. Additional arguments beyond these are supported and will be passed through to the wrapped function by the decorator. The method should also return the result of the task as encodable JSONData - some recommendations follow below. Partial example:

import ska_tango_base as stb
import ska_control_model as scm
...
@stb.executor.TaskExecutor.task
def very_slow_task(
    ...
    progress_callback: stb.type_hints.ProgressCallbackType,
    task_abort_event: threading.Event | None = None,
) -> stb.type_hints.JSONData:
    ...
    return ([scm.ResultCode.OK], ["Task completed"])

Task dos and don’ts

There are a few things a task method should do to ensure it behaves correctly as an LRC task:

Important guidelines

Reporting progress

The task should regularly call the progress_callback to report the progress of the task. The progress is expected to be an integer, and it is recommended to use this integer to represent a percentage from 0 to 100. How to interpret the task progress should be well documented for clients invoking the LRC.

Task result

The task result is the return value of the task method decorated with TaskExecutor.task(). It is recommended to always include a ResultCode to indicate to clients if the task has completed successfully or not. Ideally, this ResultCode should be accessed with result[0] to fit in with task results provided by ska-tango-base. A client should know the type of result[1] based on the value of result[0].

If your task can complete “partially successfully”, consider using multiple result codes to provide more details. For example, if your task coordinates multiple subordinate devices, you might provide a result such as the following:

(scm.ResultCode.OK, {
    "total_success": False,
    "device_responses":[
        (scm.ResultCode.OK, "OK"),
        (scm.ResultCode.FAILED, "Not enough quux available"),
        ...
    ]
})

Aborting

The task should check at regular intervals whether the abort event has been set. If task_abort_event.is_set() returns True, the task should quickly perform only essential cleanup and raise TaskAborted.

Handling exceptions

Do not raise exceptions (other than TaskAborted) from your task method to indicate normal task failures. Instead, use the TaskStatus and ResultCode to communicate the outcome of the task. Raising exceptions should be reserved for abnormal failures (i.e. bugs).

Managing the task status yourself

If more flexibility is required for your task, you could manage task status yourself using the task_callback() instead of using the TaskExecutor.task() decorator. However, care must be taken when managing the task status manually to ensure that the task status follows the state machine as defined by the LRC protocol. When writing a task without the TaskExecutor.task() decorator, the task should return None and the task result should be reported via the task_callback().

Briefly, the task status state machine requires the following. At the start of your task method the task status must be updated to TaskStatus.IN_PROGRESS via the task_callback(). During the execution of your task you should update the task progress also via the task_callback() and periodically check the abort event as explained above. Before your task method returns it must update the task status to be either TaskStatus.COMPLETED or TaskStatus.ABORTED as appropriate, and provide a task result via the task_callback().

See Long Running Command tasks for details about the task status state machine.

import ska_tango_base as stb
import ska_control_model as scm
...
def very_slow_task(
    ...
    task_callback: collections.abc.Callable,
    task_abort_event: threading.Event,
) -> None:
    # Indicate that the task has started
    task_callback(status=scm.TaskStatus.IN_PROGRESS)
    while/for ...:
        # Update the task progress
        task_callback(progress=...)

        # Do something that takes long to complete
        ...

        # Periodically check that tasks have not been ABORTED
        if task_abort_event.is_set():
            # Indicate that the task has been aborted
            task_callback(
                status=scm.TaskStatus.ABORTED,
                result=(scm.ResultCode.ABORTED, "Task aborted"),
            )
            return

    # Indicate that the task has completed
    task_callback(
        status=scm.TaskStatus.COMPLETED,
        result=(scm.ResultCode.OK, "Task completed"),
    )

Step 2: Decide on a concurrency mechanism

Once you have a task to be scheduled you need to decide on where to schedule it, i.e. what concurrency mechanism to use. This is a decision that needs to be taken for the SKA Tango device as a whole rather than for each long running command. The long running command infrastructure provided by ska-tango-base requires you to provide an object modelling the TaskExecutorProtocol as a concurrency mechanism.

If you are working on a device inheriting from any of the base device classes that subclass SKABaseDevice, then the device will, by default, expect the TaskExecutorProtocol object to be provided by the component manager. In order to fulfil this, the component manager must inherit from TaskExecutorComponentManager which provides a TaskExecutor for queuing and executing asynchronous tasks. However, subclasses of SKABaseDevice can provide a TaskExecutor directly by inheriting from LRCMixin, thereby not requiring a component manager specifically for LRCs (i.e. making it optional).

It is possible to implement long running commands using a different concurrency mechanism. The LRCMixin allows providing other objects that implement the TaskExecutorProtocol by overriding create_task_executor(), but this is not covered in this guide.

Recommendation

Always have your device inherit from LRCMixin, unless your component manager needs access to the task executor for some reason - otherwise have your component manager inherit from TaskExecutorComponentManager.

Step 3: Implement a Tango command to schedule the task

The next step is to implement a Tango command to submit the task to the TaskExecutorProtocol object your device uses. This step differs slightly for a user-defined command versus so-called standard SKA commands already defined in the base classes.

User-defined LRC

For a user-defined LRC use the long_running_command() decorator to create a Tango command method which submits the task. For example a device using a component manager:

import ska_tango_base as stb
...
class MyDevice(stb.SKABaseDevice[MyComponentManager]):
    ...
    @stb.long_running_commands.long_running_command
    def VerySlow(self) -> stb.type_hints.TaskFunctionType:
        """Long running command."""
        return self.component_manager.very_slow_task

If you are working on a device constructed modularly with the LRCMixin:

import ska_tango_base as stb
...
class MyDevice(stb.long_running_commands.LRCMixin, stb.SKADevice):
    ..
    @stb.long_running_commands.long_running_command
    def VerySlow(self) -> stb.type_hints.TaskFunctionType:
        """Long running command."""
        return self.very_slow_task

Tip

The long_running_command() decorator accepts the same parameters as tango.command(), so you can specify input argument types, descriptions, etc. as needed.

Standard SKA command as an LRC

The BaseInterface and SubarrayInterface classes (which are base classes for SKABaseDevice and SKASubarray respectively) define a set of standard SKA commands which subclasses are expected to override via the execute_<Cmd>() methods. It is possible to make these long running commands using the submit_lrc_task() decorator.

Tip

If your device is a subarray device and must also implement the standard subarray commands, you can inherit from both SubarrayComponentManager and TaskExecutorComponentManager.

import ska_tango_base as stb
...
class MySubarrayComponentManager(
    stb.subarray.SubarrayComponentManager, stb.executor.TaskExecutorComponentManager
):
    """A subarray component manager."""
    ...

When required to be an LRC, the behaviour of a standard SKA command should be implemented in a task that is returned by the relevant execute_<Cmd>() method. The execute_<Cmd>() method should then be decorated with the submit_lrc_task() decorator. For example, here we are returning the component manager method to be submitted as a long running On() command task:

import ska_tango_base as stb
...
class MyComponentManager(stb.executor.TaskExecutorComponentManager):
    ...
    def do_on(
        self,
        task_callback: stb.type_hints.TaskCallbackType,
        task_abort_event: threading.Event,
    ) -> None:
        # Implement the long running task here
        ...
        return

class MyDevice(stb.SKABaseDevice[MyComponentManager]):
    ...
    @stb.long_running_commands.submit_lrc_task
    def execute_On(self) -> stb.type_hints.TaskFunctionType:
        return self.component_manager.do_on

Important notes

Optional commands

The standard SKA commands provided by BaseInterface are optional, namely On(), Off(), Standby() and Reset(), and will only be included in the Tango interface if the corresponding execute_<Cmd>() methods are overridden. However, for backwards compatibility the SKABaseDevice will always include these commands in the interface. A subclass of SKABaseDevice can disable the commands by assigning them to None. For example:

import ska_tango_base as stb
...
class MyDevice(stb.SKASubarray[MyComponentManager]):
    Reset = None
    ...

ObsState management

When using the SubarrayInterface or its child SKASubarray, the obs_state_model actions to transition the ObsState are performed automatically at the start and end of each command when using the submit_lrc_task() or long_running_command() decorators.

However, the component_<x>() actions must still be called explicitly at the appropriate time by the task implementing the LRC. When using a component manager, these component_<x>() methods are automatically called when _update_component_state() is called with the appropriate arguments.

The following device does not use a component manager and overrides the AssignResources() command, using the TaskExecutor.task decorator to simplify the task implementation. The task calls component_resourced() to indicate to the observation state machine that the AssignResources() command successfully assigned resources:

import ska_tango_base as stb
import ska_control_model as scm
...
class MyDevice(stb.subarray.SubarrayInterface):
    ...
    AssignResources_SCHEMA: dict[str, stb.type_hints.JSONData] = {
        "$schema": "https://json-schema.org/draft/2020-12/schema",
        "$id": "https://skao.int/ska-tango-base/AssignResources.json",
        "title": "MyDevice AssignResources schema",
        "description": "Schema for MyDevice AssignResources command",
        "type": "object",
        "properties": {
            "resources": {
                "description": "Resources to assign",
                "type": "array",
                "items": {"type": "string"},
            },
        },
        "required": ["resources"],
    }

    @stb.long_running_commands.submit_lrc_task
    @stb.validators.validate_json_args
    def execute_AssignResources(
        self, resources: list[str]
    ) -> stb.type_hints.TaskFunctionType:

        @stb.executor.TaskExecutor.task
        def task(
            progress_callback: stb.type_hintsProgressCallbackType,
            task_abort_event: threading.Event,
        ) -> tuple[ResultCode, str]:
            ...
            self.component_resourced()
            return [scm.ResultCode.OK], ["AssignResources successful"]

        return task

Customising standard SKA command behaviour

If you need more control over the behaviour of a standard SKA command, you can look at using the mark_long_running() decorator and allocate_lrc() method. See the 1.4 migration guide for more details.

Step 4: Handling command arguments

If your command accepts arguments, you need to wrap your task method in a closure, or use functools.partial(), to capture the arguments to be used by the task. Additionally, the validate_json_args() decorator can be used to create a method that accepts a single JSON string from a method that accepts multiple keyword arguments. For example, the following overrides the behaviour of the AssignResources() command, which accepts its resources argument in a JSON encoded object:

class MyDevice(stb.SKASubarray[MyComponentManager]):
    ...
    AssignResources_SCHEMA: dict[str, stb.type_hints.JSONData] = {
        "$schema": "https://json-schema.org/draft/2020-12/schema",
        "$id": "https://skao.int/ska-tango-base/ReferenceSkaSubarray_AssignResources.json",
        "title": "ska-tango-base ReferenceSkaSubarray AssignResources schema",
        "description": "Schema for ska-tango-base ReferenceSkaSubarray AssignResources command",
        "type": "object",
        "properties": {
            "resources": {
                "description": "Resources to assign",
                "type": "array",
                "items": {"type": "string"},
            },
        },
        "required": ["resources"],
    }

    @stb.long_running_commands.submit_lrc_task
    @stb.validators.validate_json_args
    def execute_AssignResources(
        self, resources: list[str]
    ) -> stb.type_hints.TaskFunctionType:
        def task(
            task_callback: stb.type_hints.TaskCallbackType,
            task_abort_event: threading.Event,
        ) -> None:
            self.component_manager.do_assign(
                set(resources),
                task_callback=task_callback,
                task_abort_event=task_abort_event,
            )

        return task

Warning

The @validate_json_args decorator must always be applied first (innermost) to allow LRCs to catch validation errors and reject the task.

Step 5: Optionally add an is-allowed method

The default method name of is_<Cmd>_allowed() is automatically looked up on the device and called when the command is dequeued. If the method is omitted it will be assumed that the task is always allowed. An is_<Cmd>_allowed() method must have a request_type argument that defaults to LRCReqType.ENQUEUE_REQ, as Tango will automatically call it with no arguments when we enqueue the task. For example:

class MyDevice(...):
    ...
    def is_VerySlow_allowed(
        self,
        request_type: stb.long_running_commands.LRCReqType
        | None = stb.long_running_commands.LRCReqType.ENQUEUE_REQ,
    ) -> bool:
        """Is 'VerySlow' command allowed.

        :return: True if the 'VerySlow' command can be executed
        """
        ...

Note

The is-allowed methods for all standard SKA commands are already implemented in the relevant base classes and are designed to work with both long running and fast commands. The submit_lrc_task() decorator will mark the execute_<Cmd>() methods to be treated as long running commands and this is used by the default is-allowed method implementation to decide when to check if execution is allowed.

Step 6: Optionally implement the unhandled exception callback

If your device inherits from SKABaseDevice or LRCMixin, or component manager from TaskExecutorComponentManager, you should implement the _on_unhandled_exception() method. It is called when the TaskExecutor catches any unhandled exceptions during execution of an LRC. It implies a bug in the device code, and the callback should be used to notify users thereof. Here is an example where the callback sets the device’s state to FAULT.

class MyDevice/MyComponentManager:
    ...
    def _on_unhandled_exception(self, exception: Exception):
        self._update_component_state(fault=True)