"""
Utility functions to create ScanType objects out of different JSON documents
used throughout SDP.
"""
from typing import List, Union
import numpy as np
from astropy import units as u
from astropy.coordinates import Angle
from realtime.receive.core.common import from_dict, load_json_resource
from realtime.receive.core.scan import (
Beam,
Channels,
Field,
PhaseDirection,
Polarisations,
ScanType,
SpectralWindow,
StokesType,
)
[docs]
def parse_scantypes_from_assignres(
assign_resources_command: Union[dict, str],
) -> List[ScanType]:
"""Parses scan types from a subarray AssignRes Tango command argument"""
if isinstance(assign_resources_command, str):
assign_resources_command = load_json_resource(assign_resources_command)
# use backwards compatibility if specified by interface
if (
"interface" in assign_resources_command
and assign_resources_command["interface"]
== "https://schema.skao.int/ska-sdp-assignres/0.3"
):
return _parse_scantypes_from_execblock_03(assign_resources_command)
else:
return _parse_scantypes_from_execblock_04(assign_resources_command["execution_block"])
[docs]
def parse_scantypes_from_execblock(execution_block: Union[dict, str]) -> List[ScanType]:
"""Parse scan types from an execution block (as a dict or raw json)"""
if isinstance(execution_block, str):
execution_block = load_json_resource(execution_block)
# I'm not sure if EBs always declare a schema. If they don't, then we use
# this (flaky) condition.
if "fields" in execution_block:
return _parse_scantypes_from_execblock_04(execution_block)
else:
return _parse_scantypes_from_execblock_03(execution_block)
def _parse_scantypes_from_execblock_04(execution_block: Union[dict, str]) -> List[ScanType]:
"""
Parse the ScanType objects from the given SDP Execution Block following the
ska-sdp-assignres/0.4 schema at:
https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-assignres.html#sdp-assign-resources-0-4
:param execution_block: The Execution Block
"""
polarisations = {
polarisations["polarisations_id"]: Polarisations(
polarisations["polarisations_id"],
[StokesType[corr_type] for corr_type in polarisations["corr_type"]],
)
for polarisations in execution_block["polarisations"]
}
fields = {
field["field_id"]: Field(
field["field_id"],
PhaseDirection(
ra=_extract_coordinate_adr49(field["phase_dir"], "ra"),
dec=_extract_coordinate_adr49(field["phase_dir"], "dec"),
reference_time=field["phase_dir"]["reference_time"],
reference_frame=field["phase_dir"]["reference_frame"],
),
pointing_fqdn=field.get("pointing_fqdn", None),
)
for field in execution_block["fields"]
}
channels = {
channel["channels_id"]: Channels(
channel["channels_id"],
[
from_dict(SpectralWindow, data=spectral_window)
for spectral_window in channel["spectral_windows"]
],
)
for channel in execution_block["channels"]
}
def combine_beams(*beams: dict) -> Beam:
"""Overrides a partial beam"""
beam = {}
for beam_overwrite in beams:
beam.update(beam_overwrite)
return Beam(
beam["beam_id"],
beam["function"],
channels[beam["channels_id"]],
polarisations[beam["polarisations_id"]],
fields[beam["field_id"]],
)
base_beams = {beam["beam_id"]: beam for beam in execution_block["beams"]}
def get_base_beam(beam_id):
"""
Return the raw, potentially partial beam definition in the
AssignResources command, or an empty dictionary if no such definition
exists.
"""
return base_beams.get(beam_id, {})
base_scan_type_beam_overrides = {
scan_type["scan_type_id"]: scan_type["beams"]
for scan_type in execution_block["scan_types"]
if scan_type["scan_type_id"].startswith(".")
}
def get_base_scan_type_beam_override(scan_type, beam_id):
if "derive_from" not in scan_type:
return {}
return base_scan_type_beam_overrides[scan_type["derive_from"]].get(beam_id, {})
scan_types = [
ScanType(
scan_type["scan_type_id"],
[
combine_beams(
get_base_beam(beam_id),
get_base_scan_type_beam_override(scan_type, beam_id),
beam_override,
)
for beam_id, beam_override in scan_type["beams"].items()
],
)
for scan_type in execution_block["scan_types"]
if not scan_type["scan_type_id"].startswith(".")
]
return scan_types
def _parse_scantypes_from_execblock_03(
execution_block: dict,
) -> List[ScanType]:
"""
Creates a ScanType from an SDP Execution Block following the
ska-sdp-assignres/0.3 schema at:
https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-assignres.html#sdp-assign-resources-0-3
ExecBlocks have not historically had a schema, but until AssignResources 0.3
they have shared most of the schema in practice, which is why we base our
parsing of the ExecBlock on in this schema.
:param execution_block: The Execution Block
"""
# schema 0.3 does not have seperately named:
# beam_id
# channel_id
# field_id
# spectral_window_name
return [
ScanType(
scan_type_id=scan_type["scan_type_id"],
beams=[
Beam(
"vis0",
scan_type["scan_type_id"],
channels=Channels(
str(scan_type["scan_type_id"]),
[
SpectralWindow(
spectral_window_id=str(channel_id),
count=channel["count"],
start=channel["start"],
stride=channel["stride"],
freq_min=channel["freq_min"],
freq_max=channel["freq_max"],
)
for channel_id, channel in enumerate(scan_type["channels"])
],
),
polarisations=Polarisations(
scan_type["scan_type_id"],
[
StokesType.XX,
StokesType.XY,
StokesType.YX,
StokesType.YY,
],
),
field=Field(
scan_type["scan_type_id"],
# Phase centre of the observation in J2000/ICRS
PhaseDirection(
ra=_extract_coordinate_03(scan_type, "ra"),
dec=_extract_coordinate_03(scan_type, "dec"),
reference_time="",
),
),
)
],
)
for scan_type in execution_block["scan_types"]
]
def _extract_coordinate_adr49(phase_dir: dict, key: str) -> Angle:
"""
Extracts coordinates according to SKA ADR-49 symbols, units
and sexagesimal representation.
"""
sg_units = u.hour if key in ("ra", "ha") else u.deg
return (
Angle(phase_dir[key], u.deg)
if key in phase_dir
else Angle(phase_dir[f"{key}_str"], sg_units)
)
def _extract_coordinate_03(phase_dir: dict, key: str) -> Angle:
"""
Deprecated coordinate extraction for ska-sdp-assignres/0.3
"""
sg_units = u.hour if key in ("ra", "ha") else u.deg
return Angle(np.array(phase_dir[key]).reshape(-1), sg_units)