# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing utility methods of RECV."""
from __future__ import annotations
import logging
import math
from typing import Any, cast
from ska_pst.common.constants import DEFAULT_COORD_MODE, DEFAULT_EQUINOX, DEFAULT_TRACKING_MODE, MEGA_HERTZ
from ska_pst.lmc.smrb.smrb_util import generate_data_key, generate_weights_key
from ska_pst.common import CbfPstConfig
__all__ = [
"calculate_receive_common_resources",
"calculate_receive_subband_resources",
"calculate_receive_packet_resources",
]
_logger = logging.getLogger(__name__)
def generate_recv_configure_scan_request(
**request_params: Any,
) -> dict:
"""
Generate a RECV.CORE configure scan request map.
This is a common method to map a CSP JSON configure scan request to the appropriate RECV.CORE parameters.
:param request_params: a dictionary of request parameters that is used to configure PST for a scan.
:type request_params: Any
:returns: the RECV.CORE parameters to be used in the gRPC request.
:rtype: dict
"""
delay_centre: list[float] = request_params["delay_centre"]
result = {
"observer": request_params["observer_id"],
"projid": request_params["project_id"],
"subarray_id": str(request_params["subarray_id"]),
"delay_centre": ",".join(map(str, delay_centre)),
"coord_md": DEFAULT_COORD_MODE,
"source": request_params["source"],
"stt_crd1": str(request_params["stt_crd1"]),
"stt_crd2": str(request_params["stt_crd2"]),
"equinox": str(request_params.get("equinox", DEFAULT_EQUINOX)),
"trk_mode": DEFAULT_TRACKING_MODE,
"execution_block_id": request_params["eb_id"],
}
# the following fields have been deprecated and in version 3.0 of PST schema will be removed
if "activation_time" in request_params:
result["activation_time"] = request_params["activation_time"]
if "pointing_id" in request_params:
result["pnt_id"] = request_params["pointing_id"]
return result
[docs]def calculate_receive_packet_resources(
bandwidth_mhz: float,
cbf_pst_config: CbfPstConfig,
**kwargs: Any,
) -> dict:
"""
Calculate RECV packet values.
This method has been refactored out of `calculate_receive_common_resources`
the common did 2 things: a) calculate receive packet specific values, and b)
overall scan configuration.
:param bandwidth_mhz: the requested total (critical) bandwidth, in MHz.
:type bandwidth_mhz: float
:param cbf_pst_config: the frequency band configuration for the current request.
:type cbf_pst_config: FrequencyBandConfig
:return: a dictionary containing parameters that are used by RECV.CORE
:rtype: dict
"""
nchan = cbf_pst_config.nchan_for_bandwidth(bandwidth_mhz=bandwidth_mhz)
npol = cbf_pst_config.npol
nbits = cbf_pst_config.nbit
oversampling_ratio = cbf_pst_config.oversampling_ratio
inferred_critical_chan_bw = (bandwidth_mhz * oversampling_ratio[0]) / (nchan * oversampling_ratio[1])
inferred_tsamp = 1 / inferred_critical_chan_bw
if not math.isclose(cbf_pst_config.tsamp, inferred_tsamp, rel_tol=0.0001):
_logger.warning(
f"Inferred tsamp {inferred_tsamp=} is not within 0.01% of expected "
f"tsamp {cbf_pst_config.tsamp}"
)
return {
"nchan": nchan,
"bandwidth": bandwidth_mhz,
"npol": npol,
"nbits": nbits,
"ndim": cbf_pst_config.ndim,
"tsamp": cbf_pst_config.tsamp,
"ovrsamp": "/".join(map(str, oversampling_ratio)),
"udp_format": cbf_pst_config.udp_format,
}
[docs]def calculate_receive_common_resources(
centre_freq_mhz: float,
bandwidth_mhz: float,
cbf_pst_config: CbfPstConfig,
receiver_id: str,
receptors: list[str],
receptor_weights: list[float],
timing_beam_id: str,
**kwargs: Any,
) -> dict:
"""
Calculate the RECV common resources.
This method has been refactored out of `calculate_receive_subband_resources`
as there are parameters that are calculated that can be reused in other areas.
:param centre_freq_mhz: the centre frequency of the scan request, in MHz.
:type centre_freq_mhz: float
:param bandwidth_mhz: the bandwidth of the scan request, in MHz.
:type bandwidth_mhz: float
:param cbf_pst_config: CBF/PST configuration.
:type cbf_pst_config: CbfPstConfig
:param receiver_id: the id/name of the receiver used for the scan request.
:type receiver_id: str
:param receptors: the list of receptors the scan is for.
:type receptors: list[str]
:param receptor_weights: the relative weights of the receptors.
:type receptor_weights: list[float]
:param timing_beam_id: the id of the beam that is being configured
:type timing_beam_id: str
:return: a dictionary of the common RECV.CORE parameters.
:rtype: dict
"""
recv_packet_resources = calculate_receive_packet_resources(
cbf_pst_config=cbf_pst_config,
bandwidth_mhz=bandwidth_mhz,
**kwargs,
)
request_rfi_frequency_masks: list[list[float]] = cast(
list[list[float]], kwargs.get("rfi_frequency_masks", list())
)
# need to remove empty or invalid masks but log warning of removing value.
def _check_valid_rfi_mask(idx: int, mask: list[float]) -> bool:
if len(mask) == 2:
return True
_logger.warning(f"rfi_frequency_masks[{idx}] is invalid. Length should be 2 but value is {mask}")
return False
request_rfi_frequency_masks = [
fm for (idx, fm) in enumerate(request_rfi_frequency_masks) if _check_valid_rfi_mask(idx, fm)
]
rfi_frequency_masks: list[float] = [f / MEGA_HERTZ for fm in request_rfi_frequency_masks for f in fm]
assert len(rfi_frequency_masks) == 2 * len(
request_rfi_frequency_masks
), f"Expected {len(rfi_frequency_masks)=} to twice of {len(request_rfi_frequency_masks)=}"
rfi_excised_channels: list[int] = [
cbf_pst_config.pst_channel_number_for_freq(
f, centre_freq_mhz=centre_freq_mhz, bandwidth_mhz=bandwidth_mhz
)
for f in rfi_frequency_masks
]
assert len(rfi_excised_channels) == len(
rfi_frequency_masks
), f"Expected {len(rfi_excised_channels)=} to be {len(rfi_frequency_masks)=}"
return {
"nsubband": 1,
"udp_nsamp": cbf_pst_config.udp_nsamp,
"wt_nsamp": cbf_pst_config.wt_nsamp,
"udp_nchan": cbf_pst_config.udp_nchan,
"frequency": centre_freq_mhz,
"frontend": receiver_id,
# This should come from the Telescope
"fd_poln": kwargs["feed_polarization"],
"fd_hand": kwargs["feed_handedness"],
"fd_sang": kwargs["feed_angle"],
"fd_mode": kwargs["feed_tracking_mode"],
"fa_req": kwargs["feed_position_angle"],
"nant": len(receptors),
"antennas": ",".join(receptors),
"ant_weights": ",".join(map(str, receptor_weights)),
# this is for AAO.5 where we will only have one beam, the default will
# be the device's configured beam id. Ideally this should come from
# the configure scan request from CSP.LMC
"beam_id": timing_beam_id,
"num_rfi_masks": len(request_rfi_frequency_masks),
"rfi_frequency_masks": rfi_frequency_masks,
"rfi_excised_channels": rfi_excised_channels,
**recv_packet_resources,
}
[docs]def calculate_receive_subband_resources(
*,
beam_id: int,
bandwidth_mhz: float,
centre_freq_mhz: float,
data_host: str,
data_mac: str,
subband_udp_ports: list[int],
cbf_pst_config: CbfPstConfig,
**kwargs: Any,
) -> dict:
"""
Calculate the RECV resources for all subbands from request.
This is a common method to map a CSP JSON request to the appropriate
RECV.CORE parameters. It is also used to calculate the specific subband
resources.
:param beam_id: the id of the beam the request is for
:type beam_id: int
:param bandwidth_mhz: the total bandwidth for the scan, in MHz
:type bandwidth_mhz: float
:param centre_freq_mhz: the centre frequency for the scan, in MHz
:type centre_freq_mhz: float
:param data_host: the IP address at which data will be received.
:type data_host: str
:param data_mac: the MAC address corresponding to the data_host.
:type data_mac: str
:param subband_udp_ports: a list of UDP ports for each of the subbands.
Max length is 4 given there is a maximum of 4 subbands.
:type subband_udp_ports: list[int]
:param cbf_pst_config: the CBF/PST configuration for the current request
:type cbf_pst_config: CbfPstConfig
:return: a dict of dicts, with "common" and "subbands" as the top level
keys. The `common` values comes from the :py:func:`calculate_receive_common_resources`
function. The `subbands` is a dict of dicts with subband ids as the keys, while
the second level is the specific parameters. An example would response
is as follows::
{
"common": {
"nchan": nchan,
"nsubband": 1,
...
},
"subbands": {
1: {
"data_key": "a000",
"weights_key": "a010",
...
}
}
}
:rtype: dict
"""
# construct the sub-band workloads, noting a single subband is valid
workloads = cbf_pst_config.calculate_channel_ranges(
centre_freq_mhz=centre_freq_mhz,
bandwidth_mhz=bandwidth_mhz,
)
if len(workloads) == 1:
refined_centre_freq_mhz = workloads[0].centre_freq_mhz
refined_bandwidth_mhz = workloads[0].bandwidth_mhz
else:
# when over-lapping sub-bands are used the centre frequency and bandwidth cannot be
# simply deduced from the workloads.
refined_centre_freq_mhz = centre_freq_mhz
refined_bandwidth_mhz = bandwidth_mhz
if refined_centre_freq_mhz != centre_freq_mhz:
_logger.warning(f"{refined_centre_freq_mhz=} {centre_freq_mhz=}")
if refined_bandwidth_mhz != bandwidth_mhz:
_logger.warning(f"{refined_bandwidth_mhz=} {bandwidth_mhz=}")
resources = {
"common": calculate_receive_common_resources(
cbf_pst_config=cbf_pst_config,
centre_freq_mhz=refined_centre_freq_mhz,
bandwidth_mhz=refined_bandwidth_mhz,
**kwargs,
),
"subbands": {},
}
resources["common"]["nsubband"] = len(workloads)
# TODO(ajameson) subband_udp_ports should use index when supported
for workload in workloads:
nchan = workload.end_channel - workload.start_channel
resources["subbands"][workload.subband_id] = {
"data_key": generate_data_key(beam_id=beam_id, subband_id=workload.subband_id),
"weights_key": generate_weights_key(beam_id=beam_id, subband_id=workload.subband_id),
"bandwidth": workload.bandwidth_mhz,
"nchan": nchan,
"frequency": workload.centre_freq_mhz,
"start_channel": workload.start_channel,
"end_channel": workload.end_channel, # using exclusive range
"start_channel_out": workload.start_channel,
"end_channel_out": workload.end_channel, # using exclusive range
"nchan_out": nchan,
"bandwidth_out": workload.bandwidth_mhz,
"frequency_out": workload.centre_freq_mhz,
"data_host": data_host,
"data_mac": data_mac,
"data_port": subband_udp_ports[0],
"start_centre_freq_mhz": workload.start_channel_centre_freq_mhz,
}
return resources