Source code for ska_pst.lmc.dsp.dsp_util

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing utility methods of DSP."""

from __future__ import annotations

from typing import Any, Dict

from ska_control_model import PstProcessingMode
from ska_pst.common.constants import BITS_PER_BYTE
from ska_pst.lmc.smrb.smrb_util import generate_data_key, generate_weights_key

from ska_pst.common import CbfPstConfig

__all__ = [
    "calculate_bytes_per_second",
    "calculate_dsp_subband_resources",
    "generate_dsp_scan_request",
]

DEFAULT_REQUANTISATION_SCALE: float = 1.0
"""The default scaling to apply to data before performing quantisation/digitisation."""

DEFAULT_RESCALE_TIMESCALE: float = 0.0
"""
The default timescale (in seconds) for recalculating the rescale.

With a value of 0.0, the rescale calculation is performed on a block of NDAT
based on other configuration and DSPSR will determine how many data samples
to use.
"""

DEFAULT_RESCALE_ALGORITHM: str = "MedianMAD"
"""
The default rescale algorithm to use if not set.

By default, robust statistics will be used to calculate scales and offsets,
where the median is used to estimate the mean and the median absolute deviation
is used to estimate the standard deviation.
"""

DEFAULT_RESCALE_PERIODIC_UPDATE: bool = False
"""
The default value for whether to perform periodic updates of scales and offsets.

The default value is ``False`` which means that scales and offsets are constant.
"""

DEFAULT_SUBINT_DURATION: float = 10.0
"""
The default value for the length of sub-integration files, in seconds produced
by DSP processing modes.
"""

POLARISATION_STATE_NPOL_MAP = {
    "Intensity": 1,
    "PPQQ": 2,
    "Coherence": 4,
    "Stokes": 4,
    "PP_State": 1,
    "QQ_State": 1,
}
"""
Mapping from Polarisation state to number of output polarisations.

This is used to calculate an estimate of output bytes per second.
"""

FT_POLARISATION_OUT_MAP: dict[str, str] = {
    "A": "A",
    "B": "B",
    "X": "A",
    "Y": "B",
    "Both": "Both",
}
"""
Mapping used to map the Flow Through output polarisations to internal PST value.

