API documentation

This section describes requirements and guidelines.

The receiver is very configurable - with different pluggable receivers (SPEAD2 being the initial implementation) and multiple consumers (measurement set writers, plasma store writers etc). The payload is also selectable - but currently

High-level entrypoint

class realtime.receive.modules.receiver.SdpConfigDbConfig(host: str = '127.0.0.1', port: int = 2379, backend: str = 'etcd3')[source]

Set of options used to establish a connection to the SDP Configuration DB.

backend: str = 'etcd3'

The backend to use

host: str = '127.0.0.1'

The host to connect to.

port: int = 2379

The port to connect to

class realtime.receive.modules.receiver.ScanProviderConfig(execution_block_id: str | None = None, end_scan_max_delay: float | None = 5.0, measurement_set: str | None = None, assign_resources_command: str | None = None, hardcoded_spectral_window_channels: None | str | ChannelRange = None)[source]

Set of options used to build a Scan Provider.

class DataSource(value)[source]

An enumeration of types of data sources used to obtain data.

assign_resources_command: str | None = None

A JSON resource containing an instance of an SDP AssignResources command, or an SDP Execution Block. The JSON document should contain all the keys needed to fully-define a Scan Type.

property data_source: DataSource

The type of data source used by this configuration.

end_scan_max_delay: float | None = 5.0

The maximum time to wait, in seconds, before closing the CBF streams after receiving an EndScan SDP command.

execution_block_id: str | None = None

The ID of an Execution Block in the SDP Configuration Database for which Scan information should be monitored.

hardcoded_spectral_window_channels: None | str | ChannelRange = None

If set, can be used together with measurement_set to force the spectral windows of generated Scans to be a certain channel range.

measurement_set: str | None = None

A Measurement Set where antenna information will be extracted from.

property needs_sdp_config_db

Whether this configuration requires an SDP Config DB client.

class realtime.receive.modules.receiver.TelescopeManagerConfig(execution_block_id: str | None = None, measurement_set: str | None = None, schedblock: str | None = None, telmodel_key: str | None = None, telmodel_source_uris: str | None = None, antenna_layout: str | None = None, hardcoded_num_receptors: int | None = None, hardcoded_auto_corr: bool = True, hardcoded_lower_triangular: bool = True)[source]

Set of options used to build a Telescope Model.

Depending on which options are given, different sources of data are used to read Telescope Manager data. When multiple options are given, execution_block_id takes precedence over measurement_set, which in turn takes precedence over schedblock. On top of that the following combinations are also required/possible:

  • When using schedblock, antenna_layout is also required.

  • When using execution_block_id, either telmodel_key or antenna_layout are required.

  • When using execution_block_id and telmodel_key, telmodel_source_uris can also be given.

class DataSource(value)[source]

An enumeration of types of data sources used to obtain data.

antenna_layout: str | None = None

A JSON resource providing the layout of all antennas/stations in the array. The layouts are defined as part of the Telescope Model and it is recommended that the current layout schema be examined. Required when using schedblock, or when using execution_block_id and not specifying a telmodel_key.

property data_source: DataSource

The type of data source used by this configuration.

execution_block_id: str | None = None

The ID of an Execution Block in the SDP Configuration Database from where antenna information should be read.

hardcoded_auto_corr: bool = True

If hardcoded_num_receptors is given, control whether to create baselines with or without autocorrelation.

hardcoded_lower_triangular: bool = True

If hardcoded_num_receptors is given, control whether baselines are generated in lower or higher triangular order.

hardcoded_num_receptors: int | None = None

If given, create the given number of fake receptors with hardcoded, dummy details.

measurement_set: str | None = None

The measurement Set where antenna information can be extracted from.

property needs_sdp_config_db

Whether this configuration requires an SDP Config DB client.

schedblock: str | None = None

A JSON resource containing an instance of an SDP Scheduling Block, Execution Block or an AssignResources command. The JSON document should contain a resources.receptors key with a list of antenna names. This is useful only for testing.

