Source code for ska_pst.send.processing_context

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module containing class for managing the processing context of SEND DLM process."""

from __future__ import annotations

import dataclasses
import pathlib
import threading

from ska_pst.common import TelescopeConfig


[docs]@dataclasses.dataclass class ProcessingContext: """ The class used to manage passing around the processing context of the SEND DLM process. This class captures common properties that are used between the ``.dlm_transfer.DlmTransfer``, ``.scan_manager.ScanManager`` and ``.scan.Scan`` classes. This class also has a ``threading.Event`` used to signal when processing should stop, before processing a scan or a scan file the context should be checked to see if processing can proceeded by doing the following: .. code-block:: python if ctx.processing_stopped: return """ subsystem_id: str """ The ID of the SKA subsystem used in the path of the generated scan output files. For PST the only valid values are ``pst-low`` and ``pst-mid``. """ telescope_config: TelescopeConfig """ The telescope that SEND is deployed in. This allows getting specific telescope configuration and for converting a CSP-LMC scan configuration into a PST internal representation. """ local_path: pathlib.Path """The path of where PST processing will write files to that need to be processed by SEND.""" staging_path: pathlib.Path """The path that SEND uses to stage files ready to be moved to the DLM path.""" dlm_path: pathlib.Path """The path to where SEND should put completed scan directories for the DLM agent to process.""" stop_processing_evt: threading.Event = dataclasses.field(repr=False) """The threading event used to signal that processing should stop.""" scan_timeout: float = dataclasses.field(default=300.0) """The time out, in seconds, to mark a scan as being inactive.""" @property def processing_stopped(self: ProcessingContext) -> bool: """ Check if processing has been requested to stop. :return: whether the ``stop_processing_evt`` has been set or not. :rtype: bool """ return self.stop_processing_evt.is_set()
[docs] def stop_processing(self: ProcessingContext) -> None: """ Set the ``stop_processing_evt`` to being set. Calling this method will mean any clients waiting for the event to be set will be notified. After being set the ``processing_stopped`` property will return ``True``. """ self.stop_processing_evt.set()
[docs] def wait_for(self: ProcessingContext, timeout: float | None = None) -> bool: """ Wait for the ``stop_processing_evt`` to be set or until a timeout has been reached. This is a utility method to make the code cleaner to wait for the threading event to have been set. .. code-block:: python while not ctx.wait_for(timeout=1000): # perform something versus .. code-block:: python while not ctx.stop_processing_evt.wait(timeout=1000): # perform something :param timeout: how long to wait in seconds before returning false, defaults to None :type timeout: float | None, optional :return: whether the ``threading.Event`` has been set or not. :rtype: bool """ return self.stop_processing_evt.wait(timeout=timeout)