Source code for ska_pst.send.scan

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module class for managing scans of recorded by the PST AA0.5 Voltage Recorder."""
from __future__ import annotations

import abc
import json
import logging
import pathlib
import shutil
import subprocess
import time
from typing import Any, Callable, List, Tuple, Type

from overrides import EnforceOverrides, final
from ska_control_model import PstProcessingMode

from .constants import DIR_PERMS, NANOSECONDS_PER_SEC
from .dsp_output_file import DspOutputFile
from .file_util import move_files
from .metadata_builder import DATA_PRODUCT_FILE_NAME, MetaDataBuilder
from .processing_context import ProcessingContext

__all__ = [
    "Scan",
]

SCAN_CONFIG_FILE_NAME = "scan_configuration.json"
SCAN_COMPLETED_FILE_NAME = "scan_completed"
OUTPUT_STATS_SUBDIR = "output-stats"


[docs]class Scan(EnforceOverrides, metaclass=abc.ABCMeta): """Base class for representing PST Scan Data Products, stored on the local file system.""" def __init__( self: Scan, *, ctx: ProcessingContext, relative_scan_path: pathlib.Path, pst_processing_mode: PstProcessingMode, logger: logging.Logger | None = None, **kwargs: Any, ) -> None: """Initialise attributes common to all Scans. :param local_path: absolute path to where data products will be written by DSP. :param relative_scan_path: The path of the scan, relative to the local_path. :param logger: the logger instance to use. """ self.logger = logger or logging.getLogger(__name__) self.ctx = ctx self.relative_scan_path = relative_scan_path (self.eb_id, self.subsystem_id, self.scan_id) = relative_scan_path.parts self.local_scan_path = ctx.local_path / relative_scan_path self._scan_config_file = self.local_scan_path / SCAN_CONFIG_FILE_NAME self._scan_completed_file = self.local_scan_path / SCAN_COMPLETED_FILE_NAME self._data_product_file = self.staging_scan_path / DATA_PRODUCT_FILE_NAME # create time of scan is creation time of scan directory created_time_ns = self.local_scan_path.stat().st_ctime_ns self._created_time_ns: int = created_time_ns self._modified_time_ns: int = created_time_ns self.processing_failed = False self.transfer_failed = False self._data_files: List[DspOutputFile] = [] self._weights_files: List[DspOutputFile] = [] self._config_files: List[DspOutputFile] = [] self.pst_processing_mode = pst_processing_mode # Ensure the output staging directory exists self.staging_scan_path.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True) @final @staticmethod def create_scan( *, ctx: ProcessingContext, relative_scan_path: pathlib.Path, logger: logging.Logger | None = None, **kwargs: Any, ) -> Scan | None: """ Construct a Scan sub-class from the processing/observing mode scan_configuration.json file. :param ctx: the processing context for the DLM transfer :type ctx: ProcessingContext :param relative_scan_path: the path of the scan, relative to the ``ctx.local_path``. :type relative_scan_path: pathlib.Path :return: Subclass of Scan that matches the PST processing mode :rtype: Scan | None """ logger = logger or logging.getLogger(__name__) scan_config_file = ctx.local_path / relative_scan_path / SCAN_CONFIG_FILE_NAME if not scan_config_file.exists(): return None pst_processing_mode = None with open(scan_config_file) as json_file: config_dict = json.load(json_file) # need to be forward compatible observation_mode -> pst_processing_mode try: pst_processing_mode = config_dict["pst"]["scan"]["pst_processing_mode"] except KeyError: pst_processing_mode = config_dict["pst"]["scan"]["observation_mode"] cls: Type[Scan] if pst_processing_mode == PstProcessingMode.PULSAR_TIMING.name: logger.error("Pulsar Timing Scan type not yet supported") return None elif pst_processing_mode == PstProcessingMode.DETECTED_FILTERBANK.name: logger.error("Detected Filterbank Scan type not yet supported") return None elif pst_processing_mode == PstProcessingMode.FLOW_THROUGH.name: from .flow_through_scan import FlowThroughScan logger.debug(f"Constructing FlowThroughScan mode={pst_processing_mode}") cls = FlowThroughScan elif pst_processing_mode == PstProcessingMode.VOLTAGE_RECORDER.name: from .voltage_recorder_scan import VoltageRecorderScan logger.debug(f"Constructing VoltageRecorderScan mode={pst_processing_mode}") cls = VoltageRecorderScan else: logger.error(f"unrecognised PST processing mode: {pst_processing_mode}") return None return cls( ctx=ctx, relative_scan_path=relative_scan_path, logger=logger, pst_processing_mode=pst_processing_mode, **kwargs, ) @final def generate_data_product_file(self: Scan) -> None: """Generate the ska-data-product.yaml file.""" # ensure the scan is marked as completed assert self.is_complete(), "generate_data_product_file called when scan is not complete" unprocessed_file = self.next_unprocessed_file() assert ( unprocessed_file is None ), f"generate_data_product_file called when there are unprocessed files. {unprocessed_file}" metadata_builder = MetaDataBuilder( output_dir=self.staging_scan_path, pst_processing_mode=self.pst_processing_mode ) metadata_builder.generate_metadata() @final def have_files_to_process(self: Scan) -> bool: """ Return true if there are Scan files to process. :return: True if the Scan has unprocessed files :rtype: bool """ return self.next_unprocessed_file() is not None @final def _process_next_file(self: Scan) -> bool: """Process the next unprocessed file if one exists. :return: True if a file was processed else False :rtype: bool """ self.logger.debug("trying to find next unprocessed file") unprocessed_file = self.next_unprocessed_file() self.logger.debug(f"unprocessed_file={unprocessed_file}") if unprocessed_file is None: return False self.process_file(unprocessed_file) return True @final def get_stat_filename(self: Scan, data_filename: pathlib.Path) -> pathlib.Path: """ Return the expected filename of the HDF statistics file for a data filename. :param data_filename: filename of the data file from which to infer the stat filename. :type data_filename: pathlib.Path :return: stat filename corresponding to the data filename. :rtype: pathlib.Path """ return self.local_scan_path / OUTPUT_STATS_SUBDIR / f"{data_filename.stem}.h5" @final def process(self: Scan) -> None: """ Process the current scan. This method is the public interface to process the files for a scan. This will loop over the available files and process them. After processing all the files it will check whether the scan is ready to be finalised and perform finalisation on the scan if it is ready. If any error occurs during the processing it will be logged as a warning and the scan will be marked as invalid to avoid having the process keep retrying to process this scan. :param ctx: the processing context to check if processing should proceed or not. :type ctx: ProcessingContext """ self.logger.debug(f"processing {self}") try: while self._process_next_file(): if self.ctx.processing_stopped: self.logger.warning(f"interrupted processing of {self}", exc_info=False) return # the check of if scan is ready to be finalised is performed in the finalise method self.finalise() except Exception: self.logger.warning( f"Error in processing a scan {self.scan_id}. Marking as invalid", exc_info=True ) self.processing_failed = True self._move_to_invalid_dir() @final def finalise(self: Scan) -> None: """ Finalise the scan if it is marked as completed and has no unprocessed files. See ``can_finalise()`` for details about if a Scan can be finalised. The method will call the ``_finalise_scan`` that subclasses must override such as creating the metadata file. If an exception is raised during this call the scan will have the ``processing_failed`` property marked as ``True`` which will result in the scan as being marked as invalid and won't be processed any further. After calling ``_finalise_scan`` this will move all the files to the DLM path defined in the processing context and then clean up the scan directory. If an exception is raised during this call the scan will have the ``transfer_failed`` property marked as ``True`` which will result in the scan as being marked as invalid and won't be processed any further. """ if not self.can_finalise(): return self.logger.info(f"finalising scan {self.scan_id}") if self._is_empty_scan(): self.logger.warning( f"Scan {self.scan_id} had no data files. May have been aborted/stopped" " before data was received. Not sending to DLM" ) # this will mark scan as invalid self.processing_failed = True self._move_to_invalid_dir() return try: self.move_input_stats_to_staging() move_files(self._scan_config_file, self.staging_scan_path) # allow the concrete scan type to do any specific processing such as # the flow through scan needing to move the scloffs file self._finalise_scan() self.generate_data_product_file() except Exception: self.logger.warning( f"Exception occurred while trying to finalise scan {self.scan_id}.", exc_info=True ) # this will mark scan as invalid self.processing_failed = True self._move_to_invalid_dir() return try: self.logger.info(f"moving scan from {self.staging_scan_path} to {self.ctx.dlm_path}") self._move_files_to_dlm_path() self.logger.debug(f"deleting scan from {self.ctx.local_path}") self.delete_scan() except Exception: self.logger.warning( f"Exception occurred while transferring the finalised scan to " f"the DLM path {self.scan_id}.", exc_info=True, ) # this will mark scan as invalid self.transfer_failed = True self._move_to_invalid_dir() @abc.abstractmethod def _finalise_scan(self: Scan) -> None: """ Conclude any processing on a completed Scan. Generate the DataProduct file and clean up any interim files found during processing. """ @final def _move_files_to_dlm_path(self: Scan) -> None: move_files(self.staging_scan_path, self.dlm_output_path) @final def _move_to_invalid_dir(self: Scan) -> None: invalid_dir = self.invalid_output_path try: self.logger.warning(f"Scan {self} has been marked as invalid. Moving files to {str(invalid_dir)}") invalid_dir.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True) move_files(self.local_scan_path, invalid_dir) move_files(self.staging_scan_path, invalid_dir) self.delete_scan() except Exception: # this is being overly defensive but this will mean no exception is escapes when # moving scan the the invalid directory self.logger.warning("Error while moving scan to invalid directory.", exc_info=True) @final def delete_scan(self: Scan) -> None: """Delete all the local data files associated with a scan.""" self.logger.debug(f"deleting all {self.relative_scan_path}") def _log_rmtree_error(_func: Callable, path: str, *args: Any, **kwargs: Any) -> None: self.logger.warning( f"Error in removing {self.local_scan_path} - {path} raised error", exc_info=True ) self.transfer_failed = True # first delete all files in the scan directory - if it exist if self.path_exists(): shutil.rmtree(self.local_scan_path, onerror=_log_rmtree_error) # then move up the directory tree to the data_product path, pruning directory if empty to_prune = self.local_scan_path.parent while to_prune.is_relative_to(self.ctx.local_path): delta = to_prune.relative_to(self.ctx.local_path) if delta == pathlib.Path("."): self.logger.debug("pruned scan_path: stopping prune") return try: # remove the directory, if it is empty self.logger.debug(f"pruning {to_prune}.rmdir()") to_prune.rmdir() to_prune = to_prune.parent except OSError as exc: self.logger.debug(f"found non-empty parent directory, stopping prune: {exc}") return @final def is_recording(self: Scan) -> bool: """ Return true is the scan been not yet been marked as completed. :return: flag indicating if the scan is currently recording :rtype: bool """ return not self._scan_completed_file.exists() @final def data_product_file_exists(self: Scan) -> bool: """ Return true if the ska-data-product.yaml file exists. :return: flag indicating the data product file exists :rtype: bool """ return self._data_product_file.exists() @final def scan_config_file_exists(self: Scan) -> bool: """ Return true if the scan-config.json file exists. :return: flag indicating the scan config file exists :rtype: bool """ return self._scan_config_file.exists() @final def is_complete(self: Scan) -> bool: """ Return true if the scan_completed file exists. :return: flag indicating the scan recording is complete :rtype: bool """ return self._scan_completed_file.exists() @final def force_completion(self: Scan) -> None: """Create the scan_completed file for the scan, if it does not exist.""" if not self._scan_completed_file.exists(): try: self._scan_completed_file.touch() except Exception: self.logger.warning( f"Error in trying to force inactive scan {self.scan_id} as complete.", exc_info=True ) self.processing_failed = True self._move_to_invalid_dir() @final def can_finalise(self: Scan) -> bool: """ Return true if the Scan can be finalised. A scan is ready to be finalised if a scan completed file exists and all files have been processed. :return: flag indicating if the scan can be finalised. :rtype: bool """ return self.is_complete() and not self.have_files_to_process() @final def is_valid(self: Scan) -> bool: """ Get whether the the scan is still valid or not. A valid scan matches the following conditions: * file processing hasn't failed * file transfer hasn't failed * the file directory still exists :return: flag indicating if the scan is valid :rtype: bool """ return self.path_exists() and not self.processing_failed and not self.transfer_failed @final def is_active(self: Scan, max_scan_age: float) -> bool: """ Return true if the scan is active. An scan is considered active if the time since it's last modification is less than the maximum scan age. :param max_scan_age: maximum age of a scan before it is considered inactive :type max_scan_age: float :return: flag indicating if the scan is valid :rtype: bool """ return self.age < max_scan_age @property def age(self: Scan) -> float: """ Return the age of the scan in seconds. :return: difference between the current time and the modified time in seconds :rtype: float """ return time.time() - self.modified_time_secs @property def flattened_scan_path(self: Scan) -> str: """ Return the flattened relative scan path from the execution block, subsystem and scan IDs. :return: flattened relative scan path :rtype: str """ return f"{self.eb_id}_{self.subsystem_id}_{self.scan_id}" @property def staging_scan_path(self: Scan) -> pathlib.Path: """ Get the staging path for the scan. :return: the staging path for the scan :rtype: pathlib.Path """ return self.ctx.staging_path / self.flattened_scan_path @property def dlm_output_path(self: Scan) -> pathlib.Path: """ Get the DLM output path for the scan. :return: the DLM output path for the scan. :rtype: pathlib.Path """ return self.ctx.dlm_path / self.flattened_scan_path @property def invalid_output_path(self: Scan) -> pathlib.Path: """ Get the output path where an invalid processing would move files to. If there is an error while processing a scan all the files will be moved to ``$STAGING_PATH/invalid/<flatten_scan_path>``. This way the files are not lost but the scan will not attempt to be reprocessed. :return: the output path where an invalid processing would move files to. :rtype: pathlib.Path """ return self.ctx.staging_path / "invalid" / self.flattened_scan_path @property def data_product_file(self: Scan) -> pathlib.Path: """ Return the pathlib object of the metadata file. :return: pathlib object concerning the metadata file :rtype: pathlib.Path """ return self._data_product_file @property def modified_time_secs(self: Scan) -> float: """Get last modified time in seconds.""" return self._modified_time_ns / NANOSECONDS_PER_SEC @final def update_modified_time(self: Scan) -> None: """Update the last time the scan was processed with the current timestamp.""" curr_time_ns = time.time_ns() self.logger.debug( f"updating modified time for scan {self.scan_id} to {curr_time_ns / NANOSECONDS_PER_SEC}" ) self._modified_time_ns = curr_time_ns @final def path_exists(self: Scan) -> bool: """Get whether the full path to scan exists or not.""" return self.local_scan_path.exists() def __repr__(self: Scan) -> str: """Get string representation of current scan.""" type_repr = type(self).__name__ return ( f"{type_repr}(eb_id={self.eb_id}, subsystem_id={self.subsystem_id}, " f"scan_id={self.scan_id}, modified_time_secs={self.modified_time_secs})" ) @final @staticmethod def compare_modified(first: Scan, second: Scan) -> int: """Compare two scan objects by modified time to allow for sorting. This implementation compares 2 scans by modified time, creation time, scan id and finally eb id. The scan that was modified the least recently will be ordered before scans modified more recently. Comparison by creation time, scan id and eb-id are to break ties. As the scan modified time can be updated this comparator should not be used to sort dictionaries. :param first: in the A < B comparison, this parameter is A :param second: in the A < B comparison, this parameter is B """ def _cmp(first_attr: Any, second_attr: Any) -> int: if first_attr < second_attr: return -1 if first_attr > second_attr: return 1 return 0 for attr_name in ["_modified_time_ns", "_created_time_ns", "scan_id", "eb_id"]: first_attr = getattr(first, attr_name) second_attr = getattr(second, attr_name) comp = _cmp(first_attr, second_attr) if comp != 0: return comp return 0
[docs] def update_files(self: Scan) -> None: """Check the file system for new data and weights files from VR or FT scans.""" self._data_files = [ DspOutputFile(data_file, self.ctx.local_path) for data_file in sorted(self.local_scan_path.glob("data/*.dada")) ] self._weights_files = [ DspOutputFile(weights_file, self.ctx.local_path) for weights_file in sorted(self.local_scan_path.glob("weights/*.dada")) ] self._config_files = [] if self.data_product_file_exists(): self._config_files.append(DspOutputFile(self._data_product_file, self.ctx.staging_path)) if self.scan_config_file_exists(): self._config_files.append(DspOutputFile(self._scan_config_file, self.ctx.local_path)) def _update_last_modified_time(files: List[DspOutputFile]) -> None: for f in files: file_modified_time_ns = f.file_name.stat().st_mtime_ns if file_modified_time_ns > self._modified_time_ns: self.logger.debug( f"file {f} has modified file more recent than scan's modified time. " f"Updating scan's modified time to {file_modified_time_ns / NANOSECONDS_PER_SEC}" ) self._modified_time_ns = file_modified_time_ns for files in [ self._data_files, self._weights_files, self._config_files, ]: _update_last_modified_time(files)
[docs] def get_all_files(self: Scan) -> List[DspOutputFile]: """ Return a list of all data, weights and control files. :return: list of all pertinent files for a scan :rtype: List[DspOutputFile] """ self.update_files() return [*self._data_files, *self._weights_files, *self._config_files]
def _data_and_weights_file_pairs( self: Scan, ) -> List[Tuple[DspOutputFile, DspOutputFile]]: """Combine the data and weights files into a list of enumerated tuples.""" return [ (d, w) for d in self._data_files for w in self._weights_files if d.file_number == w.file_number ]
[docs] def next_unprocessed_file( self: Scan, ) -> Tuple[DspOutputFile, DspOutputFile] | None: """ Return data and weights files that has not yet been staged. In the future some processing of the data, weights and scloffs may be performed. :return: data and weights ```*.dada``` files :rtype: Tuple[DspOutputFile, DspOutputFile] """ # update all of the data, weights and scloffs files self.update_files() file_pairs = self._data_and_weights_file_pairs() if len(file_pairs) > 0: self.logger.debug(f"returning {file_pairs[0]}") return file_pairs[0] return None
[docs] def move_input_stats_to_staging(self: Scan) -> None: """Move the input-stats files from local scan path to the staging scan input-stats path.""" local_input_stats_path = self.local_scan_path / "input-stats" staging_input_stats_dir = self.staging_scan_path / "input-stats" self.logger.debug(f"input-stats exists: {local_input_stats_path.exists()}") if local_input_stats_path.exists(): move_files(local_input_stats_path, staging_input_stats_dir)
@final def process_file( self: Scan, unprocessed_file: Tuple[DspOutputFile, DspOutputFile], ) -> None: """ Process the pair of data and weights files to generate a stat file. :param unprocessed_file: next file to process :type unprocessed_file: Tuple[DspOutputFile, DspOutputFile] """ self.logger.info(f"Processing {unprocessed_file}") (data_file, weights_file) = unprocessed_file # the stat file that should be produced by the processing output_stat_file = self.get_stat_filename(data_file.file_name) output_stat_file_dir = output_stat_file.parent self.logger.debug(f"ensuring {output_stat_file_dir} exists") output_stat_file_dir.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True) staging_data_dir = self.staging_scan_path / "data" staging_weights_dir = self.staging_scan_path / "weights" staging_input_stats_dir = self.staging_scan_path / "input-stats" staging_output_stats_dir = self.staging_scan_path / "output-stats" # ensure the data, weights and stats sub directories exist for dir in [staging_data_dir, staging_weights_dir, staging_input_stats_dir, staging_output_stats_dir]: if not dir.exists(): self.logger.info(f"creating directory: {dir}") dir.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True) # actual command to execute when the container is setup command = [ "ska_pst_stat_file_proc", "-d", str(data_file.file_name), "-w", str(weights_file.file_name), ] self.logger.info(f"Processing data/{data_file.file_name.name}, weights/{weights_file.file_name.name}") # improve subprocess check UDP gen in testutils self.logger.debug(f"running command: {command}") completed = subprocess.run( command, cwd=self.local_scan_path, shell=False, stdin=None, capture_output=True, ) ok = completed.returncode == 0 if not ok: self.logger.warning(f"command {command} failed: {completed.returncode}", exc_info=False) move_files(data_file.file_name, staging_data_dir) move_files(weights_file.file_name, staging_weights_dir) # the output stat file may not exist if ska_pst_stat_file_proc failed if output_stat_file.exists(): move_files(output_stat_file, staging_output_stats_dir) self.logger.info(f"Processed data/{data_file.file_name.name}, weights/{weights_file.file_name.name}") self.update_modified_time() def _is_empty_scan(self: Scan) -> bool: """ Check whether the scan is an empty scan. This method should only be called after the scan is marked as completed. This method will raise an ``AssertionError`` if that precondition doesn't hold. A scan is considered empty when no data was produced. This could be because a stop scan / abort was called before the CBF sent data to PST or that something in PST failed to produce data. In either case SEND is not able to produce a valid metadata file and the only files we would have got are the scan configuration and the scan completed file. """ assert self.is_complete(), "expected scan to be in completed" data_path = self.staging_scan_path / "data" data_files = [f for f in data_path.glob("*") if f.is_file()] return len(data_files) == 0