Source code for ska_pst.lmc.dsp.dsp_util

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing utility methods of DSP."""

from __future__ import annotations

from typing import Any, Dict

from ska_control_model import PstProcessingMode
from ska_pst.common.constants import BITS_PER_BYTE
from ska_pst.lmc.smrb.smrb_util import generate_data_key, generate_weights_key

from ska_pst.common import CbfPstConfig

__all__ = [
    "calculate_bytes_per_second",
    "calculate_dsp_subband_resources",
    "generate_dsp_scan_request",
]

DEFAULT_REQUANTISATION_SCALE: float = 1.0
"""The default scaling to apply to data before performing quantisation/digitisation."""

DEFAULT_RESCALE_TIMESCALE: float = 0.0
"""
The default timescale (in seconds) for recalculating the rescale.

With a value of 0.0, the rescale calculation is performed on a block of NDAT
based on other configuration and DSPSR will determine how many data samples
to use.
"""

DEFAULT_RESCALE_ALGORITHM: str = "MedianMAD"
"""
The default rescale algorithm to use if not set.

By default, robust statistics will be used to calculate scales and offsets,
where the median is used to estimate the mean and the median absolute deviation
is used to estimate the standard deviation.
"""

DEFAULT_RESCALE_PERIODIC_UPDATE: bool = False
"""
The default value for whether to perform periodic updates of scales and offsets.

