Source code for ska_pst.lmc.receive.receive_util

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing utility methods of RECV."""

from __future__ import annotations

import logging
import math
from typing import Any, cast

from ska_pst.common.constants import DEFAULT_COORD_MODE, DEFAULT_EQUINOX, DEFAULT_TRACKING_MODE, MEGA_HERTZ
from ska_pst.lmc.smrb.smrb_util import generate_data_key, generate_weights_key

from ska_pst.common import CbfPstConfig

__all__ = [
    "calculate_receive_common_resources",
    "calculate_receive_subband_resources",
    "calculate_receive_packet_resources",
]

_logger = logging.getLogger(__name__)


def generate_recv_configure_scan_request(
    **request_params: Any,
) -> dict:
    """
    Generate a RECV.CORE configure scan request map.

    This is a common method to map a CSP JSON configure scan request to the appropriate RECV.CORE parameters.

    :param request_params: a dictionary of request parameters that is used to configure PST for a scan.
    :type request_params: Any
    :returns: the RECV.CORE parameters to be used in the gRPC request.
    :rtype: dict
    """
    delay_centre: list[float] = request_params["delay_centre"]

    result = {
        "observer": request_params["observer_id"],
        "projid": request_params["project_id"],
        "subarray_id": str(request_params["subarray_id"]),
        "delay_centre": ",".join(map(str, delay_centre)),
        "coord_md": DEFAULT_COORD_MODE,
        "source": request_params["source"],
        "stt_crd1": str(request_params["stt_crd1"]),
        "stt_crd2": str(request_params["stt_crd2"]),
        "equinox": str(request_params.get("equinox", DEFAULT_EQUINOX)),
        "trk_mode": DEFAULT_TRACKING_MODE,
        "execution_block_id": request_params["eb_id"],
    }
    # the following fields have been deprecated and in version 3.0 of PST schema will be removed
    if "activation_time" in request_params:
        result["activation_time"] = request_params["activation_time"]
    if "pointing_id" in request_params:
        result["pnt_id"] = request_params["pointing_id"]
    return result


[docs]def calculate_receive_packet_resources( bandwidth_mhz: float, cbf_pst_config: CbfPstConfig, **kwargs: Any, ) -> dict: """ Calculate RECV packet values. This method has been refactored out of `calculate_receive_common_resources` the common did 2 things: a) calculate receive packet specific values, and b) overall scan configuration. :param bandwidth_mhz: the requested total (critical) bandwidth, in MHz. :type bandwidth_mhz: float :param cbf_pst_config: the frequency band configuration for the current request. :type cbf_pst_config: FrequencyBandConfig :return: a dictionary containing parameters that are used by RECV.CORE :rtype: dict """ nchan = cbf_pst_config.nchan_for_bandwidth(bandwidth_mhz=bandwidth_mhz) npol = cbf_pst_config.npol nbits = cbf_pst_config.nbit oversampling_ratio = cbf_pst_config.oversampling_ratio inferred_critical_chan_bw = (bandwidth_mhz * oversampling_ratio[0]) / (nchan * oversampling_ratio[1]) inferred_tsamp = 1 / inferred_critical_chan_bw if not math.isclose(cbf_pst_config.tsamp, inferred_tsamp, rel_tol=0.0001): _logger.warning( f"Inferred tsamp {inferred_tsamp=} is not within 0.01% of expected " f"tsamp {cbf_pst_config.tsamp}" ) return { "nchan": nchan, "bandwidth": bandwidth_mhz, "npol": npol, "nbits": nbits, "ndim": cbf_pst_config.ndim, "tsamp": cbf_pst_config.tsamp, "ovrsamp": "/".join(map(str, oversampling_ratio)), "udp_format": cbf_pst_config.udp_format, }
[docs]def calculate_receive_common_resources( centre_freq_mhz: float, bandwidth_mhz: float, cbf_pst_config: CbfPstConfig, receiver_id: str, receptors: list[str], receptor_weights: list[float], timing_beam_id: str, **kwargs: Any, ) -> dict: """ Calculate the RECV common resources. This method has been refactored out of `calculate_receive_subband_resources` as there are parameters that are calculated that can be reused in other areas. :param centre_freq_mhz: the centre frequency of the scan request, in MHz. :type centre_freq_mhz: float :param bandwidth_mhz: the bandwidth of the scan request, in MHz. :type bandwidth_mhz: float :param cbf_pst_config: CBF/PST configuration. :type cbf_pst_config: CbfPstConfig :param receiver_id: the id/name of the receiver used for the scan request. :type receiver_id: str :param receptors: the list of receptors the scan is for. :type receptors: list[str] :param receptor_weights: the relative weights of the receptors. :type receptor_weights: list[float] :param timing_beam_id: the id of the beam that is being configured :type timing_beam_id: str :return: a dictionary of the common RECV.CORE parameters. :rtype: dict """ recv_packet_resources = calculate_receive_packet_resources( cbf_pst_config=cbf_pst_config, bandwidth_mhz=bandwidth_mhz, **kwargs, ) request_rfi_frequency_masks: list[list[float]] = cast( list[list[float]], kwargs.get("rfi_frequency_masks", list()) ) # need to remove empty or invalid masks but log warning of removing value. def _check_valid_rfi_mask(idx: int, mask: list[float]) -> bool: if len(mask) == 2: return True _logger.warning(f"rfi_frequency_masks[{idx}] is invalid. Length should be 2 but value is {mask}") return False request_rfi_frequency_masks = [ fm for (idx, fm) in enumerate(request_rfi_frequency_masks) if _check_valid_rfi_mask(idx, fm) ] rfi_frequency_masks: list[float] = [f / MEGA_HERTZ for fm in request_rfi_frequency_masks for f in fm] assert len(rfi_frequency_masks) == 2 * len( request_rfi_frequency_masks ), f"Expected {len(rfi_frequency_masks)=} to twice of {len(request_rfi_frequency_masks)=}" rfi_excised_channels: list[int] = [ cbf_pst_config.pst_channel_number_for_freq( f, centre_freq_mhz=centre_freq_mhz, bandwidth_mhz=bandwidth_mhz ) for f in rfi_frequency_masks ] assert len(rfi_excised_channels) == len( rfi_frequency_masks ), f"Expected {len(rfi_excised_channels)=} to be {len(rfi_frequency_masks)=}" return { "nsubband": 1, "udp_nsamp": cbf_pst_config.udp_nsamp, "wt_nsamp": cbf_pst_config.wt_nsamp, "udp_nchan": cbf_pst_config.udp_nchan, "frequency": centre_freq_mhz, "frontend": receiver_id, # This should come from the Telescope "fd_poln": kwargs["feed_polarization"], "fd_hand": kwargs["feed_handedness"], "fd_sang": kwargs["feed_angle"], "fd_mode": kwargs["feed_tracking_mode"], "fa_req": kwargs["feed_position_angle"], "nant": len(receptors), "antennas": ",".join(receptors), "ant_weights": ",".join(map(str, receptor_weights)), # this is for AAO.5 where we will only have one beam, the default will # be the device's configured beam id. Ideally this should come from # the configure scan request from CSP.LMC "beam_id": timing_beam_id, "num_rfi_masks": len(request_rfi_frequency_masks), "rfi_frequency_masks": rfi_frequency_masks, "rfi_excised_channels": rfi_excised_channels, **recv_packet_resources, }
[docs]def calculate_receive_subband_resources( *, beam_id: int, bandwidth_mhz: float, centre_freq_mhz: float, data_host: str, data_mac: str, subband_udp_ports: list[int], cbf_pst_config: CbfPstConfig, **kwargs: Any, ) -> dict: """ Calculate the RECV resources for all subbands from request. This is a common method to map a CSP JSON request to the appropriate RECV.CORE parameters. It is also used to calculate the specific subband resources. :param beam_id: the id of the beam the request is for :type beam_id: int :param bandwidth_mhz: the total bandwidth for the scan, in MHz :type bandwidth_mhz: float :param centre_freq_mhz: the centre frequency for the scan, in MHz :type centre_freq_mhz: float :param data_host: the IP address at which data will be received. :type data_host: str :param data_mac: the MAC address corresponding to the data_host. :type data_mac: str :param subband_udp_ports: a list of UDP ports for each of the subbands. Max length is 4 given there is a maximum of 4 subbands. :type subband_udp_ports: list[int] :param cbf_pst_config: the CBF/PST configuration for the current request :type cbf_pst_config: CbfPstConfig :return: a dict of dicts, with "common" and "subbands" as the top level keys. The `common` values comes from the :py:func:`calculate_receive_common_resources` function. The `subbands` is a dict of dicts with subband ids as the keys, while the second level is the specific parameters. An example would response is as follows:: { "common": { "nchan": nchan, "nsubband": 1, ... }, "subbands": { 1: { "data_key": "a000", "weights_key": "a010", ... } } } :rtype: dict """ # construct the sub-band workloads, noting a single subband is valid workloads = cbf_pst_config.calculate_channel_ranges( centre_freq_mhz=centre_freq_mhz, bandwidth_mhz=bandwidth_mhz, ) if len(workloads) == 1: refined_centre_freq_mhz = workloads[0].centre_freq_mhz refined_bandwidth_mhz = workloads[0].bandwidth_mhz else: # when over-lapping sub-bands are used the centre frequency and bandwidth cannot be # simply deduced from the workloads. refined_centre_freq_mhz = centre_freq_mhz refined_bandwidth_mhz = bandwidth_mhz if refined_centre_freq_mhz != centre_freq_mhz: _logger.warning(f"{refined_centre_freq_mhz=} {centre_freq_mhz=}") if refined_bandwidth_mhz != bandwidth_mhz: _logger.warning(f"{refined_bandwidth_mhz=} {bandwidth_mhz=}") resources = { "common": calculate_receive_common_resources( cbf_pst_config=cbf_pst_config, centre_freq_mhz=refined_centre_freq_mhz, bandwidth_mhz=refined_bandwidth_mhz, **kwargs, ), "subbands": {}, } resources["common"]["nsubband"] = len(workloads) # TODO(ajameson) subband_udp_ports should use index when supported for workload in workloads: nchan = workload.end_channel - workload.start_channel resources["subbands"][workload.subband_id] = { "data_key": generate_data_key(beam_id=beam_id, subband_id=workload.subband_id), "weights_key": generate_weights_key(beam_id=beam_id, subband_id=workload.subband_id), "bandwidth": workload.bandwidth_mhz, "nchan": nchan, "frequency": workload.centre_freq_mhz, "start_channel": workload.start_channel, "end_channel": workload.end_channel, # using exclusive range "start_channel_out": workload.start_channel, "end_channel_out": workload.end_channel, # using exclusive range "nchan_out": nchan, "bandwidth_out": workload.bandwidth_mhz, "frequency_out": workload.centre_freq_mhz, "data_host": data_host, "data_mac": data_mac, "data_port": subband_udp_ports[0], "start_centre_freq_mhz": workload.start_channel_centre_freq_mhz, } return resources