Version 3.x of the PST scan configuration uses polarisations 'A', 'B' or 'Both'
but version 4.0 uses 'X', 'Y', 'Both'. However, the migration from the older
values to the new values is not straight forward and this mapping allows for
mapping the new 4.0 back to the older values.  In the future when we do the
migration we would need to change the mapping values here so the old values
map to the new values.
"""


[docs]def calculate_dsp_subband_resources(beam_id: int, **kwargs: Any) -> Dict[int, dict]: """ Calculate the digital signal processing (DSP) resources from request. This is a common method to map a CSP JSON request to the appropriate DSP parameters. It is also used to calculate the specific subband resources. This uses the SMRB :py:func:`generate_data_key`, :py:func:`generate_weights_key` functions to calculate the keys for the data and weight ring buffers that the DSP process will read from. :param beam_id: the numerical id of the beam that this DSP request is for. :returns: a dict of dicts, with the top level key being the subband id, while the second level is the specific parameters. An example would response is as follows:: { 1: { 'data_key': "a000", 'weights_key': "a010", } } """ return { 1: { "data_key": generate_data_key(beam_id=beam_id, subband_id=1), "weights_key": generate_weights_key(beam_id=beam_id, subband_id=1), } }
[docs]def generate_dsp_scan_request( eb_id: str, pst_processing_mode: PstProcessingMode, **kwargs: Any, ) -> dict: """ Generate a DSP scan request dictionary. This method delegates to a specific implementation depending on the given ``pst_processing_mode``. This method currently supports voltage recorder, flow through and detected filterbank processing modes. :param eb_id: the execution block id of the request. :type eb_id: str :param pst_processing_mode: the requested PST processing mode. :type pst_processing_mode: PstProcessingMode :raises AssertionError: raised for any unsupported PST processing modes. :return: a DSP scan request dictionary. :rtype: dict """ if pst_processing_mode == PstProcessingMode.VOLTAGE_RECORDER: request = _generate_dsp_disk_scan_request(pst_processing_mode=pst_processing_mode, **kwargs) elif pst_processing_mode == PstProcessingMode.FLOW_THROUGH: request = _generate_dsp_ft_scan_request(pst_processing_mode=pst_processing_mode, **kwargs) elif pst_processing_mode == PstProcessingMode.DETECTED_FILTERBANK: request = _generate_dsp_df_scan_request(pst_processing_mode=pst_processing_mode, **kwargs) else: raise AssertionError( ( "PST currently only supports Voltage Recorder, Flow Through " "and Detected Filterbank processing modes." ) ) request["execution_block_id"] = eb_id return request
def _get_parameter( config: dict, section: str, key: str, fallback: str | None = None, default: Any | None = None ) -> Any: if section in config: value = config[section].get(key, default) elif fallback: value = config.get(fallback, default) else: value = default assert value is not None, f"expected {section}/{key} to exist or have a default and/or fallback value" return value def _get_rescale_parameters(config: dict, rescale_time_fallback: str) -> dict: rescale_timescale = _get_parameter( config=config, section="rescale", key="timescale", fallback=rescale_time_fallback, default=DEFAULT_RESCALE_TIMESCALE, ) rescale_algorithm = _get_parameter( config=config, section="rescale", key="algorithm", default=DEFAULT_RESCALE_ALGORITHM ) rescale_periodic_update = _get_parameter( config=config, section="rescale", key="periodic_update", default=DEFAULT_RESCALE_PERIODIC_UPDATE ) return { "rescale_timescale": rescale_timescale, "rescale_algorithm": rescale_algorithm, "rescale_periodic_update": rescale_periodic_update, } def _generate_dsp_disk_scan_request(cbf_pst_config: CbfPstConfig, **kwargs: Any) -> dict: """ Generate the DSP.DISK scan request parameters. The method returns a dictionary whose only key is ``bytes_per_second``. :param cbf_pst_config: the CBF/PST configuration for the current telescope and frequency band of the scan request. :return: the scan request parameters to be sent to DSP.DISK :rtype: dict """ nchan_out = cbf_pst_config.nchan_for_bandwidth(**kwargs) npol_out = cbf_pst_config.npol nbit_out = cbf_pst_config.nbit ndim_out = cbf_pst_config.ndim return { "bytes_per_second": calculate_bytes_per_second( nchan_out=nchan_out, npol_out=npol_out, nbit_out=nbit_out, ndim_out=ndim_out, tsamp=cbf_pst_config.tsamp, ), } def _generate_dsp_ft_scan_request( flow_through_params: dict, cbf_pst_config: CbfPstConfig, **kwargs: Any, ) -> dict: """ Generate the DSP.FT scan request parameters. The method returns a dictionary whose keys are: * num_bits_out * polarisations * channels * requantisation_scale * rescale_timescale * rescale_algorithm * rescale_periodic_update * bytes_per_second :param flow_through_params: the flow through mode specific parameters of the CSP.LMC scan request. :type flow_through_params: dict :param cbf_pst_config: the CBF/PST configuration for the current telescope and frequency band of the scan request. :return: the scan request parameters to be sent to DSP.FT :rtype: dict """ nbit_out = _get_parameter( config=flow_through_params, section="requantisation", key="num_bits_out", fallback="num_bits_out" ) polarisations = _get_parameter( config=flow_through_params, section="channel_polarisation_selection", key="polarisations", fallback="polarizations", ) # Map polarisation to PST/SKA values # before v4.0 of PST we had A, B but now they map to X, Y polarisations = FT_POLARISATION_OUT_MAP[polarisations] channels = _get_parameter( config=flow_through_params, section="channel_polarisation_selection", key="channels", fallback="channels", ) requantisation_scale = _get_parameter( config=flow_through_params, section="requantisation", key="scale", fallback="requantization_scale", default=DEFAULT_REQUANTISATION_SCALE, ) npol_out = 2 if polarisations == "Both" else 1 nchan_out = channels[1] - channels[0] + 1 ndim_out = cbf_pst_config.ndim rescale_parameters = _get_rescale_parameters( config=flow_through_params, rescale_time_fallback="requantisation_init_time" ) return { "num_bits_out": nbit_out, "polarisations": polarisations, "channels": channels, "requantisation_scale": requantisation_scale, "bytes_per_second": calculate_bytes_per_second( nchan_out=nchan_out, npol_out=npol_out, nbit_out=nbit_out, ndim_out=ndim_out, tsamp=cbf_pst_config.tsamp, ), **rescale_parameters, } def _generate_dsp_df_scan_request( detected_filterbank_params: dict, cbf_pst_config: CbfPstConfig, subint_duration: float = DEFAULT_SUBINT_DURATION, **kwargs: Any, ) -> dict: """ Generate the DSP.DF scan request parameters. The method returns a dictionary whose keys are: * dispersion_measure * rotation_measure (optional) * nchan_out * tsubint_out * polarisation_state * num_bits_out * time_decimation_factor * frequency_decimation_factor * requantisation_scale * rescale_timescale * rescale_algorithm * rescale_periodic_update * bytes_per_second :param detected_filterbank_params: the detected filterbank mode specific parameters of the CSP.LMC scan request. :type detected_filterbank_params: dict :return: the scan request parameters to be sent to DSP.DF :rtype: dict """ nchan_in = cbf_pst_config.nchan_for_bandwidth(**kwargs) nbit_out = _get_parameter( config=detected_filterbank_params, section="requantisation", key="num_bits_out", fallback="num_bits_out", ) requantisation_scale = _get_parameter( config=detected_filterbank_params, section="requantisation", key="scale", fallback="requantisation_scale", default=DEFAULT_REQUANTISATION_SCALE, ) rescale_parameters = _get_rescale_parameters( config=detected_filterbank_params, rescale_time_fallback="requantisation_length" ) time_decimation_factor: int = detected_filterbank_params["time_decimation_factor"] frequency_decimation_factor: int = detected_filterbank_params["frequency_decimation_factor"] nchan_out: int = detected_filterbank_params["output_frequency_channels"] polarisation_state = detected_filterbank_params["polarisation_state"] npol_out = POLARISATION_STATE_NPOL_MAP[polarisation_state] # for detected filterbank the output TSAMP is based on the critical sampled rate # as well as the time_decimation_factor. Example if time_decimation_factor=2 we take 2 time samples # and add them but the output rate is half, or in other words the output TSAMP is 2 time # the input tsamp. tsamp_out = time_decimation_factor * cbf_pst_config.critical_sample_tsamp bytes_per_second = calculate_bytes_per_second( nchan_out=nchan_out, npol_out=npol_out, nbit_out=nbit_out, ndim_out=1, tsamp=tsamp_out, ) result = { "dispersion_measure": detected_filterbank_params["dispersion_measure"], "nchan_out": detected_filterbank_params["output_frequency_channels"], "tsubint_out": subint_duration, "polarisation_state": polarisation_state, "num_bits_out": nbit_out, "time_decimation_factor": time_decimation_factor, "frequency_decimation_factor": frequency_decimation_factor, "requantisation_scale": requantisation_scale, "bytes_per_second": bytes_per_second, "nchan": nchan_in, "sk_config": detected_filterbank_params.get("sk_config", []), **rescale_parameters, } if "rotation_measure" in detected_filterbank_params: result["rotation_measure"] = detected_filterbank_params["rotation_measure"] return result
[docs]def calculate_bytes_per_second( nchan_out: int, npol_out: int, nbit_out: int, ndim_out: int, tsamp: float, **kwargs: Any, ) -> float: """ Calculate the expected bytes per second value given output parameters. :param nchan_out: the number of output channels :type nchan_out: int :param npol_out: the number of output polarisations :type npol_out: int :param nbit_out: the bits per value in the output data :type nbit_out: int :param ndim_out: the number of dimensions of a value in the output data. This should be 1 or 2 depending on real or complex data. :type ndim_out: int :param tsamp: the sampling interval, in microseconds, for each sample of data. :type tsamp: float :return: the expected bytes per second value given output parameters. :rtype: float """ # convert sampling interval in microsecond to sampling interval in seconds tsamp_s = tsamp / 1_000_000 return (nchan_out * npol_out * nbit_out * ndim_out) / (BITS_PER_BYTE * tsamp_s)