# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module class for managing scans of recorded by the PST AA0.5 Voltage Recorder."""
from __future__ import annotations
import abc
import logging
import pathlib
import shutil
import time
from typing import Any, Callable
from overrides import EnforceOverrides, final
from ska_control_model import PstProcessingMode
from ska_pst.common.constants import NANOSECONDS_PER_SEC, SCAN_CONFIG_FILE_NAME
from .constants import (
DATA_PRODUCT_FILE_NAME,
DIR_PERMS,
OUTPUT_STATS_FILES,
SCAN_COMPLETED_FILE_NAME,
)
from .dsp_output_file import DspOutputFile
from .file_util import move_files
from .metadata import PstFiles, ScanMetadata, generate_metadata, get_path_total_filesize
from .processing_context import ProcessingContext
__all__ = [
"Scan",
]
[docs]class Scan(EnforceOverrides, metaclass=abc.ABCMeta):
"""Base class for representing PST Scan Data Products, stored on the local file system."""
def __init__(
self: Scan,
*,
ctx: ProcessingContext,
relative_scan_path: pathlib.Path,
pst_processing_mode: PstProcessingMode,
pst_scan_config: dict,
logger: logging.Logger | None = None,
**kwargs: Any,
) -> None:
"""Initialise attributes common to all Scans.
:param local_path: absolute path to where data products will be written by DSP.
:param relative_scan_path: The path of the scan, relative to the local_path.
:param logger: the logger instance to use.
"""
self.logger = logger or logging.getLogger(__name__)
self.ctx = ctx
self.pst_scan_config = pst_scan_config
self.relative_scan_path = relative_scan_path
self.eb_id, self.subsystem_id, self.scan_id = relative_scan_path.parts
self.local_scan_path = ctx.local_path / relative_scan_path
self._scan_config_file = self.local_scan_path / SCAN_CONFIG_FILE_NAME
self._scan_completed_file = self.local_scan_path / SCAN_COMPLETED_FILE_NAME
self._data_product_file = self.staging_scan_path / DATA_PRODUCT_FILE_NAME
# create time of scan is creation time of scan directory
created_time_ns = self.local_scan_path.stat().st_ctime_ns
self._created_time_ns: int = created_time_ns
self._modified_time_ns: int = created_time_ns
self.processing_failed = False
self.transfer_failed = False
self.pst_processing_mode = pst_processing_mode
# Ensure the output staging directory exists
self.staging_scan_path.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True)
@final
def generate_data_product_file(self: Scan) -> None:
"""Generate the ska-data-product.yaml file."""
# ensure the scan is marked as completed
assert self.is_complete(), "generate_data_product_file called when scan is not complete"
assert (
not self.have_files_to_process()
), "generate_data_product_file called when there are unprocessed files."
generate_metadata(
scan_id=self.scan_id,
scan_metadata=self.get_scan_metadata(),
pst_scan_config=self.pst_scan_config,
output_dir=self.staging_scan_path,
)
[docs] @abc.abstractmethod
def have_files_to_process(self: Scan) -> bool:
"""
Return true if there are Scan files to process.
:return: True if the Scan has unprocessed files
:rtype: bool
"""
[docs] @abc.abstractmethod
def process_next_file(self: Scan) -> bool:
"""Process the next unprocessed file if one exists.
:return: True if a file was processed else False
:rtype: bool
"""
@final
def get_stat_filename(self: Scan, data_filename: pathlib.Path) -> pathlib.Path:
"""
Return the expected filename of the HDF statistics file for a data filename.
:param data_filename: filename of the data file from which to infer the stat filename.
:type data_filename: pathlib.Path
:return: stat filename corresponding to the data filename.
:rtype: pathlib.Path
"""
return self.local_scan_path / OUTPUT_STATS_FILES / f"{data_filename.stem}.h5"
@final
def process(self: Scan) -> None:
"""
Process the current scan.
This method is the public interface to process the files for a scan. This will
loop over the available files and process them. After processing all the files
it will check whether the scan is ready to be finalised and perform finalisation
on the scan if it is ready.
If any error occurs during the processing it will be logged as a warning and
the scan will be marked as invalid to avoid having the process keep retrying
to process this scan.
:param ctx: the processing context to check if processing should proceed or not.
:type ctx: ProcessingContext
"""
self.logger.debug(f"processing {self}")
try:
while self.process_next_file():
if self.ctx.processing_stopped:
self.logger.warning(f"interrupted processing of {self}", exc_info=False)
return
# the check of if scan is ready to be finalised is performed in the finalise method
self.finalise()
except Exception:
self.logger.warning(
f"Error in processing a scan {self.scan_id}. Marking as invalid", exc_info=True
)
self.processing_failed = True
self._move_to_invalid_dir()
@final
def finalise(self: Scan) -> None:
"""
Finalise the scan if it is marked as completed and has no unprocessed files.
See ``can_finalise()`` for details about if a Scan can be finalised.
The method will call the ``_finalise_scan`` that subclasses must override
such as creating the metadata file. If an exception is raised during
this call the scan will have the ``processing_failed`` property marked
as ``True`` which will result in the scan as being marked as invalid and
won't be processed any further.
After calling ``_finalise_scan`` this will move all the files to the DLM path
defined in the processing context and then clean up the scan directory. If an
exception is raised during this call the scan will have the ``transfer_failed``
property marked as ``True`` which will result in the scan as being marked as
invalid and won't be processed any further.
"""
if not self.can_finalise():
return
self.logger.info(f"finalising scan {self.scan_id}")
if self._is_empty_scan():
self.logger.warning(
f"Scan {self.scan_id} had no data files. May have been aborted/stopped"
" before data was received. Not sending to DLM"
)
# this will mark scan as invalid
self.processing_failed = True
self._move_to_invalid_dir()
return
try:
self.move_input_stats_to_staging()
move_files(self._scan_config_file, self.staging_scan_path)
# allow the concrete scan type to do any specific processing such as
# the flow through scan needing to move the scloffs file
self._finalise_scan()
self.generate_data_product_file()
except Exception:
self.logger.warning(
f"Exception occurred while trying to finalise scan {self.scan_id}.", exc_info=True
)
# this will mark scan as invalid
self.processing_failed = True
self._move_to_invalid_dir()
return
try:
self.logger.info(f"moving scan from {self.staging_scan_path} to {self.ctx.dlm_path}")
self._move_files_to_dlm_path()
self.logger.debug(f"deleting scan from {self.ctx.local_path}")
self.delete_scan()
except Exception:
self.logger.warning(
f"Exception occurred while transferring the finalised scan to "
f"the DLM path {self.scan_id}.",
exc_info=True,
)
# this will mark scan as invalid
self.transfer_failed = True
self._move_to_invalid_dir()
@abc.abstractmethod
def _finalise_scan(self: Scan) -> None:
"""
Conclude any processing on a completed Scan.
Generate the DataProduct file and clean up any interim files found
during processing.
"""
@final
def _move_files_to_dlm_path(self: Scan) -> None:
move_files(self.staging_scan_path, self.dlm_output_path)
@final
def _move_to_invalid_dir(self: Scan) -> None:
invalid_dir = self.invalid_output_path
try:
self.logger.warning(f"Scan {self} has been marked as invalid. Moving files to {invalid_dir}")
invalid_dir.mkdir(mode=DIR_PERMS, parents=True, exist_ok=True)
move_files(self.local_scan_path, invalid_dir)
move_files(self.staging_scan_path, invalid_dir)
self.delete_scan()
except Exception:
# this is being overly defensive but this will mean no exception is escapes when
# moving scan to the invalid directory
self.logger.warning("Error while moving scan to invalid directory.", exc_info=True)
@final
def delete_scan(self: Scan) -> None:
"""Delete all the local data files associated with a scan."""
self.logger.debug(f"deleting all {self.relative_scan_path}")
def _log_rmtree_error(_func: Callable, path: str, *args: Any, **kwargs: Any) -> None:
self.logger.warning(
f"Error in removing {self.local_scan_path} - {path} raised error", exc_info=True
)
self.transfer_failed = True
# first delete all files in the scan directory - if it exist
if self.path_exists():
shutil.rmtree(self.local_scan_path, onerror=_log_rmtree_error)
# then move up the directory tree to the data_product path, pruning directory if empty
to_prune = self.local_scan_path.parent
while to_prune.is_relative_to(self.ctx.local_path):
delta = to_prune.relative_to(self.ctx.local_path)
if delta == pathlib.Path("."):
self.logger.debug("pruned scan_path: stopping prune")
return
try:
# remove the directory, if it is empty
self.logger.debug(f"pruning {to_prune}.rmdir()")
to_prune.rmdir()
to_prune = to_prune.parent
except OSError as exc:
self.logger.debug(f"found non-empty parent directory, stopping prune: {exc}")
return
@final
def is_recording(self: Scan) -> bool:
"""
Return true is the scan been not yet been marked as completed.
:return: flag indicating if the scan is currently recording
:rtype: bool
"""
return not self._scan_completed_file.exists()
@final
def data_product_file_exists(self: Scan) -> bool:
"""
Return true if the ska-data-product.yaml file exists.
:return: flag indicating the data product file exists
:rtype: bool
"""
return self._data_product_file.exists()
@final
def scan_config_file_exists(self: Scan) -> bool:
"""
Return true if the scan-config.json file exists.
:return: flag indicating the scan config file exists
:rtype: bool
"""
return self._scan_config_file.exists()
@final
def is_complete(self: Scan) -> bool:
"""
Return true if the scan_completed file exists.
:return: flag indicating the scan recording is complete
:rtype: bool
"""
return self._scan_completed_file.exists()
@final
def force_completion(self: Scan) -> None:
"""Create the scan_completed file for the scan, if it does not exist."""
if not self._scan_completed_file.exists():
try:
self._scan_completed_file.touch()
except Exception:
self.logger.warning(
f"Error in trying to force inactive scan {self.scan_id} as complete.", exc_info=True
)
self.processing_failed = True
self._move_to_invalid_dir()
@final
def can_finalise(self: Scan) -> bool:
"""
Return true if the Scan can be finalised.
A scan is ready to be finalised if a scan completed file exists and
all files have been processed.
:return: flag indicating if the scan can be finalised.
:rtype: bool
"""
return self.is_complete() and not self.have_files_to_process()
@final
def is_valid(self: Scan) -> bool:
"""
Get whether the scan is still valid or not.
A valid scan matches the following conditions:
* file processing hasn't failed
* file transfer hasn't failed
* the file directory still exists
:return: flag indicating if the scan is valid
:rtype: bool
"""
return self.path_exists() and not self.processing_failed and not self.transfer_failed
@final
def is_active(self: Scan, max_scan_age: float) -> bool:
"""
Return true if the scan is active.
An scan is considered active if the time since it's last modification is less
than the maximum scan age.
:param max_scan_age: maximum age of a scan before it is considered inactive
:type max_scan_age: float
:return: flag indicating if the scan is valid
:rtype: bool
"""
return self.age < max_scan_age
@property
def age(self: Scan) -> float:
"""
Return the age of the scan in seconds.
:return: difference between the current time and the modified time in seconds
:rtype: float
"""
return time.time() - self.modified_time_secs
@property
def flattened_scan_path(self: Scan) -> str:
"""
Return the flattened relative scan path from the execution block, subsystem and scan IDs.
:return: flattened relative scan path
:rtype: str
"""
return f"{self.eb_id}_{self.subsystem_id}_{self.scan_id}"
@property
def staging_scan_path(self: Scan) -> pathlib.Path:
"""
Get the staging path for the scan.
:return: the staging path for the scan
:rtype: pathlib.Path
"""
return self.ctx.staging_path / self.flattened_scan_path
@property
def dlm_output_path(self: Scan) -> pathlib.Path:
"""
Get the DLM output path for the scan.
:return: the DLM output path for the scan.
:rtype: pathlib.Path
"""
return self.ctx.dlm_path / self.flattened_scan_path
@property
def invalid_output_path(self: Scan) -> pathlib.Path:
"""
Get the output path where an invalid processing would move files to.
If there is an error while processing a scan all the files will be moved
to ``$STAGING_PATH/invalid/<flatten_scan_path>``. This way the files
are not lost but the scan will not attempt to be reprocessed.
:return: the output path where an invalid processing would move files to.
:rtype: pathlib.Path
"""
return self.ctx.staging_path / "invalid" / self.flattened_scan_path
@property
def data_product_file(self: Scan) -> pathlib.Path:
"""
Return the pathlib object of the metadata file.
:return: pathlib object concerning the metadata file
:rtype: pathlib.Path
"""
return self._data_product_file
@property
def modified_time_secs(self: Scan) -> float:
"""Get last modified time in seconds."""
return self._modified_time_ns / NANOSECONDS_PER_SEC
@final
def update_modified_time(self: Scan) -> None:
"""Update the last time the scan was processed with the current timestamp."""
curr_time_ns = time.time_ns()
self.logger.debug(
f"updating modified time for scan {self.scan_id} to {curr_time_ns / NANOSECONDS_PER_SEC}"
)
self._modified_time_ns = curr_time_ns
@final
def path_exists(self: Scan) -> bool:
"""Get whether the full path to scan exists or not."""
return self.local_scan_path.exists()
def __repr__(self: Scan) -> str:
"""Get string representation of current scan."""
type_repr = type(self).__name__
return (
f"{type_repr}(eb_id={self.eb_id}, subsystem_id={self.subsystem_id}, "
f"scan_id={self.scan_id}, modified_time_secs={self.modified_time_secs})"
)
@final
@staticmethod
def compare_modified(first: Scan, second: Scan) -> int:
"""Compare two scan objects by modified time to allow for sorting.
This implementation compares 2 scans by modified time, creation time, scan id and
finally eb id. The scan that was modified the least recently will be ordered before
scans modified more recently. Comparison by creation time, scan id and eb-id are
to break ties.
As the scan modified time can be updated this comparator should not be used
to sort dictionaries.
:param first: in the A < B comparison, this parameter is A
:param second: in the A < B comparison, this parameter is B
"""
def _cmp(first_attr: Any, second_attr: Any) -> int:
if first_attr < second_attr:
return -1
if first_attr > second_attr:
return 1
return 0
for attr_name in ["_modified_time_ns", "_created_time_ns", "scan_id", "eb_id"]:
first_attr = getattr(first, attr_name)
second_attr = getattr(second, attr_name)
comp = _cmp(first_attr, second_attr)
if comp != 0:
return comp
return 0
def _update_last_modified_time(self: Scan, files: list[DspOutputFile]) -> None:
for f in files:
file_modified_time_ns = f.file_name.stat().st_mtime_ns
if file_modified_time_ns > self._modified_time_ns:
self.logger.debug(
f"file {f} has modified time more recent than scan's modified time. "
f"Updating scan's modified time to {file_modified_time_ns / NANOSECONDS_PER_SEC}"
)
self._modified_time_ns = file_modified_time_ns
def _is_empty_scan(self: Scan) -> bool:
"""
Check whether the scan is an empty scan.
This method should only be called after the scan is marked as completed.
This method will raise an ``AssertionError`` if that precondition doesn't hold.
A scan is considered empty when no data was produced. This could be because
a stop scan / abort was called before the CBF sent data to PST or that
something in PST failed to produce data. In either case SEND is not able
to produce a valid metadata file and the only files we would have got are
the scan configuration and the scan completed file.
"""
assert self.is_complete(), "expected scan to be in completed"
data_path = self.staging_scan_path / "data"
data_files = [f for f in data_path.glob("*") if f.is_file()]
return len(data_files) == 0
def _get_scan_files_metadata(self: Scan, files_definitions: dict[str, str]) -> list[PstFiles]:
scan_files: list[PstFiles] = list()
for file_type, description in files_definitions.items():
try:
path = self.staging_scan_path / file_type
total_size = get_path_total_filesize(path)
if total_size > 0:
scan_files.append(
PstFiles(
description=description,
path=file_type,
size=total_size,
status="done",
)
)
except Exception:
self.logger.exception(f"Error in obtaining {file_type} files.", exc_info=True)
return scan_files