API documentation

ICD

ICD definitions and related functionality.

The SKA1 MID SDP - CSP Interface Control Document includes definitions, including SPEAD Item descriptions, and the Items that should make up a SPEAD heap. This module includes such definitions, plus related functionality for their usage in python programs.

class realtime.receive.core.icd.ICD[source]

Base class for ICDs containing type annotations and docstrings.

ITEM_IDS: frozenset[int]

The IDs of all Items of this ICD, made available for convenience.

Items: Enum

Enumeration of all Items in this ICD.

SENT_ON_DATA_HEAP: frozenset[ItemDescription]

The Items that are sent on all data heaps, made available for convenience.

SENT_ON_SOS_HEAP: frozenset[ItemDescription]

The Items that are sent on the start-of-stream heap, made available for convenience.

TELESCOPE: Telescope

The telescope this ICD applies to.

classmethod corr_out_data_row_size() int[source]

Size in bytes of each row in the CORRELATOR_OUTPUT_DATA Item.

classmethod corr_out_data_size(num_baselines: int, channels_per_stream: int = 1) int[source]

Size in bytes of a CORRELATOR_OUTPUT_DATA Item with the given dimensions.

classmethod data_heap_size(num_baselines: int, channels_per_stream: int = 1) int[source]

Size in bytes of a data heap’s payload with the given dimensions.

class realtime.receive.core.icd.ItemDescription(id: int, name: str, dtype: str | tuple, sent_on: ItemSendingContext)[source]

Static information about Items sent through SPEAD as defined on the ICDs.

Note that the item shape is not included here, as item 0x600A (correlator output data) has a shape that is decided at runtime, while the others are shapeless.

cast_value(value)[source]

Attempts to cast the given value(s) to the dtype for this ItemDescription

dtype: str | tuple

The numpy dtype of this item.

id: int

The ID of this item.

name: str

The name of this item.

sent_on: ItemSendingContext

When this item is sent (start-of-stream of data heap).

enum realtime.receive.core.icd.ItemID(value)[source]

IDs of all Items used by both the Low and Mid ICDs

These are defined separately from the actual ItemDescriptions because at least one ID is shared by both ICDs but with different ItemsDescriptions. It’s also useful to have IDs defined on their own.

Member Type:

int

Valid values are as follows:

TIMESTAMP_COUNT = <ItemID.TIMESTAMP_COUNT: 24576>
TIMESTAMP_FRACTION = <ItemID.TIMESTAMP_FRACTION: 24577>
CHANNEL_ID = <ItemID.CHANNEL_ID: 24578>
CHANNEL_COUNT = <ItemID.CHANNEL_COUNT: 24579>
POLARISATION_ID = <ItemID.POLARISATION_ID: 24580>
BASELINE_COUNT = <ItemID.BASELINE_COUNT: 24581>
PHASE_BIN_ID = <ItemID.PHASE_BIN_ID: 24582>
PHASE_BIN_COUNT = <ItemID.PHASE_BIN_COUNT: 24583>
SCAN_ID = <ItemID.SCAN_ID: 24584>
HARDWARE_ID = <ItemID.HARDWARE_ID: 24585>
CORRELATOR_OUTPUT_DATA = <ItemID.CORRELATOR_OUTPUT_DATA: 24586>
STATION_BEAM_ID = <ItemID.STATION_BEAM_ID: 24587>
SUBARRAY_ID = <ItemID.SUBARRAY_ID: 24588>
INTEGRATION_PERIOD = <ItemID.INTEGRATION_PERIOD: 24589>
FREQUENCY_RESOLUTION = <ItemID.FREQUENCY_RESOLUTION: 24590>
OUTPUT_RESOLUTION = <ItemID.OUTPUT_RESOLUTION: 24591>
CHANNEL_FREQUENCY = <ItemID.CHANNEL_FREQUENCY: 24592>
ZOOM_WINDOW_ID = <ItemID.ZOOM_WINDOW_ID: 24593>
CBF_FIRMWARE_VERSION = <ItemID.CBF_FIRMWARE_VERSION: 24594>
SPS_EPOCH = <ItemID.SPS_EPOCH: 24595>
EPOCH_OFFSET = <ItemID.EPOCH_OFFSET: 24596>
CBF_SOURCE_ID = <ItemID.CBF_SOURCE_ID: 24597>
enum realtime.receive.core.icd.ItemSendingContext(value)[source]

Enumeration of possible contexts when an Item can be sent

Valid values are as follows:

