# -*- coding: utf-8 -*-
"""
ICD definitions and related functionality.
The SKA1 MID SDP - CSP Interface Control Document includes definitions,
including SPEAD Item descriptions, and the Items that should make up a SPEAD
heap. This module includes such definitions, plus related functionality for
their usage in python programs.
"""
import enum
import logging
from dataclasses import dataclass
import numpy as np
from astropy.time import Time
from realtime.receive.core.time_utils import unix_as_astropy, unix_to_mjd
logger = logging.getLogger(__name__)
[docs]
class Telescope(enum.Enum):
"""An enumeration of SKA telescopes, there's only two!"""
LOW = "low"
MID = "mid"
[docs]
class ItemID(enum.IntEnum):
"""
IDs of all Items used by both the Low and Mid ICDs
These are defined separately from the actual ItemDescriptions because at
least one ID is shared by both ICDs but with different ItemsDescriptions.
It's also useful to have IDs defined on their own.
"""
TIMESTAMP_COUNT = 0x6000
TIMESTAMP_FRACTION = 0x6001
CHANNEL_ID = 0x6002
CHANNEL_COUNT = 0x6003
POLARISATION_ID = 0x6004
BASELINE_COUNT = 0x6005
PHASE_BIN_ID = 0x6006
PHASE_BIN_COUNT = 0x6007
SCAN_ID = 0x6008
HARDWARE_ID = 0x6009
CORRELATOR_OUTPUT_DATA = 0x600A
STATION_BEAM_ID = 0x600B
SUBARRAY_ID = 0x600C
INTEGRATION_PERIOD = 0x600D
FREQUENCY_RESOLUTION = 0x600E
OUTPUT_RESOLUTION = 0x600F
CHANNEL_FREQUENCY = 0x6010
ZOOM_WINDOW_ID = 0x6011
CBF_FIRMWARE_VERSION = 0x6012
SPS_EPOCH = 0x6013
EPOCH_OFFSET = 0x6014
CBF_SOURCE_ID = 0x6015
[docs]
class ItemSendingContext(enum.Enum):
"""Enumeration of possible contexts when an Item can be sent"""
DATA_HEAP = enum.auto()
"""The item is sent on every data heap."""
SOS_HEAP = enum.auto()
"""The item is sent only on the start-of-stream heap."""
[docs]
@dataclass(frozen=True)
class ItemDescription:
"""
Static information about Items sent through SPEAD as defined on the ICDs.
Note that the item shape is not included here, as item 0x600A (correlator
output data) has a shape that is decided at runtime, while the others are
shapeless.
"""
id: int
"""The ID of this item."""
name: str
"""The name of this item."""
dtype: str | tuple
"""The numpy dtype of this item."""
sent_on: ItemSendingContext
"""When this item is sent (start-of-stream of data heap)."""
[docs]
def cast_value(self, value):
"""
Attempts to cast the given value(s) to the dtype for this
ItemDescription
"""
if np.ndim(value) == 0:
dtype = np.dtype(self.dtype)
return dtype.type(value)
elif isinstance(value, np.ndarray):
return value.astype(self.dtype)
else:
raise ValueError("Unknown type")
def _make_item_descriptions(*descriptions, default_sending_context):
def _with_sending_context(desc):
if len(desc) == 2:
return *desc, default_sending_context
assert len(desc) == 3, desc
return desc
return {
name: ItemDescription(ItemID[name], *_with_sending_context(desc))
for name, *desc in descriptions
}
[docs]
class ICD:
"""Base class for ICDs containing type annotations and docstrings."""
TELESCOPE: Telescope
"""The telescope this ICD applies to."""
Items: enum.Enum
"""Enumeration of all Items in this ICD."""
ITEM_IDS: frozenset[int]
"""The IDs of all Items of this ICD, made available for convenience."""
SENT_ON_SOS_HEAP: frozenset[ItemDescription]
"""
The Items that are sent on the start-of-stream heap, made available for
convenience.
"""
SENT_ON_DATA_HEAP: frozenset[ItemDescription]
"""
The Items that are sent on all data heaps, made available for convenience.
"""
@staticmethod
def _utility_sets(items_enum: enum.Enum):
all_ids = frozenset(item.value.id for item in items_enum)
sent_on_sos_heap = frozenset(
item.value for item in items_enum if item.value.sent_on == ItemSendingContext.SOS_HEAP
)
sent_on_data_heap = frozenset(item.value for item in items_enum) - sent_on_sos_heap
return all_ids, sent_on_sos_heap, sent_on_data_heap
[docs]
@classmethod
def corr_out_data_row_size(cls) -> int:
"""Size in bytes of each row in the CORRELATOR_OUTPUT_DATA Item."""
return np.dtype(list(cls.Items.CORRELATOR_OUTPUT_DATA.value.dtype)).itemsize
[docs]
@classmethod
def corr_out_data_size(cls, num_baselines: int, channels_per_stream: int = 1) -> int:
"""Size in bytes of a CORRELATOR_OUTPUT_DATA Item with the given dimensions."""
return cls.corr_out_data_row_size() * num_baselines * channels_per_stream
[docs]
@classmethod
def data_heap_size(cls, num_baselines: int, channels_per_stream: int = 1) -> int:
"""Size in bytes of a data heap's payload with the given dimensions."""
assert cls.Items.CORRELATOR_OUTPUT_DATA.value in cls.SENT_ON_DATA_HEAP
other_data_heap_items = cls.SENT_ON_DATA_HEAP - {cls.Items.CORRELATOR_OUTPUT_DATA.value}
return cls.corr_out_data_size(num_baselines, channels_per_stream) + sum(
np.dtype(item.dtype).itemsize for item in other_data_heap_items
)
[docs]
class LowICD(ICD):
"""The Low ICD."""
TELESCOPE: Telescope = Telescope.LOW
Items = enum.Enum(
"Items",
qualname="LowICD.Items",
names=_make_item_descriptions(
("CHANNEL_ID", "visibility_channel_id", "<u4"),
("BASELINE_COUNT", "visibility_baseline_count", "<u4"),
("SCAN_ID", "scan_id", "<u8"),
("HARDWARE_ID", "visibility_hardware_id", "<u4"),
("STATION_BEAM_ID", "station_beam_id", "<u2"),
("SUBARRAY_ID", "subarray_id", "<u2"),
("INTEGRATION_PERIOD", "visibility_integration_period", "<f4"),
("FREQUENCY_RESOLUTION", "visibility_frequency_resolution", "<f4"),
("OUTPUT_RESOLUTION", "visibility_output_resolution", "<u1"),
("CHANNEL_FREQUENCY", "visibility_channel_frequency", "<u4"),
("ZOOM_WINDOW_ID", "zoom_window_id", "<u1"),
("CBF_FIRMWARE_VERSION", "cbf_firmware_version", "<u4"),
("CBF_SOURCE_ID", "cbf_source_id", "|S1"),
("SPS_EPOCH", "visibility_sps_epoch", "<u4"),
(
"EPOCH_OFFSET",
"visibility_epoch_offset",
"<u8",
ItemSendingContext.DATA_HEAP,
),
(
"CORRELATOR_OUTPUT_DATA",
"correlator_output_data",
(("TCI", "i1"), ("FD", "u1"), ("VIS", "<c8", 4)),
ItemSendingContext.DATA_HEAP,
),
default_sending_context=ItemSendingContext.SOS_HEAP,
),
)
ITEM_IDS, SENT_ON_SOS_HEAP, SENT_ON_DATA_HEAP = ICD._utility_sets(Items)
assert len(SENT_ON_DATA_HEAP) == 2
# Signal Processing System (SPS) epochs are expressed as the number of
# seconds since 2000-01-01 00:00:00 TAI
_START_OF_2000_TAI_AS_UNIX_TS = 946684768
[docs]
@staticmethod
def unix_to_sps(unix_time: float):
"""
Convert a UNIX timestamp to a SPS timestamp (seconds since start of
2000)
"""
return unix_time - LowICD._START_OF_2000_TAI_AS_UNIX_TS
[docs]
@staticmethod
def sps_to_icd_epoch(sps_time):
"""
Take an SPS timestamp and generate an epoch suitable for transmission
as a LowICD.Items.SPS_EPOCH
"""
return LowICD.Items.SPS_EPOCH.value.cast_value(sps_time)
[docs]
@staticmethod
def calc_icd_offset(sps_time: float, icd_epoch):
"""
Calculate the offset of a SPS timestamp from the given SPS_EPOCH
(as generated by LowICD.sps_to_icd_epoch())
suitable for transmission as a LowICD.Items.SPS_OFFSET
"""
assert sps_time >= icd_epoch, "ICD cannot represent times prior to the epoch"
seconds_since_epoch = sps_time - icd_epoch
return LowICD.Items.EPOCH_OFFSET.value.cast_value(seconds_since_epoch * 1e9)
[docs]
@staticmethod
def icd_to_unix(sps_epoch, epoch_offset):
"""ICD Low timestamp -> fractional secs since UNIX epoch."""
return sps_epoch + epoch_offset / 1e9 + LowICD._START_OF_2000_TAI_AS_UNIX_TS
[docs]
class MidICD(ICD):
"""The Mid ICD."""
TELESCOPE: Telescope = Telescope.MID
Items = enum.Enum(
"Items",
qualname="MidICD.Items",
names=_make_item_descriptions(
("CHANNEL_ID", "visibility_channel_id", "<u4"),
("BASELINE_COUNT", "visibility_baseline_count", "<u4"),
("SCAN_ID", "scan_id", "<u8"),
("HARDWARE_ID", "visibility_hardware_id", "<u4"),
("TIMESTAMP_COUNT", "visibility_timestamp_count", "<u4"),
("TIMESTAMP_FRACTION", "visibility_timestamp_fraction", "<u4"),
("CHANNEL_COUNT", "visibility_channel_count", "<u4"),
("POLARISATION_ID", "visibility_polarisation_id", "<u4"),
("PHASE_BIN_ID", "visibility_phase_bin_id", "<u2"),
("PHASE_BIN_COUNT", "visibility_phase_bin_count", "<u2"),
(
"CORRELATOR_OUTPUT_DATA",
"correlator_output_data",
(
("TCI", "i1"),
("FD", "u1"),
("CCI", "i1"),
("VIS", "<c8", 4),
),
),
default_sending_context=ItemSendingContext.DATA_HEAP,
),
)
ITEM_IDS, SENT_ON_SOS_HEAP, SENT_ON_DATA_HEAP = ICD._utility_sets(Items)
assert len(SENT_ON_SOS_HEAP) == 0
[docs]
@staticmethod
def unix_to_icd(times):
"""Fractional secs since UNIX epoch -> Mid ICD timestamp."""
time_fractions, times = np.modf(times)
times = times.astype("<u4")
time_fractions = (time_fractions * 2**32).astype("<u4")
return times, time_fractions
[docs]
@staticmethod
def icd_to_unix(times, time_fractions):
"""Mid ICD timestamp -> fractional secs since UNIX epoch."""
return times + time_fractions / 2**32
[docs]
def icd_to_ms(vis):
"""
Move visiblity axes from ICD order to MS order. Both Low and Mid use the
same order, hence there's need for a single conversion routine.
"""
return np.moveaxis(vis, [0, 1, 2], [1, 0, 2])
[docs]
def ms_to_icd(vis):
"""
Move visiblity axes from MS order to ICD order. Both Low and Mid use the
same order, hence there's need for a single conversion routine.
"""
return np.moveaxis(vis, [0, 1, 2], [1, 0, 2])
[docs]
class Payload:
"""A payload as specified by the ICD."""
def __init__(self):
self._baseline_count = 0
self._channel_count = 0
self._channel_id = 0
self._correlated_data_fraction = []
self._hardware_id = 0
self._phase_bin_id = 0
self._phase_bin_count = 0
self._polarisation_id = 0
self._scan_id = 0
self._time_centroid_indices = []
self._timestamp: float = 0
self._cci = []
self._visibilities = []
@property
def baseline_count(self):
"""The number of baselines in this payload"""
return self._baseline_count
@baseline_count.setter
def baseline_count(self, baseline_count):
self._baseline_count = baseline_count
@property
def cci(self):
"""The channel centroid index (Mid only)"""
return self._cci
@cci.setter
def cci(self, cci):
self._cci = cci
@property
def channel_count(self):
"""The number of channels contained in this payload"""
return self._channel_count
@channel_count.setter
def channel_count(self, channel_count):
self._channel_count = channel_count
@property
def channel_id(self):
"""The ID of the first channel of this payload"""
return self._channel_id
@channel_id.setter
def channel_id(self, channel_id):
self._channel_id = channel_id
@property
def correlated_data_fraction(self):
"""The fraction of data on this payload that was correlated"""
return self._correlated_data_fraction
@correlated_data_fraction.setter
def correlated_data_fraction(self, correlated_data_fraction):
self._correlated_data_fraction = correlated_data_fraction
@property
def hardware_id(self):
"""The ID of the hardware source of this payload"""
return self._hardware_id
@hardware_id.setter
def hardware_id(self, hardware_id):
self._hardware_id = hardware_id
@property
def phase_bin_id(self):
"""The ID of the first phase bin of this payload"""
return self._phase_bin_id
@phase_bin_id.setter
def phase_bin_id(self, phase_bin_id):
self._phase_bin_id = phase_bin_id
@property
def phase_bin_count(self):
"""The number of phase bins of this payload"""
return self._phase_bin_count
@phase_bin_count.setter
def phase_bin_count(self, phase_bin_count):
self._phase_bin_count = phase_bin_count
@property
def polarisation_id(self):
"""The ID of the polarisation of this payload"""
return self._polarisation_id
@polarisation_id.setter
def polarisation_id(self, polarisation_id):
self._polarisation_id = polarisation_id
@property
def scan_id(self):
"""The ID of the scan of this payload"""
return self._scan_id
@scan_id.setter
def scan_id(self, scan_id):
self._scan_id = scan_id
@property
def time_centroid_indices(self):
"""The time centroids for each visibility of this payload"""
return self._time_centroid_indices
@time_centroid_indices.setter
def time_centroid_indices(self, time_centroids):
self._time_centroid_indices = time_centroids
@property
def timestamp(self) -> float:
"""The timestamp as fractional seconds since UNIX epoch"""
return self._timestamp
@timestamp.setter
def timestamp(self, timestamp):
self._timestamp = timestamp
@property
def visibilities(self):
"""The correlator visibilities of this payload"""
return self._visibilities
@visibilities.setter
def visibilities(self, visibilities):
self._visibilities = visibilities
@property
def mjd_time(self) -> float:
"""The timestamp of the payload in MJD seconds"""
return unix_to_mjd(self._timestamp)
@property
def astropy_time(self) -> Time:
"""The timestamp as an astropy Time object"""
return unix_as_astropy(self._timestamp)