import logging
from ska_telmodel._common import split_interface_version
from .. import channel_map
LOGGER = logging.getLogger("ska_telmodel.csp")
[docs]def get_fsp_channel_offset(csp_config_in: dict) -> int:
"""
Determines first channel ID within an FSP
"""
# Fixed?
return 0
[docs]def get_fsp_output_channel_offset(
fsp_config: dict, fsp_id: str, fsp_ch_offset: str
) -> int:
"""
Determines the FSP channel offset. Either read from the dictionary
or reconstructed.
:param fsp_config: FSP configuration structure
:param fsp_id: Position of FSP in configuration
:param fsp_ch_offset: Name of FSP channel offset field
"""
# Return channel offset if it is set
if fsp_ch_offset in fsp_config:
return fsp_config[fsp_ch_offset]
# Otherwise fallback to calculation from FSP ID
fallback_offset = 14880 * (fsp_config[fsp_id] - 1)
LOGGER.warning(
"FSP lacks output channel offset, using "
f"{fallback_offset}: {fsp_config}"
)
return fallback_offset
[docs]def add_receive_addresses(
scan_type: str,
csp_config: dict,
scan_receive_addrs: dict,
csp_interface_version: str,
sdp_interface_version: str,
) -> dict:
"""
Add SDP visibility receive addresses into CSP configuration
:param csp_config: CSP input configuration
:param scan_receive_addrs: SDP receive addresses for scan
:param csp_interface_version: CSP interface version to assume
:param sdp_interface_version: SDP interface version to assume
:returns: New CSP configuration
"""
csp_config = add_midcbf_visibility_receive_addresses(
csp_config,
scan_receive_addrs,
csp_interface_version,
sdp_interface_version,
)
csp_config = add_pss_receive_addresses(
csp_config,
scan_receive_addrs,
csp_interface_version,
sdp_interface_version,
)
csp_config = add_pst_receive_addresses(
csp_config,
scan_receive_addrs,
csp_interface_version,
sdp_interface_version,
)
return csp_config
[docs]def add_midcbf_visibility_receive_addresses(
csp_config: dict,
scan_receive_addrs: dict,
csp_interface_version: str,
sdp_interface_version: str,
) -> dict:
"""
Add SDP visibility receive addresses into mid-cbf configuration
:param csp_config: CSP input configuration
:param scan_receive_addrs: SDP receive addresses for scan
:param csp_interface_version: CSP interface version to assume
:param sdp_interface_version: SDP interface version to assume
:returns: New CSP configuration
"""
# CSP: Get major and minor version of the interface
major, _ = split_interface_version(csp_interface_version)
# Find CBF configuration
if "cbf" in csp_config:
# version 1.0 (ADR-18)
cbf_config = csp_config["cbf"]
else:
cbf_config = csp_config
# Determine applicable field names
FSP = "fsp"
if major == 0 or major == 1:
fsp_id = "fspID"
fsp_ch_offset = "fspChannelOffset"
MAPS_TO_SPLIT = {
"host": "outputHost",
"mac": "outputMac",
"port": "outputPort",
}
else:
fsp_id = "fsp_id"
fsp_ch_offset = "channel_offset"
MAPS_TO_SPLIT = {
"host": "output_host",
"mac": "output_mac",
"port": "output_port",
}
# SDP: Get major and minor version of the interface, find
# applicable beam if we have a version that supports multiple
# beams.
sdp_major, sdp_minor = split_interface_version(sdp_interface_version)
if sdp_major > 0 or sdp_minor > 3:
# Find first visibility beam (for CBF mid, we assume there to
# only be one.)
found_beam = None
for beam in scan_receive_addrs.values():
if beam.get("function") == "visibilities":
found_beam = beam
if found_beam is None:
LOGGER.warning("No visibility beam found in receive addresses!")
return csp_config
scan_receive_addrs = found_beam
# Collect fsps, adding output offset as applicable
for fsp in cbf_config[FSP]:
if fsp_ch_offset not in fsp:
fsp[fsp_ch_offset] = get_fsp_output_channel_offset(
fsp, fsp_id, fsp_ch_offset
)
# Sort fsps by output channel offset, determining channel groups
fsps_sorted = list(
sorted(cbf_config[FSP], key=lambda fsp: fsp[fsp_ch_offset])
)
fsp_offsets = [fsp[fsp_ch_offset] for fsp in fsps_sorted]
ch_offset = get_fsp_channel_offset(cbf_config)
for in_map_name, out_map_name in MAPS_TO_SPLIT.items():
# Split map, if present
if in_map_name not in scan_receive_addrs:
continue
in_map = scan_receive_addrs[in_map_name]
if len(in_map) > 0:
groups = fsp_offsets + [in_map[-1][0] + 1]
split_map = channel_map.split_channel_map_at(
in_map, groups, ch_offset
)
else:
split_map = [[] for _ in cbf_config[FSP]]
# Add to FSP sections
for fsp, new_map in zip(cbf_config[FSP], split_map):
fsp[out_map_name] = new_map
return csp_config
[docs]def add_pss_receive_addresses(
csp_config: dict,
scan_receive_addrs: dict,
csp_interface_version: str,
sdp_interface_version: str,
) -> dict:
"""
Add SDP visibility receive addresses into pulsar search configuration
:param csp_config: CSP input configuration
:param scan_receive_addrs: SDP receive addresses for scan
:param csp_interface_version: CSP interface version to assume
:param sdp_interface_version: SDP interface version to assume
:returns: New CSP configuration
"""
# Check that CSP and SDP versions are new enough
csp_major, csp_minor = split_interface_version(csp_interface_version)
if (csp_major, csp_minor) < (2, 1):
return csp_config
sdp_major, sdp_minor = split_interface_version(sdp_interface_version)
if (sdp_major, sdp_minor) < (0, 4):
return csp_config
# Go through PSS beams
for beam in csp_config.get("pss", {}).get("beam", []):
beam_id = beam["beam_id"]
# Find matching receive addresses from SDP
found_dest_host = None
found_dest_port = None
for sdp_beam in scan_receive_addrs.values():
if sdp_beam.get("function") == "pulsar search":
if sdp_beam.get("search_beam_id") == beam_id:
found_dest_host = sdp_beam.get("host")
found_dest_port = sdp_beam.get("port")
if found_dest_host is None:
LOGGER.warning(
"Could not find receive addresses for PSS search beam %s!",
beam_id,
)
continue
# Set in PSS configuration
beam["dest_host"] = found_dest_host[0][1]
beam["dest_port"] = found_dest_port[0][1]
return csp_config
[docs]def add_pst_receive_addresses(
csp_config: dict,
scan_receive_addrs: dict,
csp_interface_version: str,
sdp_interface_version: str,
) -> dict:
"""
Add SDP visibility receive addresses into pulsar timing configuration
:param scan_trype: Scan type executed
:param csp_config: CSP input configuration
:param sdp_receive_addrs: SDP receive addresses for scan
:returns: New CSP configuration
"""
# Check that CSP and SDP versions are new enough, and the PST
# section is actually populated
csp_major, csp_minor = split_interface_version(csp_interface_version)
if (
(csp_major, csp_minor) < (2, 2)
or "pst" not in csp_config
or "scan" not in csp_config["pst"]
):
return csp_config
sdp_major, sdp_minor = split_interface_version(sdp_interface_version)
if (sdp_major, sdp_minor) < (0, 4):
return csp_config
# There is only one receive address for PST, therefore we only
# need to find one PST beam
found_dest_host = None
found_dest_port = None
for sdp_beam in scan_receive_addrs.values():
if sdp_beam.get("function") == "pulsar timing":
found_dest_host = sdp_beam.get("host")
found_dest_port = sdp_beam.get("port")
break
if found_dest_host is None:
LOGGER.warning("Could not find receive addresses for pulsar timing!")
return csp_config
# Set
csp_config["pst"]["scan"]["destination_address"] = [
found_dest_host[0][1],
found_dest_port[0][1],
]
return csp_config