Source code for ska_oso_scripting.functions.pdm_transforms.dish

"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import logging
from typing import Union

from astropy.coordinates import SkyCoord
from ska_oso_pdm.sb_definition import (
    CoordinateKind,
    DishAllocation,
    EquatorialCoordinates,
    GalacticCoordinates,
    ICRSCoordinates,
    PointingCorrection,
    PointingKind,
    SolarSystemObject,
    Target,
)
from ska_oso_pdm.sb_definition.csp.midcbf import MidCBFConfiguration
from ska_tmc_cdm.messages.central_node.common import (
    DishAllocation as cdm_DishAllocation,
)
from ska_tmc_cdm.messages.skydirection import GalacticField as cdm_GalacticField
from ska_tmc_cdm.messages.skydirection import ICRSField as cdm_ICRSField
from ska_tmc_cdm.messages.skydirection import SkyDirection as cdm_SkyDirection
from ska_tmc_cdm.messages.skydirection import SpecialField as cdm_SpecialField
from ska_tmc_cdm.messages.subarray_node.configure import (
    DishConfiguration as cdm_DishConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure import (
    PointingConfiguration as cdm_PointingConfiguration,
)
from ska_tmc_cdm.messages.subarray_node.configure.core import (
    PointingCorrection as cdm_PointingCorrection,
)
from ska_tmc_cdm.messages.subarray_node.configure.core import (
    ReceiverBand as cdm_ReceiverBand,
)
from ska_tmc_cdm.messages.subarray_node.configure.core import (
    SpecialTarget as cdm_SpecialTarget,
)
from ska_tmc_cdm.messages.subarray_node.configure.core import Target as cdm_Target
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    FixedTrajectory as cdm_FixedTrajectory,
)
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    Projection as cdm_Projection,
)
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    ProjectionAlignment,
)
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    ReceptorGroup as cdm_ReceptorGroup,
)
from ska_tmc_cdm.messages.subarray_node.configure.receptorgroup import (
    Trajectory as cdm_Trajectory,
)

from ska_oso_scripting import WORKAROUNDS

LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"

logging.basicConfig(level=logging.INFO, format=FORMAT)

# Not every function in this module should be called externally
__all__ = [
    "convert_pointingconfiguration",
    "create_dishconfiguration",
    "convert_dishallocation",
]


