ska_pst.lmc.util

This module covers common utility classes and functions of PST.LMC.

Functionality provided within this module is:

  • Validation of JSON requests

  • Background Task processing

  • Job processing, including running jobs in parallel/sequential and on remote Device TANGO devices

  • Custom timeout iterator (see TimeoutIterator)

class ska_pst.lmc.util.BackgroundTask(action_fn: Callable[[], None], logger: Logger, frequency: Optional[float] = None, daemon: bool = True, *args: Any, **kwargs: Any)[source]

BackgroundTask.

This class is used for background task processing. Rather than code being littered with creating background threads or asyncio code, this class is used to provide the functionality to start/stop the process.

The task has to be explicitly started. However, if this task is deleted by the Python runtime it will stop the background processing. It is, however, advised to that a user explicitly stops the task.

This task does waiting/looping and the provided action is not expected to do any waiting.

exception() Optional[Exception][source]

Return the exception raised during background processing.

If no exception has been raised then None is returned.

Returns

the exception raised during background processing.

has_errored() bool[source]

Check if the task has errored.

run() None[source]

Run the background task.

If the task is already in a running state then this task will exit immediately.

running() bool[source]

Check if the task is in a running state.

stop() None[source]

Stop the background task.

If the task is not in a running state this will exit immediately.

class ska_pst.lmc.util.BackgroundTaskProcessor(default_logger: Logger)[source]

Class used for submitting and reaping of background tasks.

This class is used to abstract away needing to hold and monitor BackgroundTask objects. This class submits a background task itself to reap completed tasks.

submit_task(action_fn: Callable, logger: Optional[Logger] = None, frequency: Optional[float] = None) BackgroundTask[source]

Submit a background task to run.

This will submit a background task, whether it is a one shot task or a periodic task. It can take an optional logger, or the default logger used by the processor is used, and an optional frequency.

Parameters
  • task – the callable function to run, this must wrap its own callbacks if needed.

  • logger – an optional logger to use, else the default logger is used.

  • frequency – an optional parameter for background task to run at a given frequency.

Returns

the task that was submit if there is a need for external monitoring.

Return type

BackgroundTask

class ska_pst.lmc.util.DataClass(*args, **kwargs)[source]

A dataclass that can be converted to a dictionary.

class ska_pst.lmc.util.LongRunningCommand(command: str, logger: Optional[Logger] = None)[source]

A class that wraps calling the invoke_lrc() function on a device proxy.

Creating an instance of this class does not execute the command. This class is a Callable class and calling it will execute the long running command and block until successful or a timeout occurs, unless an exception occurs or the command fails.

lrc = LongRunningCommand(command="ConfigureScan")
result = lrc(proxy=proxy, command_args=scan_configuration, timeout=60.0)

if result.result_code != ResultCode.OK:
    ...
class ska_pst.lmc.util.LrcResult(command: str, command_id: str | None, result_code: ska_control_model.ResultCode, result: ~typing.Optional[~typing.Any] = None, exception: ~typing.Optional[Exception] = None, task_status_events: ~typing.List[ska_control_model.TaskStatus] = <factory>)[source]

A data class that used to represent the overall result of a long running command (LRC).

command: str

The name of the command that the result class relates to.

command_id: str | None

The ID of the remote TANGO command.

exception: Exception | None = None

The exception, if raised, that was captured during the long running command.

result: Optional[Any] = None

The result object of the long running command.

result_code: ska_control_model.ResultCode

The result code of the long running command.

task_status_events: List[ska_control_model.TaskStatus]

A list of TaskStatus that occurred during the long running command.

class ska_pst.lmc.util.RunState(value)[source]

Enum to represent run state of BackgroundTask.

ERRORED = 5

The state to represent that the task has errored.

The task can go into this state while starting, running or stopping.

RUNNING = 3

The state to indicate that the task is running.

This state represents that the task is running successfully.

STARTING = 2

The state the task is put into when the task is started.

This is an intermediate state. When the method BackgroundTask.run() is called the task goes into a STARTING state. If the task starts successfully then the state goes into RUNNING.

STOPPED = 1

Background task is stopped and not running.

This is the default state when the task is created.

STOPPING = 4

The state the task is put into when asked to stop.

This is an intermediate state, used to avoid calling stop multiple times. When the method BackgroundTask.stop() is called the task will go in to the STOPPING state.

errored() bool[source]

Check if in an errored state.

This only happens if the state is ERRORED.

Returns

is in a errored state

running() bool[source]

Check if in a state that represents a running state.

If the state is STARTING or RUNNING then this is considered as running state.

Returns

is in a running state

class ska_pst.lmc.util.StreamingTask(task_name: str, item_generator: Callable[[Event], Generator[T, None, None]], item_handler: Callable[[T], None], exception_handler: Optional[Callable[[Exception], None]] = None, logger: Optional[Logger] = None)[source]

A class used to handle the background task of streamed items of data.

This class is used by background monitoring and health check results for gRPC services. This has common code refactored out to handle the starting/stopping of the task, streaming of the results and handling of the results.

is_running() bool[source]

Get whether the task is running or not.

start(abort_event: Optional[Event] = None, **kwargs: Any) None[source]

Start the streaming of results from the generator.

Parameters

abort_event (threading.Event | None, optional) – a threading primitive to use to signal stopping of the task, defaults to None

stop() None[source]

Stop processing of background items.

This will signal the generator, via the abort event used in the start(), to stop streaming of items.

class ska_pst.lmc.util.TimeoutIterator(iterator: Iterator[T], abort_event: Optional[Event] = None, timeout: float = 0.0, expected_rate: float = 0.0)[source]

An iterator that can timeout.

The implementation of this uses a background thread to get the items and put them on a queue, while the next functionality of this will block

ska_pst.lmc.util.background_task(func: Wrapped) Wrapped[source]

Return a decorated function that runs tasks in a background.

Use this decorator on methods of a class that contains a BackgroundTaskProcessor in the field _background_task_processor. If the field doesn’t exist than this wrapper will error with the field not existing.

@background_task
def configure_beam(self):
    ...
Parameters

func – the wrapped function

Returns

the wrapped function