Source code for ska_oso_scripting.pdm_transforms.dish

"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import logging

import astropy.units as u
from astropy.coordinates import SkyCoord
from ska_oso_pdm._shared.target import TLECoordinates
from ska_oso_pdm.sb_definition import (
    AltAzCoordinates,
    DishAllocation,
    GalacticCoordinates,
    ICRSCoordinates,
    PointingCorrection,
    PointingKind,
    SpecialCoordinates,
    Target,
)
from ska_oso_pdm.sb_definition.csp.midcbf import MidCBFConfiguration, ReceiverBand
from ska_tmc_cdm.messages.central_node.common import (
    DishAllocation as cdm_DishAllocation,
)
from ska_tmc_cdm.messages.skydirection import GalacticField as cdm_GalacticField
from ska_tmc_cdm.messages.skydirection import ICRSField as cdm_ICRSField
from ska_tmc_cdm.messages.skydirection import SkyDirection as cdm_SkyDirection
from ska_tmc_cdm.messages.skydirection import SpecialField as cdm_SpecialField
from ska_tmc_cdm.messages.skydirection import TLEField as cdm_TLEField
from ska_tmc_cdm.messages.subarray_node.configure import (
    DishConfiguration as cdm_DishConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure import (
    PointingConfiguration as cdm_PointingConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure.core import (
    PointingCorrection as cdm_PointingCorrection,
)
from ska_tmc_cdm.messages.subarray_node.configure.core import (
    ReceiverBand as cdm_ReceiverBand,
)
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    FixedTrajectory as cdm_FixedTrajectory,
)
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    Projection as cdm_Projection,
)
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    ProjectionAlignment,
    ProjectionType,
)
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    PVTTrajectory as cdm_PVTTrajectory,
)
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    ReceptorGroup as cdm_ReceptorGroup,
)
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    Trajectory as cdm_Trajectory,
)

LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"

logging.basicConfig(level=logging.INFO, format=FORMAT)

# Not every function in this module should be called externally
__all__ = [
    "convert_pointingconfiguration",
    "create_dishconfiguration",
    "convert_dishallocation",
]


