How to implement a long running command
Decide on a concurrency mechanism
You will first need to decide how your long running command is going to be
fulfilled asynchronously. A reasonable default choice is to use the
TaskExecutorComponentManager
class included in ska-tango-base which provides a mechanism for queuing and
executing asynchronous tasks. This is the choice we will make for the rest
of this guide.
It is possible to implement long running commands using a different
concurrency mechanism. Just follow the steps below replacing the use of
submit_task()
with your mechanism of choice.
Create a component manager
You must subclass
TaskExecutorComponentManager
if you want to use this concurrency mechanism.
class SampleComponentManager(TaskExecutorComponentManager):
"""A sample component manager"""
def __init__(
self,
*args,
logger: logging.Logger = None,
**kwargs,
):
"""Init SampleComponentManager."""
# Set up your class
super().__init__(*args, logger=logger, **kwargs)
Tip
If your device is a subarray device and must implement the default Subarray
commands, you can inherit from both
SubarrayComponentManager
and
TaskExecutorComponentManager.
For example:
class SampleSubarrayComponentManager(SubarrayComponentManager, TaskExecutorComponentManager):
"""A sample subarray component manager"""
# ...
Add a task method to fulfil the long running command
At the start of your task method you must update the task status to be
TaskStatus.IN_PROGRESS via the
task_callback. During the execution of your task you may update the task
progress via the task_callback, and you should periodically check
the task_abort_event to see if the client has requested to abort
the task.
Before your task method returns it must update the task status to be either
TaskStatus.COMPLETED or
TaskStatus.ABORTED as
appropriate, and provide a task result via the task_callback.
If your task method raises an exception, the task executor will treat this as an
abnormal failure (i.e. a bug) and set the task status to
TaskStatus.FAILED and provide a
result (ResultCode.FAILED, <message>). To report a normal failure, set the
task status to TaskStatus.COMPLETED
and use the task result to communicate the failure.
See Long Running Command tasks for details about the task status state machine.
# class SampleComponentManager
def _a_very_slow_method(
self: SampleComponentManager,
logger: logging.Logger,
task_callback: Callable,
task_abort_event: Event,
):
"""This is a long running method
:param logger: logger
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
# Indicate that the task has started
task_callback(status=TaskStatus.IN_PROGRESS)
for current_iteration in range(100):
# Update the task progress
task_callback(progress=current_iteration)
# Do something
time.sleep(10)
# Periodically check that tasks have not been ABORTED
if task_abort_event.is_set():
# Indicate that the task has been aborted
task_callback(status=TaskStatus.ABORTED, result=(ResultCode.ABORTED, "This task aborted"))
return
# Indicate that the task has completed
task_callback(status=TaskStatus.COMPLETED, result=(ResultCode.OK, "This slow task has completed"))
Guidelines for task methods
Task progress
There is no mechanism for a client to be notified of the maximum value that the task progress can take, so it is recommended that this maximum be statically known. For example, using 0 - 100 to represent percentage completed. How to interpret the task progress should be well documented for clients invoking the LRC.
Task result
It is recommended to always include a ResultCode to
indicate to clients if the task has completed successfully or not. Ideally, this
ResultCode should be accessed with
result[0] to fit in with task results provided by ska-tango-base.
A client should know the type of result[1] based on the value of
result[0].
If your task can complete “partially successfully”, consider using multiple
ResultCode’s to provide more details. For
example, if your task coordinates multiple subordinate devices, you might
provide a result such as the following:
(ResultCode.OK, {
"total_success": False,
"device_responses":[
(ResultCode.OK, "OK"),
(ResultCode.FAILED, "Not enough quux available"),
...
]
})
Optionally add an “is-allowed” method
If the is-allowed method is omitted it will be assumed that the task is always allowed.
# class SampleComponentManager
def _is_a_very_slow_method_allowed(
self: SampleComponentManager,
):
""" is _a_very_slow_method allowed
:return: True if the very slow method can be executed
"""
return True
Warning
Do not confuse this is-allowed method with the Tango is_cmd_allowed
callback. This is-allowed method returns True if the task can be
executed at the point it is dequeued. The Tango is_cmd_allowed
callback returns True if the task can be enqueued in the first place.
Notably, the is-allowed method might return False when the task is
enqueued, but by the time the task has been dequeued it returns True
because other LRCs have been completed in the meantime.
Implement the command to submit the asynchronous task for execution
The next step is to implement the Tango command itself, by writing a method which
submits the asynchronous task for execution. If your LRC implements one of the
standard commands defined by either SKABaseDevice
or SKASubarray (On,
AssignedResources, etc.), then this method should override the corresponding
method of your component manager base class. For example, if you are implementing
the On command, you should override the unimplemented BaseComponentManager.on method.
If you are inheriting from TaskExecutorComponentManager,
you can use the TaskExecutorComponentManager.submit_task
method to submit a task for execution, as illustrated below. If not, you will need to
supply your own concurrency mechanism to schedule the task.
# class SampleComponentManager
def submit_slow_method(self, task_callback: Callable | None = None):
"""Submit the slow task.
This method returns immediately after it submitted
`self._a_very_slow_method` for execution.
:param task_callback: Update task state, defaults to None
"""
task_status, response = self.submit_task(
self._a_very_slow_method, args=[],
is_cmd_allowed=self._is_very_slow_method_allowed,
task_callback=task_callback
)
return task_status, response
Initialise and register the command object
When you are implementing an LRC specific to your device, as we are doing for
this example, then you need to register a command object in your override of
init_command_objects().
This command object must be a subclass of
SubmittedSlowCommand for an LRC that is
submitted to the input queue.
If your LRC implements one of the standard commands defined by either
SKABaseDevice or
SKASubarray, the base classes
have already created the command object for you. You do not have to
re-register the command object unless you wish to override the default command
object.
# class SampleDevice(SKABaseDevice):
def init_command_objects(self):
"""Initialise the command handlers."""
super().init_command_objects()
...
self.register_command_object(
"VerySlow",
SubmittedSlowCommand(
"VerySlow",
self._command_tracker,
self.component_manager,
"submit_slow_method",
callback=None,
logger=self.logger,
),
)
Create the Tango Command to initiate the LRC
Similarly, if your LRC implements one of the standard commands defined by either
SKABaseDevice or
SKASubarray, you are not
required to create a Tango command as the base classes will have done this for
you.
# class SampleDevice(SKABaseDevice):
@command(
dtype_in=None,
dtype_out="DevVarStringArray",
)
@DebugIt()
def VerySlow(self):
"""A very slow command."""
handler = self.get_command_object("VerySlow")
(return_code, message) = handler()
return f"{return_code}", message