Source code for ska_pst.stat.hdf5.model

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module defines the model classes when processing HDF5 STAT data."""

from __future__ import annotations

__all__ = [
    "StatisticsData",
    "StatisticsMetadata",
    "HDF5_HEADER_TYPE_V1_0_0",
    "map_hdf5_key",
]

from dataclasses import dataclass
from typing import Dict, List, Literal

import h5py
import nptyping as npt
import numpy as np
from ska_pst.stat.hdf5.consts import (
    FILE_FORMAT_VERSION_1_0_0,
    FILE_FORMAT_VERSION_1_1_0,
    HDF5_BEAM_ID,
    HDF5_BW,
    HDF5_CHAN_FREQ,
    HDF5_EB_ID,
    HDF5_FREQ,
    HDF5_FREQUENCY_BINS,
    HDF5_HISTOGRAM_REBINNED_1D_FREQ_AVG,
    HDF5_HISTOGRAM_REBINNED_1D_FREQ_AVG_RFI_EXCISED,
    HDF5_HISTOGRAM_REBINNED_2D_FREQ_AVG,
    HDF5_HISTOGRAM_REBINNED_2D_FREQ_AVG_RFI_EXCISED,
    HDF5_NBIN_HIST,
    HDF5_NCHAN,
    HDF5_NCHAN_DS,
    HDF5_NDAT_DS,
    HDF5_NDIM,
    HDF5_NPOL,
    HDF5_NREBIN,
    HDF5_NUM_INVALID_PACKETS,
    HDF5_NUM_SAMPLES,
    HDF5_NUM_SAMPLES_RFI_EXCISED,
    HDF5_NUM_SAMPLES_SPECTRUM,
    HDF5_NUM_WEIGHT_SAMPLES,
    HDF5_POLARISATIONS,
    HDF5_SCAN_ID,
    HDF5_START_CHAN,
    HDF5_T_MAX,
    HDF5_T_MIN,
    HDF5_TELESCOPE,
    HDF5_TIMESERIES_BINS,
    HDF5_UTC_START,
    VALID_FILE_FORMAT_VERSIONS,
    Polarisation,
)

KEY_MAP: Dict[str, str] = {
    HDF5_BW: "bandwidth_mhz",
    HDF5_FREQ: "frequency_mhz",
    HDF5_NBIN_HIST: "histogram_nbin",
    HDF5_CHAN_FREQ: "channel_freq_mhz",
    HDF5_HISTOGRAM_REBINNED_1D_FREQ_AVG: "rebinned_histogram_1d_freq_avg",
    HDF5_HISTOGRAM_REBINNED_1D_FREQ_AVG_RFI_EXCISED: "rebinned_histogram_1d_freq_avg_rfi_excised",
    HDF5_HISTOGRAM_REBINNED_2D_FREQ_AVG: "rebinned_histogram_2d_freq_avg",
    HDF5_HISTOGRAM_REBINNED_2D_FREQ_AVG_RFI_EXCISED: "rebinned_histogram_2d_freq_avg_rfi_excised",
}


[docs]@dataclass(kw_only=True, frozen=True) class StatFileFormat: header_dtype: npt.Void """The Numpy structured array data type.""" has_weights: bool """Indicator for whether file format has weights statistics or not.""" has_polarisations: bool """Indicator for whether file format includes the selected polarisations."""
[docs]def get_stat_file_format(version: str) -> StatFileFormat: assert version in VALID_FILE_FORMAT_VERSIONS, f"expected {version=} to be in {VALID_FILE_FORMAT_VERSIONS}" if version == FILE_FORMAT_VERSION_1_0_0: return StatFileFormat( header_dtype=HDF5_HEADER_TYPE_V1_0_0, has_weights=False, has_polarisations=False ) else: return StatFileFormat(header_dtype=HDF5_HEADER_TYPE_V1_1_0, has_weights=True, has_polarisations=True)
[docs]def map_hdf5_key(hdf5_key: str) -> str: """Map a key from a HDF5 attribute/dataset to a model dataclass property.""" try: return KEY_MAP[hdf5_key] except KeyError: return hdf5_key.lower()
string_dt = h5py.string_dtype(encoding="utf-8") uint32_dt = np.uint32 uint32_array_dt = h5py.vlen_dtype(uint32_dt) uint64_dt = np.uint64 float_dt = np.float32 double_dt = np.float64 double_array_dt = h5py.vlen_dtype(double_dt) _BASE_HDF5_HEADER_TYPE = [ (HDF5_EB_ID, string_dt), (HDF5_TELESCOPE, string_dt), (HDF5_SCAN_ID, uint64_dt), (HDF5_BEAM_ID, string_dt), (HDF5_UTC_START, string_dt), (HDF5_T_MIN, double_dt), (HDF5_T_MAX, double_dt), (HDF5_FREQ, double_dt), (HDF5_BW, double_dt), (HDF5_START_CHAN, uint32_dt), (HDF5_NPOL, uint32_dt), (HDF5_NDIM, uint32_dt), (HDF5_NCHAN, uint32_dt), (HDF5_NCHAN_DS, uint32_dt), (HDF5_NDAT_DS, uint32_dt), (HDF5_NBIN_HIST, uint32_dt), (HDF5_NREBIN, uint32_dt), (HDF5_CHAN_FREQ, double_array_dt), (HDF5_FREQUENCY_BINS, double_array_dt), (HDF5_TIMESERIES_BINS, double_array_dt), (HDF5_NUM_SAMPLES, uint32_dt), (HDF5_NUM_SAMPLES_RFI_EXCISED, uint32_dt), (HDF5_NUM_SAMPLES_SPECTRUM, uint32_array_dt), (HDF5_NUM_INVALID_PACKETS, uint32_dt), ] HDF5_HEADER_TYPE_V1_0_0 = np.dtype(_BASE_HDF5_HEADER_TYPE) HDF5_HEADER_TYPE_V1_1_0 = np.dtype( [ *_BASE_HDF5_HEADER_TYPE, (HDF5_NUM_WEIGHT_SAMPLES, uint32_dt), (HDF5_POLARISATIONS, string_dt), ] )
[docs]@dataclass(kw_only=True, frozen=True) class StatisticsMetadata: """Data class modeling the metadata from a HDF5 STAT data file.""" file_format_version: str = FILE_FORMAT_VERSION_1_1_0 """The format of the HDF5 STAT file. Default is ``1.1.0``.""" eb_id: str """The execution block id the file relates to.""" telescope: str """The telescope the data were collected for. Should be SKALow or SKAMid""" scan_id: int """The scan id for the generated data file""" beam_id: str """The beam id for the generated data file""" utc_start: str """The UTC ISO formatted start time in of scan to the nearest second.""" t_min: float """The time offset, in seconds, from the UTC start time to represent the time at the start the file.""" t_max: float """The time offset, in seconds, from the UTC start time to represent the time at the end the file.""" frequency_mhz: float """The centre frequency for the data as a whole""" bandwidth_mhz: float """The bandwidth of data""" start_chan: int """The starting channel number.""" npol: int """Number of polarisations.""" ndim: int """Number of dimensions in the data (should be 2 for complex data).""" nchan: int """Number of channels in the data.""" nchan_ds: int """The number of frequency bins in the spectrogram data.""" ndat_ds: int """The number of temporal bins in the spectrogram and timeseries data.""" histogram_nbin: int """The number of bins in the histogram data.""" nrebin: int """Number of bins to use for rebinned histograms""" channel_freq_mhz: npt.NDArray[Literal["NChan"], npt.Float64] """The centre frequencies of each channel (MHz).""" timeseries_bins: npt.NDArray[Literal["NTimeBin"], npt.Float64] """The timestamp offsets for each temporal bin.""" frequency_bins: npt.NDArray[Literal["NFreqBin"], npt.Float64] """The frequency bins used for the spectrogram attribute (MHz).""" num_samples: int """The total number of samples used to calculate the sample statistics.""" num_samples_rfi_excised: int """The total number of samples used to calculate the sample statistics, expect those flagged for RFI.""" num_samples_spectrum: npt.NDArray[Literal["NChan"], npt.UInt32] """The number of samples, per channel, to calculate the sample statistics.""" num_invalid_packets: int """The number invalid packets received while calculating the statistics.""" num_weight_samples: int = 0 """The number of samples used to calculate the weight statisitics.""" has_weights: bool = False """Indicator of whether weights are included in the statistics or not.""" polarisations: str = "Both" """ Get a string representation of the polarisations. Values are either ``A``, ``B`` or ``Both``. """ @property def end_chan(self: StatisticsMetadata) -> int: """Get the last channel that the header is for.""" return self.start_chan + self.nchan - 1 @property def polarisations_list(self: StatisticsMetadata) -> List[Polarisation]: """ Get a list of polarisations of the STAT HDF5. For version 1.0.0 it is assumed both Pol A and Pol B are valid. However, since version 1.1.0 and Flow Through stats the output stats could be for Pol A, Pol B or both. """ return Polarisation.from_string(self.polarisations)
[docs]@dataclass(kw_only=True, frozen=True) class StatisticsData: """A data class that represents the statistics loaded from the HDF5 file.""" # pylint: disable=line-too-long mean_frequency_avg: npt.NDArray[Literal["NPol, NDim"], npt.Float32] "The mean of the data for each polarisation and dimension, averaged over all channels." mean_frequency_avg_rfi_excised: npt.NDArray[Literal["NPol, NDim"], npt.Float32] "The mean of the data for each polarisation and dimension, averaged over all channels, expect those flagged for RFI." # noqa: E501 variance_frequency_avg: npt.NDArray[Literal["NPol, NDim"], npt.Float32] "The variance of the data for each polarisation and dimension, averaged over all channels." variance_frequency_avg_rfi_excised: npt.NDArray[Literal["NPol, NDim"], npt.Float32] "The variance of the data for each polarisation and dimension, averaged over all channels, expect those flagged for RFI." # noqa: E501 mean_spectrum: npt.NDArray[Literal["NPol, NDim, NChan"], npt.Float32] "The mean of the data for each polarisation, dimension and channel." variance_spectrum: npt.NDArray[Literal["NPol, NDim, NChan"], npt.Float32] "The variance of the data for each polarisation, dimension and channel." mean_spectral_power: npt.NDArray[Literal["NPol, NChan"], npt.Float32] "Mean power spectra of the data for each polarisation and channel." max_spectral_power: npt.NDArray[Literal["NPol, NChan"], npt.Float32] "Maximum power spectra of the data for each polarisation and channel." histogram_1d_freq_avg: npt.NDArray[Literal["NPol, NDim, NBin"], npt.UInt32] "Histogram of the input data integer states for each polarisation and dimension, averaged over all channels." # noqa: E501 histogram_1d_freq_avg_rfi_excised: npt.NDArray[Literal["NPol, NDim, NBin"], npt.UInt32] "Histogram of the input data integer states for each polarisation and dimension, averaged over all channels, expect those flagged for RFI." # noqa: E501 rebinned_histogram_2d_freq_avg: npt.NDArray[Literal["NPol, NRebin, NRebin"], npt.UInt32] "Rebinned 2D histogram of the input data integer states for each polarisation, averaged over all channels." # noqa: E501 rebinned_histogram_2d_freq_avg_rfi_excised: npt.NDArray[Literal["NPol, NRebin, NRebin"], npt.UInt32] "Rebinned 2D histogram of the input data integer states for each polarisation, averaged over all channels, expect those flagged for RFI." # noqa: E501 rebinned_histogram_1d_freq_avg: npt.NDArray[Literal["NPol, NDim, NRebin"], npt.UInt32] "Rebinned histogram of the input data integer states for each polarisation and dimension, averaged over all channels." # noqa: E501 rebinned_histogram_1d_freq_avg_rfi_excised: npt.NDArray[Literal["NPol, NDim, NRebin"], npt.UInt32] "Rebinned histogram of the input data integer states for each polarisation and dimension, averaged over all channels, expect those flagged for RFI." # noqa: E501 num_clipped_samples_spectrum: npt.NDArray[Literal["NPol, NDim, NChan"], npt.UInt32] "Number of clipped input samples (maximum level) for each polarisation, dimension and channel." num_clipped_samples: npt.NDArray[Literal["NPol, NDim"], npt.UInt32] "Number of clipped input samples (maximum level) for each polarisation, dimension, averaged over all channels." # noqa: E501 num_clipped_samples_rfi_excised: npt.NDArray[Literal["NPol, NDim"], npt.UInt32] "Number of clipped input samples (maximum level) for each polarisation, dimension, averaged over all channels, except those flagged for RFI." # noqa: E501 spectrogram: npt.NDArray[Literal["NPol, NFreqBin, NTimeBin"], npt.Float32] "Spectrogram of the data for each polarisation, averaged a configurable number of temporal and spectral bins (default ~1000)." # noqa: E501 timeseries: npt.NDArray[Literal["NPol, NTimeBin, 3"], npt.Float32] "Time series of the data for each polarisation, rebinned in time to ntime_bins, averaged over all frequency channels." # noqa: E501 timeseries_rfi_excised: npt.NDArray[Literal["NPol, NTimeBin, 3"], npt.Float32] "Time series of the data for each polarisation, re-binned in time." min_weights: npt.NDArray[Literal["NChan"], npt.Float32] "The minimum of the weights for each channel." max_weights: npt.NDArray[Literal["NChan"], npt.Float32] "The maximum of the weights for each channel." mean_weights: npt.NDArray[Literal["NChan"], npt.Float32] "The mean of the weights for each channel."
# pylint: enable=line-too-long