Source code for ska_telmodel.pst.low_examples

"""
Examples specifically for SKALow
"""

import copy
from typing import Any, Callable, Dict

from .._common import split_interface_version
from .version import PST_CONFIG_VER2_4, PST_CONFIG_VER2_5

LOW_PST_CONFIGURE_SCAN_FT_2_5 = {
    "interface": "https://schema.skao.int/ska-pst-configure/2.5",
    "common": {
        "config_id": "sbi-mvp01-20240101-00001-flow-through",
        "subarray_id": 1,
        "eb_id": "eb-e111-20240101-87391",
        "frequency_band": "low",
    },
    "pst": {
        "scan": {
            "activation_time": "2024-01-01T22:55:55Z",
            "timing_beam_id": "1",
            "bits_per_sample": 32,
            "num_of_polarizations": 2,
            "udp_nsamp": 32,
            "wt_nsamp": 32,
            "udp_nchan": 24,
            "num_frequency_channels": 432,
            "centre_frequency": 200000000.0,
            "total_bandwidth": 1562500.0,
            "observation_mode": "FLOW_THROUGH",
            "observer_id": "jdoe",
            "project_id": "project1",
            "pointing_id": "pointing1",
            "source": "J1921+2153",
            "itrf": [5109360.133, 2006852.586, -3238948.127],
            "receiver_id": "receiver3",
            "feed_polarization": "CIRC",
            "feed_handedness": 1,
            "feed_angle": 1.234,
            "feed_tracking_mode": "FA",
            "feed_position_angle": 10.0,
            "oversampling_ratio": [4, 3],
            "coordinates": {
                "equinox": 2000.0,
                "ra": "19:21:44.815",
                "dec": "21:53:02.400",
            },
            "max_scan_length": 20000.0,
            "subint_duration": 30.0,
            "receptors": ["receptor1", "receptor2"],
            "receptor_weights": [0.4, 0.6],
            "num_rfi_frequency_masks": 0,
            "rfi_frequency_masks": [],
            "destination_address": ["192.168.178.26", 9021],
            "num_channelization_stages": 2,
            "channelization_stages": [
                {
                    "num_filter_taps": 1,
                    "filter_coefficients": [1.0],
                    "num_frequency_channels": 1024,
                    "oversampling_ratio": [32, 27],
                },
                {
                    "num_filter_taps": 1,
                    "filter_coefficients": [1.0],
                    "num_frequency_channels": 256,
                    "oversampling_ratio": [4, 3],
                },
            ],
            "ft": {
                "num_bits_out": 4,
                "channels": [0, 100],
                "polarizations": "Both",
                "requantisation_scale": 1.0,
                "requantisation_init_time": 1.0,
            },
        },
    },
}
LOW_PST_CONFIGURE_SCAN_DS_2_4 = {
    "interface": "https://schema.skao.int/ska-pst-configure/2.4",
    "common": {
        "config_id": "sbi-dynamic_spectrum",
        "subarray_id": 1,
        "eb_id": "eb-x449-20231105-34696",
        "frequency_band": "low",
    },
    "pst": {
        "scan": {
            "activation_time": "2022-01-19T23:07:45Z",
            "timing_beam_id": "1",
            "bits_per_sample": 32,
            "num_of_polarizations": 2,
            "udp_nsamp": 32,
            "wt_nsamp": 32,
            "udp_nchan": 24,
            "num_frequency_channels": 432,
            "centre_frequency": 100000000.0,
            "total_bandwidth": 1562500.0,
            "observation_mode": "DYNAMIC_SPECTRUM",
            "observer_id": "jdoe",
            "project_id": "project1",
            "pointing_id": "pointing1",
            "source": "J1921+2153",
            "itrf": [5109360.133, 2006852.586, -3238948.127],
            "receiver_id": "receiver3",
            "feed_polarization": "CIRC",
            "feed_handedness": 1,
            "feed_angle": 1.234,
            "feed_tracking_mode": "FA",
            "feed_position_angle": 10.0,
            "oversampling_ratio": [4, 3],
            "coordinates": {
                "equinox": 2000.0,
                "ra": "19:21:44.815",
                "dec": "21:53:02.400",
            },
            "max_scan_length": 13000.2,
            "subint_duration": 30.0,
            "receptors": ["receptorX, receptorY"],
            "receptor_weights": [0.4, 0.6],
            "num_rfi_frequency_masks": 0,
            "rfi_frequency_masks": [],
            "destination_address": ["192.168.178.26", 9021],
            "num_channelization_stages": 2,
            "channelization_stages": [
                {
                    "num_filter_taps": 1,
                    "filter_coefficients": [1.0],
                    "num_frequency_channels": 1024,
                    "oversampling_ratio": [32, 27],
                },
                {
                    "num_filter_taps": 1,
                    "filter_coefficients": [1.0],
                    "num_frequency_channels": 256,
                    "oversampling_ratio": [4, 3],
                },
            ],
            "ds": {
                "dispersion_measure": 100.0,
                "output_frequency_channels": 1,
                "stokes_parameters": "Q",
                "num_bits_out": 16,
                "time_decimation_factor": 10,
                "frequency_decimation_factor": 4,
                "requantisation_scale": 1.0,
                "requantisation_length": 1.0,
            },
        },
    },
}
LOW_PST_CONFIGURE_SCAN_PT_2_4 = {
    "interface": "https://schema.skao.int/ska-pst-configure/2.4",
    "common": {
        "config_id": "sbi-20240215-00001-pulsar_timing",
        "subarray_id": 1,
        "eb_id": "eb-x449-20231105-34696",
        "frequency_band": "low",
    },
    "pst": {
        "scan": {
            "activation_time": "2024-02-15T23:07:45Z",
            "timing_beam_id": "1",
            "bits_per_sample": 32,
            "num_of_polarizations": 2,
            "udp_nsamp": 32,
            "wt_nsamp": 32,
            "udp_nchan": 24,
            "num_frequency_channels": 432,
            "centre_frequency": 100000000.0,
            "total_bandwidth": 1562500.0,
            "observation_mode": "PULSAR_TIMING",
            "observer_id": "jdoe",
            "project_id": "project1",
            "pointing_id": "pointing1",
            "source": "J1921+2153",
            "itrf": [5109360.133, 2006852.586, -3238948.127],
            "receiver_id": "receiver3",
            "feed_polarization": "CIRC",
            "feed_handedness": 1,
            "feed_angle": 1.234,
            "feed_tracking_mode": "FA",
            "feed_position_angle": 10.0,
            "oversampling_ratio": [4, 3],
            "coordinates": {
                "equinox": 2000.0,
                "ra": "19:21:44.815",
                "dec": "21:53:02.400",
            },
            "max_scan_length": 10000.5,
            "subint_duration": 30.0,
            "receptors": ["receptorX", "receptorY"],
            "receptor_weights": [0.4, 0.6],
            "num_rfi_frequency_masks": 0,
            "rfi_frequency_masks": [],
            "destination_address": ["192.168.178.26", 9021],
            "num_channelization_stages": 2,
            "channelization_stages": [
                {
                    "num_filter_taps": 1,
                    "filter_coefficients": [1.0],
                    "num_frequency_channels": 1024,
                    "oversampling_ratio": [32, 27],
                },
                {
                    "num_filter_taps": 1,
                    "filter_coefficients": [1.0],
                    "num_frequency_channels": 256,
                    "oversampling_ratio": [4, 3],
                },
            ],
            "pt": {
                "dispersion_measure": 100.0,
                "rotation_measure": 0.0,
                "ephemeris": "",
                "pulsar_phase_predictor": "",
                "output_frequency_channels": 1,
                "output_phase_bins": 64,
                "num_sk_config": 1,
                "sk_config": [
                    {
                        "sk_range": [0.8, 0.9],
                        "sk_integration_limit": 100,
                        "sk_excision_limit": 25.0,
                    }
                ],
                "target_snr": 0.0,
            },
        },
    },
}
LOW_PST_CONFIGURE_SCAN_FT_2_4 = {
    "interface": "https://schema.skao.int/ska-pst-configure/2.4",
    "common": {
        "config_id": "sbi-mvp01-20240325-00001-flow_through",
        "subarray_id": 1,
        "eb_id": "eb-b521-20240325-0010",
        "frequency_band": "low",
    },
    "pst": {
        "scan": {
            "activation_time": "2024-03-25T22:01:11Z",
            "timing_beam_id": "1",
            "bits_per_sample": 32,
            "num_of_polarizations": 2,
            "udp_nsamp": 32,
            "wt_nsamp": 32,
            "udp_nchan": 24,
            "num_frequency_channels": 432,
            "centre_frequency": 100000000.0,
            "total_bandwidth": 1562500.0,
            "observation_mode": "FLOW_THROUGH",
            "observer_id": "jdoe",
            "project_id": "project1",
            "pointing_id": "pointing1",
            "source": "J1921+2153",
            "itrf": [5109360.133, 2006852.586, -3238948.127],
            "receiver_id": "receiver3",
            "feed_polarization": "CIRC",
            "feed_handedness": 1,
            "feed_angle": 1.234,
            "feed_tracking_mode": "FA",
            "feed_position_angle": 10.0,
            "oversampling_ratio": [4, 3],
            "coordinates": {
                "equinox": 2000.0,
                "ra": "19:21:44.815",
                "dec": "21:53:02.400",
            },
            "max_scan_length": 20000.0,
            "subint_duration": 30.0,
            "receptors": ["receptorZ", "receptorW"],
            "receptor_weights": [0.4, 0.6],
            "num_rfi_frequency_masks": 0,
            "rfi_frequency_masks": [],
            "destination_address": ["192.168.178.26", 9021],
            "num_channelization_stages": 2,
            "channelization_stages": [
                {
                    "num_filter_taps": 1,
                    "filter_coefficients": [1.0],
                    "num_frequency_channels": 1024,
                    "oversampling_ratio": [32, 27],
                },
                {
                    "num_filter_taps": 1,
                    "filter_coefficients": [1.0],
                    "num_frequency_channels": 256,
                    "oversampling_ratio": [4, 3],
                },
            ],
            "ft": {
                "num_bits_out": 32,
                "num_channels": 1,
                "channels": [1],
                "requantisation_scale": 1.0,
                "requantisation_length": 1.0,
            },
        },
    },
}
LOW_PST_CONFIGURE_SCAN_VR_2_4 = {
    "interface": "https://schema.skao.int/ska-pst-configure/2.4",
    "common": {
        "config_id": "sbi-mvp01-20240111-voltage_recorder",
        "subarray_id": 1,
        "eb_id": "eb-x321-20240111-10012",
        "frequency_band": "low",
    },
    "pst": {
        "scan": {
            "activation_time": "2024-01-11T23:11:17Z",
            "bits_per_sample": 32,
            "timing_beam_id": "1",
            "num_of_polarizations": 2,
            "udp_nsamp": 32,
            "wt_nsamp": 32,
            "udp_nchan": 24,
            "num_frequency_channels": 432,
            "centre_frequency": 100000000.0,
            "total_bandwidth": 1562500.0,
            "observation_mode": "VOLTAGE_RECORDER",
            "observer_id": "jdoe",
            "project_id": "project1",
            "pointing_id": "pointing1",
            "source": "J1921+2153",
            "itrf": [5109360.133, 2006852.586, -3238948.127],
            "receiver_id": "receiver3",
            "feed_polarization": "LIN",
            "feed_handedness": 1,
            "feed_angle": 1.234,
            "feed_tracking_mode": "FA",
            "feed_position_angle": 10.0,
            "oversampling_ratio": [4, 3],
            "coordinates": {
                "equinox": 2000.0,
                "ra": "19:21:44.815",
                "dec": "21:53:02.400",
            },
            "max_scan_length": 20000.0,
            "subint_duration": 30.0,
            "receptors": ["SKA001", "SKA036"],
            "receptor_weights": [0.4, 0.6],
            "num_channelization_stages": 2,
            "channelization_stages": [
                {
                    "num_filter_taps": 1,
                    "filter_coefficients": [1.0],
                    "num_frequency_channels": 1024,
                    "oversampling_ratio": [32, 27],
                },
                {
                    "num_filter_taps": 1,
                    "filter_coefficients": [1.0],
                    "num_frequency_channels": 256,
                    "oversampling_ratio": [4, 3],
                },
            ],
        },
    },
}


