# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module to used for analysing DSP artefacts."""
from __future__ import annotations
__all__ = [
"DspDataAnalyser",
"ScanFileMetadata",
]
import dataclasses
import logging
import math
import pathlib
import subprocess
from typing import Any, Dict, List, Tuple
from ska_control_model import PstProcessingMode
from ska_pst.common.constants import (
BITS_PER_BYTE,
MICROSECS_PER_SEC,
SIZE_OF_FLOAT32_IN_BYTES,
WEIGHTS_NBITS,
WEIGHTS_NDIM,
WEIGHTS_NPOL,
)
from ska_pst.common.dada_file import SECONDS_PER_FILE, DadaFileReader, WeightsFileReader
DADA_FILE_PREFIX_START_IDX = 0
"""Index within the DADA file name of where the prefix starts."""
DADA_FILE_PREFIX_END_IDX = 20
"""Index within the DADA file name of where the prefix ends."""
DADA_FILE_SUFFIX_START_IDX = 36
"""Index within the DADA file name of where the suffix starts."""
[docs]class DspDataAnalyser:
"""Class for analysing files generated by DSP.CORE."""
def __init__(
self: DspDataAnalyser,
scan_config: dict,
local_path: pathlib.Path,
staging_path: pathlib.Path,
dlm_path: pathlib.Path,
subsystem_id: str,
scan_id: int,
eb_id: str | None = None,
logger: logging.Logger | None = None,
) -> None:
"""Create instance of DspDataAnalyser.
:param scan_config: the configuration used for the scan.
:param eb_id: execution block id.
:param subsystem_id: the path indicating the subsystem.
:param local_path: the path where DSP writes data products
:param staging_path: the path where processed DSP products are staged for DLM
:param dlm_path: the path where DLM reads finalised DSP products
:param scan_id: the scan ID to analyse the data for.
:param logger: the logger to use for the system.
"""
self.scan_config = scan_config
self.scan_id = scan_id
self.eb_id = eb_id or scan_config["common"]["eb_id"]
self.subsystem_id = subsystem_id
self.local_scan_path = local_path / self.eb_id / subsystem_id / str(scan_id)
self.staging_scan_path = staging_path / f"{self.eb_id}_{subsystem_id}_{scan_id}"
self.dlm_scan_path = dlm_path / f"{self.eb_id}_{subsystem_id}_{scan_id}"
self.logger = logger or logging.getLogger(__name__)
self.logger.debug(f"{local_path=} {staging_path=} {dlm_path=}")
[docs] def get_all_dada_files(self: DspDataAnalyser, dsp_subpath: str) -> List[pathlib.Path]:
"""
Return a list of all dada files in the local, staging or dlm scan paths and the dsp_subpath.
:param dsp_sub_path: subdirectory to look for dada files, i.e. data or weights
"""
self.dsp_sub_paths = [
self.local_scan_path / dsp_subpath,
self.staging_scan_path / dsp_subpath,
self.dlm_scan_path / dsp_subpath,
]
dada_files = []
for dada_path in self.dsp_sub_paths:
dada_files.extend(self.get_dada_files(dada_path=dada_path))
return dada_files
[docs] def get_dada_files(self: DspDataAnalyser, dada_path: pathlib.Path) -> List[pathlib.Path]:
"""Parse SCAN data path and return list of dada files."""
return list(dada_path.glob("*.dada"))
[docs] def get_weights_file_for_data_file(self: DspDataAnalyser, data_path: pathlib.Path) -> pathlib.Path | None:
"""Return the weights file that matches the file number count for the corresponding data file."""
weights_path = data_path.parent.parent / "weights"
prefix = data_path.name[DADA_FILE_PREFIX_START_IDX:DADA_FILE_PREFIX_END_IDX]
suffix = data_path.name[DADA_FILE_SUFFIX_START_IDX:]
weights_files = list(weights_path.glob(f"{prefix}*{suffix}"))
self.logger.debug(
f"data_path.name={data_path.name} prefix={prefix} suffix={suffix} weights_files={weights_files}"
)
return weights_files[0] if weights_files else None
[docs] def check_dada_files_exist(
self: DspDataAnalyser,
dsp_subpath: str,
) -> None:
r"""Assert that DADA files exist for a given subpath.
This will check that \*.dada files exist in the given dsp_subpath. Files are expected to be
found at one of the following locations:
<local_path> / <eb_id> / <subsystem_id> / <scan_id> / dsp_subpath
<staging_path> / <eb_id>_<subsystem_id>_<scan_id> / dsp_subpath
<dlm_path> / <eb_id>_<subsystem_id>_<scan_id> / dsp_subpath
"""
dada_files = self.get_all_dada_files(dsp_subpath)
self.logger.debug(f"check_dsp_files.data_files: {dada_files}")
assert len(dada_files) > 0, f"no dada files were found in {self.dsp_sub_paths}"
[docs] def check_sinusoid_frequency(self: DspDataAnalyser, expected_frequency: float, eps: float = 0.1) -> None:
r"""Analyse DSP artefacts.
This will find data and weights file for the given scan and assert that the frequency from
data is within `eps` of the expected frequency.
"""
data_files = self.get_all_dada_files("data")
analysis_stdout = []
for data_file in data_files:
weights_file = self.get_weights_file_for_data_file(data_file)
assert (
weights_file and weights_file.exists()
), f"Expected weights file {weights_file} to exist for data file {data_file}"
cmd = ["/usr/local/bin/ska_pst_dsp_disk_sine_analyse", str(data_file), str(weights_file)]
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
output, error = proc.communicate()
analysis_stdout.append(output.decode())
except Exception:
self.logger.exception("Error in checking sinusoid frequency.", exc_info=True)
# Prevents false positives. If data_files == [], this would be []
assert analysis_stdout != []
for result in analysis_stdout:
self.logger.debug("ska_pst_dsp_disk_sine_analyse returned '{result}'")
try:
frequency_str = result.split("found at frequency=")[1].split(" ")[0]
frequency = round(float(frequency_str), 1)
except Exception:
self.logger.exception(
f"Error in extracting sinusoid frequency from result='{result}'.", exc_info=True
)
raise
self.logger.debug(
(f"Frequency rounded off frequency={frequency} " f"expected_frequency={expected_frequency}")
)
assert (
abs(frequency - expected_frequency) < eps
), f"expected {frequency} to be within {eps} of {expected_frequency}"
def _load_file_metadata(self: DspDataAnalyser, file_type: str) -> Tuple[Dict[int, ScanFileMetadata], int]:
"""
Load metadata from all DADA files for the specified file type.
:param file_type: The type of files to load metadata for, either 'data' or 'weights'.
:return: A tuple containing:
- A dictionary mapping file numbers to ScanFileMetadata instances.
- The total accumulated data size across all files.
"""
total_data_size = 0
files_data = {}
for f in self.get_all_dada_files(dsp_subpath=file_type):
with DadaFileReader(f, logger=self.logger) as file:
total_data_size += file.data_size
file_data = ScanFileMetadata(
name=str(f.resolve()),
obs_offset=file.obs_offset,
file_number=file.file_number,
scan_id=int(file.scan_id),
data_size=file.data_size,
file_size=file.file_size,
)
files_data[file_data.file_number] = file_data
return files_data, total_data_size
def _compute_file_metrics(self: DspDataAnalyser, scanlen: float, resources: dict, file_type: str) -> dict:
"""
Compute various size-related metrics for the scan.
This includes bytes per file, bytes per second, and the number of expected files.
:param scanlen: Length of the scan in seconds.
:param resources: Dictionary containing configuration and resource metadata for UDP generator.
:param file_type: Type of file to compute metrics for, either 'data' or 'weights'.
:return: Dictionary with computed metrics:
- bytes_per_second
- bytes_per_file
- expected_num_files
- expected_size
- num_packets_per_file
"""
tsamp = float(resources["tsamp"])
nchan = int(resources["nchan"])
nbit = int(resources["nbits"])
ndim = int(resources["ndim"])
npol = int(resources["npol"])
resolution = int(resources["resolution"])
udp_nsamp = int(resources["udp_nsamp"])
if resources["pst_processing_mode"] == PstProcessingMode.FLOW_THROUGH:
channels = resources["channels"]
nchan = channels[1] - channels[0] + 1
polarisations = resources["polarisations"]
npol = 2 if polarisations == "Both" else 1
nbit = int(resources["num_bits_out"])
# to ensure alignment between TFP data and weights, include udp_nsamp in
# resolution calculation, since weights span 1 packet (i.e. udp_nsamp)
resolution = (udp_nsamp * nchan * ndim * npol * nbit) // 8
nbit_per_sample = nchan * nbit * ndim * npol
nsamp_per_second = MICROSECS_PER_SEC / tsamp
nbit_per_second = nbit_per_sample * nsamp_per_second
bytes_per_second = nbit_per_second / BITS_PER_BYTE
expected_size = int(math.ceil(scanlen * bytes_per_second))
if expected_size % resolution != 0:
expected_size += resolution - expected_size % resolution
bytes_per_file = int(math.floor(bytes_per_second * SECONDS_PER_FILE))
if (remainder := bytes_per_file % resolution) != 0:
bytes_per_file += resolution - remainder
expected_num_files = int(math.ceil(expected_size / bytes_per_file))
num_packets_per_file = bytes_per_file // resolution
if file_type == "weights":
udp_nsamp = int(resources["udp_nsamp"])
udp_nchan = int(resources["udp_nchan"])
wt_nsamp = int(resources["wt_nsamp"])
nbit = WEIGHTS_NBITS
tsamp *= udp_nsamp
ndim = WEIGHTS_NDIM
npol = WEIGHTS_NPOL
weights_channel_stride = udp_nsamp // wt_nsamp * nbit // BITS_PER_BYTE
wt_resolution = nchan * weights_channel_stride + (nchan // udp_nchan) * SIZE_OF_FLOAT32_IN_BYTES
bytes_per_second *= wt_resolution / resolution
bytes_per_file = num_packets_per_file * wt_resolution
self.logger.info(f"wt_resolution = {wt_resolution}")
self.logger.debug(f"nbit = {nbit}")
self.logger.debug(f"ndim = {ndim}")
self.logger.debug(f"npol = {npol}")
self.logger.debug(f"nchan = {nchan}")
self.logger.debug(f"tsamp = {tsamp}")
self.logger.debug(f"resolution = {resolution}")
return {
"bytes_per_second": bytes_per_second,
"bytes_per_file": bytes_per_file,
"expected_num_files": expected_num_files,
"expected_size": expected_size,
"num_packets_per_file": num_packets_per_file,
}
[docs] def assert_expected_file_count(
self: DspDataAnalyser,
file_type: str,
expected_num_files: int,
) -> None:
"""
Assert that the number of files matches the expected count.
:param file_type: Type of file, either 'data' or 'weights'.
:param expected_num_files: Number of files expected based on metadata.
"""
files_data, _ = self._load_file_metadata(file_type)
actual_num_files = len(files_data)
assert (
actual_num_files == expected_num_files
), f"expected {expected_num_files} files, but found {actual_num_files}"
[docs] def assert_scan_duration_match(
self: DspDataAnalyser,
file_type: str,
scanlen: float,
bytes_per_second: float,
) -> None:
"""
Assert that the total data duration matches the expected scan length.
:param file_type: Type of file, either 'data' or 'weights'.
:param scanlen: Expected scan length in seconds.
:param bytes_per_second: Expected bytes per second for the file type.
"""
_, total_data_size = self._load_file_metadata(file_type)
recorded_scan_length = total_data_size / bytes_per_second
self.logger.info(f"recorded {recorded_scan_length:0.6}s of data, expected around {scanlen:0.6}s")
assert (
abs(recorded_scan_length - scanlen) < 1.0
), f"recorded {recorded_scan_length}s of data, expected around {scanlen:0.3}s"
[docs] def assert_obs_offsets(
self: DspDataAnalyser,
file_type: str,
expected_num_files: int,
bytes_per_file: int,
) -> None:
"""
Assert that the obs_offset for each file matches its expected position.
:param file_type: Type of file, either 'data' or 'weights'.
:param expected_num_files: Number of expected files.
:param bytes_per_file: Expected size of each file in bytes.
"""
files_data, _ = self._load_file_metadata(file_type)
for file_number in range(expected_num_files):
curr_file = files_data[file_number]
expected_offset = file_number * bytes_per_file
self.logger.debug(
f"\ncurr_file.name={curr_file.name} "
f"\nfile_number={file_number} "
f"\nbytes_per_file={bytes_per_file} "
f"\nobs_offset={curr_file.obs_offset} "
f"\nexpected_offset={expected_offset}"
)
assert curr_file.obs_offset == expected_offset, (
f"expected obs_offset for file {curr_file.name} is {expected_offset}, "
f"but got {curr_file.obs_offset}"
)
[docs] def assert_file_data_sizes(
self: DspDataAnalyser,
file_type: str,
expected_num_files: int,
bytes_per_file: int,
) -> None:
"""
Assert that each file has the correct data size based on its position.
:param file_type: Type of file, either 'data' or 'weights'.
:param expected_num_files: Number of expected files.
:param bytes_per_file: Expected size of each file in bytes.
"""
files_data, total_data_size = self._load_file_metadata(file_type)
for file_number in range(expected_num_files):
curr_file = files_data[file_number]
if file_number < expected_num_files - 1:
expected_data_size = bytes_per_file
else:
expected_data_size = total_data_size - file_number * bytes_per_file
assert curr_file.data_size == expected_data_size, (
f"expected data_size for file {curr_file.name} is {expected_data_size}B "
f"but it has {curr_file.data_size}B"
)
[docs] def assert_full_file_integrity(
self: DspDataAnalyser,
scanlen: float,
file_type: str,
bytes_per_second: float,
bytes_per_file: int,
expected_num_files: int,
**kwargs: Any,
) -> None:
"""
Perform all integrity checks on the given file type.
:param scanlen: expected scan duration in seconds.
:type scanlen: float
:param file_type: Type of file to validate (e.g., 'data', 'meta').
:type file_type: str
:param bytes_per_second: expected throughput in bytes per second.
:type bytes_per_second: float
:param bytes_per_file: expected size of each file in bytes.
:type bytes_per_file: int
:param expected_num_files: number of files expected.
:type expected_num_files: int
"""
self.assert_expected_file_count(file_type, expected_num_files)
self.assert_scan_duration_match(file_type, scanlen, bytes_per_second)
self.assert_obs_offsets(file_type, expected_num_files, bytes_per_file)
self.assert_file_data_sizes(file_type, expected_num_files, bytes_per_file)
[docs] def check_contiguous_files(
self: DspDataAnalyser,
scanlen: float,
calculated_resources: dict,
file_type: str,
) -> None:
"""
Check that the data or weight files generated during the scan are contiguous and consistent.
:param scanlen: Length of the scan in seconds.
:param calculated_resources: Dictionary of calculated UDP generator resources.
:param file_type: The type of files to analyse, either 'data' or 'weights'.
"""
file_metrics = self._compute_file_metrics(scanlen, calculated_resources, file_type)
self.assert_full_file_integrity(
scanlen=scanlen,
file_type=file_type,
**file_metrics,
)
[docs] def check_weights_contain_dropped_packets(
self: DspDataAnalyser,
expected_dropped_packets: List[int],
) -> None:
"""Analyse DSP weights files.
This will parse all weights files in the local, staging or dlm scan paths
and check that the specified packets are flagged as dropped.
"""
dropped_packets: List[int] = []
weights_files = self.get_all_dada_files("weights")
for f in weights_files:
self.logger.debug(f"Opening weights file={f} with WeightsFileReader")
with WeightsFileReader(f, logger=self.logger, unpack_scales=True, unpack_weights=False) as file:
dropped_packets.extend(file.dropped_packets)
self.logger.info(
f"Found dropped packets {len(dropped_packets)}, searching for {expected_dropped_packets}"
)
assert set(expected_dropped_packets).issubset(
set(dropped_packets)
), f"Expected {expected_dropped_packets} to be within recorded dropped packets"
[docs] def check_weights_zeroed(
self: DspDataAnalyser,
expected_zeroed_packets: List[int],
) -> None:
"""Analyse DSP weights files for zero weights.
This will parse all weights files in the local, staging or dlm scan paths
and check that the specified packets have the weights being zeroed out.
"""
zeroed_packets: List[int] = []
weights_files = self.get_all_dada_files("weights")
for f in weights_files:
self.logger.debug(f"Opening weights file={f} with WeightsFileReader")
with WeightsFileReader(f, logger=self.logger, unpack_scales=True, unpack_weights=True) as file:
zeroed_packets.extend(file.zeroed_packets)
self.logger.debug(
f"Found {len(zeroed_packets)} zeroed packets, searching for {expected_zeroed_packets}"
)
assert set(expected_zeroed_packets).issubset(
set(zeroed_packets)
), f"Expected {expected_zeroed_packets} to be within recorded zeroed packets"