[docs] def convert_pointingconfiguration( target: Target, correction: PointingCorrection, dish_allocation: DishAllocation, wrap_sector: int | None = None, ) -> cdm_PointingConfiguration: """ Convert a PDM Target to the equivalent TMC configuration. """ if not isinstance(target, Target): raise TypeError(f"Expected PDM Target, got {type(target)}") elif not isinstance(correction, PointingCorrection): raise TypeError(f"Expected PDM PointingCorrection, got {type(correction)}") # PI23: TMC is not ready to accept the SS-120 JSON, so omit it if set to # the default of MAINTAIN if correction == PointingCorrection.MAINTAIN: cdm_correction = None else: cdm_correction = cdm_PointingCorrection(correction.value) return cdm_PointingConfiguration( groups=_create_receptorgroups( target, dish_allocation.dish_ids, dish_allocation.reference_dishes ), correction=cdm_correction, wrap_sector=wrap_sector, )
def _create_receptorgroups( target: Target, all_dishes: frozenset[str], reference_dishes: frozenset[str] | None = None, ) -> list[cdm_ReceptorGroup]: """ Create receptor groups for pointing configuration. For holography (when reference_dishes are present), creates two groups: - Group 1: Reference dishes staring at the field (no trajectory) - Group 2: Scanning dishes with PVT trajectory and SSN projection If all dishes are reference dishes, only creates one group. If no reference dishes, creates a single group with standard trajectory. :param target: PDM Target object :param all_dishes: List of all dish IDs in the allocation :param reference_dishes: Optional list of reference dish IDs for holography """ field = _create_field(target) reference_dishes = reference_dishes or frozenset() # Validate reference_dishes are subset of all_dishes. This should be validated already # in the SB, but it's cheap to confirm again. invalid_refs = reference_dishes - all_dishes if invalid_refs: raise ValueError( f"Reference dish{'es' if len(invalid_refs) > 1 else ''} " f"{','.join(sorted(invalid_refs))} not in dish allocation {','.join(sorted(all_dishes))}" ) groups = [] # No reference dishes - use standard single-group behavior if not reference_dishes: trajectory = _create_trajectory(target) projection = _create_projection(target, trajectory) groups.append( cdm_ReceptorGroup( field=field, trajectory=trajectory, projection=projection, ) ) return groups # Calculate scanning dishes scanning_dishes = [d for d in all_dishes if d not in reference_dishes] # Group 1: Reference dishes stare at the field (no trajectory/projection) groups.append( cdm_ReceptorGroup( receptors=sorted(reference_dishes), field=field, ) ) # Group 2: Scanning dishes with PVT trajectory and SSN projection (only if there are scanning dishes) if scanning_dishes: trajectory = _create_trajectory(target) projection = _create_projection(target, trajectory) groups.append( cdm_ReceptorGroup( receptors=sorted(scanning_dishes), field=field, trajectory=trajectory, projection=projection, ) ) return groups def _create_field(target: Target) -> cdm_SkyDirection: ref_coord = target.reference_coordinate name = target.target_id if target.radial_velocity: radial_velocity = target.radial_velocity.quantity.to("m / s").value else: radial_velocity = 0.0 match ref_coord: case SpecialCoordinates(): field = cdm_SpecialField( target_name=ref_coord.name, ) case TLECoordinates(): field = cdm_TLEField( target_name=target.name, attrs=cdm_TLEField.Attrs(line1=ref_coord.line1, line2=ref_coord.line2), ) case ICRSCoordinates(): coord: SkyCoord = ref_coord.to_sky_coord() attrs = cdm_ICRSField.Attrs( c1=coord.ra.value, c2=coord.dec.value, radial_velocity=radial_velocity, epoch=2000.0, ) field = cdm_ICRSField( target_name=name, attrs=attrs, ) case GalacticCoordinates(): optionals = { "pm_c1": ref_coord.pm_l, "pm_c2": ref_coord.pm_b, "parallax": ref_coord.parallax, "radial_velocity": radial_velocity, "epoch": ref_coord.epoch, } optionals = {k: v for k, v in optionals.items() if v not in (0.0, None)} attrs = cdm_GalacticField.Attrs(c1=ref_coord.l, c2=ref_coord.b, **optionals) field = cdm_GalacticField(target_name=name, attrs=attrs) case _: raise NotImplementedError(f"{type(ref_coord)} not currently supported") return field def _create_trajectory(target: Target) -> cdm_Trajectory: pointing_pattern = target.pointing_pattern.active pointing_pattern_params = next( p for p in target.pointing_pattern.parameters if p.kind is pointing_pattern ) match pointing_pattern: case PointingKind.SINGLE_POINT: attrs = cdm_FixedTrajectory.Attrs( x=pointing_pattern_params.offset_x_arcsec, y=pointing_pattern_params.offset_y_arcsec, ) trajectory = cdm_FixedTrajectory( attrs=attrs, ) case PointingKind.FIVE_POINT: attrs = cdm_FixedTrajectory.Attrs(x=0.0, y=0.0) trajectory = cdm_FixedTrajectory(attrs=attrs) case PointingKind.POINTED_MOSAIC: offsets = pointing_pattern_params.offsets[0] attrs = cdm_FixedTrajectory.Attrs(x=offsets.x, y=offsets.y) trajectory = cdm_FixedTrajectory(attrs=attrs) case PointingKind.PVT: attrs = cdm_PVTTrajectory.Attrs( time_offsets=pointing_pattern_params.time_offsets, x_offsets=[ q.to(u.arcsec).value for q in pointing_pattern_params.x_offsets ], y_offsets=[ q.to(u.arcsec).value for q in pointing_pattern_params.y_offsets ], x_velocities=[ q.to(u.arcsec / u.s).value for q in pointing_pattern_params.x_velocities ], y_velocities=[ q.to(u.arcsec / u.s).value for q in pointing_pattern_params.y_velocities ], slewing=pointing_pattern_params.slewing, ) trajectory = cdm_PVTTrajectory(attrs=attrs) case _: raise NotImplementedError( f"{type(pointing_pattern)} not currently supported" ) return trajectory def _create_projection(target: Target, trajectory: cdm_Trajectory) -> cdm_Projection: """ Create projection based on trajectory type and target reference frame. For PVTTrajectory (holography), use SSN projection with AltAz alignment. For five-point scans, use TAN projection with AltAz alignment so the initial configure matches the follow-on offset configures. For AltAz reference frame targets, use AltAz alignment. For other cases, use default ICRS projection. """ is_pvt_trajectory = isinstance(trajectory, cdm_PVTTrajectory) if is_pvt_trajectory: return cdm_Projection( name="SSN", alignment=ProjectionAlignment.ALTAZ, ) is_five_point_target = target.pointing_pattern.active == PointingKind.FIVE_POINT if is_five_point_target: return cdm_Projection( name=ProjectionType.TAN, alignment=ProjectionAlignment.ALTAZ, ) is_altaz_target = isinstance(target.reference_coordinate, AltAzCoordinates) if is_altaz_target: return cdm_Projection(alignment=ProjectionAlignment.ALTAZ) return cdm_Projection(alignment=ProjectionAlignment.ICRS)
[docs] def create_dishconfiguration( midcbf_configuration: MidCBFConfiguration, ) -> cdm_DishConfiguration: """ Convert a PDM Dish configuration to a CDM Dish Configuration """ if not isinstance(midcbf_configuration, MidCBFConfiguration): raise TypeError( f"Expected PDM MidCBFConfiguration, got {type(midcbf_configuration)}" ) pdm_frequency_band = midcbf_configuration.frequency_band band5_downconversion_subband = None if pdm_frequency_band == ReceiverBand.BAND_5B: band5_downconversion_subband = str(midcbf_configuration.band5b_subband.value) return cdm_DishConfiguration( receiver_band=cdm_ReceiverBand(pdm_frequency_band.value), band5_downconversion_subband=band5_downconversion_subband, )
[docs] def convert_dishallocation(dish_allocation: DishAllocation) -> cdm_DishAllocation: """ Convert a PDM DishAllocation to the equivalent CDM DishAllocation. """ return cdm_DishAllocation(receptor_ids=dish_allocation.dish_ids)