"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import logging
from collections import Counter
from dataclasses import dataclass
from itertools import zip_longest
from typing import Union
import astropy.units as u
from astropy.coordinates import SkyCoord
from ska_oso_pdm import (
AltAzCoordinates,
GalacticCoordinates,
ICRSCoordinates,
SBDefinition,
SpecialCoordinates,
TelescopeType,
)
from ska_oso_pdm._shared.target import (
TLECoordinates, # TODO: The PDM should not force us to do a deep import like this.
)
from ska_oso_pdm.sb_definition import (
Beam,
CSPConfiguration,
DishAllocation,
FivePointParameters,
MCCSAllocation,
PointedMosaicParameters,
PointingKind,
PointingPatternParameters,
PVTTable,
ScanDefinition,
SinglePointParameters,
SubArrayLOW,
Target,
)
from ska_oso_pdm.sb_definition.mccs.mccs_allocation import SubarrayBeamConfiguration
from ska_tmc_cdm.messages.skydirection import (
AltAzField,
GalacticField,
ICRSField,
SkyDirection,
SolarSystemObject,
SpecialField,
TLEField,
)
from ska_tmc_cdm.messages.subarray_node.configure.tmc import (
TMCConfiguration as cdm_TMCConfiguration,
)
LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
# Not every function in this module should be called externally
__all__ = [
"convert_tmcconfiguration",
]
SUPPORTED_PATTERNS = (
PointingKind.SINGLE_POINT,
PointingKind.FIVE_POINT,
PointingKind.POINTED_MOSAIC,
PointingKind.PVT,
)
decimal_place = 8
def csp_configuration_for_subarray_beam_scan(
subarray_beam: SubarrayBeamConfiguration, scan_number: int, sbd: SBDefinition
) -> CSPConfiguration:
"""
Lookup function to resolve the CSP Configuration required for the given scan
of the given subarray beam
"""
csp_configuration_ref = subarray_beam.scan_sequence[
scan_number
].csp_configuration_ref
try:
return next(
csp_configuration
for csp_configuration in sbd.csp_configurations
if csp_configuration.config_id == csp_configuration_ref
)
except StopIteration:
raise KeyError(
f"CSP Configuration {csp_configuration_ref} defined in scan {scan_number} for subarray beam {subarray_beam.subarray_beam_id} is not present in the SBDefinition."
)
def target_for_subarray_beam_scan(
subarray_beam: SubarrayBeamConfiguration, scan_number: int, sbd: SBDefinition
) -> Target:
"""
Lookup function to resolve the Target required for the given scan
of the given subarray beam
"""
target_ref = subarray_beam.scan_sequence[scan_number].target_ref
try:
return next(target for target in sbd.targets if target.target_id == target_ref)
except StopIteration:
raise KeyError(
f"Target {target_ref} defined in scan {scan_number} for subarray beam {subarray_beam.subarray_beam_id} is not present in the SBDefinition."
)
def convert_target(
target: Union[Target, Beam], apply_pattern_offsets: bool = False
) -> SkyDirection:
"""
Convert a PDM target or PSS/PST/VLBI beam into the equivalent CDM
SkyDirection.
@param target: the sky coordinate to convert
@param apply_pattern_offsets: flag for converting the Target to a CDM SkyDirection for the AssignResource SDP payload.
Pattern offsets are applied if applicable
@raises NotImplementedError: if reference frame is not handled yet
"""
# Tied array beams by their nature cannot have radial velocities applied
if isinstance(target, Beam):
coord = target.beam_coordinate
target_name = target.beam_name
else:
coord = target.reference_coordinate
target_name = target.name
# Handle TLE coordinates separately - they don't have a sky coordinate
if isinstance(coord, TLECoordinates):
return TLEField(
target_name=target_name,
attrs=TLEField.Attrs(line1=coord.line1, line2=coord.line2),
)
sky_coord = _target_to_skycoord(target, apply_pattern_offsets=apply_pattern_offsets)
match coord:
case ICRSCoordinates():
proper_motions = dict(
pm_c1=round(
coord.to_sky_coord().pm_ra_cosdec.to("arcsec / year").value,
decimal_place,
),
pm_c2=round(
coord.to_sky_coord().pm_dec.to("arcsec / year").value, decimal_place
),
)
optionals = _create_and_filter_optional_kwargs_dict(
target, **proper_motions
)
return ICRSField(
target_name=target_name,
attrs=ICRSField.Attrs(
c1=sky_coord.ra.degree, c2=sky_coord.dec.degree, **optionals
),
)
case GalacticCoordinates():
proper_motions = dict(
pm_c1=round(
coord.to_sky_coord().pm_l_cosb.to("arcsec/year").value,
decimal_place,
),
pm_c2=round(
coord.to_sky_coord().pm_b.to("arcsec/year").value, decimal_place
),
)
optionals = _create_and_filter_optional_kwargs_dict(
target, **proper_motions
)
return GalacticField(
target_name=target_name,
attrs=GalacticField.Attrs(
c1=sky_coord.l.value,
c2=sky_coord.b.value,
**optionals,
),
)
case AltAzCoordinates():
return AltAzField(
target_name=target_name,
attrs=AltAzField.Attrs(c1=sky_coord.az.value, c2=sky_coord.alt.value),
)
case SpecialCoordinates():
return SpecialField(
target_name=SolarSystemObject(target.reference_coordinate.name)
)
case _:
raise NotImplementedError(f"Reference frame {coord.kind} not handled.")
def _create_and_filter_optional_kwargs_dict(
target: Union[Target, Beam], **kwargs
) -> dict:
optional_keys = ["parallax", "epoch"]
if isinstance(target, Beam):
coord = target.beam_coordinate
radial_velocity = 0.0
else:
coord = target.reference_coordinate
radial_velocity = target.radial_velocity.quantity.to("m/s").value
optionals = {k: v for k, v in coord.__dict__.items() if k in optional_keys}
optionals = {**optionals, **kwargs, "radial_velocity": radial_velocity}
optionals = {k: v for k, v in optionals.items() if v not in (0.0, None)}
return optionals
def _target_to_skycoord(
target: Union[Target, Beam], apply_pattern_offsets: bool = False
) -> SkyCoord:
"""
Creates a skycoord object from a PDM Target or Beam and applies the pattern offset of the target if applicable
"""
if isinstance(target, Beam):
sky_coord = target.beam_coordinate.to_sky_coord()
elif not apply_pattern_offsets:
sky_coord = target.reference_coordinate.to_sky_coord()
else:
sky_coord = _apply_pattern_to_coord(target)
return sky_coord
def _apply_pattern_to_coord(target: Target) -> SkyCoord:
"""
Get target phase centre transformed to match target pointing pattern.
:param target: Target to process
:raises NotImplementedError: if coordinate type or pattern is unsupported
"""
coord = target.reference_coordinate.to_sky_coord()
pattern_parameters = _pattern_parameters_for(target)
match pattern_parameters:
case SinglePointParameters():
return coord.spherical_offsets_by(
pattern_parameters.offset_x_arcsec * u.arcsec,
pattern_parameters.offset_y_arcsec * u.arcsec,
)
case FivePointParameters() | PVTTable():
return coord
case PointedMosaicParameters():
offset = pattern_parameters.offsets[0]
return coord.spherical_offsets_by(
offset.x * u.arcsec,
offset.y * u.arcsec,
)
case _:
raise NotImplementedError(
f"Unhandled pointing pattern parameters for SDP: {pattern_parameters.kind}"
)
def _pattern_parameters_for(target: Target) -> PointingPatternParameters:
"""
Get the active pointing pattern parameters for a target.
:raises NotImplementedError: if the active pointing pattern is not a
supported type.
"""
active_pattern = target.pointing_pattern.active
# We can only convert single pointing targets for SDP, raise an exception for anything else
if active_pattern not in SUPPORTED_PATTERNS:
raise NotImplementedError(
f"Unhandled pointing pattern parameters for SDP: {active_pattern}"
)
# PDM guarantees one pattern params per type so should be safe to create a dict
# mapping kind to instance
pointing_params_by_type = {p.kind: p for p in target.pointing_pattern.parameters}
pattern_parameters = pointing_params_by_type[active_pattern]
return pattern_parameters
[docs]
def convert_tmcconfiguration(
scan_definition: ScanDefinition,
) -> cdm_TMCConfiguration:
"""
Convert a PDM ScanDefinition to the equivalent TMC configuration
"""
if isinstance(scan_definition, ScanDefinition):
return cdm_TMCConfiguration(scan_duration=scan_definition.scan_duration_ms)
raise TypeError(f"Expected PDM ScanDefinition, got {type(scan_definition)}")
def itf_station_name_to_id_mapping(station_id: int) -> str:
"""
Map station ID to station name.
:param station_id: the integer to map
:return: the string name of the station ID
"""
return f"itf{station_id}"
@dataclass
class ChunkSpec:
first_id: int
last_id: int
first_cluster: Union[int, None]
last_cluster: Union[int, None]
labels: str
def label_from_id(rid):
chunks = [
ChunkSpec(1, 224, None, None, "C"),
ChunkSpec(225, 296, 1, 4, "ENS"),
ChunkSpec(297, 386, 5, 9, "ESN"),
ChunkSpec(387, 512, 10, 16, "ESN"),
]
for chunk_range in chunks:
if chunk_range.first_id <= rid <= chunk_range.last_id:
if not chunk_range.first_cluster:
# This range is not in clusters. Just prepend the label to the ID
return f"{chunk_range.labels[0]}{rid}"
# This range is in clusters.
clusters = chunk_range.last_cluster - chunk_range.first_cluster + 1
offset = rid - chunk_range.first_id
block, clst_n = divmod(offset // 6, clusters)
return f"{chunk_range.labels[block]}{chunk_range.first_cluster+clst_n}-{offset % 6 + 1}"
raise ValueError(f"id {rid} did not match any valid chunk specifier")
def max_number_of_pst_beams(sbd: SBDefinition) -> int:
# For each scan slot, sum PST beams across all
# concurrently active subarray beams,
# then return the peak count needed at any one time.
targets_by_id = {t.target_id: t for t in sbd.targets}
def _pst_beam_count(target: Target) -> int:
if target.tied_array_beams and target.tied_array_beams.pst_beams:
return len(target.tied_array_beams.pst_beams)
return 0
match sbd.telescope:
case TelescopeType.SKA_LOW:
return max(
(
sum(
_pst_beam_count(targets_by_id[scan_def.target_ref])
for scan_def in scan_slot
if scan_def is not None
)
for scan_slot in zip_longest(
*(b.scan_sequence for b in sbd.mccs_allocation.subarray_beams)
)
),
default=0,
)
case TelescopeType.SKA_MID:
return max(
(
_pst_beam_count(targets_by_id[scan_def.target_ref])
for scan_def in sbd.dish_allocations.scan_sequence
),
default=0,
)
def get_station_id_counts(
subarray_beam: SubarrayBeamConfiguration,
) -> dict[int, int]:
"""
Returns a count of how many times the same station_id appears in the subarray beam,
which can be used to know if a full station is being used (count=1) or substations
are (count>1)
"""
return dict(Counter(aperture.station_id for aperture in subarray_beam.apertures))
def get_aperture_id(
station_id: int,
substation_id: int,
station_id_counts: dict[int, int] | None = None,
) -> str:
"""Returns a string with the APXXX.YY format for the aperture ID"""
if station_id_counts and station_id_counts.get(station_id, 0) > 1:
return f"AP{station_id:03}.{substation_id:02}"
return f"AP{station_id:03}.00"
def get_aperture_ids(mccs_allocation: MCCSAllocation) -> list[str]:
"""
Returns a list in the format ["AP350.00", "AP344.01", "AP344.02", "AP352.00"]
The `.00` suffix means the full station is being used.
"""
# The stations must be the same for each subarray beam, so we can just get the stations from the first
subarray_beam = mccs_allocation.subarray_beams[0]
if mccs_allocation.selected_subarray_definition is SubArrayLOW.LOW_ITF:
return [
itf_station_name_to_id_mapping(station_id)
for station_id in [
aperture.station_id for aperture in subarray_beam.apertures
]
]
station_id_counts = get_station_id_counts(subarray_beam)
return [
get_aperture_id(
aperture.station_id,
aperture.substation_id,
station_id_counts,
)
for aperture in subarray_beam.apertures
]
def get_station_labels(mccs_allocation: MCCSAllocation) -> list[str]:
"""
Returns the Low station labels in the format ["S7-6", "S8-5"].
If the mccs_allocation contains a station more than once due to substations being used, those will be deduplicated.
"""
# The stations must be the same for each subarray beam, so we can just get the stations from the first
station_ids = [
aperture.station_id for aperture in mccs_allocation.subarray_beams[0].apertures
]
if mccs_allocation.selected_subarray_definition is SubArrayLOW.LOW_ITF:
resources = [itf_station_name_to_id_mapping(sid) for sid in station_ids]
else:
resources = [label_from_id(rid) for rid in station_ids]
return sorted(list(set(resources)))
def get_dish_ids(dish_allocation: DishAllocation) -> list[str]:
"""
Returns a sorted list of the Mid Dish IDs
"""
return sorted(dish_allocation.dish_ids)