Source code for ska_pst.send.scan

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module class for managing scans of recorded by the PST AA0.5 Voltage Recorder."""

from __future__ import annotations

import abc
import logging
import pathlib
import shutil
import time
from typing import Any, Callable

from overrides import EnforceOverrides, final
from ska_control_model import PstProcessingMode
from ska_pst.common.constants import NANOSECONDS_PER_SEC, SCAN_CONFIG_FILE_NAME

from .constants import (
    DATA_PRODUCT_FILE_NAME,
    DIR_PERMS,
    OUTPUT_STATS_FILES,
    SCAN_COMPLETED_FILE_NAME,
)
from .dsp_output_file import DspOutputFile
from .file_util import move_files
from .metadata import PstFiles, ScanMetadata, generate_metadata, get_path_total_filesize
from .processing_context import ProcessingContext

__all__ = [
    "Scan",
]


[docs]class Scan(EnforceOverrides, metaclass=abc.ABCMeta): """Base class for representing PST Scan Data Products, stored on the local file system.""" def __init__( self: Scan, *, ctx: ProcessingContext, relative_scan_path: pathlib.Path, pst_processing_mode: PstProcessingMode, pst_scan_config: dict, logger: logging.Logger | None = None, **kwargs: Any, ) -> None: """Initialise attributes common to all Scans. :param local_path: absolute path to where data products will be written by DSP. :param relative_scan_path: The path of the scan, relative to the local_path. :param logger: the logger instance to use. """ self.logger = logger or logging.getLogger(__name__) self.ctx = ctx self.pst_scan_config = pst_scan_config self.relative_scan_path = relative_scan_path self.eb_id, self.subsystem_id, self.scan_id = relative_scan_path.parts self.local_scan_path = ctx.local_path / relative_scan_path self._scan_config_file = self.local_scan_path / SCAN_CONFIG_FILE_NAME self._scan_completed_file = self.local_scan_path / SCAN_COMPLETED_FILE_NAME self._data_product_file = self.staging_scan_path / DATA_PRODUCT_FILE_NAME # create time of scan is creation time of scan directory created_time_ns = self.local_scan_path.stat().st_ctime_ns self._created_time_ns: int = created_time_ns self._modified_time_ns: int = created_time_ns self.processing_failed = False self.transfer_failed = False self.pst_processing_mode = pst_processing_mode # Ensure the output staging directory exists self.staging_scan_path.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True) @final def generate_data_product_file(self: Scan) -> None: """Generate the ska-data-product.yaml file.""" # ensure the scan is marked as completed assert self.is_complete(), "generate_data_product_file called when scan is not complete" assert ( not self.have_files_to_process() ), "generate_data_product_file called when there are unprocessed files." generate_metadata( scan_id=self.scan_id, scan_metadata=self.get_scan_metadata(), pst_scan_config=self.pst_scan_config, output_dir=self.staging_scan_path, )
[docs] @abc.abstractmethod def have_files_to_process(self: Scan) -> bool: """ Return true if there are Scan files to process. :return: True if the Scan has unprocessed files :rtype: bool """
[docs] @abc.abstractmethod def process_next_file(self: Scan) -> bool: """Process the next unprocessed file if one exists. :return: True if a file was processed else False :rtype: bool """
[docs] @abc.abstractmethod def get_scan_metadata(self: Scan) -> ScanMetadata: """ Get the metadata for the scan. This is specific for the type of a Scan. This allows for the scan type to expect the values from the PST scan config, such as the number of output channels or polarisations. """
@final def get_stat_filename(self: Scan, data_filename: pathlib.Path) -> pathlib.Path: """ Return the expected filename of the HDF statistics file for a data filename. :param data_filename: filename of the data file from which to infer the stat filename. :type data_filename: pathlib.Path :return: stat filename corresponding to the data filename. :rtype: pathlib.Path """ return self.local_scan_path / OUTPUT_STATS_FILES / f"{data_filename.stem}.h5" @final def process(self: Scan) -> None: """ Process the current scan. This method is the public interface to process the files for a scan. This will loop over the available files and process them. After processing all the files it will check whether the scan is ready to be finalised and perform finalisation on the scan if it is ready. If any error occurs during the processing it will be logged as a warning and the scan will be marked as invalid to avoid having the process keep retrying to process this scan. :param ctx: the processing context to check if processing should proceed or not. :type ctx: ProcessingContext """ self.logger.debug(f"processing {self}") try: while self.process_next_file(): if self.ctx.processing_stopped: self.logger.warning(f"interrupted processing of {self}", exc_info=False) return # the check of if scan is ready to be finalised is performed in the finalise method self.finalise() except Exception: self.logger.warning( f"Error in processing a scan {self.scan_id}. Marking as invalid", exc_info=True ) self.processing_failed = True self._move_to_invalid_dir() @final def finalise(self: Scan) -> None: """ Finalise the scan if it is marked as completed and has no unprocessed files. See ``can_finalise()`` for details about if a Scan can be finalised. The method will call the ``_finalise_scan`` that subclasses must override such as creating the metadata file. If an exception is raised during this call the scan will have the ``processing_failed`` property marked as ``True`` which will result in the scan as being marked as invalid and won't be processed any further. After calling ``_finalise_scan`` this will move all the files to the DLM path defined in the processing context and then clean up the scan directory. If an exception is raised during this call the scan will have the ``transfer_failed`` property marked as ``True`` which will result in the scan as being marked as invalid and won't be processed any further. """ if not self.can_finalise(): return self.logger.info(f"finalising scan {self.scan_id}") if self._is_empty_scan(): self.logger.warning( f"Scan {self.scan_id} had no data files. May have been aborted/stopped" " before data was received. Not sending to DLM" ) # this will mark scan as invalid self.processing_failed = True self._move_to_invalid_dir() return try: self.move_input_stats_to_staging() move_files(self._scan_config_file, self.staging_scan_path) # allow the concrete scan type to do any specific processing such as # the flow through scan needing to move the scloffs file self._finalise_scan() self.generate_data_product_file() except Exception: self.logger.warning( f"Exception occurred while trying to finalise scan {self.scan_id}.", exc_info=True ) # this will mark scan as invalid self.processing_failed = True self._move_to_invalid_dir() return try: self.logger.info(f"moving scan from {self.staging_scan_path} to {self.ctx.dlm_path}") self._move_files_to_dlm_path() self.logger.debug(f"deleting scan from {self.ctx.local_path}") self.delete_scan() except Exception: self.logger.warning( f"Exception occurred while transferring the finalised scan to " f"the DLM path {self.scan_id}.", exc_info=True, ) # this will mark scan as invalid self.transfer_failed = True self._move_to_invalid_dir() @abc.abstractmethod def _finalise_scan(self: Scan) -> None: """ Conclude any processing on a completed Scan. Generate the DataProduct file and clean up any interim files found during processing. """ @final def _move_files_to_dlm_path(self: Scan) -> None: move_files(self.staging_scan_path, self.dlm_output_path) @final def _move_to_invalid_dir(self: Scan) -> None: invalid_dir = self.invalid_output_path try: self.logger.warning(f"Scan {self} has been marked as invalid. Moving files to {invalid_dir}") invalid_dir.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True) move_files(self.local_scan_path, invalid_dir) move_files(self.staging_scan_path, invalid_dir) self.delete_scan() except Exception: # this is being overly defensive but this will mean no exception is escapes when # moving scan to the invalid directory self.logger.warning("Error while moving scan to invalid directory.", exc_info=True) @final def delete_scan(self: Scan) -> None: """Delete all the local data files associated with a scan.""" self.logger.debug(f"deleting all {self.relative_scan_path}") def _log_rmtree_error(_func: Callable, path: str, *args: Any, **kwargs: Any) -> None: self.logger.warning( f"Error in removing {self.local_scan_path} - {path} raised error", exc_info=True ) self.transfer_failed = True # first delete all files in the scan directory - if it exist if self.path_exists(): shutil.rmtree(self.local_scan_path, onerror=_log_rmtree_error) # then move up the directory tree to the data_product path, pruning directory if empty to_prune = self.local_scan_path.parent while to_prune.is_relative_to(self.ctx.local_path): delta = to_prune.relative_to(self.ctx.local_path) if delta == pathlib.Path("."): self.logger.debug("pruned scan_path: stopping prune") return try: # remove the directory, if it is empty self.logger.debug(f"pruning {to_prune}.rmdir()") to_prune.rmdir() to_prune = to_prune.parent except OSError as exc: self.logger.debug(f"found non-empty parent directory, stopping prune: {exc}") return @final def is_recording(self: Scan) -> bool: """ Return true is the scan been not yet been marked as completed. :return: flag indicating if the scan is currently recording :rtype: bool """ return not self._scan_completed_file.exists() @final def data_product_file_exists(self: Scan) -> bool: """ Return true if the ska-data-product.yaml file exists. :return: flag indicating the data product file exists :rtype: bool """ return self._data_product_file.exists() @final def scan_config_file_exists(self: Scan) -> bool: """ Return true if the scan-config.json file exists. :return: flag indicating the scan config file exists :rtype: bool """ return self._scan_config_file.exists() @final def is_complete(self: Scan) -> bool: """ Return true if the scan_completed file exists. :return: flag indicating the scan recording is complete :rtype: bool """ return self._scan_completed_file.exists() @final def force_completion(self: Scan) -> None: """Create the scan_completed file for the scan, if it does not exist.""" if not self._scan_completed_file.exists(): try: self._scan_completed_file.touch() except Exception: self.logger.warning( f"Error in trying to force inactive scan {self.scan_id} as complete.", exc_info=True ) self.processing_failed = True self._move_to_invalid_dir() @final def can_finalise(self: Scan) -> bool: """ Return true if the Scan can be finalised. A scan is ready to be finalised if a scan completed file exists and all files have been processed. :return: flag indicating if the scan can be finalised. :rtype: bool """ return self.is_complete() and not self.have_files_to_process() @final def is_valid(self: Scan) -> bool: """ Get whether the scan is still valid or not. A valid scan matches the following conditions: * file processing hasn't failed * file transfer hasn't failed * the file directory still exists :return: flag indicating if the scan is valid :rtype: bool """ return self.path_exists() and not self.processing_failed and not self.transfer_failed @final def is_active(self: Scan, max_scan_age: float) -> bool: """ Return true if the scan is active. An scan is considered active if the time since it's last modification is less than the maximum scan age. :param max_scan_age: maximum age of a scan before it is considered inactive :type max_scan_age: float :return: flag indicating if the scan is valid :rtype: bool """ return self.age < max_scan_age @property def age(self: Scan) -> float: """ Return the age of the scan in seconds. :return: difference between the current time and the modified time in seconds :rtype: float """ return time.time() - self.modified_time_secs @property def flattened_scan_path(self: Scan) -> str: """ Return the flattened relative scan path from the execution block, subsystem and scan IDs. :return: flattened relative scan path :rtype: str """ return f"{self.eb_id}_{self.subsystem_id}_{self.scan_id}" @property def staging_scan_path(self: Scan) -> pathlib.Path: """ Get the staging path for the scan. :return: the staging path for the scan :rtype: pathlib.Path """ return self.ctx.staging_path / self.flattened_scan_path @property def dlm_output_path(self: Scan) -> pathlib.Path: """ Get the DLM output path for the scan. :return: the DLM output path for the scan. :rtype: pathlib.Path """ return self.ctx.dlm_path / self.flattened_scan_path @property def invalid_output_path(self: Scan) -> pathlib.Path: """ Get the output path where an invalid processing would move files to. If there is an error while processing a scan all the files will be moved to ``$STAGING_PATH/invalid/<flatten_scan_path>``. This way the files are not lost but the scan will not attempt to be reprocessed. :return: the output path where an invalid processing would move files to. :rtype: pathlib.Path """ return self.ctx.staging_path / "invalid" / self.flattened_scan_path @property def data_product_file(self: Scan) -> pathlib.Path: """ Return the pathlib object of the metadata file. :return: pathlib object concerning the metadata file :rtype: pathlib.Path """ return self._data_product_file @property def modified_time_secs(self: Scan) -> float: """Get last modified time in seconds.""" return self._modified_time_ns / NANOSECONDS_PER_SEC @final def update_modified_time(self: Scan) -> None: """Update the last time the scan was processed with the current timestamp.""" curr_time_ns = time.time_ns() self.logger.debug( f"updating modified time for scan {self.scan_id} to {curr_time_ns / NANOSECONDS_PER_SEC}" ) self._modified_time_ns = curr_time_ns @final def path_exists(self: Scan) -> bool: """Get whether the full path to scan exists or not.""" return self.local_scan_path.exists() def __repr__(self: Scan) -> str: """Get string representation of current scan.""" type_repr = type(self).__name__ return ( f"{type_repr}(eb_id={self.eb_id}, subsystem_id={self.subsystem_id}, " f"scan_id={self.scan_id}, modified_time_secs={self.modified_time_secs})" ) @final @staticmethod def compare_modified(first: Scan, second: Scan) -> int: """Compare two scan objects by modified time to allow for sorting. This implementation compares 2 scans by modified time, creation time, scan id and finally eb id. The scan that was modified the least recently will be ordered before scans modified more recently. Comparison by creation time, scan id and eb-id are to break ties. As the scan modified time can be updated this comparator should not be used to sort dictionaries. :param first: in the A < B comparison, this parameter is A :param second: in the A < B comparison, this parameter is B """ def _cmp(first_attr: Any, second_attr: Any) -> int: if first_attr < second_attr: return -1 if first_attr > second_attr: return 1 return 0 for attr_name in ["_modified_time_ns", "_created_time_ns", "scan_id", "eb_id"]: first_attr = getattr(first, attr_name) second_attr = getattr(second, attr_name) comp = _cmp(first_attr, second_attr) if comp != 0: return comp return 0 def _update_last_modified_time(self: Scan, files: list[DspOutputFile]) -> None: for f in files: file_modified_time_ns = f.file_name.stat().st_mtime_ns if file_modified_time_ns > self._modified_time_ns: self.logger.debug( f"file {f} has modified time more recent than scan's modified time. " f"Updating scan's modified time to {file_modified_time_ns / NANOSECONDS_PER_SEC}" ) self._modified_time_ns = file_modified_time_ns
[docs] def move_input_stats_to_staging(self: Scan) -> None: """Move the input-stats files from local scan path to the staging scan input-stats path.""" local_input_stats_path = self.local_scan_path / "input-stats" staging_input_stats_dir = self.staging_scan_path / "input-stats" self.logger.debug(f"input-stats exists: {local_input_stats_path.exists()}") if local_input_stats_path.exists(): move_files(local_input_stats_path, staging_input_stats_dir)
def _is_empty_scan(self: Scan) -> bool: """ Check whether the scan is an empty scan. This method should only be called after the scan is marked as completed. This method will raise an ``AssertionError`` if that precondition doesn't hold. A scan is considered empty when no data was produced. This could be because a stop scan / abort was called before the CBF sent data to PST or that something in PST failed to produce data. In either case SEND is not able to produce a valid metadata file and the only files we would have got are the scan configuration and the scan completed file. """ assert self.is_complete(), "expected scan to be in completed" data_path = self.staging_scan_path / "data" data_files = [f for f in data_path.glob("*") if f.is_file()] return len(data_files) == 0 def _get_scan_files_metadata(self: Scan, files_definitions: dict[str, str]) -> list[PstFiles]: scan_files: list[PstFiles] = list() for file_type, description in files_definitions.items(): try: path = self.staging_scan_path / file_type total_size = get_path_total_filesize(path) if total_size > 0: scan_files.append( PstFiles( description=description, path=file_type, size=total_size, status="done", ) ) except Exception: self.logger.exception(f"Error in obtaining {file_type} files.", exc_info=True) return scan_files