Long Running Commands

Many SKA device commands involve actions whose duration is inherently slow or unpredictable. For example, a command might need to interact with hardware, other devices, or other external systems over a network; read to or write from a file system; or perform intensive computation. If a TANGO device blocks while such a command runs, then there is a period of time in which it cannot respond to other requests. Its overall performance declines, and timeouts may even occur.

To address this, the base device provides long running commands (LRC) support, in the form of an interface and mechanism for running such commands asynchronously.

Note

Long Running Command: A TANGO command for which the execution time is in the order of seconds (CS Guidelines recommends less than 10 ms). In this context it also means a command which is implemented to execute asynchronously. Long running, slow command and asynchronous command are used interchangeably in this text and the code base. In the event where the meaning differ it will be explained but all refer to non-blocking calls.

This means that devices return immediately with a response while busy with the actual task in the background or parked on a queue pending the next available worker.

New attributes and commands have been added to the base device to support the mechanism to execute long running TANGO commands asynchronously.

Monitoring Progress of Long Running Commands

In addition to the listed requirements above, the device should provide monitoring points to allow clients determine when a LRC is received, executing or completed (success or fail). LRCs can assume any of the following defined task states: STAGING, QUEUED, IN_PROGRESS, ABORTED, NOT_FOUND, COMPLETED, REJECTED, FAILED.

A new set of attributes and commands have been added to the base device to enable monitoring and reporting of result, status and progress of LRCs.

LRC Attributes

Attribute

Example Value

Description

longRunningCommandsInQueue

(‘StandbyCommand’, ‘OnCommand’, ‘OffCommand’)

Keeps track of which commands are on the queue

longRunningCommandIDsInQueue

(‘1636437568.0723004_235210334802782_OnCommand’,

1636437789.493874_116219429722764_OffCommand)

Keeps track of IDs in the queue

longRunningCommandStatus

(‘1636437568.0723004_235210334802782_OnCommand’, ‘IN_PROGRESS’,

‘1636437789.493874_116219429722764_OffCommand’, ‘IN_PROGRESS’)

ID, status pair of the currently executing commands

longRunningCommandProgress

(‘1636437568.0723004_235210334802782_OnCommand’, ‘12’,

‘1636437789.493874_116219429722764_OffCommand’, ‘1’)

ID, progress pair of the currently executing commands

longRunningCommandResult

(‘1636438076.6105473_101143779281769_OnCommand’, ‘0’, ‘OK’)

ID, ResultCode, result of the completed command

LRC Commands

Command

Description

CheckLongRunningCommandStatus

Check the status of a long running command by ID

AbortCommands

Abort the currently executing LRCs and remove all enqueued LRCs

In addition to the set of commands in the table above, a number of candidate SKA commands in the base device previously implemented as blocking commands have been converted to execute as long running commands (asynchronously), viz: Standby, On, Off, Reset and GetVersionInfo.

The device has change events configured for all the LRC attributes which clients can use to track their requests. The client has the responsibility of subscribing to events to receive changes on command status and results.

UML Illustration

Multiple Clients Invoke Multiple Long Running Commands

@startuml

participant Client2 as c2
participant Client1 as c1
participant SKADevice as d
entity Queue as q
participant Worker as w

== First Client Request ==

c1 -> d: Subscribe to attr to get result notification of LongRunningCommand
c1 -> d : LongRunningCommand
d -> d : Check queue capacity
d -> q : enqueue task LongRunningCommandTask
rnote over q
  Queue:
  LongRunningCommandTask
endrnote
d -> c1 : Response QUEUED LongRunningCommand, Task ID 101
== Second Client Request ==

c2 -> d: Subscribe to attr to get result notification of OtherLongRunningCommand
c2 -> d : OtherLongRunningCommand
d -> d : Check queue capacity
d -> q : enqueue task OtherLongRunningCommandTask
rnote over q
  Queue:
  LongRunningCommandTask
  OtherLongRunningCommandTask
endrnote
d -> c2 : Response QUEUED OtherLongRunningCommandTask, Task ID 102

== Processing tasks  ==

q -> w : dequeue LongRunningCommandTask
rnote over q
  Queue:
  OtherLongRunningCommandTask
endrnote
activate w

w -> d : LongRunningCommandTask result
deactivate w
d -> d : push_change_event (ID 101) on attr
d <--> c1 : on_change event with result (ID 101, some_result)
d <--> c2 : on_change event with result (ID 101, some_result)
c2 -> c2 : Not interested in 101, ignoring

q -> w : dequeue OtherLongRunningCommandTask
rnote over q
  Queue:
  <empty>
