Executor

This module provides for asynchronous execution of tasks.

class ska_tango_base.executor.executor.TaskExecutor[source]

Bases: TaskExecutorProtocol

An asynchronous executor of tasks.

This task executor provides a default implementation of the TaskExecutorProtocol for use with the LRCMixin.

It supports multiple worker threads for simultaneous LRC execution, although the default of 1 is recommended for sequential execution in most cases.

A maximum queue length can also be specified after which more submitted tasks will be rejected. The default is 32, with an allowed minimum of 1. In other words, this task executor will always have a queue and allow at least 1 LRC to be queued while another is busy executing.

__init__(max_workers: int = 1, unhandled_exception_callback: Callable[[Exception], None] | None = None, max_queued_tasks: int = 32) None[source]

Initialise a new TaskExecutor instance.

Parameters:
  • max_workers – The maximum number of worker threads (minimum 1). This is meant to be kept at the default value to allow the sequential execution of LRC except for special cases.

  • unhandled_exception_callback – Callback to be called when a task raises an unhandled exception.

  • max_queued_tasks – The maximum number of tasks allowed in the queue before more ones are rejected (minimum 1).

start() None[source]

Start the backing thread pool.

This function is automatically called during __init__() and only needs to be called in order to recover if shutdown() has already been called.

Raises:

ExecutorNotShutdownError – If shutdown() has not already been called.

shutdown() None[source]

Abort all outstanding tasks and shutdown the backing thread pool.

This should be called when the TaskExecutor object is no longer required and after this has been called, the TaskExecutor is unusable with submit() or abort() raising a ExecutorShutdownError.

start() can be called to reset the TaskExecutor to a usable state.

property max_executing_tasks: int

Get the maximum number of simultaneously executing tasks.

Will always be one more than then the given max_workers when instantiating the TaskExecutor to accommodate an abort task.

Returns:

The maximum number of simultaneously executing tasks.

property max_queued_tasks: int

Get the maximum task queue size.

Returns:

The maximum task queue size.

get_input_queue_size() int[source]

Return the number of queued command tasks.

Returns:

Number of tasks in the input queue.

submit(func: TaskFunctionType, args: Any | None = None, kwargs: Any | None = None, is_cmd_allowed: Callable[[], bool] | None = None, task_callback: TaskCallbackType | None = None) tuple[TaskStatus, str][source]

Submit a new task.

The is_cmd_allowed callback may raise a CmdNotAllowedError before the task is to be executed.

Parameters:
  • func – the task function to be executed.

  • args – positional arguments to the task function

  • kwargs – keyword arguments to the task function

  • is_cmd_allowed – sanity check for task execution

  • task_callback – the callback to be called when the status or progress of the task execution changes

Raises:

ExecutorShutdownError – if the executor has been shutdown

Returns:

(_TaskStatus, message)

abort(task_callback: TaskCallbackType | None = None) tuple[TaskStatus, str][source]

Tell this executor to abort execution.

New submissions will be rejected until the queue is empty and no tasks are still running. Tasks on the queue will be marked as aborted and not run. Tasks already running will be allowed to continue running

Raises:

ExecutorShutdownError – if the executor has been shutdown

Parameters:

task_callback – callback for abort complete

Returns:

tuple of task status & message

static task(task: SimpleTaskFunctionType) TaskFunctionType[source]

Apply task executor boilerplate to a task.

Wraps a task function that accepts only a progress callback and abort event with common task executor boilerplate code to transition the task through the task state machine. It must also return JSONData that will be used as the result of the task.

The task should regularly call the progress callback to report the progress of the task. The progress is expected to be an integer, and it is recommended to use this integer to represent a percentage of the task progress i.e. integer values from 0 to 100. At similar intervals the task should check whether the abort event has been set (task_abort_event.is_set()). If the is_set() method returns True, the task should quickly exit, performing any essential cleanup, and raising TaskAborted.

The task should never throw an exception, other than TaskAborted, as failure should be reported by a ResultCode in the JSONData.

Parameters:

task – A task within a task factory. It should accept a progress callback and task abort event (threading.Event).

Returns:

The input task wrapped with task executor boilerplate.

exception ska_tango_base.executor.executor.TaskAborted[source]

Bases: Exception

Report task abortion during a long running command.

exception ska_tango_base.executor.executor.ExecutorShutdownError[source]

Bases: Exception

The TaskExecutor has been shutdown.

__init__(action: str) None[source]

Initialise the exception message.

exception ska_tango_base.executor.executor.ExecutorNotShutdownError[source]

Bases: Exception

The TaskExecutor has not been shutdown.

__init__() None[source]

Initialise the exception message.