"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import logging
from dataclasses import dataclass
from typing import Union
from ska_oso_pdm import (
AltAzCoordinates,
EquatorialCoordinates,
HorizontalCoordinates,
SpecialCoordinates,
)
from ska_oso_pdm.sb_definition import (
Beam,
DishAllocation,
EquatorialCoordinatesPST,
GalacticCoordinates,
ICRSCoordinates,
MCCSAllocation,
ScanDefinition,
SubArrayLOW,
Target,
)
from ska_tmc_cdm.messages.skydirection import (
AltAzField,
GalacticField,
ICRSField,
SkyDirection,
SolarSystemObject,
SpecialField,
)
from ska_tmc_cdm.messages.subarray_node.configure.tmc import (
TMCConfiguration as cdm_TMCConfiguration,
)
LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
# Not every function in this module should be called externally
__all__ = [
"convert_tmcconfiguration",
]
[docs]
def convert_tmcconfiguration(
scan_definition: ScanDefinition,
) -> cdm_TMCConfiguration:
"""
Convert a PDM ScanDefinition to the equivalent TMC configuration
"""
if isinstance(scan_definition, ScanDefinition):
return cdm_TMCConfiguration(scan_duration=scan_definition.scan_duration_ms)
raise TypeError(f"Expected PDM ScanDefinition, got {type(scan_definition)}")
def convert_target(target: Union[Target, Beam]) -> SkyDirection:
"""
Convert a PDM target or PSS/PST/VLBI beam into the equivalent CDM
SkyDirection.
@param target: the sky coordinate to convert
@raises NotImplementedError: if reference frame is not handled yet
"""
if isinstance(target, Beam):
return _convert_beam(target)
else:
return _convert_target(target)
def _convert_beam(beam: Beam) -> SkyDirection:
"""
Convert a PDM Beam into the appropriate CDM SkyDirection.
@param beam: the PSS/PST/VLBI beam to convert
"""
coord = beam.beam_coordinate
match coord:
case ICRSCoordinates() | EquatorialCoordinatesPST():
optionals = dict(
pm_c1=coord.pm_ra,
pm_c2=coord.pm_dec,
parallax=coord.parallax,
epoch=coord.epoch,
)
optionals = {k: v for k, v in optionals.items() if v not in (0.0, None)}
sky_coord = coord.to_sky_coord()
# this is hacky - but the best I can do to take into account the changes
if beam.beam_name:
target_name = beam.beam_name
else:
target_name = beam.beam_coordinate.target_id
return ICRSField(
target_name=target_name,
attrs=ICRSField.Attrs(
c1=sky_coord.ra.degree, c2=sky_coord.dec.degree, **optionals
),
)
case GalacticCoordinates():
optionals = dict(
pm_c1=coord.pm_l,
pm_c2=coord.pm_b,
parallax=coord.parallax,
epoch=coord.epoch,
)
optionals = {k: v for k, v in optionals.items() if v not in (0.0, None)}
return GalacticField(
target_name=beam.beam_name,
attrs=GalacticField.Attrs(c1=coord.l, c2=coord.b, **optionals),
)
case _:
raise NotImplementedError(
f"Reference frame {beam.beam_coordinate.reference_frame} not handled."
)
def _convert_target(target: Target) -> SkyDirection:
"""
Convert a PDM target into the appropriate CDM SkyDirection.
TODO:
* Handle Galactic and TLE reference frames as/when they are added to
the PDM.
* Handle proper motion
* Handle parallax
@param target: the sky coordinate to convert
@raises NotImplementedError: if reference frame is not handled yet
"""
match target.reference_coordinate:
case EquatorialCoordinates() | ICRSCoordinates():
kwargs = {}
if target.radial_velocity.quantity.value != 0.0:
in_si_units = target.radial_velocity.quantity.to("m/s").value
kwargs["radial_velocity"] = in_si_units
# delegate to Astropy for conversion of ra,dec to floating
# point degrees.
sky_coord = target.reference_coordinate.to_sky_coord()
return ICRSField(
target_name=target.target_id,
attrs=ICRSField.Attrs(
c1=sky_coord.ra.degree, c2=sky_coord.dec.degree, **kwargs
),
)
case AltAzCoordinates() | HorizontalCoordinates():
return AltAzField(
target_name=target.target_id,
attrs=AltAzField.Attrs(
# az/el should already be degree floats
c1=target.reference_coordinate.az,
c2=target.reference_coordinate.el,
),
)
case SpecialCoordinates():
return SpecialField(
target_name=SolarSystemObject(target.reference_coordinate.name)
)
case _:
raise NotImplementedError(
f"Reference frame {target.reference_frame} not handled."
)
def itf_station_name_to_id_mapping(station_id: int) -> str:
"""
Map station ID to station name.
:param value: the integer to map
:return: the string name of the station ID
"""
return f"itf{station_id}"
@dataclass
class ChunkSpec:
first_id: int
last_id: int
first_cluster: Union[int, None]
last_cluster: Union[int, None]
labels: str
def label_from_id(rid):
chunks = [
ChunkSpec(1, 224, None, None, "C"),
ChunkSpec(225, 296, 1, 4, "ENS"),
ChunkSpec(297, 386, 5, 9, "ESN"),
ChunkSpec(387, 512, 10, 16, "ESN"),
]
for chunk_range in chunks:
if chunk_range.first_id <= rid <= chunk_range.last_id:
if not chunk_range.first_cluster:
# This range is not in clusters. Just prepend the label to the ID
return f"{chunk_range.labels[0]}{rid}"
# This range is in clusters.
clusters = chunk_range.last_cluster - chunk_range.first_cluster + 1
offset = rid - chunk_range.first_id
block, clst_n = divmod(offset // 6, clusters)
return f"{chunk_range.labels[block]}{chunk_range.first_cluster+clst_n}-{offset % 6 + 1}"
raise ValueError(f"id {rid} did not match any valid chunk specifier")
def create_resources(allocation: Union[MCCSAllocation, DishAllocation]) -> list[str]:
"""
Create resources based on the type of allocation provided.
:param allocation: the MCCSAllocation or DishAllocation to use in resource creation
"""
match allocation:
case DishAllocation():
resources = allocation.dish_ids
case MCCSAllocation():
station_ids = [
aperture.station_id
for beam in allocation.subarray_beams
for aperture in beam.apertures
]
if allocation.selected_subarray_definition is SubArrayLOW.LOW_ITF:
resources = [itf_station_name_to_id_mapping(sid) for sid in station_ids]
else:
resources = [label_from_id(rid) for rid in station_ids]
case _:
raise TypeError("Unsupported allocation type provided.")
return sorted(resources)