telmodel_key: str | None = None

The key in the SKA Telmodel data where the antenna layout should be read from. If empty, then antenna_layout should be used instead.

telmodel_source_uris: str | None = None

A comma-separated list of URIs specifying the sources of truth that the SKA Telmodel package will use to retrieve its data.

class realtime.receive.modules.receiver.UVWEngineConfig(engine: str = 'katpoint', disable_astropy_iers_autodownload: bool = False)[source]

Set of options used to build a UVWEngine

disable_astropy_iers_autodownload: bool = False

Configures astropy to disable the automatically downloading of the latest IERS data catalogue. This automatic download is otherwise triggered by the UVW calculations performed by the katpoint engine, and blocks the main thread for the duration of the download.

engine: str = 'katpoint'

The package used to calculate the UVW coordinates from the antenna layout. Supported values are katpoint, measures and casa (alias for measures).

class realtime.receive.modules.receiver.ReceiverConfig(tm: ~realtime.receive.modules.receiver.TelescopeManagerConfig = <factory>, scan_provider: ~realtime.receive.modules.receiver.ScanProviderConfig = <factory>, uvw_engine: ~realtime.receive.modules.receiver.UVWEngineConfig = <factory>, sdp_config_db: ~realtime.receive.modules.receiver.SdpConfigDbConfig = <factory>, reception: ~realtime.receive.modules.receivers._config.Config = <factory>, consumer: ~realtime.receive.modules.consumers._config.Config = <factory>, aggregation: ~realtime.receive.modules.aggregation.AggregationConfig = <factory>)[source]

A full configuration specification to receive ICD data.

aggregation: AggregationConfig

The configuration used to control data aggregation features

consumer: Config

The configuration to create a consumer.

property needs_sdp_config_db

Whether this configuration requires an SDP Config DB client.

reception: Config

The configuration to create a receiver.

scan_provider: ScanProviderConfig

The configuration to create a ScanProvider.

sdp_config_db: SdpConfigDbConfig

The configuration to connect to the SDP Configuration Database.

tm: TelescopeManagerConfig

The configuration use to create a Telescope Manager.

uvw_engine: UVWEngineConfig

The configuration to create a UVWEngine.

async realtime.receive.modules.receiver.receive(config: ReceiverConfig, signal_handlers: Dict[int, Callable] | None = None, receiver_ready_evt: Event | None = None)[source]

Coroutine that creates a receiver and all its constituent parts based on configuration options, and runs the reception of data until it finishes. If this coroutine is cancelled while data reception is happening, it will shutdown cleanly and not raise a CancelledError, giving callers the option to still get a hold onto the underlying receiver object.

Parameters:
  • config – An object containg all the necessary configuration values to create a working receiver. It should be a ReceiveConfig object, but the old dictionary-based configuration objects still work.

  • signal_handlers – Signal handlers to install for the duration of the data reception. The callables, if given, should expect two parameters: the signal and a task representing the reception of data.

  • receiver_ready_evt – An event object that, if given, is set when the receiver is fully up and running, and ready to receive data.

Returns:

the underlying receiver object through which data was received. It can be queried for stats and other information.

Receivers

class realtime.receive.modules.receivers.Config(method: str = 'spead2_receivers')[source]

Base configuration for all receivers

method: str = 'spead2_receivers'

The method used to receive

UDP Protocol Multi-stream SPEAD2 receiver

class realtime.receive.modules.receivers.spead2_receivers.FixedLowSOSItemValues(scan_id: int, hardware_id: int, baseline_count: int, station_beam_id: int, subarray_id: int, integration_period: float, frequency_resolution: float, output_resolution: int, zoom_window_id: int, cbf_firmware_version: int, cbf_source_id: str)[source]

Values from items send on Low’s start-of-stream heaps that should be fixed across streams. This includes all items except channel ID and frequency.

Objects of this class are used to aggregate information about the streams of SKA Low data being received. In the future we want to expand this aggregation to include the channel ID and frequency range, which isn’t accounted for here, as well as including this information for Mid. For more details see https://jira.skatelescope.org/browse/YAN-1298.