DATA_HEAP = <ItemSendingContext.DATA_HEAP: 1>
SOS_HEAP = <ItemSendingContext.SOS_HEAP: 2>
class realtime.receive.core.icd.LowICD[source]

The Low ICD.

ITEM_IDS: frozenset[int] = frozenset({ItemID.CHANNEL_ID, ItemID.BASELINE_COUNT, ItemID.SCAN_ID, ItemID.HARDWARE_ID, ItemID.CORRELATOR_OUTPUT_DATA, ItemID.STATION_BEAM_ID, ItemID.SUBARRAY_ID, ItemID.INTEGRATION_PERIOD, ItemID.FREQUENCY_RESOLUTION, ItemID.OUTPUT_RESOLUTION, ItemID.CHANNEL_FREQUENCY, ItemID.ZOOM_WINDOW_ID, ItemID.CBF_FIRMWARE_VERSION, ItemID.SPS_EPOCH, ItemID.EPOCH_OFFSET, ItemID.CBF_SOURCE_ID})

The IDs of all Items of this ICD, made available for convenience.

enum Items(value)

An enumeration.

Valid values are as follows:

CHANNEL_ID = <Items.CHANNEL_ID: ItemDescription(id=<ItemID.CHANNEL_ID: 24578>, name='visibility_channel_id', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
BASELINE_COUNT = <Items.BASELINE_COUNT: ItemDescription(id=<ItemID.BASELINE_COUNT: 24581>, name='visibility_baseline_count', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
SCAN_ID = <Items.SCAN_ID: ItemDescription(id=<ItemID.SCAN_ID: 24584>, name='scan_id', dtype='<u8', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
HARDWARE_ID = <Items.HARDWARE_ID: ItemDescription(id=<ItemID.HARDWARE_ID: 24585>, name='visibility_hardware_id', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
STATION_BEAM_ID = <Items.STATION_BEAM_ID: ItemDescription(id=<ItemID.STATION_BEAM_ID: 24587>, name='station_beam_id', dtype='<u2', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
SUBARRAY_ID = <Items.SUBARRAY_ID: ItemDescription(id=<ItemID.SUBARRAY_ID: 24588>, name='subarray_id', dtype='<u2', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
INTEGRATION_PERIOD = <Items.INTEGRATION_PERIOD: ItemDescription(id=<ItemID.INTEGRATION_PERIOD: 24589>, name='visibility_integration_period', dtype='<f4', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
FREQUENCY_RESOLUTION = <Items.FREQUENCY_RESOLUTION: ItemDescription(id=<ItemID.FREQUENCY_RESOLUTION: 24590>, name='visibility_frequency_resolution', dtype='<f4', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
OUTPUT_RESOLUTION = <Items.OUTPUT_RESOLUTION: ItemDescription(id=<ItemID.OUTPUT_RESOLUTION: 24591>, name='visibility_output_resolution', dtype='<u1', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
CHANNEL_FREQUENCY = <Items.CHANNEL_FREQUENCY: ItemDescription(id=<ItemID.CHANNEL_FREQUENCY: 24592>, name='visibility_channel_frequency', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
ZOOM_WINDOW_ID = <Items.ZOOM_WINDOW_ID: ItemDescription(id=<ItemID.ZOOM_WINDOW_ID: 24593>, name='zoom_window_id', dtype='<u1', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
CBF_FIRMWARE_VERSION = <Items.CBF_FIRMWARE_VERSION: ItemDescription(id=<ItemID.CBF_FIRMWARE_VERSION: 24594>, name='cbf_firmware_version', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
CBF_SOURCE_ID = <Items.CBF_SOURCE_ID: ItemDescription(id=<ItemID.CBF_SOURCE_ID: 24597>, name='cbf_source_id', dtype='|S1', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
SPS_EPOCH = <Items.SPS_EPOCH: ItemDescription(id=<ItemID.SPS_EPOCH: 24595>, name='visibility_sps_epoch', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>)>
EPOCH_OFFSET = <Items.EPOCH_OFFSET: ItemDescription(id=<ItemID.EPOCH_OFFSET: 24596>, name='visibility_epoch_offset', dtype='<u8', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
CORRELATOR_OUTPUT_DATA = <Items.CORRELATOR_OUTPUT_DATA: ItemDescription(id=<ItemID.CORRELATOR_OUTPUT_DATA: 24586>, name='correlator_output_data', dtype=(('TCI', 'i1'), ('FD', 'u1'), ('VIS', '<c8', 4)), sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
SENT_ON_DATA_HEAP: frozenset[ItemDescription] = frozenset({ItemDescription(id=<ItemID.CORRELATOR_OUTPUT_DATA: 24586>, name='correlator_output_data', dtype=(('TCI', 'i1'), ('FD', 'u1'), ('VIS', '<c8', 4)), sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.EPOCH_OFFSET: 24596>, name='visibility_epoch_offset', dtype='<u8', sent_on=<ItemSendingContext.DATA_HEAP: 1>)})

The Items that are sent on all data heaps, made available for convenience.

SENT_ON_SOS_HEAP: frozenset[ItemDescription] = frozenset({ItemDescription(id=<ItemID.BASELINE_COUNT: 24581>, name='visibility_baseline_count', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.CBF_FIRMWARE_VERSION: 24594>, name='cbf_firmware_version', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.CBF_SOURCE_ID: 24597>, name='cbf_source_id', dtype='|S1', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.CHANNEL_FREQUENCY: 24592>, name='visibility_channel_frequency', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.CHANNEL_ID: 24578>, name='visibility_channel_id', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.FREQUENCY_RESOLUTION: 24590>, name='visibility_frequency_resolution', dtype='<f4', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.HARDWARE_ID: 24585>, name='visibility_hardware_id', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.INTEGRATION_PERIOD: 24589>, name='visibility_integration_period', dtype='<f4', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.OUTPUT_RESOLUTION: 24591>, name='visibility_output_resolution', dtype='<u1', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.SCAN_ID: 24584>, name='scan_id', dtype='<u8', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.SPS_EPOCH: 24595>, name='visibility_sps_epoch', dtype='<u4', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.STATION_BEAM_ID: 24587>, name='station_beam_id', dtype='<u2', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.SUBARRAY_ID: 24588>, name='subarray_id', dtype='<u2', sent_on=<ItemSendingContext.SOS_HEAP: 2>), ItemDescription(id=<ItemID.ZOOM_WINDOW_ID: 24593>, name='zoom_window_id', dtype='<u1', sent_on=<ItemSendingContext.SOS_HEAP: 2>)})

The Items that are sent on the start-of-stream heap, made available for convenience.

TELESCOPE: Telescope = 'low'

The telescope this ICD applies to.

static calc_icd_offset(sps_time: float, icd_epoch)[source]

Calculate the offset of a SPS timestamp from the given SPS_EPOCH (as generated by LowICD.sps_to_icd_epoch()) suitable for transmission as a LowICD.Items.SPS_OFFSET

static icd_to_unix(sps_epoch, epoch_offset)[source]

ICD Low timestamp -> fractional secs since UNIX epoch.

static sps_to_icd_epoch(sps_time)[source]

Take an SPS timestamp and generate an epoch suitable for transmission as a LowICD.Items.SPS_EPOCH

static unix_to_sps(unix_time: float)[source]

Convert a UNIX timestamp to a SPS timestamp (seconds since start of 2000)

class realtime.receive.core.icd.MidICD[source]

The Mid ICD.

ITEM_IDS: frozenset[int] = frozenset({ItemID.TIMESTAMP_COUNT, ItemID.TIMESTAMP_FRACTION, ItemID.CHANNEL_ID, ItemID.CHANNEL_COUNT, ItemID.POLARISATION_ID, ItemID.BASELINE_COUNT, ItemID.PHASE_BIN_ID, ItemID.PHASE_BIN_COUNT, ItemID.SCAN_ID, ItemID.HARDWARE_ID, ItemID.CORRELATOR_OUTPUT_DATA})

The IDs of all Items of this ICD, made available for convenience.

enum Items(value)

An enumeration.

Valid values are as follows:

CHANNEL_ID = <Items.CHANNEL_ID: ItemDescription(id=<ItemID.CHANNEL_ID: 24578>, name='visibility_channel_id', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
BASELINE_COUNT = <Items.BASELINE_COUNT: ItemDescription(id=<ItemID.BASELINE_COUNT: 24581>, name='visibility_baseline_count', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
SCAN_ID = <Items.SCAN_ID: ItemDescription(id=<ItemID.SCAN_ID: 24584>, name='scan_id', dtype='<u8', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
HARDWARE_ID = <Items.HARDWARE_ID: ItemDescription(id=<ItemID.HARDWARE_ID: 24585>, name='visibility_hardware_id', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
TIMESTAMP_COUNT = <Items.TIMESTAMP_COUNT: ItemDescription(id=<ItemID.TIMESTAMP_COUNT: 24576>, name='visibility_timestamp_count', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
TIMESTAMP_FRACTION = <Items.TIMESTAMP_FRACTION: ItemDescription(id=<ItemID.TIMESTAMP_FRACTION: 24577>, name='visibility_timestamp_fraction', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
CHANNEL_COUNT = <Items.CHANNEL_COUNT: ItemDescription(id=<ItemID.CHANNEL_COUNT: 24579>, name='visibility_channel_count', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
POLARISATION_ID = <Items.POLARISATION_ID: ItemDescription(id=<ItemID.POLARISATION_ID: 24580>, name='visibility_polarisation_id', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
PHASE_BIN_ID = <Items.PHASE_BIN_ID: ItemDescription(id=<ItemID.PHASE_BIN_ID: 24582>, name='visibility_phase_bin_id', dtype='<u2', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
PHASE_BIN_COUNT = <Items.PHASE_BIN_COUNT: ItemDescription(id=<ItemID.PHASE_BIN_COUNT: 24583>, name='visibility_phase_bin_count', dtype='<u2', sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
CORRELATOR_OUTPUT_DATA = <Items.CORRELATOR_OUTPUT_DATA: ItemDescription(id=<ItemID.CORRELATOR_OUTPUT_DATA: 24586>, name='correlator_output_data', dtype=(('TCI', 'i1'), ('FD', 'u1'), ('CCI', 'i1'), ('VIS', '<c8', 4)), sent_on=<ItemSendingContext.DATA_HEAP: 1>)>
SENT_ON_DATA_HEAP: frozenset[ItemDescription] = frozenset({ItemDescription(id=<ItemID.BASELINE_COUNT: 24581>, name='visibility_baseline_count', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.CHANNEL_COUNT: 24579>, name='visibility_channel_count', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.CHANNEL_ID: 24578>, name='visibility_channel_id', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.CORRELATOR_OUTPUT_DATA: 24586>, name='correlator_output_data', dtype=(('TCI', 'i1'), ('FD', 'u1'), ('CCI', 'i1'), ('VIS', '<c8', 4)), sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.HARDWARE_ID: 24585>, name='visibility_hardware_id', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.PHASE_BIN_COUNT: 24583>, name='visibility_phase_bin_count', dtype='<u2', sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.PHASE_BIN_ID: 24582>, name='visibility_phase_bin_id', dtype='<u2', sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.POLARISATION_ID: 24580>, name='visibility_polarisation_id', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.SCAN_ID: 24584>, name='scan_id', dtype='<u8', sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.TIMESTAMP_COUNT: 24576>, name='visibility_timestamp_count', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>), ItemDescription(id=<ItemID.TIMESTAMP_FRACTION: 24577>, name='visibility_timestamp_fraction', dtype='<u4', sent_on=<ItemSendingContext.DATA_HEAP: 1>)})

The Items that are sent on all data heaps, made available for convenience.

SENT_ON_SOS_HEAP: frozenset[ItemDescription] = frozenset({})

The Items that are sent on the start-of-stream heap, made available for convenience.

TELESCOPE: Telescope = 'mid'

The telescope this ICD applies to.

static icd_to_unix(times, time_fractions)[source]

Mid ICD timestamp -> fractional secs since UNIX epoch.

static unix_to_icd(times)[source]

Fractional secs since UNIX epoch -> Mid ICD timestamp.

class realtime.receive.core.icd.Payload[source]

A payload as specified by the ICD.

property astropy_time: Time

The timestamp as an astropy Time object

property baseline_count

The number of baselines in this payload

property cci

The channel centroid index (Mid only)

property channel_count

The number of channels contained in this payload

property channel_id

The ID of the first channel of this payload

property correlated_data_fraction

The fraction of data on this payload that was correlated

property hardware_id

The ID of the hardware source of this payload

property mjd_time: float

The timestamp of the payload in MJD seconds

property phase_bin_count

The number of phase bins of this payload

property phase_bin_id

The ID of the first phase bin of this payload

property polarisation_id

The ID of the polarisation of this payload

property scan_id

The ID of the scan of this payload

property time_centroid_indices

The time centroids for each visibility of this payload

property timestamp: float

The timestamp as fractional seconds since UNIX epoch

property visibilities

The correlator visibilities of this payload

enum realtime.receive.core.icd.Telescope(value)[source]

An enumeration of SKA telescopes, there’s only two!

Valid values are as follows:

LOW = <Telescope.LOW: 'low'>
MID = <Telescope.MID: 'mid'>
realtime.receive.core.icd.icd_to_ms(vis)[source]

Move visiblity axes from ICD order to MS order. Both Low and Mid use the same order, hence there’s need for a single conversion routine.

realtime.receive.core.icd.ms_to_icd(vis)[source]

Move visiblity axes from MS order to ICD order. Both Low and Mid use the same order, hence there’s need for a single conversion routine.

Antennas

class realtime.receive.core.antenna.Antenna(interface: str, diameter: float, location: dict, fixed_delays: List[dict], niao: float, station_name: str | None = None, station_label: str | None = None, station_id: int | None = None)[source]

A fixed antenna.

as_KatPointAntenna(ref_location: EarthLocation | None = None)[source]

Return a katpoint.Antenna object for this antenna. There are two types of KatPoint antenna. One is initialised with a delay model and a reference location and this corresponds to an antenna in an array. A single antenna is not initialised with a delay model, and it is initialised at the location of the antenna.

Parameters:

ref_location: EarthLocation object for the reference location. This implicitly

creates an antenna in an array. The delay model will be calculated for this antenna and the reference postion will be used to initialise the antenna.

property dish_diameter

The diameter of this antenna

property fixed_delay_h

Fixed delay for H-POL

property fixed_delay_v

Fixed delay for V-POL

property h

Height of geocentric location

property id

The ID of this antenna

property label

The label of this antenna

property lat

Latitude of geocentric location

property lon

Longitude of geocentric location

property name

The name of this antenna

property pos

A tuple representing the geocentric coordinates of this antenna

property x

X coordinate of geocentric location

property y

Y coordinate of geocentric location

property z

Z coordinate of geocentric location

realtime.receive.core.antenna_utils.load_antennas(antennas: str | Sequence[Dict]) Tuple[Antenna][source]

Loads antennas from a sequence of dictionaries, or a file containing them.

Parameters:

antennas – A sequence of dictionaries, each capable of constructing an Antenna object. If a string is given instead, it is the route to a file with a JSON object whose top-level object has an “antennas” member with such a sequence of dictionaries.

Baselines

class realtime.receive.core.baselines.Baselines(antenna1: Sequence[int], antenna2: Sequence[int])[source]

A sequence of antenna index combinations making up a complete set of antenna baselines.

as_tuples()[source]

Returns the baselines as a list of tuples. The dataclasses.astuples does not seem to work as I need to interleave the antenna1 and antenna2 lists.

static generate(num_stations: int, autocorr: bool, lower_triangular: bool = False)[source]

Generate a Baselines object for the given number of stations and taking into account if autocorrelation is requested.

Parameters:
  • num_stations – The number of stations

  • autocorr – Whether autocorrelated baselines should be accounted for.

  • lower_triangular – Whether to generate baselines in lower-triangular order

See:

realtime.receive.core.baseline_utils.baseline_indices()

Baseline-related functions

realtime.receive.core.baseline_utils.are_autocorrelated_baselines(num_stations: int, num_baselines: int)[source]

Returns True if the number of baselines corresponds to an autocorrelated configuration for the given number of stations.

Parameters:
  • num_stations – The number of stations

  • num_baselines – The number of baselines

realtime.receive.core.baseline_utils.baseline_indices(num_stations: int, lower_triangular: bool, autocorr: bool = True) tuple[tuple[int, int]][source]

Generates baseline indices for the given number of stations.

Parameters:
  • num_stations – Number of stations to generate baselines for.

  • lower_triangular – Whether baseline indices should be generated as if they were laid in a lower or upper triangular matrix.

  • autocorr – Whether to generate indices for autocorrelation.

Returns:

A tuple of 2-tuples containing the station1/station2 indices making up each baseline.

realtime.receive.core.baseline_utils.baselines(stations: int | Sequence, autocorr: bool)[source]

Return the number of baselines for the given number of stations, depending on whether stations are autocorrelated or not.

Parameters:
  • stations – The number of stations, or a sequence of stations, in which case its length is used.

  • autocorr – Whether autocorrelated baselines should be accounted for.

realtime.receive.core.baseline_utils.generate_baselines(antennas: Sequence[AntennaT] | Reversible[AntennaT], lower_triangular: bool, autocorr: bool = True) tuple[tuple[AntennaT, AntennaT]][source]

Generates baselines for the given elements.

Baselines are generated differently depending on how antennas are traversed. The lower_triangular argument controls this, with a True value indicating a sequencing that goes in row-major order through the following matrix:

A1|
0:| 0
1:| 1 2
2:| 3 4 5
3:| 6 7 8 9
--+--------
A2| 0 1 2 3

while a False value indicates the following sequencing:

A1|
0:| 0 1 2 3
1:|   4 5 6
2:|     7 8
3:|       9
--+--------
A2| 0 1 2 3

autocorr indicates whether baseline indices for autocorrelation (i.e., the diagonal in the matrices above) should be generated or not.

Parameters:
  • antennas – Antennas to generate baselines for.

  • lower_triangular – Whether baseline indices should be generated as if they were laid in a lower or upper triangular matrix.

  • autocorr – Whether to generate indices for autocorrelation.

Returns:

A tuple of 2-tuples containing the antennas making up each baseline.

realtime.receive.core.baseline_utils.validate_antenna_and_baseline_counts(num_stations, num_baselines)[source]

Check if the given number of baselines corresponds to a valid number of baselines for the given number of stations, and raise an exception if it doesn’t.

Parameters:
  • num_stations – The number of stations

  • num_baselines – The number of baselines

Scans

These classes are used to describe a Scan and its components. Scans are the atomic pieces of an observation, and they have important metadata associated to them.

class realtime.receive.core.scan.SpectralWindow(spectral_window_id: str, count: int, start: int, freq_min: float, freq_max: float, stride: int = 1)[source]

A spectral window specifying channel numbers and a frequency range

as_channel_range()[source]

Construct a ChannelRange from this SpectralWindow

property channel_bandwidth

The amount of bandwidth (in Hz) that each channel will have

property channel_width

The gap (in Hz) between the center points of each channel

count: int

Number of channels

freq_max: float

The upper bound of the frequency of the last channel in Hz

freq_min: float

The lower bound of the frequency of the first channel in Hz

property frequencies

The center points (in Hz) of each channel

frequencies_for_range(channel_range: ChannelRange)[source]

Return the frequencies of the channels that the specified range refers to. channel_range must be a valid subrange of this spectral window.

spectral_window_id: str

ID of this spectral window

start: int

First channel

stride: int = 1

Stride in channel numbers

try_as_channel_range(start_id: int | None = None, count: int | None = None)[source]

Construct a ChannelRange from this SpectralWindow, adding the specified constraints. If this isn’t possible, return None.

Parameters:
  • start_id – The first channel of the created range. Defaults to the channel at the start of the SpectralWindow

  • count – The number of channels in the new range. Defaults to the number of channels in the spectral window after start_id

class realtime.receive.core.scan.Channels(channels_id: str, spectral_windows: Sequence[SpectralWindow])[source]

A named collection of channels, expressed as spectral windows

channels_id: str

ID of this collection of channels

property num_channels

Number of channels in all child spectral windows, given they are all the same. Raises an error if they are not.

spectral_windows: Sequence[SpectralWindow]

Spectral windows making up this collection of channels

class realtime.receive.core.scan.PhaseDirection(ra: Angle, dec: Angle, reference_time: str, reference_frame: str = 'icrs')[source]

A phase direction

as_SkyCoord()[source]

Convert to astropy SkyCoord

dec: Angle

Declination polinomial

ra: Angle

Right Ascension polinomial

reference_frame: str = 'icrs'

Reference frame in which coordinates are given

reference_time: str

Reference time for RA/Dec polinomials

enum realtime.receive.core.scan.StokesType(value)[source]

A type of stoke for correlations.

The values correspond to the casacore::Stokes::StokesTypes enumeration for consistency and ease of use when writing them into a Measurement Set.

Member Type:

int

Valid values are as follows:

RR = <StokesType.RR: 5>
RL = <StokesType.RL: 6>
LR = <StokesType.LR: 7>
LL = <StokesType.LL: 8>
XX = <StokesType.XX: 9>
XY = <StokesType.XY: 10>
YX = <StokesType.YX: 11>
YY = <StokesType.YY: 12>
class realtime.receive.core.scan.Polarisations(polarisation_id: str, correlation_type: Sequence[StokesType])[source]

A named collection of correlation types

correlation_type: Sequence[StokesType]

The correlation types

property num_pols

Number of polarisations

polarisation_id: str

ID of this collection of correlation types

class realtime.receive.core.scan.Field(field_id: str, phase_dir: PhaseDirection, pointing_fqdn: str | None = None)[source]

A named field with one or many coordinates

field_id: str

ID of this field

phase_dir: PhaseDirection

The phase direction where this field can be found

pointing_fqdn: str | None = None

The FQDN of the Tango attribute where live pointing information can be retrieved from

class realtime.receive.core.scan.Beam(beam_id: str, function: str, channels: Channels, polarisations: Polarisations, field: Field, search_beam_id: int | None = None, timing_beam_id: int | None = None, vlbi_beam_id: int | None = None)[source]

A beam configuration, containing channels, polarisations and a field

beam_id: str

ID of this beam

channels: Channels

The channels this beam is configured with

field: Field

The field this beam is pointing to

function: str

The functional purpose of this beam

polarisations: Polarisations

The polarisations this beam is configured with

class realtime.receive.core.scan.ScanType(scan_type_id: str, beams: Sequence[Beam])[source]

A scan type, consisting on one or more beam configurations

beams: Sequence[Beam]

The beams making up this scan type

property num_channels

N_f, the number of frequency channels across all beams. Valid only if all beams declare the same number of channels, raises an error otherwise.

property num_pols

N_p, The number of poloarizations across all beams. Valid only if all beams declare the same number of channels, raises an error otherwise.

scan_type_id: str

ID of this scan type

class realtime.receive.core.scan.Scan(scan_number: int, scan_type: ScanType)[source]

A scan, identified by a number and associated to a scan type

scan_number: int

The scan number

scan_type: ScanType

The scan type associated to this scan

Utility functions to create ScanType objects out of different JSON documents used throughout SDP.

realtime.receive.core.scan_utils.parse_scantypes_from_assignres(assign_resources_command: dict | str) List[ScanType][source]

Parses scan types from a subarray AssignRes Tango command argument

realtime.receive.core.scan_utils.parse_scantypes_from_execblock(execution_block: dict | str) List[ScanType][source]

Parse scan types from an execution block (as a dict or raw json)

UVW Calculation and Cache

These methods are related to the calculation and storage of UVWs.

class realtime.receive.core.uvw_engine.UVWEngine(antennas: Sequence[Antenna], baselines: Sequence[tuple] | None = None, swap_baselines=False, include_autos=True, lower_triangular=False, position_frame='ITRF', direction_frame='J2000')[source]

This is a container class that holds a cache of UVW objects. On instantiation, it will take the minimal amount of information required to form the UVWObject THis is the Antennas, the Phase direction and the Time. It will then create an empty UVWObject of the correct dimension.

After construction the interface is essentially just an update to the time - or the direction.

The other states can be changed. But there is not an external interface to that.

I reasoned that if you want to change a reference frame or the antennas then you should probably create a new Engine. steve-ord

baseline_indices()[source]

Return the baseline indices

static get_antenna_uvw(antenna_location: ndarray, epoch: Time, ra: Angle, dec: Angle, position_frame='itrf', epoch_frame='J2000', swap_baselines=False) ndarray[source]

Return the geocentric uvw vector for the the given position at the given epoch.

Note: This actually just calls get_uvw_J2000 with the geocentre as the other antenna and this antenna as the reference position.

Note: This is the per antenna calculation that you probably want

Parameters:
  • antenna_location (list) – position of the antenna (vector geocentric XYZ in m)

  • epoch (astropy.Time) – Time for which to calculate the UVW

  • ra (astropy.Angle) – Right Ascension (J2000)

  • dec (astropy.Angle) – Declination (J2000)

  • position_frame (str) – The frame of the input positions (generally WGS84 or ITRF) we are using casacore for this and these are the two frames supported

  • epoch_frame (str) – Whether you want a catalog (J2000) frome or epoch of date (default: ‘J2000’)

  • swap_baselines (Bool) – (xyposB - xyposA) is assumed - if the reverse is required set this to True

Returns:

The geocentric UVW baseline

Return type:

numpy.ndarray [u,v,w]

get_uvw(time: Time, phase_direction: PhaseDirection, direction_frame=None, position_frame=None, swap_baselines=None) ndarray | None[source]

This method given a time it updates the individual antenna UVW, THen updates the baseline UVW. It first checks the current state machine against the proposed state. It may not have to update the UVW at all. :param time: The time as an astropy.Time object :param phase_direction: The direction as a realtime.receiver.core.PhaseDirection object :param position_frame: the frame of the antenna positions :param swap_baselines: are the baselines defined as ant1-ant2 or ant2-ant1 :param direction_frame: What frame of reference is the look direction in. Note

We do not compare equivelance across frames. So if either the direction frame or the direction itself change it is considered a state change - even if the resulting UVW would be the same.

num_baselines()[source]

Return the number of baselines

num_entries()[source]

Number of UVW calculations that are stored

num_stations()[source]

Return the number of stations

class realtime.receive.core.katpoint_uvw_engine.KatpointUVWEngine(antennas: Sequence[Antenna], baselines: Sequence[tuple] | None = None, swap_baselines=False, include_autos=True, lower_triangular=False, position_frame='ITRF', direction_frame='J2000')[source]

A derived class that uses katpoint to determine the UVW of the antennas. One complication is that katpoint prefers topocentric antenna reference positions. You may not that the UVWEngine creates a geocentric antenna UVW so all baselines can be generated by subtractions. This will need another reference position

static get_antenna_uvw(antenna_location: array, epoch: Time, ra: Angle, dec: Angle, position_frame='itrf', epoch_frame='icrs', swap_baselines=False) ndarray[source]

Internal class that uses the Engine to determine the UVW This is specific to the Katpoint engine.

This must have a reference position. For the basic engine this is the geocentre - so we replicate it here

This overloads a more general method. There are some fixed assumptions here: The position is geocentric (in metres) The RA and DEC are ICRS

Parameters:
  • antenna_location (list) – position of the antenna (vector geocentric XYZ in m)

  • epoch (astropy.Time) – Time for which to calculate the UVW

  • ra (astropy.Angle) – Right Ascension (ICRS)

  • dec (astropy.Angle) – Declination (ICRS)

  • position_frame (str) – The frame of the input positions (currently assumes geocentric itrf)

  • epoch_frame (str) – NOT USED.

  • swap_baselines (Bool) – NOT USED.

Returns:

The geocentric UVW baseline

Return type:

numpy.ndarray [u,v,w]

Measurement Set access

class realtime.receive.core.msutils.MeasurementSet(name: str, mode: Mode, scan: Scan | None = None, antennas: Sequence[Antenna] | None = None, baselines: Baselines | None = None, plasma_socket: str | None = None)[source]

A simple wrapper around a Measurement Set. Writing is finalized during _t destructor.

class realtime.receive.core.msutils.MSWriter(output_filename: str, scan: Scan, antennas: Sequence[Antenna], baselines: Baselines | None = None, plasma_socket: str | None = None)[source]

A class that handles the writing of data into a new MeasurementSet.

Utilities

Common utility routines

realtime.receive.core.common.autocast_fields(cls)[source]

An annotation for dataclasses to automatically transform given field values into their declared types. In order to be successful the ctor for an annotated type T must accept values with the given type (e.g., int() accepts str objects, so fields of type int can be initialised with str values).

None values are left unset. "None" strings are casted to None. Boolean fields are casted using strtobool.

realtime.receive.core.common.from_dict(cls, data: dict)[source]

Constructs an object of the given type from a dictionary using the dictionary keys/value pairs that match parameter names of the type’s constructor.

Parameters:
  • cls – The type of the object to build

  • data – A dictionary where parameters for construction will be extracted from

realtime.receive.core.common.load_json_resource(json_resource_url: str) dict[source]

Load the given JSON resource.

Parameters:

json_resource_url (str) – The json resource location, which can either be a filename or a URL.

realtime.receive.core.common.strtobool(val)[source]

Convert a string representation of truth to true (1) or false (0).

True values are ‘y’, ‘yes’, ‘t’, ‘true’, ‘on’, and ‘1’; false values are ‘n’, ‘no’, ‘f’, ‘false’, ‘off’, and ‘0’. Raises ValueError if ‘val’ is anything else.

Adapted from distutils.util.strtobool, which will disappear in Python 3.12.

realtime.receive.core.common.untar(path, target_dir=None)[source]

Extracts the given file from a similarly-named tarfile into the given directory.

Parameters:
  • path – The file to extract. The corresponding tarfile should have the same name, suffixed with .tar.gz.

  • target_dir – The directory where the tarfile will be extracted. If not given the dirname of path is used.

realtime.receive.core.time_utils.mjd_to_unix(mjd)[source]

MJD secs -> fractional secs since UNIX epoch.

realtime.receive.core.time_utils.unix_as_astropy(time) Time[source]

UNIX timestamp -> astropy.time.Time

realtime.receive.core.time_utils.unix_to_mjd(times)[source]

Fractional secs since UNIX epoch -> MJD secs.