Source code for ska_pst.common.cbf_pst_config

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.

"""Module used to model the CBF/PST interface."""

from __future__ import annotations

__all__ = [
    "ChannelisationStage",
    "CbfPstConfig",
    "RingBufferConfig",
]

import dataclasses
from dataclasses import field
from typing import Any, List, Tuple

import numpy as np
from ska_pst.common.constants import KILO_HERTZ_PER_MEGA_HERTZ, WEIGHTS_NBITS

DEFAULT_NDIM = 2
"""
The default number of dimension of the data coming from the CBF.

While PST can handle real and complex data, SKA is using only complex which
is ``ndim=2``
"""

DEFAULT_NPOL = 2
"""The default number of polarisations of the data coming from the CBF."""


[docs]@dataclasses.dataclass(kw_only=True, frozen=True) class RingBufferConfig: """ A data class used to store ring buffer (RB) configuration for a given frequency band. An element within the RB is configured to be large enough to capture all the channels of the subband for ``packets_per_buffer`` number of packets. """ num_buffers: int """Number of buffers withing a ring buffer.""" packets_per_buffer: int """The number of UDP packets per buffer that a channel needs be recorded for."""
[docs]@dataclasses.dataclass(kw_only=True, frozen=True) class ChannelisationStage: """A data class used to store the configuration for a CBF channelisation stage.""" num_frequency_channels: int """Number of channels used in the channelisation stage.""" oversampling_ratio: Tuple[int, int] | List[int] """ The oversampling ratio for the channelisation stage. This is represented as a tuple like ``(4, 3)`` or a list with only 2 elements. """ def __post_init__(self: ChannelisationStage) -> None: """Perform post init of data class.""" if isinstance(self.oversampling_ratio, list): assert len(self.oversampling_ratio) == 2, ( "expected oversampling ratio to have a length of exactly 2 " f"but was {len(self.oversampling_ratio)}" )
[docs]@dataclasses.dataclass(kw_only=True, frozen=True) class CbfPstConfig: """ A data class used to represent the CBF / PST frequency band constants. This class also includes utility methods that can be used to calculate values like ``nchan``, and ``bytes_per_sec`` given other values. """ udp_format: str """The UDP format for the given frequency band.""" tsamp: float """The time per sample, in microseconds.""" udp_nchan: int """The number of channels in an UDP packet.""" udp_nsamp: int """ The number of samples per channel in an UDP packet. For each of the ``udp_nchan`` channels in an UDP packet there are this many samples. """ wt_nsamp: int """ The number of samples per weight per channel in an UDP packet. This value should be the same as ``udp_nsamp`` """ nbit: int """ The number of bits per sample. This value is the number of bits used for a value within a given dimension. For complex valued data the total number of bits used for the complex value is ``2 * nbit``. """ oversampling_ratio: Tuple[int, int] | List[int] """ The oversampling ratio for the frequency band. This is represented as a tuple like ``(4, 3)`` or a list with only 2 elements. """ max_nchan: int """The maximum number of channels for the frequency band.""" rf_freq_minimum_mhz: float """The bottom end of the frequency band, in MHz.""" rf_freq_maximum_mhz: float """The top end of the frequency band, in MHz.""" rf_bw_mhz: float """The bandwidth of the band, in MHz.""" chan_separation_khz: float """The frequency separation between the start and end of a channel, in kHz.""" channel_0_start_freq_mhz: float """The effective start frequency of channel 0, in MHz.""" ring_buffer_config: RingBufferConfig """Configuration specific for setting up ring buffers.""" channelisation_stages: List[ChannelisationStage] = field(default_factory=list) """The configuration used in the channelisation of the RF signal by the CBF.""" ndim: int = DEFAULT_NDIM """ The number of dimensions of the input data. If the value is 1, then the input data is real valued. If the value is 2 then the input data is complex valued. The default is 2 (i.e. complex valued data). """ npol: int = DEFAULT_NPOL """ The number of polarisations of the input data. The default is 2. """ wt_nbit: int = WEIGHTS_NBITS """ The number of bits per weight. The default is 16. """ def __post_init__(self: CbfPstConfig) -> None: """Perform post init of data class.""" assert self.ndim in {1, 2}, "expected ndim to be 1 or 2" if isinstance(self.oversampling_ratio, list): assert len(self.oversampling_ratio) == 2, ( "expected oversampling ratio to have a length of exactly 2 " f"but was {len(self.oversampling_ratio)}" ) @property def num_channelisation_stages(self: CbfPstConfig) -> int: """Get the number of stages of channelisation for the frequency band.""" return len(self.channelisation_stages) @property def min_valid_chan(self: CbfPstConfig) -> int: """Get the minimum valid channel number for the frequency band.""" return int( np.round((self.rf_freq_minimum_mhz - self.channel_0_start_freq_mhz) / (self.chan_separation_mhz)) ) @property def max_valid_chan(self: CbfPstConfig) -> int: """Get the maximum valid channel number for the frequency band.""" return int( np.round((self.rf_freq_maximum_mhz - self.channel_0_start_freq_mhz) / (self.chan_separation_mhz)) ) @property def chan_separation_mhz(self: CbfPstConfig) -> float: """ Get the channel frequency separation, in MHz. This is a derived property that converts the ``chan_separation_khz``, which is in kHz, to MHz. :return: the channel frequency separation, in MHz. :rtype: float """ return self.chan_separation_khz / KILO_HERTZ_PER_MEGA_HERTZ
[docs] def apply_nchan_limits(self: CbfPstConfig, nchan: int, min_nchan: int | None = None) -> int: """Apply limits to the `nchan` value. This method ensures that the following conditions: * `self.udp_nchan` <= `nchan` <= `self.max_nchan` * `nchan % self.udp_nchan == 0` If `min_nchan` is set, then the lower bound of `nchan` is `min_nchan` but the output value is still a multiple of `self.udp_nchan` :param nchan: the proposed NCHAN value that needs to have limits applied. :type nchan: int :param min_nchan: the requested minimum number of channels, defaults to None. If not set then `self.udp_nchan` is used. :type min_nchan: int | None, optional :return: a value that meets the channel limits conditions. :rtype: int """ min_nchan = min_nchan or self.udp_nchan if min_nchan % self.udp_nchan != 0: # round min_nchan up to the next multiple of udp_nchan min_nchan += self.udp_nchan - (min_nchan % self.udp_nchan) max_nchan = self.max_nchan max_nchan -= max_nchan % self.udp_nchan nchan -= nchan % self.udp_nchan # min_nchan, max_nchan, and nchan are now all multiples of self.udp_nchan nchan = max(min_nchan, min(nchan, max_nchan)) return nchan
[docs] def nchan_for_data_rate(self: CbfPstConfig, data_rate: float) -> int: """Calculate the number of channels given a input data rate. The calculation of the input bytes per seconds is given by: .. code-block:: python data_rate = nchan * npol * nbit * ndim / 8 / (tsamp / 1e6) However, this method ensures the output nchan is not greater than the ``max_nchan`` value but is also a multiple of ``udp_nchan``. :param data_rate: the requested input data rate, in bytes per second. :type data_rate: float :return: the number of channels needed to get to a data rate close to ``data_rate`` :rtype: int """ nchan = int(np.around(data_rate / self.npol / self.nbit / self.ndim * 8 / 1_000_000 * self.tsamp)) return self.apply_nchan_limits(nchan)
[docs] def nchan_for_bandwidth(self: CbfPstConfig, bandwidth_mhz: float, **kwargs: Any) -> int: """Get the number of channels closest to the given bandwidth. This determines the number of channels the given bandwidth, in MHz, would be equivalent to given the ``tsamp`` and ``oversampling_ratio``. The output number of channels is given as a multiple of ``udp_nchan``. :param bandwidth_mhz: the desired bandwidth, in MHz. :type bandwidth_mhz: float :return: the closest number of channels, as a multiple of ``udp_nchan`` that span the given bandwidth. :rtype: int """ nchan_est = self.tsamp * bandwidth_mhz * self.oversampling_ratio[0] / self.oversampling_ratio[1] nchan = int(np.round(nchan_est / self.udp_nchan)) * self.udp_nchan return self.apply_nchan_limits(nchan)
[docs] def data_rate(self: CbfPstConfig, nchan: int | None = None) -> float: """Get the data rate, in bytes/second, for the given number of channels. If ``nchan == None`` then this value will return the maximum data rate based on the ``max_nchan`` property. :param nchan: the number of channels, defaults to the ``max_nchan`` value of the configuration. :type nchan: int | None, optional :return: the calculated number of bytes per second. :rtype: float """ nchan = nchan or self.max_nchan return nchan * self.npol * self.nbit * self.ndim / 8 * 1_000_000 / self.tsamp
[docs] def bandwidth_mhz(self: CbfPstConfig, nchan: int | None = None) -> float: """Get the bandwidth given the number of channels, in MHz, rounded to the nearest hertz. If ``nchan == None`` then this value will return the maximum bandwidth based on the ``max_nchan`` property. :param nchan: the number of channels, defaults to the ``max_nchan`` value of the configuration. :type nchan: int | None, optional :return: the calculated bandwidth, in MHz, given the configuration. :rtype: float """ nchan = nchan or self.max_nchan bw = nchan / (self.tsamp * self.oversampling_ratio[0] / self.oversampling_ratio[1]) return np.around(bw, 6)
[docs] def centre_freq_mhz(self: CbfPstConfig, start_chan: int = 0, nchan: int | None = None) -> float: """Get the centre frequency, in MHz, based on the start channel and the number of channels. The centre frequency is rounded to the nearest hertz. :param start_chan: the start channel, defaults to 0 :type start_chan: int, optional :param nchan: the number of channels, defaults to the ``max_nchan`` value of the configuration. :type nchan: int | None, optional :return: the calculated centre frequency given the channel separation, bottom frequency, starting channel, and number of channels. :rtype: float """ nchan = nchan or self.max_nchan freq_mhz = self.channel_0_start_freq_mhz + (start_chan + nchan / 2) * self.chan_separation_mhz return np.around(freq_mhz, 6)
[docs] def channel_centre_freq_mhz(self: CbfPstConfig, chan: int = 0) -> float: """ Get the channel centre frequency, in MHz. :param chan: the PST fine channel to get the centre frequency for. :type nchan: int :return: the calculated centre frequency for the given channel. :rtype: float """ return self.centre_freq_mhz(start_chan=chan, nchan=1)
[docs] def calculate_channel_range( self: CbfPstConfig, bandwidth_mhz: float, centre_freq_mhz: float ) -> Tuple[int, int]: """Calculate the start and end channel given a centre frequency and bandwidth. The total number of channels is calculated from the bandwidth, tsamp and oversampling ratio of the frequency band configuration. The start channel will be a multiple of the number of channels in a UDP packet, relative to the minimum valid channel. :param bandwidth_mhz: the required bandwidth of the RF signal, in MHz. :type bandwidth_mhz: float :param centre_freq_mhz: the centre frequency of the RF signal, in MHz. :type centre_freq_mhz: float :return: a tuple representing the range of the channels, inclusive of the start channel but exclusive of the end channel: [start_chan, end_chan) :rtype: Tuple[int, int] """ nchan = min(self.nchan_for_bandwidth(bandwidth_mhz), self.max_nchan) centre_freq_offset_mhz = centre_freq_mhz - self.channel_0_start_freq_mhz centre_freq_chan = centre_freq_offset_mhz / (self.chan_separation_mhz) start_chan_est = int(np.round(centre_freq_chan - nchan / 2)) start_chan_offset = (start_chan_est - self.min_valid_chan) % self.udp_nchan if start_chan_offset > 0: start_chan = start_chan_est - start_chan_offset else: start_chan = start_chan_est start_chan = min(max(self.min_valid_chan, start_chan), self.max_valid_chan) end_chan = min(start_chan + nchan, self.max_valid_chan + 1) return (start_chan, end_chan)
[docs] def pst_channel_number_for_freq( self, freq_mhz: float, centre_freq_mhz: float, bandwidth_mhz: float, strict: bool = False ) -> int: """ Get the PST channel number for the given frequency. The PST signal processing pipelines count the PST channels based on the input RF signal, with channel 0 being ``centre_freq_mhz - bandwidth_mhz / 2`` with a total number of channels (``nchan``) being ``bandwidth_mhz / chan_separation_mhz``. This method is used to find the channel number within the RF signal that has a given frequency in MHz. If the frequency is outside of the given RF signal range then the channel number is clipped to be in the exclusive range of ``[0, nchan)``. :param freq_mhz: the frequency to get the PST channel number for in MHz :type freq_mhz: float :param centre_freq_mhz: the centre frequency of the RF signal, in MHz. :type centre_freq_mhz: float :param bandwidth_mhz: the required bandwidth of the RF signal, in MHz. :type bandwidth_mhz: float :param strict: test that the channel number resides within the band, default False :type strict: bool :type bandwidth_mhz: float :return: the PST channel number :rtype: int """ nchan = self.nchan_for_bandwidth(bandwidth_mhz=bandwidth_mhz) start_freq_mhz = centre_freq_mhz - bandwidth_mhz / 2 freq_offset_mhz = freq_mhz - start_freq_mhz chan_num = int(np.round(freq_offset_mhz / self.chan_separation_mhz)) if strict: if not 0 <= chan_num <= nchan - 1: raise ValueError( f"frequency {freq_mhz} MHz converted to channel {chan_num}: outside valid " f"range [0 - {nchan-1}] given {start_freq_mhz=}" ) else: return np.clip(chan_num, 0, nchan - 1) return chan_num