"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import logging
import ska_tmc_cdm.messages.subarray_node.configure.csp as cdm
from astropy.coordinates import SkyCoord
from ska_oso_pdm._shared.target import EquatorialCoordinates, SolarSystemObject
from ska_oso_pdm.sb_definition import DishAllocation as pdm_DishAllocation
from ska_oso_pdm.sb_definition import DishConfiguration as pdm_DishConfiguration
from ska_oso_pdm.sb_definition import PointingCorrection as pdm_PointingCorrection
from ska_oso_pdm.sb_definition import Target
from ska_oso_pdm.sb_definition.dish.dish_configuration import (
ReceiverBand as pdm_ReceiverBand,
)
from ska_tmc_cdm.messages.central_node.common import (
DishAllocation as cdm_DishAllocation,
)
from ska_tmc_cdm.messages.subarray_node.configure import (
DishConfiguration as cdm_DishConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure import (
PointingConfiguration as cdm_PointingConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure.core import (
PointingCorrection as cdm_PointingCorrection,
)
from ska_tmc_cdm.messages.subarray_node.configure.core import (
ReceiverBand as cdm_ReceiverBand,
)
from ska_tmc_cdm.messages.subarray_node.configure.core import (
SpecialTarget as cdm_SpecialTarget,
)
from ska_tmc_cdm.messages.subarray_node.configure.core import Target as cdm_Target
LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
# Not every function in this module should be called externally
__all__ = [
"convert_pointingconfiguration",
"convert_dishconfiguration",
"convert_dishallocation",
]
def convert_frequency_band(
pdm_val: pdm_ReceiverBand,
) -> cdm.core.ReceiverBand:
"""
Converts a PDM receiver band to a CDM receiver band.
:param pdm_val: PDM receiver band to convert
:return: CDM receiver band
"""
return cdm.core.ReceiverBand[pdm_val.name]
[docs]
def convert_pointingconfiguration(
target: Target, correction: pdm_PointingCorrection
) -> cdm_PointingConfiguration:
"""
Convert a PDM Target to the equivalent TMC configuration
"""
if not isinstance(target, Target):
raise TypeError(f"Expected PDM Target, got {type(target)}")
if isinstance(target.reference_coordinate, SolarSystemObject):
return cdm_PointingConfiguration(
target=cdm_SpecialTarget(
target_name=target.reference_coordinate.name,
),
correction=cdm_PointingCorrection(correction.value),
)
if not isinstance(target.reference_coordinate, EquatorialCoordinates):
raise NotImplementedError(
f"Expected PDM EquatorialCoordinate, got {type(target.reference_coordinate)}"
)
coord: SkyCoord = target.reference_coordinate._coord
raw_ra = coord.ra.value
raw_dec = coord.dec.value
radec_units = (coord.ra.unit.name, coord.dec.unit.name)
frame = coord.frame.name
name = target.target_id
# PI23: TMC is not ready to accept the SS-120 JSON, so omit it if set to
# the default of MAINTAIN
if correction == pdm_PointingCorrection.MAINTAIN:
cdm_correction = None
else:
cdm_correction = cdm_PointingCorrection(correction.value)
return cdm_PointingConfiguration(
target=cdm_Target(
ra=raw_ra,
dec=raw_dec,
unit=radec_units,
reference_frame=frame,
target_name=name,
),
correction=cdm_correction,
)
[docs]
def convert_dishconfiguration(
dish_configuration: pdm_DishConfiguration,
) -> cdm_DishConfiguration:
"""
Convert a PDM Dish configuration to a CDM Dish Configuration
"""
if not isinstance(dish_configuration, pdm_DishConfiguration):
raise TypeError(
f"Expected PDM DishConfiguration, got {type(dish_configuration)}"
)
pdm_rx = dish_configuration.receiver_band
return cdm_DishConfiguration(receiver_band=cdm_ReceiverBand(pdm_rx.value))
[docs]
def convert_dishallocation(dish_allocation: pdm_DishAllocation) -> cdm_DishAllocation:
"""
Convert a PDM DishAllocation to the equivalent CDM DishAllocation.
"""
return cdm_DishAllocation(receptor_ids=dish_allocation.receptor_ids)