# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module containing class for managing the processing context of SEND DLM process."""
from __future__ import annotations
import dataclasses
import pathlib
import threading
from ska_pst.common import TelescopeConfig
[docs]@dataclasses.dataclass
class ProcessingContext:
"""
The class used to manage passing around the processing context of the SEND DLM process.
This class captures common properties that are used between the ``.dlm_transfer.DlmTransfer``,
``.scan_manager.ScanManager`` and ``.scan.Scan`` classes. This class also has a ``threading.Event``
used to signal when processing should stop, before processing a scan or a scan file the
context should be checked to see if processing can proceeded by doing the following:
.. code-block:: python
if ctx.processing_stopped:
return
"""
subsystem_id: str
"""
The ID of the SKA subsystem used in the path of the generated scan output files.
For PST the only valid values are ``pst-low`` and ``pst-mid``.
"""
telescope_config: TelescopeConfig
"""
The telescope that SEND is deployed in.
This allows getting specific telescope configuration and for converting a CSP-LMC scan
configuration into a PST internal representation.
"""
local_path: pathlib.Path
"""The path of where PST processing will write files to that need to be processed by SEND."""
staging_path: pathlib.Path
"""The path that SEND uses to stage files ready to be moved to the DLM path."""
dlm_path: pathlib.Path
"""The path to where SEND should put completed scan directories for the DLM agent to process."""
stop_processing_evt: threading.Event = dataclasses.field(repr=False)
"""The threading event used to signal that processing should stop."""
scan_timeout: float = dataclasses.field(default=300.0)
"""The time out, in seconds, to mark a scan as being inactive."""
@property
def processing_stopped(self: ProcessingContext) -> bool:
"""
Check if processing has been requested to stop.
:return: whether the ``stop_processing_evt`` has been set or not.
:rtype: bool
"""
return self.stop_processing_evt.is_set()
[docs] def stop_processing(self: ProcessingContext) -> None:
"""
Set the ``stop_processing_evt`` to being set.
Calling this method will mean any clients waiting for the event to
be set will be notified. After being set the ``processing_stopped``
property will return ``True``.
"""
self.stop_processing_evt.set()
[docs] def wait_for(self: ProcessingContext, timeout: float | None = None) -> bool:
"""
Wait for the ``stop_processing_evt`` to be set or until a timeout has been reached.
This is a utility method to make the code cleaner to wait for the threading event
to have been set.
.. code-block:: python
while not ctx.wait_for(timeout=1000):
# perform something
versus
.. code-block:: python
while not ctx.stop_processing_evt.wait(timeout=1000):
# perform something
:param timeout: how long to wait in seconds before returning false, defaults to None
:type timeout: float | None, optional
:return: whether the ``threading.Event`` has been set or not.
:rtype: bool
"""
return self.stop_processing_evt.wait(timeout=timeout)