Source code for ska_pst.send.scan_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module class for managing scans of recorded by the PST AA0.5 Voltage Recorder."""

from __future__ import annotations

import functools
import logging
import pathlib
from typing import List, Set

from ska_pst.common.constants import SKA_PST_SUBSYSTEM_IDS

from .processing_context import ProcessingContext
from .scan import Scan
from .scan_factory import create_scan

__all__ = [
    "ScanManager",
]


[docs]class ScanManager: """Class that manages the processing of recorded scans.""" def __init__( self: ScanManager, *, ctx: ProcessingContext, logger: logging.Logger | None = None, ) -> None: """ Initialise a ScanManager object. :param ctx: the processing context for the DLM transfer :type ctx: ProcessingContext :param logger: the logger instance to use, defaults to None :type logger: logging.Logger | None, optional """ assert ctx.local_path.exists() and ctx.local_path.is_dir() assert ctx.staging_path.exists() and ctx.staging_path.is_dir() assert ctx.subsystem_id in SKA_PST_SUBSYSTEM_IDS self._scans: List[Scan] = [] self.logger = logger or logging.getLogger(__name__) self._ctx = ctx self._refresh_scans() def __iter__(self: ScanManager) -> ScanManager: """ Get instance as an iterator. This will mark force complete inactive scans This allows for following code .. code-block:: python for scan in scan_manager: # handle scan :return: self :rtype: ScanManager """ self._force_complete_inactive_scans() return self def __next__(self: ScanManager) -> Scan: """ Get the next available scan that should be processed. If there are no scans available or the processing context is marked as stopped, then the iteration will stop. :return: the next available scan to be processed. :rtype: Scan :raises StopIteration: if there are no more scans to process or the processing context has been stopped. """ scan = self._next_unprocessed_scan() if scan is None or self._ctx.processing_stopped: raise StopIteration() return scan def _next_unprocessed_scan(self: ScanManager) -> Scan | None: """ Return the next unprocessed scan stored in the local_path. :return: the older scan currently stored in the data product path, or None if empty :rtype: Scan | None """ self._refresh_scans() curr_active_scans = self.active_scans curr_inactive_scans = self.inactive_scans scan: Scan | None = None if curr_active_scans: scan = curr_active_scans[0] self.logger.debug(f"active scan {scan} found {scan.age=}") elif curr_inactive_scans: scan = self.inactive_scans[0] self.logger.debug(f"no active scans found, but an inactive scan {scan} found {scan.age=}") else: self.logger.debug("no unprocessed scans found") return scan def _refresh_scans(self: ScanManager) -> None: """Update the list of scans.""" # use a set not a list all_scan_rel_paths = {s.relative_scan_path for s in self._scans} for rel_scan_path in self.relative_scan_paths: if rel_scan_path not in all_scan_rel_paths: scan = create_scan( ctx=self._ctx, relative_scan_path=rel_scan_path, logger=self.logger, ) if scan: self.logger.debug(f"adding new scan {rel_scan_path}") self._scans.append(scan) else: self.logger.debug(f"ignoring unsupported scan {rel_scan_path}") # remove deleted and invalid scans from the list for scan in self._scans: if scan.relative_scan_path not in self.relative_scan_paths: self.logger.debug(f"removing scan at {scan.relative_scan_path}") self._scans.remove(scan) # sort the scans by the last processing time # typically there would only be one scan with unprocessed files self._scans.sort(key=functools.cmp_to_key(Scan.compare_modified)) @property def relative_scan_paths(self: ScanManager) -> Set[pathlib.Path]: """ Return a current set of the relative scan paths stored in the local_path. :return: the list of relative scan paths. :rtype: List[pathlib.Path]. """ return {x.resolve().relative_to(self._ctx.local_path) for x in self.scan_paths} @property def scan_paths(self: ScanManager) -> Set[pathlib.Path]: """ Return a set of the current full scan paths stored in the local_path. The expected path of scans are <eb_id>/<subsystem_id>/<scan_id> where eb_id is the execution block id. :return: the list of full scan paths. :rtype: List[pathlib.Path]. """ return {p for p in self._ctx.local_path.glob(f"eb-*/{self._ctx.subsystem_id}/*") if p.is_dir()} @property def active_scans(self: ScanManager) -> List[Scan]: """Get scans that have been updated within the last scan_timeout.""" return [s for s in self._scans if s.is_active(self._ctx.scan_timeout) and s.is_valid()] @property def inactive_scans(self: ScanManager) -> List[Scan]: """Get scans that have been not updated within the last scan_timeout.""" return [s for s in self._scans if not s.is_active(self._ctx.scan_timeout) and s.is_valid()] def _force_complete_inactive_scans(self: ScanManager) -> None: """ Refresh scan list and force completion on any inactive scans. This will only mark inactive scans as completed if there are no active scans. """ self._refresh_scans() if self.active_scans: return for scan in self.inactive_scans: self.logger.debug(f"marking inactive scan {scan.scan_id} completed") # note - force completion will not throw an Exception scan.force_completion()