endrnote
activate w

w -> d : OtherLongRunningCommandTask result
deactivate w
d -> d : push_change_event (ID 102) on attr
d <--> c2 : on_change event with result (ID 102, some_result)
d <--> c1 : on_change event with result (ID 102, some_result)
c1 -> c1 : Not interested in 102, ignoring 

@enduml

How to implement a long running command using the provided executor

A task executor has been provisioned to handle the asynchronous execution of tasks put on the queue. Your sample component manager will be asynchronous if it inherits from the provisioned executor. You can also swap out the default executor with any asynchronous mechanism for your component manager.

Create a component manager

class SampleComponentManager(TaskExecutorComponentManager):
    """A sample component manager"""

    def __init__(
        self,
        *args,
        max_workers: int | None = None,
        logger: logging.Logger = None,
        **kwargs,
    ):
        """Init SampleComponentManager."""

        # Set up your class

        super().__init__(*args, max_workers=max_workers, logger=logger, **kwargs)

Add a method that should be executed in a background thread

# class SampleComponentManager

    def _a_very_slow_method(
        logger: logging.Logger,
        task_callback: Callable,
        task_abort_event: Event,
    ):
        """This is a long running method

        :param logger: logger
        :param task_callback: Update task state, defaults to None
        :param task_abort_event: Check for abort, defaults to None
        """
        # Indicate that the task has started
        task_callback(status=TaskStatus.IN_PROGRESS)
        for current_iteration in range(100):
            # Update the task progress
            task_callback(progress=current_iteration)

            # Do something
            time.sleep(10)

            # Periodically check that tasks have not been ABORTED
            if task_abort_event.is_set():
                # Indicate that the task has been aborted
                task_callback(status=TaskStatus.ABORTED, result="This task aborted")
                return

        # Indicate that the task has completed
        task_callback(status=TaskStatus.COMPLETED, result="This slow task has completed")

Add a method to submit the slow method

# class SampleComponentManager

    def submit_slow_method(self, task_callback: Callable | None = None):
        """Submit the slow task.

        This method returns immediately after it submitted
        `self._a_very_slow_method` for execution.

        :param task_callback: Update task state, defaults to None
        """
        task_status, response = self.submit_task(
            self._a_very_slow_method, args=[], is_cmd_allowed=self._a_check, task_callback=task_callback
        )
        return task_status, response

Create the component manager in your Tango device

class SampleDevice(SKABaseDevice):
    """A sample Tango device"""

    def create_component_manager(self):
        """Create a component manager."""
        return SampleComponentManager(
            max_workers=2,
            logger=self.logger,
            communication_state_callback=self._communication_state_changed,
            component_state_callback=self._component_state_changed,
        )

Init the command object

# class SampleDevice(SKABaseDevice):

    def init_command_objects(self):
        """Initialise the command handlers."""
        super().init_command_objects()

        ...

        self.register_command_object(
            "VerySlow",
            SubmittedSlowCommand(
                "VerySlow",
                self._command_tracker,
                self.component_manager,
                "submit_slow_method",
                callback=None,
                logger=self.logger,
            ),
        )

Create the Tango Command

# class SampleDevice(SKABaseDevice):

    @command(
        dtype_in=None,
        dtype_out="DevVarStringArray",
    )
    @DebugIt()
    def VerySlow(self):
        """A very slow command."""
        handler = self.get_command_object("VerySlow")
        (return_code, message) = handler()
        return f"{return_code}", message

Class diagram

@startuml

class SubmittedSlowCommand {
+ command_tracker
+ component_manager
+ method_name
+ do()
}
note bottom: Uses component_manager\nand command_tracker\nto update task state attributes\nin the Tango device


class _CommandTracker
note bottom: Keeps track of the\ncommand state and progress

class SampleDevice {
- _component_manager
- _command_tracker
- _commands__SlowCommand__
+ ...
+ ...()
}

class SampleDevice extends SKABaseDevice 

class BaseComponentManager

class TaskExecutor {
+ ...
+ submit()
+ abort()
+ ...()
}
note right: Uses `ThreadPoolExecutor` for task execution

class TaskExecutorComponentManager {
+ ...
+ submit_task()
+ abort_commands()
- _task_executor
+ ...()
}

class TaskExecutorComponentManager extends BaseComponentManager

SampleDevice::_component_manager --> TaskExecutorComponentManager
SampleDevice::_command_tracker --> _CommandTracker
SampleDevice::_commands__SlowCommand__ --> SubmittedSlowCommand
TaskExecutorComponentManager::_task_executor --> TaskExecutor


@enduml