ska_pst.lmc.job

This is used for handling complex jobs used by the PST.BEAM.

class ska_pst.lmc.job.DeviceCommandTask(devices: List[PstDeviceProxy], command: str, command_args: Optional[tuple[Any]] = None, timeout: Optional[float] = None)[source]

A class used to handle a command to be executed on remote devices.

Instances of this class take a list of devices and an action to be performed on a remote device. If more than one device is used this is converted into a ParallelTask that separate instances of this class, one per device. This job is not complete until all the remote devices have completed the action.

command: str

The name of the command to on the device proxies.

command_args: tuple[Any] | None = None

The list of arguments, if any, to use when performing command.

devices: List[PstDeviceProxy]

list of devices to perform action upon.

timeout: float | None = None

The timeout for remote call to complete by.

If not set then the task will block indefinitely until the command completes.

class ska_pst.lmc.job.DeviceCommandTaskContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None, device: ~ska_pst.lmc.device_proxy.PstDeviceProxy, command: str, command_args: ~typing.Optional[tuple[typing.Any]] = None, timeout: ~typing.Optional[float] = None)[source]

A task context class that is used for TANGO Device Proxy command tasks.

This extends from TaskContext to allow storing of task contexts of remote device proxy command tasks.

command: str

The name of the long running command to execute.

command_args: tuple[Any] | None = None

The command arguments to pass through to the command.

device: PstDeviceProxy

The device proxy the command will be executed on.

timeout: float | None = None

The timeout for remote call to complete by.

If not set then the task will block indefinitely until the command completes.

class ska_pst.lmc.job.DeviceCommandTaskExecutor(task_queue: Queue[DeviceCommandTaskContext], max_parallel_workers: int = 4, logger: Optional[Logger] = None)[source]

Class to handle executing and tracking commands on device proxies.

This class uses a queue to receive task commands, while a background thread receives these messages and then executes the commands.

Since the remote commands also run in the background on the device, this class will use a LongRunningCommand to wait on when the command completes.

Clients should submit DeviceCommandTask tasks to the TaskExecutor rather than building up a DeviceCommandTaskContext and sending it to the task queue.

Instances of class and the TaskExecutor class work together by sharing a queue. If creating separate instances of both classes, make sure that queue between them is the same.

start() None[source]

Start the executor.

stop() None[source]

Stop the executor.

class ska_pst.lmc.job.JobContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None, success_callback: ~typing.Optional[~typing.Callable[[...], ~typing.Any]] = None)[source]

A task context class that is used to track the whole submitted job.

This is used by the job executor to track the overall job. The task executor will either throw the exception stored in parent class or will call the success_callback once the job has been marked as complete.

Variables

success_callback (Callback) – an option callback to call when job ends successfully, defaults to None.

success_callback: Optional[Callable[[...], Any]] = None
class ska_pst.lmc.job.LambdaTask(action: Callable[[], None], name: Optional[str] = None)[source]

A class whose operation is to call a lambda.

The lambda doesn’t take any parameters. Any capturing of variables must be done at the call site of the construction of this task.

This allows a task to call something like a task_callback to perform an update.

action: Callable[[], None]
name: str | None = None
class ska_pst.lmc.job.NoopTask[source]

A class as a placeholder for a no-op task.

This is useful when nothing is meant to happen but makes it easier for defining a the overall job where there is a conditional task being created. Using the NoopTask can be inserted and treated like the Python None.

class ska_pst.lmc.job.ParallelTask(subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]])[source]

A class used to handle tasks that can be run in parallel.

Instances of this class take a list of tasks that can be all run in parallel. This job is not complete until all the task are complete.

Parameters

tasks (List[Task]) – a list of subtasks to be performed concurrently

subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]]
class ska_pst.lmc.job.ParallelTaskContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None, subtasks: ~typing.List[~ska_pst.lmc.job.task.TaskContext] = <factory>)[source]

