Executor
This module provides for asynchronous execution of tasks.
- class ska_tango_base.executor.executor.TaskExecutor[source]
Bases:
TaskExecutorProtocolAn asynchronous executor of tasks.
This task executor provides a default implementation of the
TaskExecutorProtocolfor use with theLRCMixin.It supports multiple worker threads for simultaneous LRC execution, although the default of 1 is recommended for sequential execution in most cases.
A maximum queue length can also be specified after which more submitted tasks will be rejected. The default is 32, with an allowed minimum of 1. In other words, this task executor will always have a queue and allow at least 1 LRC to be queued while another is busy executing.
- __init__(max_workers: int = 1, unhandled_exception_callback: Callable[[Exception], None] | None = None, max_queued_tasks: int = 32) None[source]
Initialise a new TaskExecutor instance.
- Parameters:
max_workers – The maximum number of worker threads (minimum 1). This is meant to be kept at the default value to allow the sequential execution of LRC except for special cases.
unhandled_exception_callback – Callback to be called when a task raises an unhandled exception.
max_queued_tasks – The maximum number of tasks allowed in the queue before more ones are rejected (minimum 1).
- start() None[source]
Start the backing thread pool.
This function is automatically called during
__init__()and only needs to be called in order to recover ifshutdown()has already been called.- Raises:
ExecutorNotShutdownError – If
shutdown()has not already been called.
- shutdown() None[source]
Abort all outstanding tasks and shutdown the backing thread pool.
This should be called when the
TaskExecutorobject is no longer required and after this has been called, theTaskExecutoris unusable withsubmit()orabort()raising aExecutorShutdownError.start()can be called to reset theTaskExecutorto a usable state.
- property max_executing_tasks: int
Get the maximum number of simultaneously executing tasks.
Will always be one more than then the given
max_workerswhen instantiating the TaskExecutor to accommodate an abort task.- Returns:
The maximum number of simultaneously executing tasks.
- property max_queued_tasks: int
Get the maximum task queue size.
- Returns:
The maximum task queue size.
- get_input_queue_size() int[source]
Return the number of queued command tasks.
- Returns:
Number of tasks in the input queue.
- submit(func: TaskFunctionType, args: Any | None = None, kwargs: Any | None = None, is_cmd_allowed: Callable[[], bool] | None = None, task_callback: TaskCallbackType | None = None) tuple[TaskStatus, str][source]
Submit a new task.
The is_cmd_allowed callback may raise a CmdNotAllowedError before the task is to be executed.
- Parameters:
func – the task function to be executed.
args – positional arguments to the task function
kwargs – keyword arguments to the task function
is_cmd_allowed – sanity check for task execution
task_callback – the callback to be called when the status or progress of the task execution changes
- Raises:
ExecutorShutdownError – if the executor has been shutdown
- Returns:
(_TaskStatus, message)
- abort(task_callback: TaskCallbackType | None = None) tuple[TaskStatus, str][source]
Tell this executor to abort execution.
New submissions will be rejected until the queue is empty and no tasks are still running. Tasks on the queue will be marked as aborted and not run. Tasks already running will be allowed to continue running
- Raises:
ExecutorShutdownError – if the executor has been shutdown
- Parameters:
task_callback – callback for abort complete
- Returns:
tuple of task status & message
- static task(task: SimpleTaskFunctionType) TaskFunctionType[source]
Apply task executor boilerplate to a task.
Wraps a task function that accepts only a progress callback and abort event with common task executor boilerplate code to transition the task through the task state machine. It must also return JSONData that will be used as the result of the task.
The task should regularly call the progress callback to report the progress of the task. The progress is expected to be an integer, and it is recommended to use this integer to represent a percentage of the task progress i.e. integer values from 0 to 100. At similar intervals the task should check whether the abort event has been set (
task_abort_event.is_set()). If theis_set()method returnsTrue, the task should quickly exit, performing any essential cleanup, and raisingTaskAborted.The task should never throw an exception, other than TaskAborted, as failure should be reported by a
ResultCodein theJSONData.- Parameters:
task – A task within a task factory. It should accept a progress callback and task abort event (
threading.Event).- Returns:
The input task wrapped with task executor boilerplate.
- exception ska_tango_base.executor.executor.TaskAborted[source]
Bases:
ExceptionReport task abortion during a long running command.