Source code for realtime.receive.core.scan_utils

"""
Utility functions to create ScanType objects out of different JSON documents
used throughout SDP.
"""
from typing import List, Union

import numpy as np
from astropy import units as u
from astropy.coordinates import Angle

from realtime.receive.core.common import from_dict, load_json_resource
from realtime.receive.core.scan import (
    Beam,
    Channels,
    Field,
    PhaseDirection,
    Polarisations,
    ScanType,
    SpectralWindow,
    StokesType,
)


[docs] def parse_scantypes_from_assignres( assign_resources_command: Union[dict, str], ) -> List[ScanType]: """Parses scan types from a subarray AssignRes Tango command argument""" if isinstance(assign_resources_command, str): assign_resources_command = load_json_resource(assign_resources_command) # use backwards compatibility if specified by interface if ( "interface" in assign_resources_command and assign_resources_command["interface"] == "https://schema.skao.int/ska-sdp-assignres/0.3" ): return _parse_scantypes_from_execblock_03(assign_resources_command) else: return _parse_scantypes_from_execblock_04(assign_resources_command["execution_block"])
[docs] def parse_scantypes_from_execblock(execution_block: Union[dict, str]) -> List[ScanType]: """Parse scan types from an execution block (as a dict or raw json)""" if isinstance(execution_block, str): execution_block = load_json_resource(execution_block) # I'm not sure if EBs always declare a schema. If they don't, then we use # this (flaky) condition. if "fields" in execution_block: return _parse_scantypes_from_execblock_04(execution_block) else: return _parse_scantypes_from_execblock_03(execution_block)
def _parse_scantypes_from_execblock_04(execution_block: Union[dict, str]) -> List[ScanType]: """ Parse the ScanType objects from the given SDP Execution Block following the ska-sdp-assignres/0.4 schema at: https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-assignres.html#sdp-assign-resources-0-4 :param execution_block: The Execution Block """ polarisations = { polarisations["polarisations_id"]: Polarisations( polarisations["polarisations_id"], [StokesType[corr_type] for corr_type in polarisations["corr_type"]], ) for polarisations in execution_block["polarisations"] } fields = { field["field_id"]: Field( field["field_id"], PhaseDirection( ra=_extract_coordinate_adr49(field["phase_dir"], "ra"), dec=_extract_coordinate_adr49(field["phase_dir"], "dec"), reference_time=field["phase_dir"]["reference_time"], reference_frame=field["phase_dir"]["reference_frame"], ), pointing_fqdn=field.get("pointing_fqdn", None), ) for field in execution_block["fields"] } channels = { channel["channels_id"]: Channels( channel["channels_id"], [ from_dict(SpectralWindow, data=spectral_window) for spectral_window in channel["spectral_windows"] ], ) for channel in execution_block["channels"] } def combine_beams(*beams: dict) -> Beam: """Overrides a partial beam""" beam = {} for beam_overwrite in beams: beam.update(beam_overwrite) return Beam( beam["beam_id"], beam["function"], channels[beam["channels_id"]], polarisations[beam["polarisations_id"]], fields[beam["field_id"]], ) base_beams = {beam["beam_id"]: beam for beam in execution_block["beams"]} def get_base_beam(beam_id): """ Return the raw, potentially partial beam definition in the AssignResources command, or an empty dictionary if no such definition exists. """ return base_beams.get(beam_id, {}) base_scan_type_beam_overrides = { scan_type["scan_type_id"]: scan_type["beams"] for scan_type in execution_block["scan_types"] if scan_type["scan_type_id"].startswith(".") } def get_base_scan_type_beam_override(scan_type, beam_id): if "derive_from" not in scan_type: return {} return base_scan_type_beam_overrides[scan_type["derive_from"]].get(beam_id, {}) scan_types = [ ScanType( scan_type["scan_type_id"], [ combine_beams( get_base_beam(beam_id), get_base_scan_type_beam_override(scan_type, beam_id), beam_override, ) for beam_id, beam_override in scan_type["beams"].items() ], ) for scan_type in execution_block["scan_types"] if not scan_type["scan_type_id"].startswith(".") ] return scan_types def _parse_scantypes_from_execblock_03( execution_block: dict, ) -> List[ScanType]: """ Creates a ScanType from an SDP Execution Block following the ska-sdp-assignres/0.3 schema at: https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-assignres.html#sdp-assign-resources-0-3 ExecBlocks have not historically had a schema, but until AssignResources 0.3 they have shared most of the schema in practice, which is why we base our parsing of the ExecBlock on in this schema. :param execution_block: The Execution Block """ # schema 0.3 does not have seperately named: # beam_id # channel_id # field_id # spectral_window_name return [ ScanType( scan_type_id=scan_type["scan_type_id"], beams=[ Beam( "vis0", scan_type["scan_type_id"], channels=Channels( str(scan_type["scan_type_id"]), [ SpectralWindow( spectral_window_id=str(channel_id), count=channel["count"], start=channel["start"], stride=channel["stride"], freq_min=channel["freq_min"], freq_max=channel["freq_max"], ) for channel_id, channel in enumerate(scan_type["channels"]) ], ), polarisations=Polarisations( scan_type["scan_type_id"], [ StokesType.XX, StokesType.XY, StokesType.YX, StokesType.YY, ], ), field=Field( scan_type["scan_type_id"], # Phase centre of the observation in J2000/ICRS PhaseDirection( ra=_extract_coordinate_03(scan_type, "ra"), dec=_extract_coordinate_03(scan_type, "dec"), reference_time="", ), ), ) ], ) for scan_type in execution_block["scan_types"] ] def _extract_coordinate_adr49(phase_dir: dict, key: str) -> Angle: """ Extracts coordinates according to SKA ADR-49 symbols, units and sexagesimal representation. """ sg_units = u.hour if key in ("ra", "ha") else u.deg return ( Angle(phase_dir[key], u.deg) if key in phase_dir else Angle(phase_dir[f"{key}_str"], sg_units) ) def _extract_coordinate_03(phase_dir: dict, key: str) -> Angle: """ Deprecated coordinate extraction for ska-sdp-assignres/0.3 """ sg_units = u.hour if key in ("ra", "ha") else u.deg return Angle(np.array(phase_dir[key]).reshape(-1), sg_units)