# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
Module for handling tasks and the task context for background jobs.
This module provides for 3 types of tasks: :py:class:`SequentialTask`,
:py:class:`ParallelTask`, and :py:class:`DeviceCommandTask`. A job
that can be submitted to for background processing can be any of these
types of tasks. The `SequentialTask` and `ParallelTask` task classes
are composite task classes that have subtasks that can be of any type
of task.
This module also provides a hierarchy of :py:class:`TaskContext` classes.
These are used within the task executors to keep track for the tasks,
and their subtasks. A task context may have an optional parent task context
is used to send information back to the parent and ultimately back up to
job the that was submitted.
"""
from __future__ import annotations
import threading
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable, List, Optional, Union
from ska_pst.lmc.device_proxy import PstDeviceProxy
from ska_pst.lmc.util.callback import Callback
[docs]@dataclass
class NoopTask:
"""
A class as a placeholder for a no-op task.
This is useful when nothing is meant to happen but
makes it easier for defining a the overall job
where there is a conditional task being created. Using
the `NoopTask` can be inserted and treated like the Python
`None`.
"""
[docs]@dataclass
class LambdaTask:
"""
A class whose operation is to call a lambda.
The lambda doesn't take any parameters. Any capturing of variables must be done at the call site of the
construction of this task.
This allows a task to call something like a task_callback to perform an update.
"""
action: Callable[[], None] = field(repr=False)
name: str | None = field(default=None)
[docs]@dataclass
class SequentialTask:
"""
A class used to handle sequential tasks.
Instances of this class take a list of tasks that will all be run in sequentially. This job is not
complete until the last job is complete.
:param tasks: a list of subtasks to be performed sequentially
:type tasks: List[Task]
"""
subtasks: List[Task]
[docs]@dataclass
class ParallelTask:
"""
A class used to handle tasks that can be run in parallel.
Instances of this class take a list of tasks that can be all run in parallel. This job is not complete
until all the task are complete.
:param tasks: a list of subtasks to be performed concurrently
:type tasks: List[Task]
"""
subtasks: List[Task]
[docs]@dataclass
class DeviceCommandTask:
"""
A class used to handle a command to be executed on remote devices.
Instances of this class take a list of devices and an action to
be performed on a remote device. If more than one device is used this
is converted into a :py:class:`ParallelTask` that separate instances
of this class, one per device. This job is not complete until all
the remote devices have completed the action.
"""
devices: List[PstDeviceProxy]
"""list of devices to perform action upon."""
command: str
"""The name of the command to on the device proxies."""
command_args: tuple[Any] | None = None
"""The list of arguments, if any, to use when performing command."""
timeout: float | None = None
"""
The timeout for remote call to complete by.
If not set then the task will block indefinitely until the command completes.
"""
Task = Union[SequentialTask, ParallelTask, DeviceCommandTask, NoopTask, LambdaTask]
"""Type alias for the different sorts of tasks."""
[docs]@dataclass(kw_only=True)
class TaskContext:
"""
A class used to track a task when a job is running.
This is the base class for all other types of `TaskContext`. This
class it used by the job/task executors to keep track of a task
and is linked back to a parent task. It stores the result or exception
if the task has completed successfully or failed respectively.
:ivar task: the task this context relates to.
:vartype task: Task
:ivar task_id: the id of the task. If not specified it is a string version
of a UUID version 4.
:vartype task_id: str
:ivar evt: at :py:class:`threading.Event` used to signal that this task
has finished, either successfully or it failed.
:vartype evt: threading.Event
:ivar parent_task_context: an optional parent task context, default None.
:vartype parent_task_context: Optional[TaskContext]
:ivar result: the result of the task. Until this value is set the task
is not considered completed successfully.
:vartype result: Optional[Any]
:ivar exception: the exception of a failed task. Once this value is set
the task is considered to have failed.
"""
task: Task
task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
evt: threading.Event = field(default_factory=threading.Event, repr=False)
parent_task_context: Optional[TaskContext] = field(default=None, repr=False)
result: Optional[Any] = None
exception: Optional[Exception] = None
[docs] def signal_complete(self: TaskContext, result: Optional[Any] = None) -> None:
"""
Signal that the task has completed successfully.
Calling this will mark the task as completed and if any parent task is
waiting on the `evt` to be set will be notified.
:param result: the result of the task, if any. This defaults to None
:type result: Optional[Any], optional
"""
self.result = result
self.evt.set()
[docs] def signal_failed(self: TaskContext, exception: Exception) -> None:
"""
Signal that the task has failed.
Calling this will mark the task as failed and if any parent task is
waiting on the `evt` to be set will be notified.
:param exception: an exception that will be sent back to the original
caller of the job that signifies that the command failed.
:type exception: Exception
"""
self.exception = exception
self.evt.set()
[docs] def signal_failed_from_str(self: TaskContext, msg: str) -> None:
"""
Signal that the task has failed.
This takes an error message an raises a `RuntimeError` and then
calls the `signal_failed` method. This is used by remote device proxies
where the result returned is the error message of the remote exception.
:param msg: the error message to convert into a `RuntimeError`
:type msg: str
"""
try:
raise RuntimeError(msg)
except Exception as e:
self.signal_failed(exception=e)
@property
def failed(self: TaskContext) -> bool:
"""
Check if task has failed or not.
:return: returns True if the context is storing an exception.
:rtype: bool
"""
return self.exception is not None
@property
def completed(self: TaskContext) -> bool:
"""
Check if the task has completed.
If the task context has failed or there is a result then this method will return true.
:return: if the task has completed
:rtype: bool
"""
return self.evt.is_set() and not self.failed
[docs]@dataclass(kw_only=True)
class ParallelTaskContext(TaskContext):
"""
A task context class that tracks subtasks for a parallel task.
This extends from :py:class:`TaskContext` to allow storing of
task contexts of the subtasks.
This task is only considered completed successfully once on the
subtasks have completed successfully. If one of the subtasks fails
then this task context is to be considered failed.
:ivar subtasks: a list of `TaskContext`, one for each subtask.
:vartype subtasks: List[TaskContext]
"""
subtasks: List[TaskContext] = field(default_factory=list)
[docs]@dataclass(kw_only=True)
class DeviceCommandTaskContext(TaskContext):
"""
A task context class that is used for TANGO Device Proxy command tasks.
This extends from :py:class:`TaskContext` to allow storing of
task contexts of remote device proxy command tasks.
"""
device: PstDeviceProxy
"""The device proxy the command will be executed on."""
command: str
"""The name of the long running command to execute."""
command_args: tuple[Any] | None = None
"""The command arguments to pass through to the command."""
timeout: float | None = None
"""
The timeout for remote call to complete by.
If not set then the task will block indefinitely until the command completes.
"""
[docs]@dataclass(kw_only=True)
class JobContext(TaskContext):
"""
A task context class that is used to track the whole submitted job.
This is used by the job executor to track the overall job. The task
executor will either throw the exception stored in parent class or
will call the `success_callback` once the job has been marked as complete.
:ivar success_callback: an option callback to call when job ends successfully,
defaults to None.
:vartype success_callback: Callback
"""
success_callback: Callback = field(default=None, repr=False)