"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import logging
from typing import Optional
import ska_oso_pdm.sb_definition.csp as pdm_csp
from ska_oso_pdm._shared.target import EquatorialCoordinates, HorizontalCoordinates
from ska_oso_pdm.sb_definition import MCCSAllocation, Target
from ska_oso_pdm.sb_definition.mccs.mccs_allocation import (
SubarrayBeamConfiguration as pdm_SubarrayBeamConfiguration,
)
from ska_tmc_cdm.messages.central_node.mccs import (
ApertureConfiguration as cdm_ApertureConfiguration,
)
from ska_tmc_cdm.messages.central_node.mccs import MCCSAllocate as cdm_MCCSAllocate
from ska_tmc_cdm.messages.central_node.mccs import (
SubArrayBeamsConfiguration as cdm_SubArrayBeamsConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure.mccs import (
MCCSConfiguration as cdm_MCCSConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure.mccs import (
SubarrayBeamAperatures as cdm_SubarrayBeamAperatures,
)
from ska_tmc_cdm.messages.subarray_node.configure.mccs import (
SubarrayBeamConfiguration as cdm_SubarrayBeamConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure.mccs import (
SubarrayBeamSkyCoordinates as cdm_SubarrayBeamSkyCoordinates,
)
from ska_oso_scripting.functions.pdm_transforms.csp import _calculate_logical_bands
CDM_MCC_SCHEMA_URI = "https://schema.skao.int/ska-low-mccs-controller-allocate/3.0"
LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
# Not every function in this module should be called externally
__all__ = [
"convert_mccs_allocation",
"convert_mccs_configuration",
"get_aperture_id",
"get_subarray_beam_ids",
"get_station_ids",
"get_allocation_apertures",
"get_sky_coordinates_based_on_reference_coordinates",
]
def get_aperture_id(station_id: int, substation_id: int) -> str:
return f"AP{station_id:03}.{substation_id:02}"
[docs]
def get_subarray_beam_ids(allocation: MCCSAllocation) -> list[int]:
"""
Get the subarray beam ids array from the MCC Allocation
"""
if not isinstance(allocation, MCCSAllocation):
raise TypeError(f"Expected PDM MCCSAllocation, got {type(allocation)}")
subarray_beam_ids = [beam.subarray_beam_id for beam in allocation.subarray_beams]
return subarray_beam_ids
[docs]
def get_station_ids(allocation: MCCSAllocation) -> list[list[int]]:
"""
Get the station ids from the MCC Allocation
"""
if not isinstance(allocation, MCCSAllocation):
raise TypeError(f"Expected PDM MCCSAllocation, got {type(allocation)}")
station_ids = [
[aperture.station_id for aperture in subarray_beam.apertures]
for subarray_beam in allocation.subarray_beams
]
return station_ids
[docs]
def get_allocation_apertures(
subarray_beam: cdm_SubArrayBeamsConfiguration,
) -> list[cdm_ApertureConfiguration]:
"""
Get the apertures from one of the subArray beam of MCC Allocation
"""
apertures = [
cdm_ApertureConfiguration(
station_id=aperture.station_id,
aperture_id=get_aperture_id(aperture.station_id, aperture.substation_id),
)
for aperture in subarray_beam.apertures
]
return apertures
[docs]
def convert_mccs_allocation(
allocation: MCCSAllocation,
) -> cdm_MCCSAllocate:
"""
Convert a PDM Low MCCSAllocation to a CDM MCCSAllocate instance.
"""
if not isinstance(allocation, MCCSAllocation):
raise TypeError(f"Expected PDM MCCSAllocation, got {type(allocation)}")
cdm_subarray_beams = [
cdm_SubArrayBeamsConfiguration(
subarray_beam_id=subarray_beam.subarray_beam_id,
number_of_channels=subarray_beam.number_of_channels,
apertures=get_allocation_apertures(subarray_beam),
)
for subarray_beam in allocation.subarray_beams
]
cdm_allocation = cdm_MCCSAllocate(
interface=CDM_MCC_SCHEMA_URI,
subarray_beams=cdm_subarray_beams,
)
return cdm_allocation
[docs]
def get_sky_coordinates_based_on_reference_coordinates(
target: Target,
) -> cdm_SubarrayBeamSkyCoordinates:
"""
Get the sky coordinates based on target reference_coordinates
target is an instance of HorizontalCoordinates", set reference_frame to "topocentric" and pass c1=az, c2=el
target is an instance of EquatorialCoordinates", set reference_frame to "ICRS" and pass c1=ra, c2=dec
"""
if not isinstance(target, Target):
raise TypeError(f"Expected PDM Target, got {type(target)}")
if isinstance(target.reference_coordinate, HorizontalCoordinates):
return cdm_SubarrayBeamSkyCoordinates(
reference_frame="horizontal",
c1=target.reference_coordinate.az,
c2=target.reference_coordinate.el,
)
if isinstance(target.reference_coordinate, EquatorialCoordinates):
return cdm_SubarrayBeamSkyCoordinates(
reference_frame="ICRS",
c1=target.reference_coordinate._coord.ra.degree,
c2=target.reference_coordinate._coord.dec.degree,
)
# Catch any unsupported reference_coordinate
raise TypeError(
f"Expected reference_coordinate, got {type(target.reference_coordinate)}"
)
def get_config_apertures(
subarray_beam: pdm_SubarrayBeamConfiguration,
) -> list[cdm_SubarrayBeamAperatures]:
"""
Get the apertures from one of the subArray beam of MCC Allocation
"""
apertures = [
cdm_SubarrayBeamAperatures(
weighting_key_ref=aperture.weighting_key,
aperture_id=get_aperture_id(aperture.station_id, aperture.substation_id),
)
for aperture in subarray_beam.apertures
]
return apertures
[docs]
def convert_mccs_configuration(
allocation: MCCSAllocation,
lowcbf: Optional[pdm_csp.LowCBFConfiguration],
target: Target,
) -> cdm_MCCSConfiguration:
"""
Convert PDM Low SB TargetBeamConfiguration list to a CDM MCCSConfiguration.
Other SB elements required are the Target list and
SubarrayBeamConfiguration list, which are referenced by the
TargetBeamConfigurations.
The MCCSAllocation is also needed for its list of station_ids.
:param mccs_allocation: The PDM MCCSAllocation
:param lowcbf: The low cfg configuration
:param targets: The PDM Target list
:return: the required CDM MCCSConfiguration
"""
mccs_configuration = cdm_MCCSConfiguration(
subarray_beam_configs=[
cdm_SubarrayBeamConfiguration(
update_rate=0.0,
logical_bands=_calculate_logical_bands(lowcbf),
apertures=get_config_apertures(subarray_beam),
sky_coordinates=get_sky_coordinates_based_on_reference_coordinates(
target
),
subarray_beam_id=subarray_beam.subarray_beam_id,
)
for subarray_beam in allocation.subarray_beams
],
)
return mccs_configuration