[docs] def convert_pointingconfiguration( target: Target, correction: PointingCorrection ) -> cdm_PointingConfiguration: """ Convert a PDM Target to the equivalent TMC configuration """ if not isinstance(target, Target): raise TypeError(f"Expected PDM Target, got {type(target)}") elif not isinstance(correction, PointingCorrection): raise TypeError(f"Expected PDM PointingCorrection, got {type(correction)}") # PI23: TMC is not ready to accept the SS-120 JSON, so omit it if set to # the default of MAINTAIN if correction == PointingCorrection.MAINTAIN: cdm_correction = None else: cdm_correction = cdm_PointingCorrection(correction.value) return cdm_PointingConfiguration( target=_create_target(target), groups=_create_receptorgroups(target), correction=cdm_correction, )
def _create_target(target: Target) -> Union[cdm_Target, cdm_SpecialTarget, None]: """ This creates the now deprecated CDM Target from a PDM Target """ ref_coord = target.reference_coordinate match ref_coord: case SolarSystemObject(): return cdm_SpecialTarget(target_name=target.reference_coordinate.name) case EquatorialCoordinates() | ICRSCoordinates(): if ref_coord is EquatorialCoordinates(): LOG.warning( "EquatorialCoordinates is deprecated. Please use ICRSCoordinate instead" ) coord: SkyCoord = target.reference_coordinate.to_sky_coord() raw_ra = coord.ra.value raw_dec = coord.dec.value radec_units = (coord.ra.unit.name, coord.dec.unit.name) frame = coord.frame.name name = target.target_id return cdm_Target( ra=raw_ra, dec=raw_dec, unit=radec_units, reference_frame=frame, target_name=name, ) case _: return None def _create_receptorgroups(target: Target) -> list[cdm_ReceptorGroup]: if WORKAROUNDS.disable_pointing_groups: return [] groups = [ cdm_ReceptorGroup( field=_create_field(target), trajectory=_create_trajectory(target), projection=_create_projection(target), ) ] return groups def _create_field(target: Target) -> cdm_SkyDirection: ref_coord = target.reference_coordinate name = target.target_id if target.radial_velocity: radial_velocity = target.radial_velocity.quantity.to("m / s").value else: radial_velocity = 0.0 match ref_coord: case SolarSystemObject(): field = cdm_SpecialField( target_name=ref_coord.name, ) case EquatorialCoordinates() | ICRSCoordinates(): if ref_coord.kind is CoordinateKind.EQUATORIAL: LOG.warning( "EquatorialCoordinates is deprecated. Please use ICRSCoordinate instead" ) coord: SkyCoord = ref_coord.to_sky_coord() attrs = cdm_ICRSField.Attrs( c1=coord.ra.value, c2=coord.dec.value, radial_velocity=radial_velocity, epoch=2000.0, ) field = cdm_ICRSField( target_name=name, attrs=attrs, ) case GalacticCoordinates(): optionals = dict( pm_c1=ref_coord.pm_l, pm_c2=ref_coord.pm_b, parallax=ref_coord.parallax, radial_velocity=radial_velocity, epoch=ref_coord.epoch, ) optionals = {k: v for k, v in optionals.items() if v not in (0.0, None)} attrs = cdm_GalacticField.Attrs(c1=ref_coord.l, c2=ref_coord.b, **optionals) field = cdm_GalacticField(target_name=name, attrs=attrs) case _: raise NotImplementedError(f"{type(ref_coord)} not currently supported") return field def _create_trajectory(target: Target) -> cdm_Trajectory: pointing_pattern = target.pointing_pattern.active pointing_pattern_params = next( p for p in target.pointing_pattern.parameters if p.kind is pointing_pattern ) match pointing_pattern: case PointingKind.SINGLE_POINT: attrs = cdm_FixedTrajectory.Attrs( x=pointing_pattern_params.offset_x_arcsec, y=pointing_pattern_params.offset_y_arcsec, ) trajectory = cdm_FixedTrajectory( attrs=attrs, ) case PointingKind.FIVE_POINT: attrs = cdm_FixedTrajectory.Attrs(x=0.0, y=0.0) trajectory = cdm_FixedTrajectory(attrs=attrs) case PointingKind.POINTED_MOSAIC: offsets = pointing_pattern_params.offsets[0] attrs = cdm_FixedTrajectory.Attrs(x=offsets.x, y=offsets.y) trajectory = cdm_FixedTrajectory(attrs=attrs) case _: raise NotImplementedError( f"{type(pointing_pattern)} not currently supported" ) return trajectory def _create_projection(target: Target) -> cdm_Projection: ref_coord = target.reference_coordinate if ref_coord.kind is not CoordinateKind.ALTAZ: projection = cdm_Projection(alignment=ProjectionAlignment.ICRS) else: projection = cdm_Projection(alignment=ProjectionAlignment.ALTAZ) return projection
[docs] def create_dishconfiguration( midcbf_configuration: MidCBFConfiguration, ) -> cdm_DishConfiguration: """ Convert a PDM Dish configuration to a CDM Dish Configuration """ if not isinstance(midcbf_configuration, MidCBFConfiguration): raise TypeError( f"Expected PDM MidCBFConfiguration, got {type(midcbf_configuration)}" ) pdm_frequency_band = midcbf_configuration.frequency_band return cdm_DishConfiguration( receiver_band=cdm_ReceiverBand(pdm_frequency_band.value) )
[docs] def convert_dishallocation(dish_allocation: DishAllocation) -> cdm_DishAllocation: """ Convert a PDM DishAllocation to the equivalent CDM DishAllocation. """ return cdm_DishAllocation(receptor_ids=dish_allocation.dish_ids)