Source code for ska_pst.lmc.receive.receive_util

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing utility methods of RECV."""

from __future__ import annotations

import logging
import math
from typing import Any, List, cast

from ska_pst.common.constants import MEGA_HERTZ
from ska_pst.lmc.smrb.smrb_util import generate_data_key, generate_weights_key

from ska_pst.common import CbfPstConfig

__all__ = [
    "calculate_receive_common_resources",
    "calculate_receive_subband_resources",
    "calculate_receive_packet_resources",
]

_logger = logging.getLogger(__name__)

DEFAULT_COORD_MODE = "J2000"
"""
Default coordinate mode.

Currently only J2000 is supported but in future other modes could be supported.
"""

DEFAULT_EQUINOX = 2000.0
"""Default equinox for equatorial/J2000 coordinate mode."""

DEFAULT_TRACKING_MODE = "TRACK"
"""
Default tracking mode.

Currently only TRACK is supported but other modes could be supported in the future.
"""


def generate_recv_configure_scan_request(
    **request_params: Any,
) -> dict:
    """
    Generate a RECV.CORE configure scan request map.

    This is a common method to map a CSP JSON configure scan request to the appropriate RECV.CORE parameters.

    :param request_params: a dictionary of request parameters that is used to configure PST for a scan.
    :type request_params: Any
    :returns: the RECV.CORE parameters to be used in the gRPC request.
    :rtype: dict
    """
    delay_centre: List[float]
    if "itrf" in request_params:
        delay_centre = request_params["itrf"]
    else:
        delay_centre = request_params["delay_centre"]

    if "source" in request_params:
        # This branch is for deprecated PST schema versions (i.e. < 3.0)
        source = request_params["source"]
        coords = request_params["coordinates"]
        stt_crd1 = coords["ra"]
        stt_crd2 = coords["dec"]
        equinox = request_params["coordinates"].get("equinox", DEFAULT_EQUINOX)
    else:
        # This branch supports versions of 3.0+ of the PST Schema which uses
        # the SKA sky_direction format
        target = request_params["target"]
        source = target["target_name"]
        stt_crd1 = target["attrs"]["c1"]
        stt_crd2 = target["attrs"]["c2"]
        equinox = target["attrs"].get("epoch", DEFAULT_EQUINOX)

    result = {
        "observer": request_params["observer_id"],
        "projid": request_params["project_id"],
        "subarray_id": str(request_params["subarray_id"]),
        "delay_centre": ",".join(map(str, delay_centre)),
        "coord_md": DEFAULT_COORD_MODE,
        "source": source,
        "equinox": str(equinox),
        "stt_crd1": str(stt_crd1),
        "stt_crd2": str(stt_crd2),
        "trk_mode": DEFAULT_TRACKING_MODE,
        "scanlen_max": int(request_params["max_scan_length"]),
        "execution_block_id": request_params["eb_id"],
    }
    # the following fields have been deprecated and in version 3.0 of PST schema will be removed
    if "activation_time" in request_params:
        result["activation_time"] = request_params["activation_time"]
    if "pointing_id" in request_params:
        result["pnt_id"] = request_params["pointing_id"]
    return result


[docs]def calculate_receive_packet_resources( bandwidth_mhz: float, cbf_pst_config: CbfPstConfig, **kwargs: Any, ) -> dict: """ Calculate RECV packet values. This method has been refactored out of `calculate_receive_common_resources` the common did 2 things: a) calculate receive packet specific values, and b) overall scan configuration. :param bandwidth_mhz: the requested total (critical) bandwidth, in MHz. :type bandwidth_mhz: float :param cbf_pst_config: the frequency band configuration for the current request. :type cbf_pst_config: FrequencyBandConfig :return: a dictionary containing parameters that are used by RECV.CORE :rtype: dict """ nchan = cbf_pst_config.nchan_for_bandwidth(bandwidth_mhz=bandwidth_mhz) npol = cbf_pst_config.npol nbits = cbf_pst_config.nbit oversampling_ratio = cbf_pst_config.oversampling_ratio inferred_critical_chan_bw = (bandwidth_mhz * oversampling_ratio[0]) / (nchan * oversampling_ratio[1]) inferred_tsamp = 1 / inferred_critical_chan_bw if not math.isclose(cbf_pst_config.tsamp, inferred_tsamp, rel_tol=0.0001): _logger.warning( f"Inferred tsamp {inferred_tsamp=} is not within 0.01% of expected " f"tsamp {cbf_pst_config.tsamp}" ) return { "nchan": nchan, "bandwidth": bandwidth_mhz, "npol": npol, "nbits": nbits, "ndim": cbf_pst_config.ndim, "tsamp": cbf_pst_config.tsamp, "ovrsamp": "/".join(map(str, oversampling_ratio)), "udp_format": cbf_pst_config.udp_format, }
[docs]def calculate_receive_common_resources( centre_freq_mhz: float, bandwidth_mhz: float, cbf_pst_config: CbfPstConfig, receiver_id: str, receptors: List[str], receptor_weights: List[float], timing_beam_id: str, **kwargs: Any, ) -> dict: """ Calculate the RECV common resources. This method has been refactored out of `calculate_receive_subband_resources` as there are parameters that are calculated that can be reused in other areas. :param centre_freq_mhz: the centre frequency of the scan request, in MHz. :type centre_freq_mhz: float :param bandwidth_mhz: the bandwidth of the scan request, in MHz. :type bandwidth_mhz: float :param cbf_pst_config: CBF/PST configuration. :type cbf_pst_config: CbfPstConfig :param receiver_id: the id/name of the receiver used for the scan request. :type receiver_id: str :param receptors: the list of receptors the scan is for. :type receptors: List[str] :param receptor_weights: the relative weights of the receptors. :type receptor_weights: List[float] :param timing_beam_id: the id of the beam that is being configured :type timing_beam_id: str :return: a dictionary of the common RECV.CORE parameters. :rtype: dict """ recv_packet_resources = calculate_receive_packet_resources( cbf_pst_config=cbf_pst_config, bandwidth_mhz=bandwidth_mhz, **kwargs, ) request_rfi_frequency_masks: List[List[float]] = cast( List[List[float]], kwargs.get("rfi_frequency_masks", list()) ) # need to remove empty or invalid masks but log warning of removing value. def _check_valid_rfi_mask(idx: int, mask: List[float]) -> bool: if len(mask) == 2: return True _logger.warning(f"rfi_frequency_masks[{idx}] is invalid. Length should be 2 but value is {mask}") return False request_rfi_frequency_masks = [ fm for (idx, fm) in enumerate(request_rfi_frequency_masks) if _check_valid_rfi_mask(idx, fm) ] rfi_frequency_masks: List[float] = [f / MEGA_HERTZ for fm in request_rfi_frequency_masks for f in fm] assert len(rfi_frequency_masks) == 2 * len( request_rfi_frequency_masks ), f"Expected {len(rfi_frequency_masks)=} to twice of {len(request_rfi_frequency_masks)=}" rfi_excised_channels: List[int] = [ cbf_pst_config.pst_channel_number_for_freq( f, centre_freq_mhz=centre_freq_mhz, bandwidth_mhz=bandwidth_mhz ) for f in rfi_frequency_masks ] assert len(rfi_excised_channels) == len( rfi_frequency_masks ), f"Expected {len(rfi_excised_channels)=} to be {len(rfi_frequency_masks)=}" return { "nsubband": 1, "udp_nsamp": cbf_pst_config.udp_nsamp, "wt_nsamp": cbf_pst_config.wt_nsamp, "udp_nchan": cbf_pst_config.udp_nchan, "frequency": centre_freq_mhz, "frontend": receiver_id, # This should come from the Telescope "fd_poln": kwargs["feed_polarization"], "fd_hand": kwargs["feed_handedness"], "fd_sang": kwargs["feed_angle"], "fd_mode": kwargs["feed_tracking_mode"], "fa_req": kwargs["feed_position_angle"], "nant": len(receptors), "antennas": ",".join(receptors), "ant_weights": ",".join(map(str, receptor_weights)), # this is for AAO.5 where we will only have one beam, the default will # be the device's configured beam id. Ideally this should come from # the configure scan request from CSP.LMC "beam_id": timing_beam_id, "num_rfi_masks": len(request_rfi_frequency_masks), "rfi_frequency_masks": rfi_frequency_masks, "rfi_excised_channels": rfi_excised_channels, **recv_packet_resources, }
[docs]def calculate_receive_subband_resources( *, beam_id: int, bandwidth_mhz: float, centre_freq_mhz: float, data_host: str, data_mac: str, subband_udp_ports: List[int], cbf_pst_config: CbfPstConfig, **kwargs: Any, ) -> dict: """ Calculate the RECV resources for all subbands from request. This is a common method to map a CSP JSON request to the appropriate RECV.CORE parameters. It is also used to calculate the specific subband resources. :param beam_id: the id of the beam the request is for :type beam_id: int :param bandwidth_mhz: the total bandwidth for the scan, in MHz :type bandwidth_mhz: float :param centre_freq_mhz: the centre frequency for the scan, in MHz :type centre_freq_mhz: float :param data_host: the IP address at which data will be received. :type data_host: str :param data_mac: the MAC address corresponding to the data_host. :type data_mac: str :param subband_udp_ports: a list of UDP ports for each of the subbands. Max length is 4 given there is a maximum of 4 subbands. :type subband_udp_ports: List[int] :param cbf_pst_config: the CBF/PST configuration for the current request :type cbf_pst_config: CbfPstConfig :return: a dict of dicts, with "common" and "subbands" as the top level keys. The `common` values comes from the :py:func:`calculate_receive_common_resources` function. The `subbands` is a dict of dicts with subband ids as the keys, while the second level is the specific parameters. An example would response is as follows:: { "common": { "nchan": nchan, "nsubband": 1, ... }, "subbands": { 1: { "data_key": "a000", "weights_key": "a010", ... } } } :rtype: dict """ (start_chan, end_chan) = cbf_pst_config.calculate_channel_range( centre_freq_mhz=centre_freq_mhz, bandwidth_mhz=bandwidth_mhz, ) nchan = end_chan - start_chan refined_centre_freq_mhz = cbf_pst_config.centre_freq_mhz(start_chan, nchan) refined_bandwidth_mhz = cbf_pst_config.bandwidth_mhz(nchan) if refined_centre_freq_mhz != centre_freq_mhz: _logger.warning(f"{refined_centre_freq_mhz=} {centre_freq_mhz=}") if refined_bandwidth_mhz != bandwidth_mhz: _logger.warning(f"{refined_bandwidth_mhz=} {bandwidth_mhz=}") return { "common": calculate_receive_common_resources( cbf_pst_config=cbf_pst_config, centre_freq_mhz=refined_centre_freq_mhz, bandwidth_mhz=refined_bandwidth_mhz, **kwargs, ), "subbands": { 1: { "data_key": generate_data_key(beam_id=beam_id, subband_id=1), "weights_key": generate_weights_key(beam_id=beam_id, subband_id=1), "bandwidth": refined_bandwidth_mhz, "nchan": nchan, "frequency": refined_centre_freq_mhz, "start_channel": start_chan, "end_channel": end_chan, # using exclusive range "start_channel_out": start_chan, "end_channel_out": end_chan, # using exclusive range "nchan_out": nchan, "bandwidth_out": refined_bandwidth_mhz, "frequency_out": refined_centre_freq_mhz, "data_host": data_host, "data_mac": data_mac, "data_port": subband_udp_ports[0], "start_centre_freq_mhz": cbf_pst_config.channel_centre_freq_mhz(chan=start_chan), }, }, }