ska_pst.lmc.job
This is used for handling complex jobs used by the PST.BEAM.
- class ska_pst.lmc.job.DeviceCommandTask(devices: List[PstDeviceProxy], action: Callable[[PstDeviceProxy], ska_tango_base.base.base_device.DevVarLongStringArrayType], command_name: str)[source]
A class used to handle a command to be executed on remote devices.
Instances of this class take a list of devices and an action to be performed on a remote device. If more than one device is used this is converted into a
ParallelTask
that separate instances of this class, one per device. This job is not complete until all the remote devices have completed the action.Commands are sent to the DeviceCommandExecutor to allow to be tracked by listening to events on the longRunningCommandResult property.
- Parameters
devices (List[PstDeviceProxy]) – list of devices to perform action upon.
action (Callable[[PstDeviceProxy], DevVarLongStringArrayType]) – the callbable to perform on each device proxy.
- action: Callable[[PstDeviceProxy], ska_tango_base.base.base_device.DevVarLongStringArrayType]
- command_name: str
- devices: List[PstDeviceProxy]
- class ska_pst.lmc.job.DeviceCommandTaskContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None, device: ~ska_pst.lmc.device_proxy.PstDeviceProxy, command_name: str, action: ~typing.Callable[[~ska_pst.lmc.device_proxy.PstDeviceProxy], ska_tango_base.base.base_device.DevVarLongStringArrayType])[source]
A task context class that is used for TANGO Device Proxy command tasks.
This extends from
TaskContext
to allow storing of task contexts of remote device proxy command tasks.- Variables
device (DeviceAction) – the device proxy the command will be executed on.
action – the action to perform when the task is executed.
command_name (str) – the name of the command. This is use for logging/debugging.
- action: Callable[[PstDeviceProxy], ska_tango_base.base.base_device.DevVarLongStringArrayType]
- command_name: str
- device: PstDeviceProxy
- class ska_pst.lmc.job.DeviceCommandTaskExecutor(task_queue: Queue, logger: Optional[Logger])[source]
Class to handle executing and tracking commands on device proxies.
This class uses a queue to receive task commands, while a background thread receives these messages and then executes the commands.
Since the remote commands also run in the background on the device, this class needs to subscribe to the longRunningCommandResult property and listen to events of when this changes for a given command. This class was created to allow the normal TaskExecutor not having to worry about all the necessary subscription and event handling.
Clients should submit DeviceCommandTask tasks to the TaskExecutor rather than building up a DeviceCommandTaskContext and sending it to the task queue.
Instances of class and the TaskExecutor class work together by sharing a queue. If creating separate instances of both classes, make sure that queue between them is the same.
- class ska_pst.lmc.job.JobContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None, success_callback: ~typing.Optional[~typing.Callable[[...], ~typing.Any]] = None)[source]
A task context class that is used to track the whole submitted job.
This is used by the job executor to track the overall job. The task executor will either throw the exception stored in parent class or will call the success_callback once the job has been marked as complete.
- Variables
success_callback (Callback) – an option callback to call when job ends successfully, defaults to None.
- success_callback: Optional[Callable[[...], Any]] = None
- class ska_pst.lmc.job.LambdaTask(action: Callable[[], None], name: Optional[str] = None)[source]
A class whose operation is to call a lambda.
The lamdba doesn’t take any parameters. Any capturing of variables must be done at the call site of the construction of this task.
This allows a task to call something like a task_callback to perform an update.
- action: Callable[[], None]
- name: str | None = None
- class ska_pst.lmc.job.NoopTask[source]
A class as a placeholder for a no-op task.
This is useful when nothing is meant to happen but makes it easier for defining a the overall job where there is a conditional task being created. Using the NoopTask can be inserted and treated like the Python None.
- class ska_pst.lmc.job.ParallelTask(subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]])[source]
A class used to handle tasks that can be run in parallel.
Instances of this class take a list of tasks that can be all run in parallel. This job is not complete until all the task are complete.
- Parameters
tasks (List[Task]) – a list of subtasks to be performed concurrently
- subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]]
- class ska_pst.lmc.job.ParallelTaskContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None, subtasks: ~typing.List[~ska_pst.lmc.job.task.TaskContext] = <factory>)[source]
A task context class that tracks subtasks for a parallel task.
This extends from
TaskContext
to allow storing of task contexts of the subtasks.This task is only considered completed successfully once on the subtasks have completed successfully. If one of the subtasks fails then this task contexted is to be considered failed.
- Variables
subtasks (List[TaskContext]) – a list of TaskContext, one for each subtask.
- subtasks: List[TaskContext]
- class ska_pst.lmc.job.SequentialTask(subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]])[source]
A class used to handle sequential tasks.
Instances of this class take a list of tasks that will all be run in sequentially. This job is not complete until the last job is complete.
- Parameters
tasks (List[Task]) – a list of subtasks to be performed sequentially
- subtasks: List[Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]]
- class ska_pst.lmc.job.TaskContext(*, task: ~typing.Union[~ska_pst.lmc.job.task.SequentialTask, ~ska_pst.lmc.job.task.ParallelTask, ~ska_pst.lmc.job.task.DeviceCommandTask, ~ska_pst.lmc.job.task.NoopTask, ~ska_pst.lmc.job.task.LambdaTask], task_id: str = <factory>, evt: ~threading.Event = <factory>, parent_task_context: ~typing.Optional[~ska_pst.lmc.job.task.TaskContext] = None, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None)[source]
A class used to track a task when a job is running.
This is the base class for all other types of TaskContext. This class it used by the job/task executors to keep track of a task and is linked back to a parent task. It stores the result or exception if the task has completed successfully or failed respectively.
- Variables
task (Task) – the task this context relates to.
task_id (str) – the id of the task. If not specified it is a string version of a UUID version 4.
evt (threading.Event) – at
threading.Event
used to signal that this task has finished, either successfully or it failed.parent_task_context (Optional[TaskContext]) – an optional parent task context, default None.
result (Optional[Any]) – the result of the task. Until this value is set the task is not considered completed successly.
exception – the exception of a failed task. Once this value is set the task is considered to have failed.
- property completed: bool
Check if the task has completed.
If the task context has failed or there is a result then this method will return true.
- Returns
if the task has completed
- Return type
bool
- evt: Event
- exception: Optional[Exception] = None
- property failed: bool
Check if task has failed or not.
- Returns
returns True if the context is storing an exception.
- Return type
bool
- parent_task_context: Optional[TaskContext] = None
- result: Optional[Any] = None
- signal_complete(result: Optional[Any] = None) None [source]
Signal that the task has completed successfully.
Calling this will mark the task as completed and if any parent task is waiting on the evt to be set will be notified.
- Parameters
result (Optional[Any], optional) – the result of the task, if any. This defaults to None
- signal_failed(exception: Exception) None [source]
Signal that the task has failed.
Calling this will mark the task as failed and if any parent task is waiting on the evt to be set will be notified.
- Parameters
exception (Exception) – an exception that will be sent back to the original caller of the job that signifies that the command failed.
- signal_failed_from_str(msg: str) None [source]
Signal that the task has failed.
This takes an error message an raises a RuntimeError and then calls the signal_failed method. This is used by remote device proxies where the result returned is the error message of the remote exception.
- Parameters
msg (str) – the error message to convert into a RuntimeError
- task: Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]
- task_id: str
- class ska_pst.lmc.job.TaskExecutor(job_queue: Optional[Queue[TaskContext]] = None, device_command_task_queue: Optional[Queue[DeviceCommandTaskContext]] = None, max_parallel_workers: int = 4, logger: Optional[Logger] = None)[source]
An executor class that handles requests for tasks.
Jobs are submitted to instances of this class via the
submit_job()
method or to the global instance of this task executor GLOBAL_JOB_EXECUTOR via the global method submit_job.Instances of this class are linked with DeviceCommandTaskExecutor that will handle the TANGO logic of subscriptions and dealing with long running commands. Both need to share an instance of a
queue.Queue
for which this class will send messages to while the DeviceCommandTaskExecutor will read from.- submit_job(job: Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask], callback: Optional[Callable[[...], Any]] = None) None [source]
Submit a job to be executed.
This is the main method that clients should use to submit jobs to be executed. This will wrap into a TaskContext and put it on the main queue which can then be processed.
- Parameters
job (Task) – the job to be submitted, it can be a simple DeviceCommandTask or a composite task like a SequentialTask or ParallelTask.
callback (Callback, optional) – the callback to notify the job is complete, defaults to None