Utils

This module covers common utility classes and functions of PST.LMC.

Functionality provided within this module is:

  • Validation of JSON requests

  • Background Task processing

  • Job processing, including running jobs in parallel/sequential and on remote Device TANGO devices

  • Custom timeout iterator (see TimeoutIterator)

ska_pst_lmc.util.validate(config: dict, strictness: Strictness = Strictness.Permissive) dict[source]

Validate a dict against the CSP config schema.

Parameters
  • config (dict) – the object that needs to be validated.

  • strictness (Strictness) – used to handle level of Schema strictness.

Returns

an updated dictionary based on schema.validate(), which includes the default values.

Raises

ValueError exception if there is an exception in validation.

class ska_pst_lmc.util.Strictness(value)[source]

Enum for level of strictness for validation.

exception ska_pst_lmc.util.ValidationError(message: str, *args: Any)[source]

Exception representing a validation exception.

Create instance of ValidationError.

class ska_pst_lmc.util.Configuration(dict=None, /, **kwargs)[source]

Configuration.

This class represents a PST-LMC configuration that is sent from the CSP. It is a generic object that is able to validate itself against the ska-telmodel schema for PST.

Creating instances of these from a JSON encoded string should be done via the Configration.from_json() method. To convert the object to a JSON encoded string should be through the to_json() method.

static from_json(json_str: str) Configuration[source]

Create Configuration class from JSON string.

Creates an instance of a Configuration class from as JSON encoded string. This will also validate that the given string matches what is expected, this is performed by calling ska.pst.util.validate().

Parameters

json_str – JSON encode string of a configuration object.

Returns

A Configuration object.

Return type

Configuration

Raises

ValueError is not a valid configuration request.

to_json() str[source]

Serialise the Configuration object to a JSON string.

This is a helper method to serialised the configuration object to a JSON string. This is effectively

json.dumps(self._values)
Returns

a JSON encoded string.

Return type

str

class ska_pst_lmc.util.BackgroundTask(action_fn: Callable[[], None], logger: Logger, frequency: Optional[float] = None, daemon: bool = True, *args: Any, **kwargs: Any)[source]

BackgroundTask.

This class is used for background task processing. Rather than code being littered with creating background threads or asyncio code, this class is used to provide the functionality to start/stop the process.

The task has to be explicitly started. However, if this task is deleted by the Python runtime it will stop the background processing. It is, however, advised to that a user explicitly stops the task.

This task does waiting/looping and the provided action is not expected to do any waiting.

Initialise background task.

Parameters
  • action_fn – action to call during background process. If the frequency is set then this is a per loop callback, else this action will be called once.

  • logger – logger to be used by the task object.

  • frequency – how often to execute the action. If None (the default) then action is only called once, else this task will attempt to sleep for 1/frequency seconds.

  • daemon – run task in a background daemone thread (default is True)

run() None[source]

Run the background task.

If the task is already in a running state then this task will exit immediately.

running() bool[source]

Check if the task is in a running state.

has_errored() bool[source]

Check if the task has errored.

exception() Optional[Exception][source]

Return the exception raised during background processing.

If no exception has been raised then None is returned.

Returns

the exception raised during background processing.

stop() None[source]

Stop the background task.

If the task is not in a running state this will exit immediately.

class ska_pst_lmc.util.BackgroundTaskProcessor(default_logger: Logger)[source]

Class used for submitting and reaping of background tasks.

This class is used to abstract away needing to hold and monitor BackgrounTask objects. This class submits a background task itself to reap completed tasks.

Initialise processor.

Parameters

default_logger (logging.Logger) – the logger that the tasks should use if one is not provided.

submit_task(action_fn: Callable, logger: Optional[Logger] = None, frequency: Optional[float] = None) BackgroundTask[source]

Submit a background task to run.

This will submit a background task, whether it is a one shot task or a periodic task. It can take an optional logger, or the default logger used by the processor is used, and an optional frequency.

Parameters
  • task – the callable function to run, this must wrap its own callbacks if needed.

  • logger – an optional logger to use, else the default logger is used.

  • frequency – an optional parameter for background task to run at a given frequency.