A task context class that tracks subtasks for a parallel task.

This extends from TaskContext to allow storing of task contexts of the subtasks.

This task is only considered completed successfully once on the subtasks have completed successfully. If one of the subtasks fails then this task context is to be considered failed.

Variables

subtasks (List[TaskContext]) – a list of TaskContext, one for each subtask.

subtasks: List[TaskContext]
class ska_pst.lmc.job.SequentialTask(subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]])[source]

A class used to handle sequential tasks.

Instances of this class take a list of tasks that will all be run in sequentially. This job is not complete until the last job is complete.

Parameters

tasks (List[Task]) – a list of subtasks to be performed sequentially

subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]]
class ska_pst.lmc.job.TaskContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None)[source]

A class used to track a task when a job is running.

This is the base class for all other types of TaskContext. This class it used by the job/task executors to keep track of a task and is linked back to a parent task. It stores the result or exception if the task has completed successfully or failed respectively.

Variables
  • task (Task) – the task this context relates to.

  • task_id (str) – the id of the task. If not specified it is a string version of a UUID version 4.

  • evt (threading.Event) – at threading.Event used to signal that this task has finished, either successfully or it failed.

  • parent_task_context (Optional[TaskContext]) – an optional parent task context, default None.

  • result (Optional[Any]) – the result of the task. Until this value is set the task is not considered completed successfully.

  • exception – the exception of a failed task. Once this value is set the task is considered to have failed.

property completed: bool

Check if the task has completed.

If the task context has failed or there is a result then this method will return true.

Returns

if the task has completed

Return type

bool

evt: Event
exception: Optional[Exception] = None
property failed: bool

Check if task has failed or not.

Returns

returns True if the context is storing an exception.

Return type

bool

parent_task_context: Optional[TaskContext] = None
result: Optional[Any] = None
signal_complete(result: Optional[Any] = None) None[source]

Signal that the task has completed successfully.

Calling this will mark the task as completed and if any parent task is waiting on the evt to be set will be notified.

Parameters

result (Optional[Any], optional) – the result of the task, if any. This defaults to None

signal_failed(exception: Exception) None[source]

Signal that the task has failed.

Calling this will mark the task as failed and if any parent task is waiting on the evt to be set will be notified.

Parameters

exception (Exception) – an exception that will be sent back to the original caller of the job that signifies that the command failed.

signal_failed_from_str(msg: str) None[source]

Signal that the task has failed.

This takes an error message an raises a RuntimeError and then calls the signal_failed method. This is used by remote device proxies where the result returned is the error message of the remote exception.

Parameters

msg (str) – the error message to convert into a RuntimeError

task: Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]
task_id: str
class ska_pst.lmc.job.TaskExecutor(job_queue: Optional[Queue[TaskContext]] = None, device_command_task_queue: Optional[Queue[DeviceCommandTaskContext]] = None, max_parallel_workers: int = 4, logger: Optional[Logger] = None)[source]

An executor class that handles requests for tasks.

Jobs are submitted to instances of this class via the submit_job() method or to the global instance of this task executor GLOBAL_JOB_EXECUTOR via the global method submit_job.

Instances of this class are linked with DeviceCommandTaskExecutor that will handle the TANGO logic of subscriptions and dealing with long running commands. Both need to share an instance of a queue.Queue for which this class will send messages to while the DeviceCommandTaskExecutor will read from.

start() None[source]

Start the task executor.

stop() None[source]

Stop the executor.

submit_job(job: Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask], callback: Optional[Callable[[...], Any]] = None) None[source]

Submit a job to be executed.

This is the main method that clients should use to submit jobs to be executed. This will wrap into a TaskContext and put it on the main queue which can then be processed.

Parameters
  • job (Task) – the job to be submitted, it can be a simple DeviceCommandTask or a composite task like a SequentialTask or ParallelTask.

  • callback (Callback, optional) – the callback to notify the job is complete, defaults to None