The default value is ``False`` which means that scales and offsets are constant.
"""


[docs]def calculate_dsp_subband_resources(beam_id: int, **kwargs: Any) -> Dict[int, dict]: """ Calculate the digital signal processing (DSP) resources from request. This is a common method to map a CSP JSON request to the appropriate DSP parameters. It is also used to calculate the specific subband resources. This uses the SMRB :py:func:`generate_data_key`, :py:func:`generate_weights_key` functions to calculate the keys for the data and weight ring buffers that the DSP process will read from. :param beam_id: the numerical id of the beam that this DSP request is for. :returns: a dict of dicts, with the top level key being the subband id, while the second level is the specific parameters. An example would response is as follows:: { 1: { 'data_key': "a000", 'weights_key': "a010", } } """ return { 1: { "data_key": generate_data_key(beam_id=beam_id, subband_id=1), "weights_key": generate_weights_key(beam_id=beam_id, subband_id=1), } }
[docs]def generate_dsp_scan_request( eb_id: str, pst_processing_mode: PstProcessingMode, max_scan_length: float = 0.0, **kwargs: Any, ) -> dict: """ Generate a DSP scan request dictionary. This method delegates to a specific implementation depending on the given ``pst_processing_mode``. This method currently only supports voltage recorder and flow through processing modes. :param eb_id: the execution block id of the request. :type eb_id: str :param pst_processing_mode: the requested PST processing mode. :type pst_processing_mode: PstProcessingMode :param max_scan_length: the configured maximum scan length, defaults to 0.0 :type max_scan_length: float, optional :raises AssertionError: raised for any unsupported PST processing modes. :return: a DSP scan request dictionary. :rtype: dict """ if pst_processing_mode == PstProcessingMode.VOLTAGE_RECORDER: request = _generate_dsp_disk_scan_request(pst_processing_mode=pst_processing_mode, **kwargs) elif pst_processing_mode == PstProcessingMode.FLOW_THROUGH: request = _generate_dsp_ft_scan_request(pst_processing_mode=pst_processing_mode, **kwargs) else: raise AssertionError( "PST currently only supports Voltage Recorder and Flow Through processing modes." ) request["execution_block_id"] = eb_id request["scanlen_max"] = max_scan_length return request
def _generate_dsp_disk_scan_request(cbf_pst_config: CbfPstConfig, **kwargs: Any) -> dict: """ Generate the DSP.DISK scan request parameters. The method returns a dictionary whose only key is ``bytes_per_second``. :param cbf_pst_config: the CBF/PST configuration for the current telescope and frequency band of the the scan request. :return: the scan request parameters to be sent to DSP.DISK :rtype: dict """ nchan_out = cbf_pst_config.nchan_for_bandwidth(**kwargs) npol_out = cbf_pst_config.npol nbit_out = cbf_pst_config.nbit ndim_out = cbf_pst_config.ndim return { "bytes_per_second": calculate_bytes_per_second( nchan_out=nchan_out, npol_out=npol_out, nbit_out=nbit_out, ndim_out=ndim_out, tsamp=cbf_pst_config.tsamp, ), } def _generate_dsp_ft_scan_request( flow_through_params: dict, cbf_pst_config: CbfPstConfig, **kwargs: Any, ) -> dict: """ Generate the DSP.FT scan request parameters. The method returns a dictionary whose keys are: * num_bits_out * polarisations * channels * requantisation_scale * rescale_timescale * rescale_algorithm * rescale_periodic_update * bytes_per_second :param flow_through_params: the flow through mode specific parameters of the CSP.LMC scan request. :type flow_through_params: dict :param cbf_pst_config: the CBF/PST configuration for the current telescope and frequency band of the the scan request. :return: the scan request parameters to be sent to DSP.FT :rtype: dict """ def _get_parameter( section: str, key: str, fallback: str | None = None, default: Any | None = None ) -> Any: if section in flow_through_params: value = flow_through_params[section].get(key, default) elif fallback: value = flow_through_params.get(fallback, default) else: value = default assert value is not None, f"expected {section}/{key} to exist or have a default and/or fallback value" return value nbit_out = _get_parameter(section="requantisation", key="num_bits_out", fallback="num_bits_out") polarisations = _get_parameter( section="channel_polarisation_selection", key="polarisations", fallback="polarizations" ) channels = _get_parameter(section="channel_polarisation_selection", key="channels", fallback="channels") requantisation_scale = _get_parameter( section="requantisation", key="scale", fallback="requantization_scale", default=DEFAULT_REQUANTISATION_SCALE, ) rescale_timescale = _get_parameter( section="rescale", key="timescale", fallback="requantisation_init_time", default=DEFAULT_RESCALE_TIMESCALE, ) rescale_algorithm = _get_parameter(section="rescale", key="algorithm", default=DEFAULT_RESCALE_ALGORITHM) rescale_periodic_update = _get_parameter( section="rescale", key="periodic_update", default=DEFAULT_RESCALE_PERIODIC_UPDATE ) npol_out = 2 if polarisations == "Both" else 1 nchan_out = channels[1] - channels[0] + 1 ndim_out = cbf_pst_config.ndim return { "num_bits_out": nbit_out, "polarisations": polarisations, "channels": channels, "requantisation_scale": requantisation_scale, "rescale_timescale": rescale_timescale, "rescale_algorithm": rescale_algorithm, "rescale_periodic_update": rescale_periodic_update, "bytes_per_second": calculate_bytes_per_second( nchan_out=nchan_out, npol_out=npol_out, nbit_out=nbit_out, ndim_out=ndim_out, tsamp=cbf_pst_config.tsamp, ), }
[docs]def calculate_bytes_per_second( nchan_out: int, npol_out: int, nbit_out: int, ndim_out: int, tsamp: float, **kwargs: Any, ) -> float: """ Calculate the expected bytes per second value given output parameters. :param nchan_out: the number of output channels :type nchan_out: int :param npol_out: the number of output polarisations :type npol_out: int :param nbit_out: the bits per value in the output data :type nbit_out: int :param ndim_out: the number of dimensions of a value in the output data. This should be 1 or 2 depending on real or complex data. :type ndim_out: int :param tsamp: the sampling interval, in microseconds, for each sample of data. :type tsamp: float :return: the expected bytes per second value given output parameters. :rtype: float """ # convert sampling interval in microsecond to sampling interval in seconds tsamp_s = tsamp / 1_000_000 return (nchan_out * npol_out * nbit_out * ndim_out) / (BITS_PER_BYTE * tsamp_s)