# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module class for managing scans of recorded by the PST AA0.5 Voltage Recorder."""
from __future__ import annotations
import functools
import logging
import pathlib
from typing import List, Set
from ska_pst.common.constants import SKA_PST_SUBSYSTEM_IDS
from .processing_context import ProcessingContext
from .scan import Scan
from .scan_factory import create_scan
__all__ = [
"ScanManager",
]
[docs]class ScanManager:
"""Class that manages the processing of recorded scans."""
def __init__(
self: ScanManager,
*,
ctx: ProcessingContext,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a ScanManager object.
:param ctx: the processing context for the DLM transfer
:type ctx: ProcessingContext
:param logger: the logger instance to use, defaults to None
:type logger: logging.Logger | None, optional
"""
assert ctx.local_path.exists() and ctx.local_path.is_dir()
assert ctx.staging_path.exists() and ctx.staging_path.is_dir()
assert ctx.subsystem_id in SKA_PST_SUBSYSTEM_IDS
self._scans: List[Scan] = []
self.logger = logger or logging.getLogger(__name__)
self._ctx = ctx
self._refresh_scans()
def __iter__(self: ScanManager) -> ScanManager:
"""
Get instance as an iterator.
This will mark force complete inactive scans
This allows for following code
.. code-block:: python
for scan in scan_manager:
# handle scan
:return: self
:rtype: ScanManager
"""
self._force_complete_inactive_scans()
return self
def __next__(self: ScanManager) -> Scan:
"""
Get the next available scan that should be processed.
If there are no scans available or the processing context
is marked as stopped, then the iteration will stop.
:return: the next available scan to be processed.
:rtype: Scan
:raises StopIteration: if there are no more scans to process or
the processing context has been stopped.
"""
scan = self._next_unprocessed_scan()
if scan is None or self._ctx.processing_stopped:
raise StopIteration()
return scan
def _next_unprocessed_scan(self: ScanManager) -> Scan | None:
"""
Return the next unprocessed scan stored in the local_path.
:return: the older scan currently stored in the data product path, or None if empty
:rtype: Scan | None
"""
self._refresh_scans()
curr_active_scans = self.active_scans
curr_inactive_scans = self.inactive_scans
scan: Scan | None = None
if curr_active_scans:
scan = curr_active_scans[0]
self.logger.debug(f"active scan {scan} found {scan.age=}")
elif curr_inactive_scans:
scan = self.inactive_scans[0]
self.logger.debug(f"no active scans found, but an inactive scan {scan} found {scan.age=}")
else:
self.logger.debug("no unprocessed scans found")
return scan
def _refresh_scans(self: ScanManager) -> None:
"""Update the list of scans."""
# use a set not a list
all_scan_rel_paths = {s.relative_scan_path for s in self._scans}
for rel_scan_path in self.relative_scan_paths:
if rel_scan_path not in all_scan_rel_paths:
scan = create_scan(
ctx=self._ctx,
relative_scan_path=rel_scan_path,
logger=self.logger,
)
if scan:
self.logger.debug(f"adding new scan {rel_scan_path}")
self._scans.append(scan)
else:
self.logger.debug(f"ignoring unsupported scan {rel_scan_path}")
# remove deleted and invalid scans from the list
for scan in self._scans:
if scan.relative_scan_path not in self.relative_scan_paths:
self.logger.debug(f"removing scan at {scan.relative_scan_path}")
self._scans.remove(scan)
# sort the scans by the last processing time
# typically there would only be one scan with unprocessed files
self._scans.sort(key=functools.cmp_to_key(Scan.compare_modified))
@property
def relative_scan_paths(self: ScanManager) -> Set[pathlib.Path]:
"""
Return a current set of the relative scan paths stored in the local_path.
:return: the list of relative scan paths.
:rtype: List[pathlib.Path].
"""
return {x.resolve().relative_to(self._ctx.local_path) for x in self.scan_paths}
@property
def scan_paths(self: ScanManager) -> Set[pathlib.Path]:
"""
Return a set of the current full scan paths stored in the local_path.
The expected path of scans are <eb_id>/<subsystem_id>/<scan_id> where eb_id
is the execution block id.
:return: the list of full scan paths.
:rtype: List[pathlib.Path].
"""
return {p for p in self._ctx.local_path.glob(f"eb-*/{self._ctx.subsystem_id}/*") if p.is_dir()}
@property
def active_scans(self: ScanManager) -> List[Scan]:
"""Get scans that have been updated within the last scan_timeout."""
return [s for s in self._scans if s.is_active(self._ctx.scan_timeout) and s.is_valid()]
@property
def inactive_scans(self: ScanManager) -> List[Scan]:
"""Get scans that have been not updated within the last scan_timeout."""
return [s for s in self._scans if not s.is_active(self._ctx.scan_timeout) and s.is_valid()]
def _force_complete_inactive_scans(self: ScanManager) -> None:
"""
Refresh scan list and force completion on any inactive scans.
This will only mark inactive scans as completed if there are no active
scans.
"""
self._refresh_scans()
if self.active_scans:
return
for scan in self.inactive_scans:
self.logger.debug(f"marking inactive scan {scan.scan_id} completed")
# note - force completion will not throw an Exception
scan.force_completion()