import copy
import logging
import re
from collections.abc import Sequence
from realtime.receive.core.antenna import Antenna
from realtime.receive.core.antenna_utils import load_antennas
from realtime.receive.core.baselines import Baselines
from realtime.receive.core.common import load_json_resource
from ska_sdp_config import Config, ExecutionBlock
from ska_telmodel.data.frontend import TMData
from realtime.receive.modules.tm.base_tm import TelescopeManager
logger = logging.getLogger(__name__)
LOW_AP_PATTERN = re.compile(r"^AP(\d{3})\.(\d{2})$")
MID_PATTERN = re.compile(r"^(SKA|MKT)(\d{3})$")
[docs]
def parse_ap_ids(ap_matches: list[re.Match], indexed_by_id: dict[int, Antenna]) -> list[Antenna]:
"""
Convert list of APxxx.yy regex matches into sorted Antenna objects.
"""
ap_ids = []
for ap_id in ap_matches:
station_id = int(ap_id.group(1))
substation_id = int(ap_id.group(2))
ap_ids.append((station_id, substation_id))
ap_ids.sort()
antennas = []
for station_id, substation_id in ap_ids:
antenna_copy = copy.deepcopy(indexed_by_id[station_id])
if substation_id != 0:
antenna_copy.station_label = (
f"{antenna_copy.station_label}.{str(substation_id).zfill(2)}"
)
antennas.append(antenna_copy)
return antennas
[docs]
def sort_receptors_by_icd(receptors: list[str], all_antennas: list[Antenna]) -> list[Antenna]:
"""
Sort receptor names and returns corresponding Antenna objects.
For MID, accept names in format SKAxxx or MKTxxx.
For LOW, accept any names provided.
Args:
receptors: List of receptor names.
all_antennas: List of all Antenna objects.
Returns:
Sorted list of Antenna objects.
"""
normalised_receptors = [r.upper() for r in receptors]
indexed_by_label = {antenna.label.upper(): antenna for antenna in all_antennas}
indexed_by_id = {antenna.id: antenna for antenna in all_antennas}
mid_matches = [MID_PATTERN.match(receptor) for receptor in normalised_receptors]
# Check if all labels are MID dishes
if all(mid_matches):
# Mid CBF SDP ICD requires sorting by DishID. Sorting the string version of Dish IDs yields the same result.
return [indexed_by_label[name] for name in sorted(normalised_receptors)]
# Otherwise, we are dealing with LOW
ap_matches = [LOW_AP_PATTERN.match(receptor) for receptor in normalised_receptors]
# If all are APxxx.yy format, use station/substation_id in name for sorting
if all(ap_matches):
return parse_ap_ids(ap_matches, indexed_by_id)
# Otherwise, look up labels in the telmodel layout file
if all(name in indexed_by_label for name in normalised_receptors):
sorted_receptors = sorted(
normalised_receptors,
key=lambda name: int(indexed_by_label[name].id),
)
return [indexed_by_label[name] for name in sorted_receptors]
else:
unmatched_receptors = [
name for name in normalised_receptors if name not in indexed_by_label
]
raise ValueError(
f"Unable to find receptor(s) {unmatched_receptors} in the provided layout"
)
[docs]
class SKATelescopeManager(TelescopeManager):
"""
TelescopeManager class that combines information from an AssignResources
command and an Antenna Layout to assemble the list of antennas making up
a subarray.
"""
def __init__(
self,
antenna_layout: str | Sequence[dict],
assign_resources_command: ExecutionBlock | dict | str | None = None,
):
all_antennas = load_antennas(antenna_layout)
if isinstance(assign_resources_command, str):
assign_resources_command = load_json_resource(assign_resources_command)
if isinstance(assign_resources_command, ExecutionBlock):
assign_resources_command = assign_resources_command.model_dump()
if assign_resources_command is None:
antennas = sorted(all_antennas, key=lambda antenna: int(antenna.id))
else:
receptors_list = assign_resources_command["resources"]["receptors"]
antennas = sort_receptors_by_icd(receptors_list, all_antennas)
logger.info(
"Build SKATelescopeModel with %d antennas: %r",
len(antennas),
antennas,
)
# TODO (rtobar): For the time being we assume Low's baseline ordering,
# which is well defined in the CBF SDP ICD. Mid's order isn't set in
# stone, and might change to match Low's.
baselines = Baselines.generate(len(antennas), autocorr=True, lower_triangular=True)
super().__init__(antennas, baselines)
[docs]
@staticmethod
def from_sdp_config(
execution_block_id: str,
sdp_config: Config,
telmodel_key: str | None = None,
telmodel_source_uris: list[str] | None = None,
antenna_layout: str | Sequence[dict] | None = None,
):
"""
Construct a telescope manager for the given options, contacting the
SDP Configuration Database and using the SKA Telmodel package to obtain
all necessary information.
:param execution_block_id: The Execution Block ID for which a TM needs
to be created.
:param sdp_config_db: A client to the SDP Configuration DB.
:param telmodel_key: The key in the SKA Telmodel data where the antenna
layout can be read from. If not given, then `antenna_layout` must be
given.
:param telmodel_source_uris: An optional list of URIs used by the SKA
Telmodel package to read its data from.
:param antenna_layout: An antenna layout, either a a list of
dictionaries or as a URL/filename with a JSON representation of such
list. Used only if `telmodel_key` is *not* given.
"""
if not antenna_layout:
tmdata = TMData(telmodel_source_uris)
antenna_layout = tmdata.get(telmodel_key).get_dict()["receptors"]
for txn in sdp_config.txn():
execution_block = txn.execution_block.get(execution_block_id)
if not execution_block:
raise ValueError(f"No EB in SDP config with id='{execution_block_id}'")
return SKATelescopeManager(antenna_layout, execution_block)