Source code for ska_pst.testutils.dada.dsp_data_analyser

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module to used for analysing DSP artefacts."""

from __future__ import annotations

__all__ = [
    "DspDataAnalyser",
    "ScanFileMetadata",
]

import dataclasses
import logging
import math
import pathlib
import subprocess
from typing import Any, Dict, List, Tuple

from ska_control_model import PstProcessingMode
from ska_pst.common.constants import (
    BITS_PER_BYTE,
    MICROSECS_PER_SEC,
    SIZE_OF_FLOAT32_IN_BYTES,
    WEIGHTS_NBITS,
    WEIGHTS_NDIM,
    WEIGHTS_NPOL,
)
from ska_pst.common.dada_file import SECONDS_PER_FILE, DadaFileReader, WeightsFileReader

DADA_FILE_PREFIX_START_IDX = 0
"""Index within the DADA file name of where the prefix starts."""

DADA_FILE_PREFIX_END_IDX = 20
"""Index within the DADA file name of where the prefix ends."""

DADA_FILE_SUFFIX_START_IDX = 36
"""Index within the DADA file name of where the suffix starts."""


[docs]@dataclasses.dataclass(kw_only=True) class ScanFileMetadata: """A data class to store the metadata of a scan file.""" name: str obs_offset: int file_number: int data_size: int file_size: int scan_id: int
[docs]class DspDataAnalyser: """Class for analysing files generated by DSP.CORE.""" def __init__( self: DspDataAnalyser, scan_config: dict, local_path: pathlib.Path, staging_path: pathlib.Path, dlm_path: pathlib.Path, subsystem_id: str, scan_id: int, eb_id: str | None = None, logger: logging.Logger | None = None, ) -> None: """Create instance of DspDataAnalyser. :param scan_config: the configuration used for the scan. :param eb_id: execution block id. :param subsystem_id: the path indicating the subsystem. :param local_path: the path where DSP writes data products :param staging_path: the path where processed DSP products are staged for DLM :param dlm_path: the path where DLM reads finalised DSP products :param scan_id: the scan ID to analyse the data for. :param logger: the logger to use for the system. """ self.scan_config = scan_config self.scan_id = scan_id self.eb_id = eb_id or scan_config["common"]["eb_id"] self.subsystem_id = subsystem_id self.local_scan_path = local_path / self.eb_id / subsystem_id / str(scan_id) self.staging_scan_path = staging_path / f"{self.eb_id}_{subsystem_id}_{scan_id}" self.dlm_scan_path = dlm_path / f"{self.eb_id}_{subsystem_id}_{scan_id}" self.logger = logger or logging.getLogger(__name__) self.logger.debug(f"{local_path=} {staging_path=} {dlm_path=}")
[docs] def get_all_dada_files(self: DspDataAnalyser, dsp_subpath: str) -> List[pathlib.Path]: """ Return a list of all dada files in the local, staging or dlm scan paths and the dsp_subpath. :param dsp_sub_path: subdirectory to look for dada files, i.e. data or weights """ self.dsp_sub_paths = [ self.local_scan_path / dsp_subpath, self.staging_scan_path / dsp_subpath, self.dlm_scan_path / dsp_subpath, ] dada_files = [] for dada_path in self.dsp_sub_paths: dada_files.extend(self.get_dada_files(dada_path=dada_path)) return dada_files
[docs] def get_dada_files(self: DspDataAnalyser, dada_path: pathlib.Path) -> List[pathlib.Path]: """Parse SCAN data path and return list of dada files.""" return list(dada_path.glob("*.dada"))
[docs] def get_weights_file_for_data_file(self: DspDataAnalyser, data_path: pathlib.Path) -> pathlib.Path | None: """Return the weights file that matches the file number count for the corresponding data file.""" weights_path = data_path.parent.parent / "weights" prefix = data_path.name[DADA_FILE_PREFIX_START_IDX:DADA_FILE_PREFIX_END_IDX] suffix = data_path.name[DADA_FILE_SUFFIX_START_IDX:] weights_files = list(weights_path.glob(f"{prefix}*{suffix}")) self.logger.debug( f"data_path.name={data_path.name} prefix={prefix} suffix={suffix} weights_files={weights_files}" ) return weights_files[0] if weights_files else None
[docs] def check_dada_files_exist( self: DspDataAnalyser, dsp_subpath: str, ) -> None: r"""Assert that DADA files exist for a given subpath. This will check that \*.dada files exist in the given dsp_subpath. Files are expected to be found at one of the following locations: <local_path> / <eb_id> / <subsystem_id> / <scan_id> / dsp_subpath <staging_path> / <eb_id>_<subsystem_id>_<scan_id> / dsp_subpath <dlm_path> / <eb_id>_<subsystem_id>_<scan_id> / dsp_subpath """ dada_files = self.get_all_dada_files(dsp_subpath) self.logger.debug(f"check_dsp_files.data_files: {dada_files}") assert len(dada_files) > 0, f"no dada files were found in {self.dsp_sub_paths}"
[docs] def check_sinusoid_frequency(self: DspDataAnalyser, expected_frequency: float, eps: float = 0.1) -> None: r"""Analyse DSP artefacts. This will find data and weights file for the given scan and assert that the frequency from data is within `eps` of the expected frequency. """ data_files = self.get_all_dada_files("data") analysis_stdout = [] for data_file in data_files: weights_file = self.get_weights_file_for_data_file(data_file) assert ( weights_file and weights_file.exists() ), f"Expected weights file {weights_file} to exist for data file {data_file}" cmd = ["/usr/local/bin/ska_pst_dsp_disk_sine_analyse", str(data_file), str(weights_file)] try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) output, error = proc.communicate() analysis_stdout.append(output.decode()) except Exception: self.logger.exception("Error in checking sinusoid frequency.", exc_info=True) # Prevents false positives. If data_files == [], this would be [] assert analysis_stdout != [] for result in analysis_stdout: self.logger.debug("ska_pst_dsp_disk_sine_analyse returned '{result}'") try: frequency_str = result.split("found at frequency=")[1].split(" ")[0] frequency = round(float(frequency_str), 1) except Exception: self.logger.exception( f"Error in extracting sinusoid frequency from result='{result}'.", exc_info=True ) raise self.logger.debug( (f"Frequency rounded off frequency={frequency} " f"expected_frequency={expected_frequency}") ) assert ( abs(frequency - expected_frequency) < eps ), f"expected {frequency} to be within {eps} of {expected_frequency}"
def _load_file_metadata(self: DspDataAnalyser, file_type: str) -> Tuple[Dict[int, ScanFileMetadata], int]: """ Load metadata from all DADA files for the specified file type. :param file_type: The type of files to load metadata for, either 'data' or 'weights'. :return: A tuple containing: - A dictionary mapping file numbers to ScanFileMetadata instances. - The total accumulated data size across all files. """ total_data_size = 0 files_data = {} for f in self.get_all_dada_files(dsp_subpath=file_type): with DadaFileReader(f, logger=self.logger) as file: total_data_size += file.data_size file_data = ScanFileMetadata( name=str(f.resolve()), obs_offset=file.obs_offset, file_number=file.file_number, scan_id=int(file.scan_id), data_size=file.data_size, file_size=file.file_size, ) files_data[file_data.file_number] = file_data return files_data, total_data_size def _compute_file_metrics(self: DspDataAnalyser, scanlen: float, resources: dict, file_type: str) -> dict: """ Compute various size-related metrics for the scan. This includes bytes per file, bytes per second, and the number of expected files. :param scanlen: Length of the scan in seconds. :param resources: Dictionary containing configuration and resource metadata for UDP generator. :param file_type: Type of file to compute metrics for, either 'data' or 'weights'. :return: Dictionary with computed metrics: - bytes_per_second - bytes_per_file - expected_num_files - expected_size - num_packets_per_file """ tsamp = float(resources["tsamp"]) nchan = int(resources["nchan"]) nbit = int(resources["nbits"]) ndim = int(resources["ndim"]) npol = int(resources["npol"]) resolution = int(resources["resolution"]) udp_nsamp = int(resources["udp_nsamp"]) if resources["pst_processing_mode"] == PstProcessingMode.FLOW_THROUGH: channels = resources["channels"] nchan = channels[1] - channels[0] + 1 polarisations = resources["polarisations"] npol = 2 if polarisations == "Both" else 1 nbit = int(resources["num_bits_out"]) # to ensure alignment between TFP data and weights, include udp_nsamp in # resolution calculation, since weights span 1 packet (i.e. udp_nsamp) resolution = (udp_nsamp * nchan * ndim * npol * nbit) // 8 nbit_per_sample = nchan * nbit * ndim * npol nsamp_per_second = MICROSECS_PER_SEC / tsamp nbit_per_second = nbit_per_sample * nsamp_per_second bytes_per_second = nbit_per_second / BITS_PER_BYTE expected_size = int(math.ceil(scanlen * bytes_per_second)) if expected_size % resolution != 0: expected_size += resolution - expected_size % resolution bytes_per_file = int(math.floor(bytes_per_second * SECONDS_PER_FILE)) if (remainder := bytes_per_file % resolution) != 0: bytes_per_file += resolution - remainder expected_num_files = int(math.ceil(expected_size / bytes_per_file)) num_packets_per_file = bytes_per_file // resolution if file_type == "weights": udp_nsamp = int(resources["udp_nsamp"]) udp_nchan = int(resources["udp_nchan"]) wt_nsamp = int(resources["wt_nsamp"]) nbit = WEIGHTS_NBITS tsamp *= udp_nsamp ndim = WEIGHTS_NDIM npol = WEIGHTS_NPOL weights_channel_stride = udp_nsamp // wt_nsamp * nbit // BITS_PER_BYTE wt_resolution = nchan * weights_channel_stride + (nchan // udp_nchan) * SIZE_OF_FLOAT32_IN_BYTES bytes_per_second *= wt_resolution / resolution bytes_per_file = num_packets_per_file * wt_resolution self.logger.info(f"wt_resolution = {wt_resolution}") self.logger.debug(f"nbit = {nbit}") self.logger.debug(f"ndim = {ndim}") self.logger.debug(f"npol = {npol}") self.logger.debug(f"nchan = {nchan}") self.logger.debug(f"tsamp = {tsamp}") self.logger.debug(f"resolution = {resolution}") return { "bytes_per_second": bytes_per_second, "bytes_per_file": bytes_per_file, "expected_num_files": expected_num_files, "expected_size": expected_size, "num_packets_per_file": num_packets_per_file, }
[docs] def assert_expected_file_count( self: DspDataAnalyser, file_type: str, expected_num_files: int, ) -> None: """ Assert that the number of files matches the expected count. :param file_type: Type of file, either 'data' or 'weights'. :param expected_num_files: Number of files expected based on metadata. """ files_data, _ = self._load_file_metadata(file_type) actual_num_files = len(files_data) assert ( actual_num_files == expected_num_files ), f"expected {expected_num_files} files, but found {actual_num_files}"
[docs] def assert_scan_duration_match( self: DspDataAnalyser, file_type: str, scanlen: float, bytes_per_second: float, ) -> None: """ Assert that the total data duration matches the expected scan length. :param file_type: Type of file, either 'data' or 'weights'. :param scanlen: Expected scan length in seconds. :param bytes_per_second: Expected bytes per second for the file type. """ _, total_data_size = self._load_file_metadata(file_type) recorded_scan_length = total_data_size / bytes_per_second self.logger.info(f"recorded {recorded_scan_length:0.6}s of data, expected around {scanlen:0.6}s") assert ( abs(recorded_scan_length - scanlen) < 1.0 ), f"recorded {recorded_scan_length}s of data, expected around {scanlen:0.3}s"
[docs] def assert_obs_offsets( self: DspDataAnalyser, file_type: str, expected_num_files: int, bytes_per_file: int, ) -> None: """ Assert that the obs_offset for each file matches its expected position. :param file_type: Type of file, either 'data' or 'weights'. :param expected_num_files: Number of expected files. :param bytes_per_file: Expected size of each file in bytes. """ files_data, _ = self._load_file_metadata(file_type) for file_number in range(expected_num_files): curr_file = files_data[file_number] expected_offset = file_number * bytes_per_file self.logger.debug( f"\ncurr_file.name={curr_file.name} " f"\nfile_number={file_number} " f"\nbytes_per_file={bytes_per_file} " f"\nobs_offset={curr_file.obs_offset} " f"\nexpected_offset={expected_offset}" ) assert curr_file.obs_offset == expected_offset, ( f"expected obs_offset for file {curr_file.name} is {expected_offset}, " f"but got {curr_file.obs_offset}" )
[docs] def assert_file_data_sizes( self: DspDataAnalyser, file_type: str, expected_num_files: int, bytes_per_file: int, ) -> None: """ Assert that each file has the correct data size based on its position. :param file_type: Type of file, either 'data' or 'weights'. :param expected_num_files: Number of expected files. :param bytes_per_file: Expected size of each file in bytes. """ files_data, total_data_size = self._load_file_metadata(file_type) for file_number in range(expected_num_files): curr_file = files_data[file_number] if file_number < expected_num_files - 1: expected_data_size = bytes_per_file else: expected_data_size = total_data_size - file_number * bytes_per_file assert curr_file.data_size == expected_data_size, ( f"expected data_size for file {curr_file.name} is {expected_data_size}B " f"but it has {curr_file.data_size}B" )
[docs] def assert_full_file_integrity( self: DspDataAnalyser, scanlen: float, file_type: str, bytes_per_second: float, bytes_per_file: int, expected_num_files: int, **kwargs: Any, ) -> None: """ Perform all integrity checks on the given file type. :param scanlen: expected scan duration in seconds. :type scanlen: float :param file_type: Type of file to validate (e.g., 'data', 'meta'). :type file_type: str :param bytes_per_second: expected throughput in bytes per second. :type bytes_per_second: float :param bytes_per_file: expected size of each file in bytes. :type bytes_per_file: int :param expected_num_files: number of files expected. :type expected_num_files: int """ self.assert_expected_file_count(file_type, expected_num_files) self.assert_scan_duration_match(file_type, scanlen, bytes_per_second) self.assert_obs_offsets(file_type, expected_num_files, bytes_per_file) self.assert_file_data_sizes(file_type, expected_num_files, bytes_per_file)
[docs] def check_contiguous_files( self: DspDataAnalyser, scanlen: float, calculated_resources: dict, file_type: str, ) -> None: """ Check that the data or weight files generated during the scan are contiguous and consistent. :param scanlen: Length of the scan in seconds. :param calculated_resources: Dictionary of calculated UDP generator resources. :param file_type: The type of files to analyse, either 'data' or 'weights'. """ file_metrics = self._compute_file_metrics(scanlen, calculated_resources, file_type) self.assert_full_file_integrity( scanlen=scanlen, file_type=file_type, **file_metrics, )
[docs] def check_weights_contain_dropped_packets( self: DspDataAnalyser, expected_dropped_packets: List[int], ) -> None: """Analyse DSP weights files. This will parse all weights files in the local, staging or dlm scan paths and check that the specified packets are flagged as dropped. """ dropped_packets: List[int] = [] weights_files = self.get_all_dada_files("weights") for f in weights_files: self.logger.debug(f"Opening weights file={f} with WeightsFileReader") with WeightsFileReader(f, logger=self.logger, unpack_scales=True, unpack_weights=False) as file: dropped_packets.extend(file.dropped_packets) self.logger.info( f"Found dropped packets {len(dropped_packets)}, searching for {expected_dropped_packets}" ) assert set(expected_dropped_packets).issubset( set(dropped_packets) ), f"Expected {expected_dropped_packets} to be within recorded dropped packets"
[docs] def check_weights_zeroed( self: DspDataAnalyser, expected_zeroed_packets: List[int], ) -> None: """Analyse DSP weights files for zero weights. This will parse all weights files in the local, staging or dlm scan paths and check that the specified packets have the weights being zeroed out. """ zeroed_packets: List[int] = [] weights_files = self.get_all_dada_files("weights") for f in weights_files: self.logger.debug(f"Opening weights file={f} with WeightsFileReader") with WeightsFileReader(f, logger=self.logger, unpack_scales=True, unpack_weights=True) as file: zeroed_packets.extend(file.zeroed_packets) self.logger.debug( f"Found {len(zeroed_packets)} zeroed packets, searching for {expected_zeroed_packets}" ) assert set(expected_zeroed_packets).issubset( set(zeroed_packets) ), f"Expected {expected_zeroed_packets} to be within recorded zeroed packets"