class realtime.receive.modules.receivers.spead2_receivers.ItemDescStatus(value)[source]

An enumeration describing the different status in which a stream can be in with respect to having received ICD item descriptors for its data heaps.

class realtime.receive.modules.receivers.spead2_receivers.LostDataTracker(period)[source]

A class that keeps track of how many data heaps have been lost. When asked to inform about lost data heaps it prints a warning if any have been recorded, then it resets itself for a new count.

record_data_heap_lost()[source]

Increment the number of heaps that have been lost

reset()[source]

Resets the summary after a full period has passed.

summary()[source]

The summary to log periodically, if any. Logged only if it evalutes to True at the moment of reporting.

class realtime.receive.modules.receivers.spead2_receivers.ReceptionStats(total_bytes: int = 0, num_heaps: int = 0, num_incomplete: int = 0, duration: float = 0.0, per_stream_stats: ~typing.List[~typing.Dict[str, int]] = <factory>)[source]

Statistics about the data reception process

class realtime.receive.modules.receivers.spead2_receivers.Spead2ReceiverPayload(start_of_stream_logger)[source]

A Payload that updates itself from data coming from spead2 heaps

set_item_descriptors(heap) bool[source]

Updates the SPEAD ItemGroup of this Payload with the ItemDescriptors from heap.

Parameters:

heap – A SPEAD heap.

Returns:

Whether all required ItemDescriptors this Payload needs have been found.

update(heap)[source]

Updates this Payload with the data extracted from the Items in the given heap.

Parameters:

heap – A SPEAD heap.

class realtime.receive.modules.receivers.spead2_receivers.Spead2ReceptionConfig(method: str = 'spead2_receivers', num_streams: int | None = None, num_channels: int = 1, continuous_mode: bool = False, channels_per_stream: int = 0, stats_receiver_interval: float = 1.0, stats_receiver_kafka_config: str = '', data_loss_report_rate: float = 1.0, start_of_stream_report_rate: float = 1.0, port_start: int = 41000, bind_hostname: str = '', transport_protocol: TransportProtocols = TransportProtocols.UDP, pcap_file: str = '', readiness_filename: str = '', max_pending_data_heaps: int = 10, test_failure: bool = False, ring_heaps: int = 16, buffer_size: int | None = None, max_packet_size: int = spead2.recv.Stream.DEFAULT_UDP_MAX_SIZE, receiver_threads: int = 1, reset_time_indexing_after_each_scan: bool = False)[source]

Set of options used to build a spead2 network receiver

bind_hostname: str = ''

The IP address or hostname of the interface to which to bind for reception.

buffer_size: int | None = None

The socket buffer size to use. If not given, a default value is calculated based on the default value set by spead2, and the limits imposed by the OS.

channels_per_stream: int = 0

DEPRECATED, use num_streams instead.

The number of channels for which data will be sent in a single stream. This is used in the case where multiple ports are required with multiple channels per port. Together with num_channels they define the number of streams to listen for (and therefore the number of ports to open). 0 means that all channels should be received on a single stream.

continuous_mode: bool = False

Whether the receiver should re-create the streams and resume receiving data after all end of streams are reached.

data_loss_report_rate: float = 1.0

The period, in seconds, at which lost data heaps should be reported, if any.

max_packet_size: int

The maximum packet size to accept on the streams.

max_pending_data_heaps: int = 10

The number of data heaps on each stream to accumulate in memory before the start-of-stream heap arrives. Data heaps cannot be processed before the start-of-stream heap arrives, so a finite queue is implemented to keep some of them around. Further data heaps that don’t fit in the queue are dropped and permanently lost.

method: str = 'spead2_receivers'

The method used to receive

num_channels: int = 1

DEPRECATED, use num_streams instead.

The number of channels to receive.

num_streams: int | None = None

The number of streams this receiver should open. This option should be preferred over num_channels and channels_per_stream (which are now deprecated).