def _get_pt_example(*args: Any, **kwargs: Any) -> dict:
    return LOW_PST_CONFIGURE_SCAN_PT_2_4


def _get_vr_example(*args: Any, **kwargs: Any) -> dict:
    return LOW_PST_CONFIGURE_SCAN_VR_2_4


def _get_ft_example(version: str, *args: Any, **kwargs: Any) -> dict:
    (major, minor) = split_interface_version(version)

    if (major, minor) >= (2, 5):
        return LOW_PST_CONFIGURE_SCAN_FT_2_5

    return LOW_PST_CONFIGURE_SCAN_FT_2_4


def _get_ds_example(*args: Any, **kwargs: Any) -> dict:
    return LOW_PST_CONFIGURE_SCAN_DS_2_4


SCAN_TYPE_MAP: Dict[str, Callable[..., dict]] = {
    "pst_scan_vr": _get_vr_example,
    "pst_scan_pt": _get_pt_example,
    "pst_scan_ft": _get_ft_example,
    "pst_scan_ds": _get_ds_example,
}


[docs]def get_low_pst_config_example(version: str, scan_type: str = None) -> dict: """Generate examples for SKALow PST configuration strings :param version: Version URI of configuration format :param scan: Includes SDP receive addresses for a scan? `None` means that this is "template" configuration as passed to TMC. Valid parameters: pst_scan_vr, pst_scan_pt, pst_scan_ft, and pst_scan_ds """ effective_scan_type = scan_type if effective_scan_type and effective_scan_type.startswith("low_"): effective_scan_type = effective_scan_type[4:] (major_version, minor_version) = split_interface_version(version) if (major_version, minor_version) >= (2, 4): config = copy.deepcopy(SCAN_TYPE_MAP[effective_scan_type](version)) if (major_version, minor_version) == (2, 4): config["interface"] = PST_CONFIG_VER2_4 elif (major_version, minor_version) == (2, 5): config["interface"] = PST_CONFIG_VER2_5 return config raise ValueError( f"Could not generate example for schema {version} and {scan_type=}!" )