Source code for ska_oso_scripting.functions.pdm_transforms.common

"""
The pdm_transforms module contains code to transform Project Data Model (PDM)
entities to Configuration Data Model (CDM) entities. The pdm_transforms code
is called by observing scripts to convert the PDM Scheduling Block to the
equivalent CDM configurations, which are then sent to TMC devices to control
the telescope.
"""
import logging
from dataclasses import dataclass
from typing import Union

from ska_oso_pdm import (
    AltAzCoordinates,
    EquatorialCoordinates,
    HorizontalCoordinates,
    SpecialCoordinates,
)
from ska_oso_pdm.sb_definition import (
    Beam,
    DishAllocation,
    EquatorialCoordinatesPST,
    GalacticCoordinates,
    ICRSCoordinates,
    MCCSAllocation,
    ScanDefinition,
    SubArrayLOW,
    Target,
)
from ska_tmc_cdm.messages.skydirection import (
    AltAzField,
    GalacticField,
    ICRSField,
    SkyDirection,
    SolarSystemObject,
    SpecialField,
)
from ska_tmc_cdm.messages.subarray_node.configure.tmc import (
    TMCConfiguration as cdm_TMCConfiguration,
)

LOG = logging.getLogger(__name__)
FORMAT = "%(asctime)-15s %(message)s"

logging.basicConfig(level=logging.INFO, format=FORMAT)

# Not every function in this module should be called externally
__all__ = [
    "convert_tmcconfiguration",
]


[docs] def convert_tmcconfiguration( scan_definition: ScanDefinition, ) -> cdm_TMCConfiguration: """ Convert a PDM ScanDefinition to the equivalent TMC configuration """ if isinstance(scan_definition, ScanDefinition): return cdm_TMCConfiguration(scan_duration=scan_definition.scan_duration_ms) raise TypeError(f"Expected PDM ScanDefinition, got {type(scan_definition)}")
def convert_target(target: Union[Target, Beam]) -> SkyDirection: """ Convert a PDM target or PSS/PST/VLBI beam into the equivalent CDM SkyDirection. @param target: the sky coordinate to convert @raises NotImplementedError: if reference frame is not handled yet """ if isinstance(target, Beam): return _convert_beam(target) else: return _convert_target(target) def _convert_beam(beam: Beam) -> SkyDirection: """ Convert a PDM Beam into the appropriate CDM SkyDirection. @param beam: the PSS/PST/VLBI beam to convert """ coord = beam.beam_coordinate match coord: case ICRSCoordinates() | EquatorialCoordinatesPST(): optionals = dict( pm_c1=coord.pm_ra, pm_c2=coord.pm_dec, parallax=coord.parallax, epoch=coord.epoch, ) optionals = {k: v for k, v in optionals.items() if v not in (0.0, None)} sky_coord = coord.to_sky_coord() # this is hacky - but the best I can do to take into account the changes if beam.beam_name: target_name = beam.beam_name else: target_name = beam.beam_coordinate.target_id return ICRSField( target_name=target_name, attrs=ICRSField.Attrs( c1=sky_coord.ra.degree, c2=sky_coord.dec.degree, **optionals ), ) case GalacticCoordinates(): optionals = dict( pm_c1=coord.pm_l, pm_c2=coord.pm_b, parallax=coord.parallax, epoch=coord.epoch, ) optionals = {k: v for k, v in optionals.items() if v not in (0.0, None)} return GalacticField( target_name=beam.beam_name, attrs=GalacticField.Attrs(c1=coord.l, c2=coord.b, **optionals), ) case _: raise NotImplementedError( f"Reference frame {beam.beam_coordinate.reference_frame} not handled." ) def _convert_target(target: Target) -> SkyDirection: """ Convert a PDM target into the appropriate CDM SkyDirection. TODO: * Handle Galactic and TLE reference frames as/when they are added to the PDM. * Handle proper motion * Handle parallax @param target: the sky coordinate to convert @raises NotImplementedError: if reference frame is not handled yet """ match target.reference_coordinate: case EquatorialCoordinates() | ICRSCoordinates(): kwargs = {} if target.radial_velocity.quantity.value != 0.0: in_si_units = target.radial_velocity.quantity.to("m/s").value kwargs["radial_velocity"] = in_si_units # delegate to Astropy for conversion of ra,dec to floating # point degrees. sky_coord = target.reference_coordinate.to_sky_coord() return ICRSField( target_name=target.target_id, attrs=ICRSField.Attrs( c1=sky_coord.ra.degree, c2=sky_coord.dec.degree, **kwargs ), ) case AltAzCoordinates() | HorizontalCoordinates(): return AltAzField( target_name=target.target_id, attrs=AltAzField.Attrs( # az/el should already be degree floats c1=target.reference_coordinate.az, c2=target.reference_coordinate.el, ), ) case SpecialCoordinates(): return SpecialField( target_name=SolarSystemObject(target.reference_coordinate.name) ) case _: raise NotImplementedError( f"Reference frame {target.reference_frame} not handled." ) def itf_station_name_to_id_mapping(station_id: int) -> str: """ Map station ID to station name. :param value: the integer to map :return: the string name of the station ID """ return f"itf{station_id}" @dataclass class ChunkSpec: first_id: int last_id: int first_cluster: Union[int, None] last_cluster: Union[int, None] labels: str def label_from_id(rid): chunks = [ ChunkSpec(1, 224, None, None, "C"), ChunkSpec(225, 296, 1, 4, "ENS"), ChunkSpec(297, 386, 5, 9, "ESN"), ChunkSpec(387, 512, 10, 16, "ESN"), ] for chunk_range in chunks: if chunk_range.first_id <= rid <= chunk_range.last_id: if not chunk_range.first_cluster: # This range is not in clusters. Just prepend the label to the ID return f"{chunk_range.labels[0]}{rid}" # This range is in clusters. clusters = chunk_range.last_cluster - chunk_range.first_cluster + 1 offset = rid - chunk_range.first_id block, clst_n = divmod(offset // 6, clusters) return f"{chunk_range.labels[block]}{chunk_range.first_cluster+clst_n}-{offset % 6 + 1}" raise ValueError(f"id {rid} did not match any valid chunk specifier") def create_resources(allocation: Union[MCCSAllocation, DishAllocation]) -> list[str]: """ Create resources based on the type of allocation provided. :param allocation: the MCCSAllocation or DishAllocation to use in resource creation """ match allocation: case DishAllocation(): resources = allocation.dish_ids case MCCSAllocation(): station_ids = [ aperture.station_id for beam in allocation.subarray_beams for aperture in beam.apertures ] if allocation.selected_subarray_definition is SubArrayLOW.LOW_ITF: resources = [itf_station_name_to_id_mapping(sid) for sid in station_ids] else: resources = [label_from_id(rid) for rid in station_ids] case _: raise TypeError("Unsupported allocation type provided.") return sorted(resources)