Source code for ska_telmodel.csp.config

import logging

from ska_telmodel._common import split_interface_version

from .. import channel_map

LOGGER = logging.getLogger("ska_telmodel.csp")


[docs]def get_fsp_channel_offset(csp_config_in: dict) -> int: """ Determines first channel ID within an FSP """ # Fixed? return 0
[docs]def get_fsp_output_channel_offset( fsp_config: dict, fsp_id: str, fsp_ch_offset: str ) -> int: """ Determines the FSP channel offset. Either read from the dictionary or reconstructed. :param fsp_config: FSP configuration structure :param fsp_id: Position of FSP in configuration :param fsp_ch_offset: Name of FSP channel offset field """ # Return channel offset if it is set if fsp_ch_offset in fsp_config: return fsp_config[fsp_ch_offset] # Otherwise fallback to calculation from FSP ID fallback_offset = 14880 * (fsp_config[fsp_id] - 1) LOGGER.warning( "FSP lacks output channel offset, using " f"{fallback_offset}: {fsp_config}" ) return fallback_offset
[docs]def add_receive_addresses( scan_type: str, csp_config: dict, scan_receive_addrs: dict, csp_interface_version: str, sdp_interface_version: str, ) -> dict: """ Add SDP visibility receive addresses into CSP configuration :param csp_config: CSP input configuration :param scan_receive_addrs: SDP receive addresses for scan :param csp_interface_version: CSP interface version to assume :param sdp_interface_version: SDP interface version to assume :returns: New CSP configuration """ csp_config = add_midcbf_visibility_receive_addresses( csp_config, scan_receive_addrs, csp_interface_version, sdp_interface_version, ) csp_config = add_pss_receive_addresses( csp_config, scan_receive_addrs, csp_interface_version, sdp_interface_version, ) csp_config = add_pst_receive_addresses( csp_config, scan_receive_addrs, csp_interface_version, sdp_interface_version, ) return csp_config
[docs]def add_midcbf_visibility_receive_addresses( csp_config: dict, scan_receive_addrs: dict, csp_interface_version: str, sdp_interface_version: str, ) -> dict: """ Add SDP visibility receive addresses into mid-cbf configuration :param csp_config: CSP input configuration :param scan_receive_addrs: SDP receive addresses for scan :param csp_interface_version: CSP interface version to assume :param sdp_interface_version: SDP interface version to assume :returns: New CSP configuration """ # CSP: Get major and minor version of the interface major, _ = split_interface_version(csp_interface_version) # Find CBF configuration if "cbf" in csp_config: # version 1.0 (ADR-18) cbf_config = csp_config["cbf"] else: cbf_config = csp_config # Determine applicable field names FSP = "fsp" if major == 0 or major == 1: fsp_id = "fspID" fsp_ch_offset = "fspChannelOffset" MAPS_TO_SPLIT = { "host": "outputHost", "mac": "outputMac", "port": "outputPort", } else: fsp_id = "fsp_id" fsp_ch_offset = "channel_offset" MAPS_TO_SPLIT = { "host": "output_host", "mac": "output_mac", "port": "output_port", } # SDP: Get major and minor version of the interface, find # applicable beam if we have a version that supports multiple # beams. sdp_major, sdp_minor = split_interface_version(sdp_interface_version) if sdp_major > 0 or sdp_minor > 3: # Find first visibility beam (for CBF mid, we assume there to # only be one.) found_beam = None for beam in scan_receive_addrs.values(): if beam.get("function") == "visibilities": found_beam = beam if found_beam is None: LOGGER.warning("No visibility beam found in receive addresses!") return csp_config scan_receive_addrs = found_beam # Collect fsps, adding output offset as applicable for fsp in cbf_config[FSP]: if fsp_ch_offset not in fsp: fsp[fsp_ch_offset] = get_fsp_output_channel_offset( fsp, fsp_id, fsp_ch_offset ) # Sort fsps by output channel offset, determining channel groups fsps_sorted = list( sorted(cbf_config[FSP], key=lambda fsp: fsp[fsp_ch_offset]) ) fsp_offsets = [fsp[fsp_ch_offset] for fsp in fsps_sorted] ch_offset = get_fsp_channel_offset(cbf_config) for in_map_name, out_map_name in MAPS_TO_SPLIT.items(): # Split map, if present if in_map_name not in scan_receive_addrs: continue in_map = scan_receive_addrs[in_map_name] if len(in_map) > 0: groups = fsp_offsets + [in_map[-1][0] + 1] split_map = channel_map.split_channel_map_at( in_map, groups, ch_offset ) else: split_map = [[] for _ in cbf_config[FSP]] # Add to FSP sections for fsp, new_map in zip(cbf_config[FSP], split_map): fsp[out_map_name] = new_map return csp_config
[docs]def add_pss_receive_addresses( csp_config: dict, scan_receive_addrs: dict, csp_interface_version: str, sdp_interface_version: str, ) -> dict: """ Add SDP visibility receive addresses into pulsar search configuration :param csp_config: CSP input configuration :param scan_receive_addrs: SDP receive addresses for scan :param csp_interface_version: CSP interface version to assume :param sdp_interface_version: SDP interface version to assume :returns: New CSP configuration """ # Check that CSP and SDP versions are new enough csp_major, csp_minor = split_interface_version(csp_interface_version) if (csp_major, csp_minor) < (2, 1): return csp_config sdp_major, sdp_minor = split_interface_version(sdp_interface_version) if (sdp_major, sdp_minor) < (0, 4): return csp_config # Go through PSS beams for beam in csp_config.get("pss", {}).get("beam", []): beam_id = beam["beam_id"] # Find matching receive addresses from SDP found_dest_host = None found_dest_port = None for sdp_beam in scan_receive_addrs.values(): if sdp_beam.get("function") == "pulsar search": if sdp_beam.get("search_beam_id") == beam_id: found_dest_host = sdp_beam.get("host") found_dest_port = sdp_beam.get("port") if found_dest_host is None: LOGGER.warning( "Could not find receive addresses for PSS search beam %s!", beam_id, ) continue # Set in PSS configuration beam["dest_host"] = found_dest_host[0][1] beam["dest_port"] = found_dest_port[0][1] return csp_config
[docs]def add_pst_receive_addresses( csp_config: dict, scan_receive_addrs: dict, csp_interface_version: str, sdp_interface_version: str, ) -> dict: """ Add SDP visibility receive addresses into pulsar timing configuration :param scan_trype: Scan type executed :param csp_config: CSP input configuration :param sdp_receive_addrs: SDP receive addresses for scan :returns: New CSP configuration """ # Check that CSP and SDP versions are new enough, and the PST # section is actually populated csp_major, csp_minor = split_interface_version(csp_interface_version) if ( (csp_major, csp_minor) < (2, 2) or "pst" not in csp_config or "scan" not in csp_config["pst"] ): return csp_config sdp_major, sdp_minor = split_interface_version(sdp_interface_version) if (sdp_major, sdp_minor) < (0, 4): return csp_config # There is only one receive address for PST, therefore we only # need to find one PST beam found_dest_host = None found_dest_port = None for sdp_beam in scan_receive_addrs.values(): if sdp_beam.get("function") == "pulsar timing": found_dest_host = sdp_beam.get("host") found_dest_port = sdp_beam.get("port") break if found_dest_host is None: LOGGER.warning("Could not find receive addresses for pulsar timing!") return csp_config # Set csp_config["pst"]["scan"]["destination_address"] = [ found_dest_host[0][1], found_dest_port[0][1], ] return csp_config