ska_pst.lmc.job

This is used for handling complex jobs used by the PST.BEAM.

class ska_pst.lmc.job.DeviceCommandTask(devices: List[PstDeviceProxy], action: Callable[[PstDeviceProxy], ska_tango_base.base.base_device.DevVarLongStringArrayType], command_name: str)[source]

A class used to handle a command to be executed on remote devices.

Instances of this class take a list of devices and an action to be performed on a remote device. If more than one device is used this is converted into a ParallelTask that separate instances of this class, one per device. This job is not complete until all the remote devices have completed the action.

Commands are sent to the DeviceCommandExecutor to allow to be tracked by listening to events on the longRunningCommandResult property.

Parameters
  • devices (List[PstDeviceProxy]) – list of devices to perform action upon.

  • action (Callable[[PstDeviceProxy], DevVarLongStringArrayType]) – the callbable to perform on each device proxy.

action: Callable[[PstDeviceProxy], ska_tango_base.base.base_device.DevVarLongStringArrayType]
command_name: str
devices: List[PstDeviceProxy]
class ska_pst.lmc.job.DeviceCommandTaskContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None, device: ~ska_pst.lmc.device_proxy.PstDeviceProxy, command_name: str, action: ~typing.Callable[[~ska_pst.lmc.device_proxy.PstDeviceProxy], ska_tango_base.base.base_device.DevVarLongStringArrayType])[source]

A task context class that is used for TANGO Device Proxy command tasks.

This extends from TaskContext to allow storing of task contexts of remote device proxy command tasks.

Variables
  • device (DeviceAction) – the device proxy the command will be executed on.

  • action – the action to perform when the task is executed.

  • command_name (str) – the name of the command. This is use for logging/debugging.

action: Callable[[PstDeviceProxy], ska_tango_base.base.base_device.DevVarLongStringArrayType]
command_name: str
device: PstDeviceProxy
class ska_pst.lmc.job.DeviceCommandTaskExecutor(task_queue: Queue, logger: Optional[Logger])[source]

Class to handle executing and tracking commands on device proxies.

This class uses a queue to receive task commands, while a background thread receives these messages and then executes the commands.

Since the remote commands also run in the background on the device, this class needs to subscribe to the longRunningCommandResult property and listen to events of when this changes for a given command. This class was created to allow the normal TaskExecutor not having to worry about all the necessary subscription and event handling.

Clients should submit DeviceCommandTask tasks to the TaskExecutor rather than building up a DeviceCommandTaskContext and sending it to the task queue.

Instances of class and the TaskExecutor class work together by sharing a queue. If creating separate instances of both classes, make sure that queue between them is the same.

start() None[source]

Start the executor.

stop() None[source]

Stop the executor.

class ska_pst.lmc.job.JobContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None, success_callback: ~typing.Optional[~typing.Callable[[...], ~typing.Any]] = None)[source]

A task context class that is used to track the whole submitted job.

This is used by the job executor to track the overall job. The task executor will either throw the exception stored in parent class or will call the success_callback once the job has been marked as complete.

Variables

success_callback (Callback) – an option callback to call when job ends successfully, defaults to None.

success_callback: Optional[Callable[[...], Any]] = None
class ska_pst.lmc.job.LambdaTask(action: Callable[[], None], name: Optional[str] = None)[source]

A class whose operation is to call a lambda.

The lamdba doesn’t take any parameters. Any capturing of variables must be done at the call site of the construction of this task.

This allows a task to call something like a task_callback to perform an update.

action: Callable[[], None]
name: str | None = None
class ska_pst.lmc.job.NoopTask[source]

A class as a placeholder for a no-op task.

This is useful when nothing is meant to happen but makes it easier for defining a the overall job where there is a conditional task being created. Using the NoopTask can be inserted and treated like the Python None.

class ska_pst.lmc.job.ParallelTask(subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]])[source]

A class used to handle tasks that can be run in parallel.

Instances of this class take a list of tasks that can be all run in parallel. This job is not complete until all the task are complete.

Parameters

tasks (List[Task]) – a list of subtasks to be performed concurrently

subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]]
class ska_pst.lmc.job.ParallelTaskContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None, subtasks: ~typing.List[~ska_pst.lmc.job.task.TaskContext] = <factory>)[source]

A task context class that tracks subtasks for a parallel task.

This extends from TaskContext to allow storing of task contexts of the subtasks.

This task is only considered completed successfully once on the subtasks have completed successfully. If one of the subtasks fails then this task contexted is to be considered failed.

Variables

subtasks (List[TaskContext]) – a list of TaskContext, one for each subtask.

subtasks: List[TaskContext]
class ska_pst.lmc.job.SequentialTask(subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]])[source]

A class used to handle sequential tasks.

Instances of this class take a list of tasks that will all be run in sequentially. This job is not complete until the last job is complete.

Parameters

tasks (List[Task]) – a list of subtasks to be performed sequentially

subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]]
class ska_pst.lmc.job.TaskContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None)[source]

A class used to track a task when a job is running.

This is the base class for all other types of TaskContext. This class it used by the job/task executors to keep track of a task and is linked back to a parent task. It stores the result or exception if the task has completed successfully or failed respectively.

Variables
  • task (Task) – the task this context relates to.

  • task_id (str) – the id of the task. If not specified it is a string version of a UUID version 4.

  • evt (threading.Event) – at threading.Event used to signal that this task has finished, either successfully or it failed.

  • parent_task_context (Optional[TaskContext]) – an optional parent task context, default None.

  • result (Optional[Any]) – the result of the task. Until this value is set the task is not considered completed successly.

  • exception – the exception of a failed task. Once this value is set the task is considered to have failed.

property completed: bool

Check if the task has completed.

If the task context has failed or there is a result then this method will return true.

Returns

if the task has completed

Return type

bool

evt: Event
exception: Optional[Exception] = None
property failed: bool

Check if task has failed or not.

Returns

returns True if the context is storing an exception.

Return type

bool

parent_task_context: Optional[TaskContext] = None
result: Optional[Any] = None
signal_complete(result: Optional[Any] = None) None[source]

Signal that the task has completed successfully.

Calling this will mark the task as completed and if any parent task is waiting on the evt to be set will be notified.

Parameters

result (Optional[Any], optional) – the result of the task, if any. This defaults to None

signal_failed(exception: Exception) None[source]

Signal that the task has failed.

Calling this will mark the task as failed and if any parent task is waiting on the evt to be set will be notified.

Parameters

exception (Exception) – an exception that will be sent back to the original caller of the job that signifies that the command failed.

signal_failed_from_str(msg: str) None[source]

Signal that the task has failed.

This takes an error message an raises a RuntimeError and then calls the signal_failed method. This is used by remote device proxies where the result returned is the error message of the remote exception.

Parameters

msg (str) – the error message to convert into a RuntimeError

task: Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]
task_id: str
class ska_pst.lmc.job.TaskExecutor(job_queue: Optional[Queue[TaskContext]] = None, device_command_task_queue: Optional[Queue[DeviceCommandTaskContext]] = None, max_parallel_workers: int = 4, logger: Optional[Logger] = None)[source]

An executor class that handles requests for tasks.

Jobs are submitted to instances of this class via the submit_job() method or to the global instance of this task executor GLOBAL_JOB_EXECUTOR via the global method submit_job.

Instances of this class are linked with DeviceCommandTaskExecutor that will handle the TANGO logic of subscriptions and dealing with long running commands. Both need to share an instance of a queue.Queue for which this class will send messages to while the DeviceCommandTaskExecutor will read from.

start() None[source]

Start the task executor.

stop() None[source]

Stop the executor.

submit_job(job: Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask], callback: Optional[Callable[[...], Any]] = None) None[source]

Submit a job to be executed.

This is the main method that clients should use to submit jobs to be executed. This will wrap into a TaskContext and put it on the main queue which can then be processed.

Parameters
  • job (Task) – the job to be submitted, it can be a simple DeviceCommandTask or a composite task like a SequentialTask or ParallelTask.

  • callback (Callback, optional) – the callback to notify the job is complete, defaults to None