How to implement a long running command

Decide on a concurrency mechanism

You will first need to decide how your long running command is going to be fulfilled asynchronously. A reasonable default choice is to use the TaskExecutorComponentManager class included in ska-tango-base which provides a mechanism for queuing and executing asynchronous tasks. This is the choice we will make for the rest of this guide.

It is possible to implement long running commands using a different concurrency mechanism. Just follow the steps below replacing the use of submit_task() with your mechanism of choice.

Create a component manager

You must subclass TaskExecutorComponentManager if you want to use this concurrency mechanism.

class SampleComponentManager(TaskExecutorComponentManager):
    """A sample component manager"""

    def __init__(
        self,
        *args,
        logger: logging.Logger = None,
        **kwargs,
    ):
        """Init SampleComponentManager."""

        # Set up your class

        super().__init__(*args, logger=logger, **kwargs)

Tip

If your device is a subarray device and must implement the default Subarray commands, you can inherit from both SubarrayComponentManager and TaskExecutorComponentManager. For example:

class SampleSubarrayComponentManager(SubarrayComponentManager, TaskExecutorComponentManager):
    """A sample subarray component manager"""
    # ...

Add a task method to fulfil the long running command

At the start of your task method you must update the task status to be TaskStatus.IN_PROGRESS via the task_callback. During the execution of your task you may update the task progress via the task_callback, and you should periodically check the task_abort_event to see if the client has requested to abort the task.

Before your task method returns it must update the task status to be either TaskStatus.COMPLETED or TaskStatus.ABORTED as appropriate, and provide a task result via the task_callback.

If your task method raises an exception, the task executor will treat this as an abnormal failure (i.e. a bug) and set the task status to TaskStatus.FAILED and provide a result (ResultCode.FAILED, <message>). To report a normal failure, set the task status to TaskStatus.COMPLETED and use the task result to communicate the failure.

See Long Running Command tasks for details about the task status state machine.

# class SampleComponentManager

    def _a_very_slow_method(
        self: SampleComponentManager,
        logger: logging.Logger,
        task_callback: Callable,
        task_abort_event: Event,
    ):
        """This is a long running method

        :param logger: logger
        :param task_callback: Update task state, defaults to None
        :param task_abort_event: Check for abort, defaults to None
        """
        # Indicate that the task has started
        task_callback(status=TaskStatus.IN_PROGRESS)
        for current_iteration in range(100):
            # Update the task progress
            task_callback(progress=current_iteration)

            # Do something
            time.sleep(10)

            # Periodically check that tasks have not been ABORTED
            if task_abort_event.is_set():
                # Indicate that the task has been aborted
                task_callback(status=TaskStatus.ABORTED, result=(ResultCode.ABORTED, "This task aborted"))
                return

        # Indicate that the task has completed
        task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "This slow task has completed"))

Guidelines for task methods

Task progress

There is no mechanism for a client to be notified of the maximum value that the task progress can take, so it is recommended that this maximum be statically known. For example, using 0 - 100 to represent percentage completed. How to interpret the task progress should be well documented for clients invoking the LRC.

Task result

It is recommended to always include a ResultCode to indicate to clients if the task has completed successfully or not. Ideally, this ResultCode should be accessed with result[0] to fit in with task results provided by ska-tango-base. A client should know the type of result[1] based on the value of result[0].

If your task can complete “partially successfully”, consider using multiple ResultCode’s to provide more details. For example, if your task coordinates multiple subordinate devices, you might provide a result such as the following:

(ResultCode.OK, {
    "total_success": False,
    "device_responses":[
        (ResultCode.OK, "OK"),
        (ResultCode.FAILED, "Not enough quux available"),
        ...
    ]
})

Optionally add an “is-allowed” method

If the is-allowed method is omitted it will be assumed that the task is always allowed.

# class SampleComponentManager

    def _is_a_very_slow_method_allowed(
        self: SampleComponentManager,
    ):
        """ is _a_very_slow_method allowed

        :return: True if the very slow method can be executed
        """
        return True

Warning

Do not confuse this is-allowed method with the Tango is_cmd_allowed callback. This is-allowed method returns True if the task can be executed at the point it is dequeued. The Tango is_cmd_allowed callback returns True if the task can be enqueued in the first place.

Notably, the is-allowed method might return False when the task is enqueued, but by the time the task has been dequeued it returns True because other LRCs have been completed in the meantime.

Implement the command to submit the asynchronous task for execution

The next step is to implement the Tango command itself, by writing a method which submits the asynchronous task for execution. If your LRC implements one of the standard commands defined by either SKABaseDevice or SKASubarray (On, AssignedResources, etc.), then this method should override the corresponding method of your component manager base class. For example, if you are implementing the On command, you should override the unimplemented BaseComponentManager.on method.

If you are inheriting from TaskExecutorComponentManager, you can use the TaskExecutorComponentManager.submit_task method to submit a task for execution, as illustrated below. If not, you will need to supply your own concurrency mechanism to schedule the task.

# class SampleComponentManager

    def submit_slow_method(self, task_callback: Callable | None = None):
        """Submit the slow task.

        This method returns immediately after it submitted
        `self._a_very_slow_method` for execution.

        :param task_callback: Update task state, defaults to None
        """
        task_status, response = self.submit_task(
            self._a_very_slow_method, args=[],
            is_cmd_allowed=self._is_very_slow_method_allowed,
            task_callback=task_callback
        )
        return task_status, response

Initialise and register the command object

When you are implementing an LRC specific to your device, as we are doing for this example, then you need to register a command object in your override of init_command_objects(). This command object must be a subclass of SubmittedSlowCommand for an LRC that is submitted to the input queue.

If your LRC implements one of the standard commands defined by either SKABaseDevice or SKASubarray, the base classes have already created the command object for you. You do not have to re-register the command object unless you wish to override the default command object.

# class SampleDevice(SKABaseDevice):

    def init_command_objects(self):
        """Initialise the command handlers."""
        super().init_command_objects()

        ...

        self.register_command_object(
            "VerySlow",
            SubmittedSlowCommand(
                "VerySlow",
                self._command_tracker,
                self.component_manager,
                "submit_slow_method",
                callback=None,
                logger=self.logger,
            ),
        )

Create the Tango Command to initiate the LRC

Similarly, if your LRC implements one of the standard commands defined by either SKABaseDevice or SKASubarray, you are not required to create a Tango command as the base classes will have done this for you.

# class SampleDevice(SKABaseDevice):

    @command(
        dtype_in=None,
        dtype_out="DevVarStringArray",
    )
    @DebugIt()
    def VerySlow(self):
        """A very slow command."""
        handler = self.get_command_object("VerySlow")
        (return_code, message) = handler()
        return f"{return_code}", message