Source code for realtime.receive.core.icd

# -*- coding: utf-8 -*-
"""
ICD definitions and related functionality.

The SKA1 MID SDP - CSP Interface Control Document includes definitions,
including SPEAD Item descriptions, and the Items that should make up a SPEAD
heap. This module includes such definitions, plus related functionality for
their usage in python programs.
"""
import enum
import logging
from dataclasses import dataclass

import numpy as np
from astropy.time import Time

from realtime.receive.core.time_utils import unix_as_astropy, unix_to_mjd

logger = logging.getLogger(__name__)


[docs] class Telescope(enum.Enum): """An enumeration of SKA telescopes, there's only two!""" LOW = "low" MID = "mid"
[docs] class ItemID(enum.IntEnum): """ IDs of all Items used by both the Low and Mid ICDs These are defined separately from the actual ItemDescriptions because at least one ID is shared by both ICDs but with different ItemsDescriptions. It's also useful to have IDs defined on their own. """ TIMESTAMP_COUNT = 0x6000 TIMESTAMP_FRACTION = 0x6001 CHANNEL_ID = 0x6002 CHANNEL_COUNT = 0x6003 POLARISATION_ID = 0x6004 BASELINE_COUNT = 0x6005 PHASE_BIN_ID = 0x6006 PHASE_BIN_COUNT = 0x6007 SCAN_ID = 0x6008 HARDWARE_ID = 0x6009 CORRELATOR_OUTPUT_DATA = 0x600A STATION_BEAM_ID = 0x600B SUBARRAY_ID = 0x600C INTEGRATION_PERIOD = 0x600D FREQUENCY_RESOLUTION = 0x600E OUTPUT_RESOLUTION = 0x600F CHANNEL_FREQUENCY = 0x6010 ZOOM_WINDOW_ID = 0x6011 CBF_FIRMWARE_VERSION = 0x6012 SPS_EPOCH = 0x6013 EPOCH_OFFSET = 0x6014 CBF_SOURCE_ID = 0x6015
[docs] class ItemSendingContext(enum.Enum): """Enumeration of possible contexts when an Item can be sent""" DATA_HEAP = enum.auto() """The item is sent on every data heap.""" SOS_HEAP = enum.auto() """The item is sent only on the start-of-stream heap."""
[docs] @dataclass(frozen=True) class ItemDescription: """ Static information about Items sent through SPEAD as defined on the ICDs. Note that the item shape is not included here, as item 0x600A (correlator output data) has a shape that is decided at runtime, while the others are shapeless. """ id: int """The ID of this item.""" name: str """The name of this item.""" dtype: str | tuple """The numpy dtype of this item.""" sent_on: ItemSendingContext """When this item is sent (start-of-stream of data heap)."""
[docs] def cast_value(self, value): """ Attempts to cast the given value(s) to the dtype for this ItemDescription """ if np.ndim(value) == 0: dtype = np.dtype(self.dtype) return dtype.type(value) elif isinstance(value, np.ndarray): return value.astype(self.dtype) else: raise ValueError("Unknown type")
def _make_item_descriptions(*descriptions, default_sending_context): def _with_sending_context(desc): if len(desc) == 2: return *desc, default_sending_context assert len(desc) == 3, desc return desc return { name: ItemDescription(ItemID[name], *_with_sending_context(desc)) for name, *desc in descriptions }
[docs] class ICD: """Base class for ICDs containing type annotations and docstrings.""" TELESCOPE: Telescope """The telescope this ICD applies to.""" Items: enum.Enum """Enumeration of all Items in this ICD.""" ITEM_IDS: frozenset[int] """The IDs of all Items of this ICD, made available for convenience.""" SENT_ON_SOS_HEAP: frozenset[ItemDescription] """ The Items that are sent on the start-of-stream heap, made available for convenience. """ SENT_ON_DATA_HEAP: frozenset[ItemDescription] """ The Items that are sent on all data heaps, made available for convenience. """ @staticmethod def _utility_sets(items_enum: enum.Enum): all_ids = frozenset(item.value.id for item in items_enum) sent_on_sos_heap = frozenset( item.value for item in items_enum if item.value.sent_on == ItemSendingContext.SOS_HEAP ) sent_on_data_heap = frozenset(item.value for item in items_enum) - sent_on_sos_heap return all_ids, sent_on_sos_heap, sent_on_data_heap
[docs] @classmethod def corr_out_data_row_size(cls) -> int: """Size in bytes of each row in the CORRELATOR_OUTPUT_DATA Item.""" return np.dtype(list(cls.Items.CORRELATOR_OUTPUT_DATA.value.dtype)).itemsize
[docs] @classmethod def corr_out_data_size(cls, num_baselines: int, channels_per_stream: int = 1) -> int: """Size in bytes of a CORRELATOR_OUTPUT_DATA Item with the given dimensions.""" return cls.corr_out_data_row_size() * num_baselines * channels_per_stream
[docs] @classmethod def data_heap_size(cls, num_baselines: int, channels_per_stream: int = 1) -> int: """Size in bytes of a data heap's payload with the given dimensions.""" assert cls.Items.CORRELATOR_OUTPUT_DATA.value in cls.SENT_ON_DATA_HEAP other_data_heap_items = cls.SENT_ON_DATA_HEAP - {cls.Items.CORRELATOR_OUTPUT_DATA.value} return cls.corr_out_data_size(num_baselines, channels_per_stream) + sum( np.dtype(item.dtype).itemsize for item in other_data_heap_items )
[docs] class LowICD(ICD): """The Low ICD.""" TELESCOPE: Telescope = Telescope.LOW Items = enum.Enum( "Items", qualname="LowICD.Items", names=_make_item_descriptions( ("CHANNEL_ID", "visibility_channel_id", "<u4"), ("BASELINE_COUNT", "visibility_baseline_count", "<u4"), ("SCAN_ID", "scan_id", "<u8"), ("HARDWARE_ID", "visibility_hardware_id", "<u4"), ("STATION_BEAM_ID", "station_beam_id", "<u2"), ("SUBARRAY_ID", "subarray_id", "<u2"), ("INTEGRATION_PERIOD", "visibility_integration_period", "<f4"), ("FREQUENCY_RESOLUTION", "visibility_frequency_resolution", "<f4"), ("OUTPUT_RESOLUTION", "visibility_output_resolution", "<u1"), ("CHANNEL_FREQUENCY", "visibility_channel_frequency", "<u4"), ("ZOOM_WINDOW_ID", "zoom_window_id", "<u1"), ("CBF_FIRMWARE_VERSION", "cbf_firmware_version", "<u4"), ("CBF_SOURCE_ID", "cbf_source_id", "|S1"), ("SPS_EPOCH", "visibility_sps_epoch", "<u4"), ( "EPOCH_OFFSET", "visibility_epoch_offset", "<u8", ItemSendingContext.DATA_HEAP, ), ( "CORRELATOR_OUTPUT_DATA", "correlator_output_data", (("TCI", "i1"), ("FD", "u1"), ("VIS", "<c8", 4)), ItemSendingContext.DATA_HEAP, ), default_sending_context=ItemSendingContext.SOS_HEAP, ), ) ITEM_IDS, SENT_ON_SOS_HEAP, SENT_ON_DATA_HEAP = ICD._utility_sets(Items) assert len(SENT_ON_DATA_HEAP) == 2 # Signal Processing System (SPS) epochs are expressed as the number of # seconds since 2000-01-01 00:00:00 TAI _START_OF_2000_TAI_AS_UNIX_TS = 946684768
[docs] @staticmethod def unix_to_sps(unix_time: float): """ Convert a UNIX timestamp to a SPS timestamp (seconds since start of 2000) """ return unix_time - LowICD._START_OF_2000_TAI_AS_UNIX_TS
[docs] @staticmethod def sps_to_icd_epoch(sps_time): """ Take an SPS timestamp and generate an epoch suitable for transmission as a LowICD.Items.SPS_EPOCH """ return LowICD.Items.SPS_EPOCH.value.cast_value(sps_time)
[docs] @staticmethod def calc_icd_offset(sps_time: float, icd_epoch): """ Calculate the offset of a SPS timestamp from the given SPS_EPOCH (as generated by LowICD.sps_to_icd_epoch()) suitable for transmission as a LowICD.Items.SPS_OFFSET """ assert sps_time >= icd_epoch, "ICD cannot represent times prior to the epoch" seconds_since_epoch = sps_time - icd_epoch return LowICD.Items.EPOCH_OFFSET.value.cast_value(seconds_since_epoch * 1e9)
[docs] @staticmethod def icd_to_unix(sps_epoch, epoch_offset): """ICD Low timestamp -> fractional secs since UNIX epoch.""" return sps_epoch + epoch_offset / 1e9 + LowICD._START_OF_2000_TAI_AS_UNIX_TS
[docs] class MidICD(ICD): """The Mid ICD.""" TELESCOPE: Telescope = Telescope.MID Items = enum.Enum( "Items", qualname="MidICD.Items", names=_make_item_descriptions( ("CHANNEL_ID", "visibility_channel_id", "<u4"), ("BASELINE_COUNT", "visibility_baseline_count", "<u4"), ("SCAN_ID", "scan_id", "<u8"), ("HARDWARE_ID", "visibility_hardware_id", "<u4"), ("TIMESTAMP_COUNT", "visibility_timestamp_count", "<u4"), ("TIMESTAMP_FRACTION", "visibility_timestamp_fraction", "<u4"), ("CHANNEL_COUNT", "visibility_channel_count", "<u4"), ("POLARISATION_ID", "visibility_polarisation_id", "<u4"), ("PHASE_BIN_ID", "visibility_phase_bin_id", "<u2"), ("PHASE_BIN_COUNT", "visibility_phase_bin_count", "<u2"), ( "CORRELATOR_OUTPUT_DATA", "correlator_output_data", ( ("TCI", "i1"), ("FD", "u1"), ("CCI", "i1"), ("VIS", "<c8", 4), ), ), default_sending_context=ItemSendingContext.DATA_HEAP, ), ) ITEM_IDS, SENT_ON_SOS_HEAP, SENT_ON_DATA_HEAP = ICD._utility_sets(Items) assert len(SENT_ON_SOS_HEAP) == 0
[docs] @staticmethod def unix_to_icd(times): """Fractional secs since UNIX epoch -> Mid ICD timestamp.""" time_fractions, times = np.modf(times) times = times.astype("<u4") time_fractions = (time_fractions * 2**32).astype("<u4") return times, time_fractions
[docs] @staticmethod def icd_to_unix(times, time_fractions): """Mid ICD timestamp -> fractional secs since UNIX epoch.""" return times + time_fractions / 2**32
[docs] def icd_to_ms(vis): """ Move visiblity axes from ICD order to MS order. Both Low and Mid use the same order, hence there's need for a single conversion routine. """ return np.moveaxis(vis, [0, 1, 2], [1, 0, 2])
[docs] def ms_to_icd(vis): """ Move visiblity axes from MS order to ICD order. Both Low and Mid use the same order, hence there's need for a single conversion routine. """ return np.moveaxis(vis, [0, 1, 2], [1, 0, 2])
[docs] class Payload: """A payload as specified by the ICD.""" def __init__(self): self._baseline_count = 0 self._channel_count = 0 self._channel_id = 0 self._correlated_data_fraction = [] self._hardware_id = 0 self._phase_bin_id = 0 self._phase_bin_count = 0 self._polarisation_id = 0 self._scan_id = 0 self._time_centroid_indices = [] self._timestamp: float = 0 self._cci = [] self._visibilities = [] @property def baseline_count(self): """The number of baselines in this payload""" return self._baseline_count @baseline_count.setter def baseline_count(self, baseline_count): self._baseline_count = baseline_count @property def cci(self): """The channel centroid index (Mid only)""" return self._cci @cci.setter def cci(self, cci): self._cci = cci @property def channel_count(self): """The number of channels contained in this payload""" return self._channel_count @channel_count.setter def channel_count(self, channel_count): self._channel_count = channel_count @property def channel_id(self): """The ID of the first channel of this payload""" return self._channel_id @channel_id.setter def channel_id(self, channel_id): self._channel_id = channel_id @property def correlated_data_fraction(self): """The fraction of data on this payload that was correlated""" return self._correlated_data_fraction @correlated_data_fraction.setter def correlated_data_fraction(self, correlated_data_fraction): self._correlated_data_fraction = correlated_data_fraction @property def hardware_id(self): """The ID of the hardware source of this payload""" return self._hardware_id @hardware_id.setter def hardware_id(self, hardware_id): self._hardware_id = hardware_id @property def phase_bin_id(self): """The ID of the first phase bin of this payload""" return self._phase_bin_id @phase_bin_id.setter def phase_bin_id(self, phase_bin_id): self._phase_bin_id = phase_bin_id @property def phase_bin_count(self): """The number of phase bins of this payload""" return self._phase_bin_count @phase_bin_count.setter def phase_bin_count(self, phase_bin_count): self._phase_bin_count = phase_bin_count @property def polarisation_id(self): """The ID of the polarisation of this payload""" return self._polarisation_id @polarisation_id.setter def polarisation_id(self, polarisation_id): self._polarisation_id = polarisation_id @property def scan_id(self): """The ID of the scan of this payload""" return self._scan_id @scan_id.setter def scan_id(self, scan_id): self._scan_id = scan_id @property def time_centroid_indices(self): """The time centroids for each visibility of this payload""" return self._time_centroid_indices @time_centroid_indices.setter def time_centroid_indices(self, time_centroids): self._time_centroid_indices = time_centroids @property def timestamp(self) -> float: """The timestamp as fractional seconds since UNIX epoch""" return self._timestamp @timestamp.setter def timestamp(self, timestamp): self._timestamp = timestamp @property def visibilities(self): """The correlator visibilities of this payload""" return self._visibilities @visibilities.setter def visibilities(self, visibilities): self._visibilities = visibilities @property def mjd_time(self) -> float: """The timestamp of the payload in MJD seconds""" return unix_to_mjd(self._timestamp) @property def astropy_time(self) -> Time: """The timestamp as an astropy Time object""" return unix_as_astropy(self._timestamp)