Source code for ska_pst.common.cbf_pst_config

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module used to model the CBF/PST interface."""

from __future__ import annotations

__all__ = [
    "ChannelisationStage",
    "CbfPstConfig",
    "RingBufferConfig",
]

import dataclasses
from dataclasses import field
from typing import Any, Tuple

import numpy as np
from ska_pst.common.constants import KILO_HERTZ_PER_HERTZ, KILO_HERTZ_PER_MEGA_HERTZ, WEIGHTS_NBITS

DEFAULT_NDIM = 2
"""
The default number of dimension of the data coming from the CBF.

While PST can handle real and complex data, SKA is using only complex which
is ``ndim=2``
"""

DEFAULT_NPOL = 2
"""The default number of polarisations of the data coming from the CBF."""


[docs]@dataclasses.dataclass(kw_only=True, frozen=True) class RingBufferConfig: """ A data class used to store ring buffer (RB) configuration for a given frequency band. An element within the RB is configured to be large enough to capture all the channels of the subband for ``packets_per_buffer`` number of packets. """ num_buffers: int """Number of buffers withing a ring buffer.""" packets_per_buffer: int """The number of UDP packets per buffer that a channel needs be recorded for."""
[docs]@dataclasses.dataclass(kw_only=True, frozen=True) class ChannelisationStage: """A data class used to store the configuration for a CBF channelisation stage.""" num_frequency_channels: int """Number of channels used in the channelisation stage.""" oversampling_ratio: Tuple[int, int] | list[int] """ The oversampling ratio for the channelisation stage. This is represented as a tuple like ``(4, 3)`` or a list with only 2 elements. """ def __post_init__(self: ChannelisationStage) -> None: """Perform post init of data class.""" if isinstance(self.oversampling_ratio, list): assert len(self.oversampling_ratio) == 2, ( "expected oversampling ratio to have a length of exactly 2 " f"but was {len(self.oversampling_ratio)}" )
@dataclasses.dataclass(kw_only=True, frozen=True) class SubbandConstraint: """A data class used to store constraints on valid sub-band configurations imposed by the CBF.""" max_nchan: int """The maximum number of channels for the sub-band constraint.""" channel_0_start_freq_mhz: float """The effective start frequency in MHz of channel 0 for the sub-band constraint.""" rf_freq_minimum_mhz: float """The minimum RF frequency in MHz for the sub-band constraint.""" rf_freq_maximum_mhz: float """The maximum RF frequency in MHz for the sub-band constraint.""" def matches( self: SubbandConstraint, centre_freq_mhz: float, bandwidth_mhz: float, min_width_mhz: float ) -> bool: """ Return true if the specified band spans this sub-band constraint. :param centre_freq_mhz: the centre frequency of the whole band to be tested in MHz :type centre_freq_mhz: float :param bandwidth_mhz: the bandwidth of the whole band to be tested in MHz :type bandwidth_mhz: float :param min_width_mhz: the minimum width of the sub-band that must be within the whole band to match :type min_width_mhz: float :return: true if the sub-band constraint resides within the specified band :rtype: bool """ freq_min = centre_freq_mhz - bandwidth_mhz / 2 freq_max = centre_freq_mhz + bandwidth_mhz / 2 constraint_min = self.rf_freq_minimum_mhz + min_width_mhz constraint_max = self.rf_freq_maximum_mhz - min_width_mhz return freq_min <= constraint_max and freq_max >= constraint_min @dataclasses.dataclass(kw_only=True, frozen=True) class SubbandWorkload: """A data class used to describe the channel workload of a sub-band.""" subband_id: int """The sub-band identifier for this workload, 1-based.""" start_channel: int """The first channel number of the sub-band workload.""" end_channel: int """The (exclusive) channel that marks the end of the sub-band workload.""" centre_freq_mhz: float """The centre frequency of the sub-band workload in MHz.""" bandwidth_mhz: float """The bandwidth of the sub-band workload in MHz.""" start_channel_centre_freq_mhz: float """The centre frequency of the first channel in the sub-band workload in MHz."""
[docs]@dataclasses.dataclass(kw_only=True, frozen=True) class CbfPstConfig: """ A data class used to represent the CBF / PST frequency band constants. This class also includes utility methods that can be used to calculate values like ``nchan``, and ``bytes_per_sec`` given other values. """ udp_format: str """The UDP format for the given frequency band.""" tsamp: float """The time per sample, in microseconds.""" udp_nchan: int """The number of channels in an UDP packet.""" udp_nsamp: int """ The number of samples per channel in an UDP packet. For each of the ``udp_nchan`` channels in an UDP packet there are this many samples. """ wt_nsamp: int """ The number of samples per weight per channel in an UDP packet. This value should be the same as ``udp_nsamp`` """ nbit: int """ The number of bits per sample. This value is the number of bits used for a value within a given dimension. For complex valued data the total number of bits used for the complex value is ``2 * nbit``. """ oversampling_ratio: Tuple[int, int] | list[int] """ The oversampling ratio for the frequency band. This is represented as a tuple like ``(4, 3)`` or a list with only 2 elements. """ max_nchan: int """The maximum number of channels for the frequency band.""" rf_freq_minimum_mhz: float """The bottom end of the frequency band, in MHz.""" rf_freq_maximum_mhz: float """The top end of the frequency band, in MHz.""" rf_bw_mhz: float """The bandwidth of the band, in MHz.""" chan_separation_khz: float """The frequency separation between the start and end of a channel, in kHz.""" channel_0_start_freq_mhz: float """The effective start frequency of channel 0, in MHz.""" ring_buffer_config: RingBufferConfig """Configuration specific for setting up ring buffers.""" channelisation_stages: list[ChannelisationStage] = field(default_factory=list) """The configuration used in the channelisation of the RF signal by the CBF.""" subband_constraints: dict[int, SubbandConstraint] = field(default_factory=dict) """The constraints on sub-band frequency ranges produced by the CBF.""" ndim: int = DEFAULT_NDIM """ The number of dimensions of the input data. If the value is 1, then the input data is real valued. If the value is 2 then the input data is complex valued. The default is 2 (i.e. complex valued data). """ npol: int = DEFAULT_NPOL """ The number of polarisations of the input data. The default is 2. """ wt_nbit: int = WEIGHTS_NBITS """ The number of bits per weight. The default is 16. """ def __post_init__(self: CbfPstConfig) -> None: """Perform post init of data class.""" assert self.ndim in {1, 2}, "expected ndim to be 1 or 2" if isinstance(self.oversampling_ratio, list): assert len(self.oversampling_ratio) == 2, ( "expected oversampling ratio to have a length of exactly 2 " f"but was {len(self.oversampling_ratio)}" ) @property def critical_sample_tsamp(self: CbfPstConfig) -> float: """ Get the TSAMP for critical sampled data. This is equivalent to ``tsamp * oversampling_ratio``. """ os_factor_nom, os_factor_denom = self.oversampling_ratio return self.tsamp * os_factor_nom / os_factor_denom @property def num_channelisation_stages(self: CbfPstConfig) -> int: """Get the number of stages of channelisation for the frequency band.""" return len(self.channelisation_stages)
[docs] def min_valid_chan(self: CbfPstConfig, subband: int | None = None) -> int: """Get the minimum valid channel number for the frequency band. :param subband: the subband constraint to apply to the max valid channel number, default None :type subband: int | None """ if subband is None: rf_freq_min = self.rf_freq_minimum_mhz channel_0_start_freq = self.channel_0_start_freq_mhz else: rf_freq_min = self.subband_constraints[subband].rf_freq_minimum_mhz channel_0_start_freq = self.subband_constraints[subband].channel_0_start_freq_mhz return int(np.round((rf_freq_min - channel_0_start_freq) / (self.chan_separation_mhz)))
[docs] def max_valid_chan(self: CbfPstConfig, subband: int | None = None) -> int: """Get the maximum valid channel number for the frequency band. :param subband: the subband constraint to apply to the max valid channel number, default None :type subband: int | None """ if subband is None: rf_freq_max = self.rf_freq_maximum_mhz channel_0_start_freq = self.channel_0_start_freq_mhz else: rf_freq_max = self.subband_constraints[subband].rf_freq_maximum_mhz channel_0_start_freq = self.subband_constraints[subband].channel_0_start_freq_mhz return int(np.round((rf_freq_max - channel_0_start_freq) / self.chan_separation_mhz))
@property def chan_separation_mhz(self: CbfPstConfig) -> float: """ Get the channel frequency separation, in MHz. This is a derived property that converts the ``chan_separation_khz``, which is in kHz, to MHz. :return: the channel frequency separation, in MHz. :rtype: float """ return self.chan_separation_khz / KILO_HERTZ_PER_MEGA_HERTZ @property def chan_separation_hz(self: CbfPstConfig) -> float: """ Get the channel frequency separation, in Hz. This is a derived property that converts the ``chan_separation_khz``, which is in kHz, to Hz. :return: the channel frequency separation, in Hz. :rtype: float """ return self.chan_separation_khz * KILO_HERTZ_PER_HERTZ
[docs] def apply_nchan_limits(self: CbfPstConfig, nchan: int, min_nchan: int | None = None) -> int: """Apply limits to the `nchan` value. This method ensures that the following conditions: * `self.udp_nchan` <= `nchan` <= `self.max_nchan` * `nchan % self.udp_nchan == 0` If `min_nchan` is set, then the lower bound of `nchan` is `min_nchan` but the output value is still a multiple of `self.udp_nchan` :param nchan: the proposed NCHAN value that needs to have limits applied. :type nchan: int :param min_nchan: the requested minimum number of channels, defaults to None. If not set then `self.udp_nchan` is used. :type min_nchan: int | None, optional :return: a value that meets the channel limits conditions. :rtype: int """ min_nchan = min_nchan or self.udp_nchan if min_nchan % self.udp_nchan != 0: # round min_nchan up to the next multiple of udp_nchan min_nchan += self.udp_nchan - (min_nchan % self.udp_nchan) max_nchan = self.max_nchan max_nchan -= max_nchan % self.udp_nchan nchan -= nchan % self.udp_nchan # min_nchan, max_nchan, and nchan are now all multiples of self.udp_nchan nchan = max(min_nchan, min(nchan, max_nchan)) return nchan
[docs] def nchan_for_data_rate(self: CbfPstConfig, data_rate: float) -> int: """Calculate the number of channels given a input data rate. The calculation of the input bytes per seconds is given by: .. code-block:: python data_rate = nchan * npol * nbit * ndim / 8 / (tsamp / 1e6) However, this method ensures the output nchan is not greater than the ``max_nchan`` value but is also a multiple of ``udp_nchan``. :param data_rate: the requested input data rate, in bytes per second. :type data_rate: float :return: the number of channels needed to get to a data rate close to ``data_rate`` :rtype: int """ nchan = int(np.around(data_rate / self.npol / self.nbit / self.ndim * 8 / 1_000_000 * self.tsamp)) return self.apply_nchan_limits(nchan)
[docs] def nchan_for_bandwidth(self: CbfPstConfig, bandwidth_mhz: float, **kwargs: Any) -> int: """Get the number of channels closest to the given bandwidth. This determines the number of channels the given bandwidth, in MHz, would be equivalent to given the ``tsamp`` and ``oversampling_ratio``. The output number of channels is given as a multiple of ``udp_nchan``. :param bandwidth_mhz: the desired bandwidth, in MHz. :type bandwidth_mhz: float :return: the closest number of channels, as a multiple of ``udp_nchan`` that span the given bandwidth. :rtype: int """ nchan_est = self.tsamp * bandwidth_mhz * self.oversampling_ratio[0] / self.oversampling_ratio[1] nchan = int(np.round(nchan_est / self.udp_nchan)) * self.udp_nchan return self.apply_nchan_limits(nchan)
[docs] def data_rate(self: CbfPstConfig, nchan: int | None = None) -> float: """Get the data rate, in bytes/second, for the given number of channels. If ``nchan == None`` then this value will return the maximum data rate based on the ``max_nchan`` property. :param nchan: the number of channels, defaults to the ``max_nchan`` value of the configuration. :type nchan: int | None, optional :return: the calculated number of bytes per second. :rtype: float """ nchan = nchan or self.max_nchan return nchan * self.npol * self.nbit * self.ndim / 8 * 1_000_000 / self.tsamp
[docs] def bandwidth_mhz(self: CbfPstConfig, nchan: int | None = None) -> float: """Get the bandwidth given the number of channels, in MHz, rounded to the nearest hertz. If ``nchan == None`` then this value will return the maximum bandwidth based on the ``max_nchan`` property. :param nchan: the number of channels, defaults to the ``max_nchan`` value of the configuration. :type nchan: int | None, optional :return: the calculated bandwidth, in MHz, given the configuration. :rtype: float """ nchan = nchan or self.max_nchan bw = nchan / (self.tsamp * self.oversampling_ratio[0] / self.oversampling_ratio[1]) return np.around(bw, 6)
[docs] def centre_freq_mhz( self: CbfPstConfig, start_chan: int = 0, nchan: int | None = None, subband: int | None = None ) -> float: """Get the centre frequency, in MHz, based on the start channel and the number of channels. The centre frequency is rounded to the nearest hertz. :param start_chan: the start channel, defaults to 0 :type start_chan: int, optional :param nchan: the number of channels, defaults to the ``max_nchan`` value of the configuration. :type nchan: int | None, optional :param subband: subband constraint to use when computing the channel centre frequency :type subband: int | None :return: the calculated centre frequency given the channel separation, bottom frequency, starting channel, and number of channels. :rtype: float """ nchan = nchan or self.max_nchan channel_0_start_freq_mhz = self.channel_0_start_freq_mhz if subband and subband not in self.subband_constraints: raise ValueError(f"{subband=} was not in the constraints={self.subband_constraints.keys()}") if subband in self.subband_constraints: channel_0_start_freq_mhz = self.subband_constraints[subband].channel_0_start_freq_mhz start_freq_mhz = channel_0_start_freq_mhz + (start_chan * self.chan_separation_mhz) nchan_bw_mhz = nchan * self.chan_separation_mhz freq_mhz = start_freq_mhz + nchan_bw_mhz / 2 return np.around(freq_mhz, 6)
[docs] def channel_centre_freq_mhz(self: CbfPstConfig, chan: int = 0, subband: int | None = None) -> float: """ Get the channel centre frequency, in MHz. :param chan: the PST fine channel to get the centre frequency for. :type chan: int, optional :param subband: subband constraint to use when computing the channel centre frequency :type subband: int | None :return: the calculated centre frequency for the given channel. :rtype: float """ return self.centre_freq_mhz(start_chan=chan, nchan=1, subband=subband)
[docs] def calculate_channel_range( self: CbfPstConfig, bandwidth_mhz: float, centre_freq_mhz: float, **kwargs: Any ) -> SubbandWorkload: """Calculate the start and end channel given a centre frequency and bandwidth. The total number of channels is calculated from the bandwidth, tsamp and oversampling ratio of the frequency band configuration. The start channel will be a multiple of the number of channels in a UDP packet, relative to the minimum valid channel. :param bandwidth_mhz: the required bandwidth of the RF signal, in MHz. :type bandwidth_mhz: float :param centre_freq_mhz: the centre frequency of the RF signal, in MHz. :type centre_freq_mhz: float :return: a tuple representing the range of the channels, inclusive of the start channel but exclusive of the end channel: [start_chan, end_chan) :rtype: SubbandWorkload """ nchan = min(self.nchan_for_bandwidth(bandwidth_mhz), self.max_nchan) centre_freq_offset_mhz = centre_freq_mhz - self.channel_0_start_freq_mhz min_freq_offset_mhz = centre_freq_offset_mhz - bandwidth_mhz / 2 start_chan_est = int(np.round(min_freq_offset_mhz / self.chan_separation_mhz)) start_chan_offset = (start_chan_est - self.min_valid_chan()) % self.udp_nchan if start_chan_offset > 0: start_chan = start_chan_est - start_chan_offset else: start_chan = start_chan_est start_chan = min(max(self.min_valid_chan(), start_chan), self.max_valid_chan()) end_chan = min(start_chan + nchan, self.max_valid_chan() + 1) refined_centre_freq_mhz = self.centre_freq_mhz(start_chan, nchan) refined_bandwidth_mhz = self.bandwidth_mhz(nchan) start_channel_centre_freq_mhz = self.channel_centre_freq_mhz(chan=start_chan) return SubbandWorkload( subband_id=1, start_channel=start_chan, end_channel=end_chan, centre_freq_mhz=refined_centre_freq_mhz, bandwidth_mhz=refined_bandwidth_mhz, start_channel_centre_freq_mhz=start_channel_centre_freq_mhz, )
[docs] def calculate_channel_ranges( self: CbfPstConfig, bandwidth_mhz: float, centre_freq_mhz: float, **kwargs: Any ) -> list[SubbandWorkload]: """ Calculate the sub-banded list of start/end channels for given a centre frequency and bandwidth. For SKA Low, a single sub-band is always returned For SKA Mid, in AA0.5 or AA1, each 198 MHz frequency slice has different channel offsets The total number of channels is calculated from the bandwidth, tsamp and oversampling ratio of the frequency band configuration. The start channel will be a multiple of the number of channels in a UDP packet, relative to the minimum valid channel. :param bandwidth_mhz: the required bandwidth of the RF signal, in MHz. :type bandwidth_mhz: float :param centre_freq_mhz: the centre frequency of the RF signal, in MHz. :type centre_freq_mhz: float :return: a list of SubbandWorkloads representing the channel range and RF frequencies for the sub-band :rtype: list[SubbandWorkload] """ if len(self.subband_constraints) <= 1: return [self.calculate_channel_range(bandwidth_mhz, centre_freq_mhz, **kwargs)] freq_minimum_mhz = centre_freq_mhz - bandwidth_mhz / 2 freq_maximum_mhz = centre_freq_mhz + bandwidth_mhz / 2 min_width_mhz = self.chan_separation_mhz * self.udp_nchan / 2 # for each constrained sub-band determine if it is required channel_ranges = [] workload_id = 1 for subband, constraint in self.subband_constraints.items(): # if the constraint spans some of the requested RF band if constraint.matches(centre_freq_mhz, bandwidth_mhz, min_width_mhz): constrained_freq_min_mhz = max(freq_minimum_mhz, constraint.rf_freq_minimum_mhz) constrained_freq_max_mhz = min(freq_maximum_mhz, constraint.rf_freq_maximum_mhz) constrained_bandwidth_mhz = constrained_freq_max_mhz - constrained_freq_min_mhz # determine the channel limits for the constrained sub-band nchan = min(self.nchan_for_bandwidth(constrained_bandwidth_mhz), constraint.max_nchan) # the frequency offset from channel zero's start freq freq_min_offset_mhz = constrained_freq_min_mhz - constraint.channel_0_start_freq_mhz start_chan_est = int(np.round(freq_min_offset_mhz / self.chan_separation_mhz)) start_chan_offset = (start_chan_est - self.min_valid_chan(subband)) % self.udp_nchan if start_chan_offset > 0: start_chan = start_chan_est - start_chan_offset else: start_chan = start_chan_est start_chan = min(max(self.min_valid_chan(subband), start_chan), self.max_valid_chan(subband)) end_chan = min(start_chan + nchan, self.max_valid_chan(subband) + 1) refined_centre_freq_mhz = self.centre_freq_mhz(start_chan, nchan, subband=subband) refined_bandwidth_mhz = self.bandwidth_mhz(nchan) start_channel_centre_freq_mhz = self.channel_centre_freq_mhz(chan=start_chan, subband=subband) channel_ranges.append( SubbandWorkload( subband_id=workload_id, start_channel=start_chan, end_channel=end_chan, centre_freq_mhz=refined_centre_freq_mhz, bandwidth_mhz=refined_bandwidth_mhz, start_channel_centre_freq_mhz=start_channel_centre_freq_mhz, ) ) workload_id += 1 return channel_ranges
[docs] def pst_channel_number_for_freq( self, freq_mhz: float, centre_freq_mhz: float, bandwidth_mhz: float, strict: bool = False ) -> int: """ Get the PST channel number for the given frequency. The PST signal processing pipelines count the PST channels based on the input RF signal, with channel 0 being ``centre_freq_mhz - bandwidth_mhz / 2`` with a total number of channels (``nchan``) being ``bandwidth_mhz / chan_separation_mhz``. This method is used to find the channel number within the RF signal that has a given frequency in MHz. If the frequency is outside of the given RF signal range then the channel number is clipped to be in the exclusive range of ``[0, nchan)``. :param freq_mhz: the frequency to get the PST channel number for in MHz :type freq_mhz: float :param centre_freq_mhz: the centre frequency of the RF signal, in MHz. :type centre_freq_mhz: float :param bandwidth_mhz: the required bandwidth of the RF signal, in MHz. :type bandwidth_mhz: float :param strict: test that the channel number resides within the band, default False :type strict: bool :type bandwidth_mhz: float :return: the PST channel number :rtype: int """ nchan = self.nchan_for_bandwidth(bandwidth_mhz=bandwidth_mhz) start_freq_mhz = centre_freq_mhz - bandwidth_mhz / 2 freq_offset_mhz = freq_mhz - start_freq_mhz chan_num = int(np.round(freq_offset_mhz / self.chan_separation_mhz)) if strict: if not 0 <= chan_num <= nchan - 1: raise ValueError( f"frequency {freq_mhz} MHz converted to channel {chan_num}: outside valid " f"range [0 - {nchan - 1}] given {start_freq_mhz=}" ) else: return np.clip(chan_num, 0, nchan - 1) return chan_num