# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module used to model the CBF/PST interface."""
from __future__ import annotations
__all__ = [
"ChannelisationStage",
"CbfPstConfig",
"RingBufferConfig",
]
import dataclasses
from dataclasses import field
from typing import Any, List, Tuple
import numpy as np
from ska_pst.common.constants import KILO_HERTZ_PER_MEGA_HERTZ, WEIGHTS_NBITS
DEFAULT_NDIM = 2
"""
The default number of dimension of the data coming from the CBF.
While PST can handle real and complex data, SKA is using only complex which
is ``ndim=2``
"""
DEFAULT_NPOL = 2
"""The default number of polarisations of the data coming from the CBF."""
[docs]@dataclasses.dataclass(kw_only=True, frozen=True)
class RingBufferConfig:
"""
A data class used to store ring buffer (RB) configuration for a given frequency band.
An element within the RB is configured to be large enough to capture all the
channels of the subband for ``packets_per_buffer`` number of packets.
"""
num_buffers: int
"""Number of buffers withing a ring buffer."""
packets_per_buffer: int
"""The number of UDP packets per buffer that a channel needs be recorded for."""
[docs]@dataclasses.dataclass(kw_only=True, frozen=True)
class ChannelisationStage:
"""A data class used to store the configuration for a CBF channelisation stage."""
num_frequency_channels: int
"""Number of channels used in the channelisation stage."""
oversampling_ratio: Tuple[int, int] | List[int]
"""
The oversampling ratio for the channelisation stage.
This is represented as a tuple like ``(4, 3)`` or a list with only 2 elements.
"""
def __post_init__(self: ChannelisationStage) -> None:
"""Perform post init of data class."""
if isinstance(self.oversampling_ratio, list):
assert len(self.oversampling_ratio) == 2, (
"expected oversampling ratio to have a length of exactly 2 "
f"but was {len(self.oversampling_ratio)}"
)
[docs]@dataclasses.dataclass(kw_only=True, frozen=True)
class CbfPstConfig:
"""
A data class used to represent the CBF / PST frequency band constants.
This class also includes utility methods that can be used to calculate
values like ``nchan``, and ``bytes_per_sec`` given other values.
"""
udp_format: str
"""The UDP format for the given frequency band."""
tsamp: float
"""The time per sample, in microseconds."""
udp_nchan: int
"""The number of channels in an UDP packet."""
udp_nsamp: int
"""
The number of samples per channel in an UDP packet.
For each of the ``udp_nchan`` channels in an UDP packet there are
this many samples.
"""
wt_nsamp: int
"""
The number of samples per weight per channel in an UDP packet.
This value should be the same as ``udp_nsamp``
"""
nbit: int
"""
The number of bits per sample.
This value is the number of bits used for a value within a given dimension. For
complex valued data the total number of bits used for the complex value is
``2 * nbit``.
"""
oversampling_ratio: Tuple[int, int] | List[int]
"""
The oversampling ratio for the frequency band.
This is represented as a tuple like ``(4, 3)`` or a list with only 2 elements.
"""
max_nchan: int
"""The maximum number of channels for the frequency band."""
rf_freq_minimum_mhz: float
"""The bottom end of the frequency band, in MHz."""
rf_freq_maximum_mhz: float
"""The top end of the frequency band, in MHz."""
rf_bw_mhz: float
"""The bandwidth of the band, in MHz."""
chan_separation_khz: float
"""The frequency separation between the start and end of a channel, in kHz."""
channel_0_start_freq_mhz: float
"""The effective start frequency of channel 0, in MHz."""
ring_buffer_config: RingBufferConfig
"""Configuration specific for setting up ring buffers."""
channelisation_stages: List[ChannelisationStage] = field(default_factory=list)
"""The configuration used in the channelisation of the RF signal by the CBF."""
ndim: int = DEFAULT_NDIM
"""
The number of dimensions of the input data.
If the value is 1, then the input data is real valued. If the value is 2 then the
input data is complex valued. The default is 2 (i.e. complex valued data).
"""
npol: int = DEFAULT_NPOL
"""
The number of polarisations of the input data.
The default is 2.
"""
wt_nbit: int = WEIGHTS_NBITS
"""
The number of bits per weight.
The default is 16.
"""
def __post_init__(self: CbfPstConfig) -> None:
"""Perform post init of data class."""
assert self.ndim in {1, 2}, "expected ndim to be 1 or 2"
if isinstance(self.oversampling_ratio, list):
assert len(self.oversampling_ratio) == 2, (
"expected oversampling ratio to have a length of exactly 2 "
f"but was {len(self.oversampling_ratio)}"
)
@property
def num_channelisation_stages(self: CbfPstConfig) -> int:
"""Get the number of stages of channelisation for the frequency band."""
return len(self.channelisation_stages)
@property
def min_valid_chan(self: CbfPstConfig) -> int:
"""Get the minimum valid channel number for the frequency band."""
return int(
np.round((self.rf_freq_minimum_mhz - self.channel_0_start_freq_mhz) / (self.chan_separation_mhz))
)
@property
def max_valid_chan(self: CbfPstConfig) -> int:
"""Get the maximum valid channel number for the frequency band."""
return int(
np.round((self.rf_freq_maximum_mhz - self.channel_0_start_freq_mhz) / (self.chan_separation_mhz))
)
@property
def chan_separation_mhz(self: CbfPstConfig) -> float:
"""
Get the channel frequency separation, in MHz.
This is a derived property that converts the ``chan_separation_khz``,
which is in kHz, to MHz.
:return: the channel frequency separation, in MHz.
:rtype: float
"""
return self.chan_separation_khz / KILO_HERTZ_PER_MEGA_HERTZ
[docs] def apply_nchan_limits(self: CbfPstConfig, nchan: int, min_nchan: int | None = None) -> int:
"""Apply limits to the `nchan` value.
This method ensures that the following conditions:
* `self.udp_nchan` <= `nchan` <= `self.max_nchan`
* `nchan % self.udp_nchan == 0`
If `min_nchan` is set, then the lower bound of `nchan` is `min_nchan` but the output value is
still a multiple of `self.udp_nchan`
:param nchan: the proposed NCHAN value that needs to have limits applied.
:type nchan: int
:param min_nchan: the requested minimum number of channels, defaults to None.
If not set then `self.udp_nchan` is used.
:type min_nchan: int | None, optional
:return: a value that meets the channel limits conditions.
:rtype: int
"""
min_nchan = min_nchan or self.udp_nchan
if min_nchan % self.udp_nchan != 0:
# round min_nchan up to the next multiple of udp_nchan
min_nchan += self.udp_nchan - (min_nchan % self.udp_nchan)
max_nchan = self.max_nchan
max_nchan -= max_nchan % self.udp_nchan
nchan -= nchan % self.udp_nchan
# min_nchan, max_nchan, and nchan are now all multiples of self.udp_nchan
nchan = max(min_nchan, min(nchan, max_nchan))
return nchan
[docs] def nchan_for_data_rate(self: CbfPstConfig, data_rate: float) -> int:
"""Calculate the number of channels given a input data rate.
The calculation of the input bytes per seconds is given by:
.. code-block:: python
data_rate = nchan * npol * nbit * ndim / 8 / (tsamp / 1e6)
However, this method ensures the output nchan is not greater than
the ``max_nchan`` value but is also a multiple of ``udp_nchan``.
:param data_rate: the requested input data rate, in bytes per second.
:type data_rate: float
:return: the number of channels needed to get to a data rate close to
``data_rate``
:rtype: int
"""
nchan = int(np.around(data_rate / self.npol / self.nbit / self.ndim * 8 / 1_000_000 * self.tsamp))
return self.apply_nchan_limits(nchan)
[docs] def nchan_for_bandwidth(self: CbfPstConfig, bandwidth_mhz: float, **kwargs: Any) -> int:
"""Get the number of channels closest to the given bandwidth.
This determines the number of channels the given bandwidth, in MHz, would
be equivalent to given the ``tsamp`` and ``oversampling_ratio``.
The output number of channels is given as a multiple of ``udp_nchan``.
:param bandwidth_mhz: the desired bandwidth, in MHz.
:type bandwidth_mhz: float
:return: the closest number of channels, as a multiple of ``udp_nchan`` that
span the given bandwidth.
:rtype: int
"""
nchan_est = self.tsamp * bandwidth_mhz * self.oversampling_ratio[0] / self.oversampling_ratio[1]
nchan = int(np.round(nchan_est / self.udp_nchan)) * self.udp_nchan
return self.apply_nchan_limits(nchan)
[docs] def data_rate(self: CbfPstConfig, nchan: int | None = None) -> float:
"""Get the data rate, in bytes/second, for the given number of channels.
If ``nchan == None`` then this value will return the maximum data rate based on the
``max_nchan`` property.
:param nchan: the number of channels, defaults to the ``max_nchan`` value of the configuration.
:type nchan: int | None, optional
:return: the calculated number of bytes per second.
:rtype: float
"""
nchan = nchan or self.max_nchan
return nchan * self.npol * self.nbit * self.ndim / 8 * 1_000_000 / self.tsamp
[docs] def bandwidth_mhz(self: CbfPstConfig, nchan: int | None = None) -> float:
"""Get the bandwidth given the number of channels, in MHz, rounded to the nearest hertz.
If ``nchan == None`` then this value will return the maximum bandwidth based on the
``max_nchan`` property.
:param nchan: the number of channels, defaults to the ``max_nchan`` value of the configuration.
:type nchan: int | None, optional
:return: the calculated bandwidth, in MHz, given the configuration.
:rtype: float
"""
nchan = nchan or self.max_nchan
bw = nchan / (self.tsamp * self.oversampling_ratio[0] / self.oversampling_ratio[1])
return np.around(bw, 6)
[docs] def centre_freq_mhz(self: CbfPstConfig, start_chan: int = 0, nchan: int | None = None) -> float:
"""Get the centre frequency, in MHz, based on the start channel and the number of channels.
The centre frequency is rounded to the nearest hertz.
:param start_chan: the start channel, defaults to 0
:type start_chan: int, optional
:param nchan: the number of channels, defaults to the ``max_nchan`` value of the configuration.
:type nchan: int | None, optional
:return: the calculated centre frequency given the channel separation, bottom frequency,
starting channel, and number of channels.
:rtype: float
"""
nchan = nchan or self.max_nchan
freq_mhz = self.channel_0_start_freq_mhz + (start_chan + nchan / 2) * self.chan_separation_mhz
return np.around(freq_mhz, 6)
[docs] def channel_centre_freq_mhz(self: CbfPstConfig, chan: int = 0) -> float:
"""
Get the channel centre frequency, in MHz.
:param chan: the PST fine channel to get the centre frequency for.
:type nchan: int
:return: the calculated centre frequency for the given channel.
:rtype: float
"""
return self.centre_freq_mhz(start_chan=chan, nchan=1)
[docs] def calculate_channel_range(
self: CbfPstConfig, bandwidth_mhz: float, centre_freq_mhz: float
) -> Tuple[int, int]:
"""Calculate the start and end channel given a centre frequency and bandwidth.
The total number of channels is calculated from the bandwidth, tsamp and oversampling ratio of
the frequency band configuration.
The start channel will be a multiple of the number of channels in a UDP
packet, relative to the minimum valid channel.
:param bandwidth_mhz: the required bandwidth of the RF signal, in MHz.
:type bandwidth_mhz: float
:param centre_freq_mhz: the centre frequency of the RF signal, in MHz.
:type centre_freq_mhz: float
:return: a tuple representing the range of the channels, inclusive of the start channel
but exclusive of the end channel: [start_chan, end_chan)
:rtype: Tuple[int, int]
"""
nchan = min(self.nchan_for_bandwidth(bandwidth_mhz), self.max_nchan)
centre_freq_offset_mhz = centre_freq_mhz - self.channel_0_start_freq_mhz
centre_freq_chan = centre_freq_offset_mhz / (self.chan_separation_mhz)
start_chan_est = int(np.round(centre_freq_chan - nchan / 2))
start_chan_offset = (start_chan_est - self.min_valid_chan) % self.udp_nchan
if start_chan_offset > 0:
start_chan = start_chan_est - start_chan_offset
else:
start_chan = start_chan_est
start_chan = min(max(self.min_valid_chan, start_chan), self.max_valid_chan)
end_chan = min(start_chan + nchan, self.max_valid_chan + 1)
return (start_chan, end_chan)
[docs] def pst_channel_number_for_freq(
self, freq_mhz: float, centre_freq_mhz: float, bandwidth_mhz: float, strict: bool = False
) -> int:
"""
Get the PST channel number for the given frequency.
The PST signal processing pipelines count the PST channels based on the
input RF signal, with channel 0 being ``centre_freq_mhz - bandwidth_mhz / 2``
with a total number of channels (``nchan``) being ``bandwidth_mhz / chan_separation_mhz``.
This method is used to find the channel number within the RF signal that
has a given frequency in MHz. If the frequency is outside of the given
RF signal range then the channel number is clipped to be in the exclusive
range of ``[0, nchan)``.
:param freq_mhz: the frequency to get the PST channel number for in MHz
:type freq_mhz: float
:param centre_freq_mhz: the centre frequency of the RF signal, in MHz.
:type centre_freq_mhz: float
:param bandwidth_mhz: the required bandwidth of the RF signal, in MHz.
:type bandwidth_mhz: float
:param strict: test that the channel number resides within the band, default False
:type strict: bool
:type bandwidth_mhz: float
:return: the PST channel number
:rtype: int
"""
nchan = self.nchan_for_bandwidth(bandwidth_mhz=bandwidth_mhz)
start_freq_mhz = centre_freq_mhz - bandwidth_mhz / 2
freq_offset_mhz = freq_mhz - start_freq_mhz
chan_num = int(np.round(freq_offset_mhz / self.chan_separation_mhz))
if strict:
if not 0 <= chan_num <= nchan - 1:
raise ValueError(
f"frequency {freq_mhz} MHz converted to channel {chan_num}: outside valid "
f"range [0 - {nchan-1}] given {start_freq_mhz=}"
)
else:
return np.clip(chan_num, 0, nchan - 1)
return chan_num