Returns

the task that was submit if there is a need for external monitoring.

Return type

BackgroundTask

ska_pst_lmc.util.background_task(func: Wrapped) Wrapped[source]

Return a decorated function that runs tasks in a background.

Use this decorator on methods of a class that contains a BackgroundTaskProcessor in the field _background_task_processor. If the field doesn’t exist than this wrapper will error with the field not existing.

@background_task
def configure_beam(self):
    ...
Parameters

func – the wrapped function

Returns

the wrapped function

class ska_pst_lmc.util.RunState(value)[source]

Enum to represent run state of BackgroundTask.

STOPPED = 1

Background task is stopped and not running.

This is the default state when the task is created.

STARTING = 2

The state the task is put into when the task is started.

This is an intermediate state. When the method BackgroundTask.run() is called the task goes into a STARTING state. If the task starts successfully then the state goes into RUNNING.

RUNNING = 3

The state to indicate that the task is running.

This state represents that the task is running successfully.

STOPPING = 4

The state the task is put into when asked to stop.

This is an intermediate state, used to avoid calling stop mulitple times. When the method BackgroundTask.stop() is called the task will go in to the STOPPING state.

ERRORED = 5

The state to represent that the task has errored.

The task can go into this state while starting, running or stopping.

running() bool[source]

Check if in a state that represents a running state.

If the state is STARTING or RUNNING then this is considered as running state.

Returns

is in a running state

errored() bool[source]

Check if in an errored state.

This only happens if the state is ERRORED.

Returns

is in a errored state

class ska_pst_lmc.util.RemoteTask(device: PstDeviceProxy, action: Callable[[PstDeviceProxy], Tuple[ska_tango_base.commands.ResultCode, str]], task_callback: Callable)[source]

Class to handle calling long run tasks on remote devices.

This class takes instances of a device proxy and function/action to call when this task is called. It also takes a task_callback to be used when the process updates.

This task will subscribe to events on the device proxy of the longRunningCommandStatus and longRunningCommandProgress attributes. When the task is deleted it will unsubscribe from the events too.

This task has to be called (i.e. task()) for the action to be called. This allows for the task to be submitted as a background process/thread.

Initialise the task.

Parameters
  • device – the device proxy that the task is for.

  • action – the action to perfom when called.

  • task_callback – the callback to use when the task needs to update it’s progress or status.

class ska_pst_lmc.util.AggregateRemoteTask(task_callback: Callable, background: bool = True, logger: Optional[Logger] = None)[source]

Class for aggregation of multiple remote tasks.

This class is used to submit multiple remotes tasks that can occur in parallel.

Tasks are added to this aggregate by:

task.add_remote_task(
    device,
    action=lambda d: d.Action(),
)

Only after all the tasks have been added should this task be submitted.

This task will also work out the progress and overall status of the task. In the case of progress it an average of the progress of all the subtasks but there is no weighting of those progress values.

Initialise aggregate task.

Parameters
  • task_callback – the callback to use when the task needs to update it’s progress or status.

  • background – whether to submit the task in the background or not. The default value is True.

  • logger – the logger to use for logging from this task.

add_remote_task(device: PstDeviceProxy, action: Callable[[PstDeviceProxy], Tuple[ska_tango_base.commands.ResultCode, str]]) RemoteTask[source]

Add a remote task to be executed when this task executes.

This can only be called while the task hasn’t been already started.

Parameters
  • device – the device proxy that the new remote task relates to.

  • action – the action to perfom on the remote device.

class ska_pst_lmc.util.TimeoutIterator(iterator: Iterator[T], abort_event: Optional[Event] = None, timeout: float = 0.0, expected_rate: float = 0.0)[source]

An iterator that can timeout.

The implementation of this uses a background thread to get the items and put them on a queue, while the next functionality of this will block

Initialise iterator.

class ska_pst_lmc.util.TelescopeFacilityEnum(value)[source]

Enum representing the different telescope facilities within SKAO.

Low = 1

Used to present that the functionality is for the SKA-Low facility.

Mid = 2

Used to present that the functionality is for the SKA-Mid facility.