ska_pst.lmc.util
This module covers common utility classes and functions of PST.LMC.
Functionality provided within this module is:
Validation of JSON requests
Background Task processing
Job processing, including running jobs in parallel/sequential and on remote Device TANGO devices
Custom timeout iterator (see
TimeoutIterator
)
- class ska_pst.lmc.util.AggregateRemoteTask(task_callback: Callable, background: bool = True, logger: Optional[Logger] = None)[source]
Class for aggregation of multiple remote tasks.
This class is used to submit multiple remotes tasks that can occur in parallel.
Tasks are added to this aggregate by:
task.add_remote_task( device, action=lambda d: d.Action(), )
Only after all the tasks have been added should this task be submitted.
This task will also work out the progress and overall status of the task. In the case of progress it an average of the progress of all the subtasks but there is no weighting of those progress values.
- add_remote_task(device: PstDeviceProxy, action: Callable[[PstDeviceProxy], Tuple[ska_tango_base.commands.ResultCode, str]]) RemoteTask [source]
Add a remote task to be executed when this task executes.
This can only be called while the task hasn’t been already started.
- Parameters
device – the device proxy that the new remote task relates to.
action – the action to perfom on the remote device.
- status: ska_tango_base.executor.TaskStatus
- tasks: List[RemoteTask]
- class ska_pst.lmc.util.BackgroundTask(action_fn: Callable[[], None], logger: Logger, frequency: Optional[float] = None, daemon: bool = True, *args: Any, **kwargs: Any)[source]
BackgroundTask.
This class is used for background task processing. Rather than code being littered with creating background threads or asyncio code, this class is used to provide the functionality to start/stop the process.
The task has to be explicitly started. However, if this task is deleted by the Python runtime it will stop the background processing. It is, however, advised to that a user explicitly stops the task.
This task does waiting/looping and the provided action is not expected to do any waiting.
- exception() Optional[Exception] [source]
Return the exception raised during background processing.
If no exception has been raised then None is returned.
- Returns
the exception raised during background processing.
- class ska_pst.lmc.util.BackgroundTaskProcessor(default_logger: Logger)[source]
Class used for submitting and reaping of background tasks.
This class is used to abstract away needing to hold and monitor
BackgrounTask
objects. This class submits a background task itself to reap completed tasks.- submit_task(action_fn: Callable, logger: Optional[Logger] = None, frequency: Optional[float] = None) BackgroundTask [source]
Submit a background task to run.
This will submit a background task, whether it is a one shot task or a periodic task. It can take an optional logger, or the default logger used by the processor is used, and an optional frequency.
- Parameters
task – the callable function to run, this must wrap its own callbacks if needed.
logger – an optional logger to use, else the default logger is used.
frequency – an optional parameter for background task to run at a given frequency.
- Returns
the task that was submit if there is a need for external monitoring.
- Return type
- class ska_pst.lmc.util.DataClass(*args, **kwargs)[source]
A dataclass that can be converted to a dictionary.
- class ska_pst.lmc.util.PstObservationMode(value)[source]
An enumeration for handling different PST observation modes.
This enum is used to provide logic around a given observation mode, including ability to get the CSP Json config example string given the current observation mode.
- DYNAMIC_SPECTRUM = 'DYNAMIC_SPECTRUM'
- FLOW_THROUGH = 'FLOW_THROUGH'
- PULSAR_TIMING = 'PULSAR_TIMING'
- VOLTAGE_RECORDER = 'VOLTAGE_RECORDER'
- property csp_scan_config_key: str | None
Get the CSP scan configuration key for given observation mode.
The CSP configuration has specific configuration for a scan when in a particular observation mode. This property can be used to get the key for that configuration.
For VOLTAGE_RECORDER there is no extra configuration and as such this returns a None value.
- Return type
str
- csp_scan_example_str() str [source]
Get CSP Scan example string.
This is to be used when calling the
ska_telmodel.csp.get_csp_config_example()
method to retreive the correct example CSP JSON data.
- static from_str(value: str) PstObservationMode [source]
Get observation mode from a string.
This is an extension to the Python enum because the PST BDD tests may use ‘voltage recorder’ or ‘pulsar timing’. This will capitalise the string and replace spaces with underscores to then try to get the enum value.
- class ska_pst.lmc.util.RemoteTask(device: PstDeviceProxy, action: Callable[[PstDeviceProxy], Tuple[ska_tango_base.commands.ResultCode, str]], task_callback: Callable)[source]
Class to handle calling long run tasks on remote devices.
This class takes instances of a device proxy and function/action to call when this task is called. It also takes a task_callback to be used when the process updates.
This task will subscribe to events on the device proxy of the longRunningCommandStatus and longRunningCommandProgress attributes. When the task is deleted it will unsubscribe from the events too.
This task has to be called (i.e. task()) for the action to be called. This allows for the task to be submitted as a background process/thread.
- command_id: Optional[str] = None
- progress: Optional[int] = None
- class ska_pst.lmc.util.RunState(value)[source]
Enum to represent run state of
BackgroundTask
.- ERRORED = 5
The state to represent that the task has errored.
The task can go into this state while starting, running or stopping.
- RUNNING = 3
The state to indicate that the task is running.
This state represents that the task is running successfully.
- STARTING = 2
The state the task is put into when the task is started.
This is an intermediate state. When the method
BackgroundTask.run()
is called the task goes into a STARTING state. If the task starts successfully then the state goes into RUNNING.
- STOPPED = 1
Background task is stopped and not running.
This is the default state when the task is created.
- STOPPING = 4
The state the task is put into when asked to stop.
This is an intermediate state, used to avoid calling stop mulitple times. When the method
BackgroundTask.stop()
is called the task will go in to the STOPPING state.
- class ska_pst.lmc.util.TelescopeFacilityEnum(value)[source]
Enum representing the different telescope facilities within SKAO.
- Low = 1
Used to present that the functionality is for the SKA-Low facility.
- Mid = 2
Used to present that the functionality is for the SKA-Mid facility.
- static from_telescope(telescope: str) TelescopeFacilityEnum [source]
Get enum value based on telescope string.
The telescope parameter must be either “SKALow” or “SKAMid”.
- property telescope: str
Get the SKA telescope that the facility enum represents.
- class ska_pst.lmc.util.TimeoutIterator(iterator: Iterator[T], abort_event: Optional[Event] = None, timeout: float = 0.0, expected_rate: float = 0.0)[source]
An iterator that can timeout.
The implementation of this uses a background thread to get the items and put them on a queue, while the next functionality of this will block
- ska_pst.lmc.util.background_task(func: Wrapped) Wrapped [source]
Return a decorated function that runs tasks in a background.
Use this decorator on methods of a class that contains a
BackgroundTaskProcessor
in the field _background_task_processor. If the field doesn’t exist than this wrapper will error with the field not existing.@background_task def configure_beam(self): ...
- Parameters
func – the wrapped function
- Returns
the wrapped function