pcap_file: str = ''

Packet capture file to read SPEAD packets from. If set then data reception will be done by reading data from this file instead of from the network. The pcap file should have data for one or more valid SPEAD UDP streams. The num_streams and port_start configuration options are still used to determine how many of these streams to “receive” from the file, with each stream resulting from filtering the pcap file for a different, successfive destination UDP port starting at port_start.

port_start: int = 41000

The initial port number to which to bind the receiver streams. Successive streams are opened in successive ports after this.

readiness_filename: str = ''

If given, the name of the file to create (empty) on disk once the receiver has finished setting itself up and is ready to receive data.

receiver_threads: int = 1

The number threads allocated to the spead2 I/O thread pool.

reset_time_indexing_after_each_scan: bool = False

Whether to reset the aggregator’s time indexing when data reception for a scan finishes.

ring_heaps: int = 16

The number of ring heaps used in by each SPEAD stream.

start_of_stream_report_rate: float = 1.0

The period, in seconds, at which start-of-stream heaps should be reported as they arrive.

stats_receiver_interval: float = 1.0

Period of time, in seconds, between publishing of receiver stats to kafka.

stats_receiver_kafka_config: str = ''

Kafka endpoint (of the form <host>[:<port>]:<topic> where receiver statistics should be sent to. If empty no statistics are sent.

test_failure: bool = False

DEPRECATED, has no effect.

transport_protocol: TransportProtocols = 'udp'

The network transport protocol used by spead2.

class realtime.receive.modules.receivers.spead2_receivers.StartOfStreamLogger(period)[source]

A class that keeps track of how many start-of-stream (SOS) heaps have been successfully received within a period of time. When asked to inform about these heaps it returns either the count of SOS heaps for Mid, or a summary of the item values of the SOS heaps for Low.

record_low_start_of_stream(item_values: FixedLowSOSItemValues)[source]

Records that a Low SOS heap has been received, keeping track of the subset of values sent in that heap that should be fixed across streams.

record_mid_start_of_stream()[source]

Records that a Mid SOS heap has been received.

reset()[source]

Resets the summary after a full period has passed.

summary()[source]

The summary to log periodically, if any. Logged only if it evalutes to True at the moment of reporting.

class realtime.receive.modules.receivers.spead2_receivers.StatsTracker(streams)[source]

A class that keeps track of reception statistics

collect() ReceptionStats[source]

Collect the current receiver statistics

property duration

If in progress, how many seconds since data started being received If stopped, how many seconds data was received for.

inform_item_group_size(item_group_size)[source]

Specify the size of item groups

reception_started(start_time)[source]

Specify the time reception started

reception_stopped()[source]

Specify that reception has now stopped

class realtime.receive.modules.receivers.spead2_receivers.TransportProtocols(value)[source]

Supported transport protocols for SPEAD reception.

realtime.receive.modules.receivers.spead2_receivers.create_stream(io_thread_pool, ring_heaps, max_heaps)[source]

Create a spead2 stream for the given options

realtime.receive.modules.receivers.spead2_receivers.incomplete_heaps(stat)[source]

Calculate the number of incomplete heaps

realtime.receive.modules.receivers.spead2_receivers.log_stats(stats: ReceptionStats)[source]

Log the given reception statistics

class realtime.receive.modules.receivers.spead2_receivers.receiver(config: Spead2ReceptionConfig, aggregator: PayloadAggregator, data_reception_handler: DataReceptionHandler | None = None)[source]

SPEAD2 receiver

This class uses the spead2 library to receive a multiple number of streams, each using a single UDP reader. As heaps are received they are given to a single consumer.

This receiver supports UDP multicast addressing. This means that multiple receivers (and therefore consumers) can access the same transmitted stream if they bind to the same multicast IP address and port.

property aggregated_payloads: int

The number of data payloads that have been received and aggregated.

config_class

alias of Spead2ReceptionConfig

property consumer: Consumer

The consumer this receiver finally forwards data to

async end_scan(scan_id: int) None[source]

Called when an SDP scan has ended

property received_payloads: int

The number of data payloads that have been received.

async run(ready_event: Event | None = None)[source]

Receive all heaps, passing them to the consumer

async start_scan(scan_id: int) None[source]

Called when an SDP scan has started

property stats

Return the latest receiver statistics

property visibilities_consumed: int

The number of visibilities that have been successfully consumed.

property visibilities_generated: int

The number of visibilities that have been successfully generated.

Aggregation

class realtime.receive.modules.aggregation.AggregationConfig(time_period: float = 5, integration_interval_tolerance: int = 5, payloads_per_backoff: int = 20, backoff_time: float = 0.0)[source]

Options controlling how data aggregation happens in the receiver before it’s handed over to Consumers.

backoff_time: float = 0.0

This is a HIGHLY TECHNICAL option, so only change if you know what you’re doing.

Time to back off for when yielding back control to the IO loop during payload aggregation into a VisibilityBuilder. See payloads_per_backoff for more details. By default we simply yield back without requesting any sleeping time in between.

integration_interval_tolerance: int = 5

Number of integration intervals to avoid considering in an aggregation step to cater for slower streams that haven’t caught up with receiving some of their data.

payloads_per_backoff: int = 20

This is a HIGHLY TECHNICAL option, so only change if you know what you’re doing.

Number of payloads to put into the temporary VisibilityBuilder object during the periodic background aggregation before giving control back to the IO loop. Adding payloads into the VisibilityBuilder is a CPU intensive task, so if many payloads are being added this can stall the other coroutines that are executing in the system concurrently under the same IO loop. Yielding back control to the IO loop every now and then allows other coroutines to progress at the expense of this final aggregation step taking longer.

time_period: float = 5

Period, in seconds, after which payloads should be aggregated. If this is a non-positive number then aggregation doesn’t happen in the background, but can still be triggered manually. It is still triggered automatically at shutdown regardless.

property tolerance

Shorthand for self.integration_interval_tolerance

class realtime.receive.modules.aggregation.PayloadAggregator(config: AggregationConfig, consumer: Consumer, scan_provider: ScanProvider, tm: TelescopeManager, num_streams: int | None = None, first_channel_id: int | None = None)[source]

A class that takes individual payloads and aggregates them following the given settings. Aggregated data is put together into a Visiblity object, and given to a Consumer object to consume.

Users need to call add_payload every time a new ICD payload that needs to be aggregated is found.

Aggregation happens asynchronously and periodically. Users might also want to trigger the aggregation of any pending payloads and the consumption of the resulting Visibility objects, if any, by calling the flush method.

Finally, the aclose method should be invoked when users are done using this class, which ensures all background tasks are completed, and a final flush of pending payloads. This behavior is also available when using objects of this class as an asynchronous context manager.

add_payload(payload: Payload) None[source]

Add a single payload to be considered for aggregation in the next Visibility object.

Parameters:

payload – The payload to add

async add_payloads_and_flush(payloads: Sequence[Payload], **flush_kwargs) None[source]

Adds all payloads and tries to immediately create a Visibility.

property added_payloads: int

The number of data payloads that have been added to this object.

property aggregated_payloads: int

The number of data payloads that have been aggregated into Visibility objects.

async astart()[source]

Start the background aggregation task, if required.

async astop() None[source]

Finish all background tasks and flush all pending payloads.

property consumer: Consumer

The consumer this aggregator forwards visibilities to

async flush(ignore_tolerance=True) None[source]

Create and consume Visibility objects for any payloads pending aggregation. If a background aggregation is currently taking place, we wait for it to finish before running a new one.

Parameters:

ignore_tolerance – Whether to ignore the tolerance settings, in which case all data recieved until now will be included in the resulting Visibility.

inform_num_streams(num_streams: int) None[source]

Inform this object what the total number of streams that are being received are.

reset_time_indexing() None[source]

Reset the internal time indexing used to calculate payload sequence numbers. This shouldn’t be normally called, but can be useful in some situations; e.g., when receiving data for different scans that is tagged with the same timestamps.

This method can only be called if no payloads have been called since the last flush; otherwise a RuntimeError is raised.

property visibilities_consumed: int

The number of visibilities that have been successfully consumed.

property visibilities_flagged_fraction: float

The faction of data flagged as missing in the last generated Visibility.

property visibilities_generated: int

The number of visibilities that have been generated.

Consumers

class realtime.receive.modules.consumers.Config(name: str = 'mswriter')[source]

Common configuration for all consumers

name: str = 'mswriter'

The fully-qualified name of the consumer class to use. Built-in consumers can be specified by simple module name (e.g., mswriter).

class realtime.receive.modules.consumers.accumulating_consumer.AccumulatingConsumer(*args, **kwargs)[source]

A consumer that keeps all visibilities around for later inspection.

async consume(visibility: Visibility)[source]

Asyncronously consumes/processes an incoming Visibility.

Parameters:

visibility – visibilities dataset

async end_scan(scan_id: int) None[source]

Called when an SDP scan has ended

async start_scan(scan_id: int) None[source]

Called when an SDP scan has started

Takes a SPEAD2 HEAP and writes it to a MEASUREMENT SET. This is pretty much the same functionality as presented in the OSKAR python binding example available at: https://github.com/OxfordSKA/OSKAR/blob/master/python/examples/spead/receiver/spead_recv.py

class realtime.receive.modules.consumers.mswriter.MSWriterConsumer(config: Config, tm: TelescopeManager, uvw_engine: UVWEngine)[source]

A heap consumer that writes incoming data into an MS.

Because data consumption happens inside the event loop we need to defer the data writing to a different thread. We do this by creating a single-threaded executor that we then use to schedule the I/O-heavy MS writing tasks onto.

async astop()[source]

Called when receiver stream ends.

Users should implement their own astop method. To maintain backwards compatibility this default implementation invokes the old, blocking stop() method.

config_class

alias of MSWriterConsumerConfig

async consume(visibility: Visibility)[source]

Asyncronously consumes/processes an incoming Visibility.

Parameters:

visibility – visibilities dataset

class realtime.receive.modules.consumers.mswriter.MSWriterConsumerConfig(name: str = 'mswriter', output_filename: str = 'recv-vis.ms', max_payloads_per_ms: int | None = None, command_template: List[str] | None = None, timestamp_output: bool = False)[source]

Configuration for the mswriter consumer.

command_template: List[str] | None = None

Command to execute after a Measurement Set has been written.

max_payloads_per_ms: int | None = None

Maximum number of payloads to write onto a single Measurement Set. If more payloads are received a new Measurement Set is opened for writing.

name: str = 'mswriter'

The fully-qualified name of the consumer class to use. Built-in consumers can be specified by simple module name (e.g., mswriter).

output_filename: str = 'recv-vis.ms'

The filename of the output Measurement Set.

timestamp_output: bool = False

Whether to add timestamp information to the output filename.

Takes a SPEAD2 HEAP and writes it to an apache plasma store. This uses the sdp-dal-prototype API and will fail if that cannot be loaded

class realtime.receive.modules.consumers.plasma_writer.InvocationResults(success: int = 0, fail: int = 0, unknown: int = 0)[source]

Summary of RPC call results

add(result: int)[source]

Add a single result, which signals either success or failure

class realtime.receive.modules.consumers.plasma_writer.PlasmaWriterConfig(name: str = 'plasma_writer', plasma_path: str = '/tmp/plasma', payloads_in_flight: int = 10, remove_old_references_timeout: float = 0.0, wait_for_all_responses_timeout: float = 5.0)[source]

Configuration for the plasma_writer consumer.

name: str = 'plasma_writer'

The fully-qualified name of the consumer class to use. Built-in consumers can be specified by simple module name (e.g., mswriter).

payloads_in_flight: int = 10

The maximum number of payloads to keep in flight in the Plasma Store before requesting their results and releasing our local references to them. If more payloads are written to Plasma the oldest references are relesed.

plasma_path: str = '/tmp/plasma'

The UNIX socket to connect to.

remove_old_references_timeout: float = 0.0

The maximum amount of time to wait, in seconds, for a pending response when removing references to old input objects in the Plasma store.

wait_for_all_responses_timeout: float = 5.0

The maximum amount of time to wait, in seconds, for all pending responses to be read from Plasma when the consumer is shut down.

class realtime.receive.modules.consumers.plasma_writer.PlasmaWriterConsumer(config: PlasmaWriterConfig, tm: TelescopeManager, uvw_engine: UVWEngine)[source]

A heap consumer that writes incoming data into a Plasma Store.

Because data consumption happens inside the event loop we need to defer the data writing to a different thread. We do this by creating a single-threaded executor that we then use to schedule the access to the Plasma Store.

async astop()[source]

Called when receiver stream ends.

Users should implement their own astop method. To maintain backwards compatibility this default implementation invokes the old, blocking stop() method.

config_class

alias of PlasmaWriterConfig

async consume(visibility: Visibility)[source]

Asyncronously consumes/processes an incoming Visibility.

Parameters:

visibility – visibilities dataset

async end_scan(scan_id: int) None[source]

Invoke the end_scan RPC procedure on remote processors.

find_processors()[source]

Search the caller for processors

property invocation_results

A summary of the results from RPC invocations

property invocations_in_flight

The number of RPC invocations that are still in flight, and for which we haven’t collected their output.

property num_processors

The number of processors known to this caller

async start_scan(scan_id: int) None[source]

Invoke the start_scan RPC procedure on remote processors.

async wait_for_all_responses() bool[source]

Wait for the responses for all in-flight invocations.

Returns:

whether there was a timeout while waiting for all responses.

async wait_for_oldest_reponse(timeout)[source]

Waits for the response of the oldest RPC call issued by this consumer, and removes references to its inputs and output objects so they can be freed by plasma.

Telescope Managers

some utils to get information

class realtime.receive.modules.tm.base_tm.TelescopeManager(antennas: Sequence[Antenna] | str, baselines: Baselines)[source]

Telescope Manager containing an immutable telescope model consisting on the antennas/stations composing a subarray.

property as_configuration: Configuration

self -> SDP Configuration

property baselines_as_visibility_indices: MultiIndex

Baselines as a MultiIndex suitable for the Visibility class

get_antennas() Sequence[Antenna][source]

Returns all antennas

get_baselines() Baselines[source]

Returns all baselines

property num_baselines: int

The number of baselines used by the current observation

property num_stations: int

The number of stations used by the current observation

some utils to get information

class realtime.receive.modules.tm.ms_tm.MeasurementSetTM(ms: MeasurementSet | str)[source]

Telescope Manager that reads its model information from a Measurement Set.

class realtime.receive.modules.tm.sched_tm.SKATelescopeManager(antenna_layout: str | Sequence[Dict], assign_resources_command: str | dict | None = None)[source]

TelescopeManager class that combines information from an AssignResources command and an Antenna Layout to assemble the list of antennas making up a subarray.

static from_sdp_config(execution_block_id: str, sdp_config: Config, telmodel_key: str | None = None, telmodel_source_uris: list[str] | None = None, antenna_layout: str | Sequence[dict] | None = None)[source]

Construct a telescope manager for the given options, contacting the SDP Configuration Database and using the SKA Telmodel package to obtain all necessary information.

Parameters:
  • execution_block_id – The Execution Block ID for which a TM needs to be created.

  • sdp_config_db – A client to the SDP Configuration DB.

  • telmodel_key – The key in the SKA Telmodel data where the antenna layout can be read from. If not given, then antenna_layout must be given.

  • telmodel_source_uris – An optional list of URIs used by the SKA Telmodel package to read its data from.

  • antenna_layout – An antenna layout, either a a list of dictionaries or as a URL/filename with a JSON representation of such list. Used only if telmodel_key is not given.

Scan Providers