API Reference
This section describes requirements and guidelines.
The receiver is very configurable - with different pluggable receivers (SPEAD2 being the initial implementation) and multiple consumers (measurement set writers, plasma store writers etc). The payload is also selectable - but currently
High-level entrypoint
- class realtime.receive.modules.receiver.SdpConfigDbConfig(host: str = '127.0.0.1', port: int = 2379, backend: str = 'etcd3')[source]
Set of options used to establish a connection to the SDP Configuration DB.
- backend: str = 'etcd3'
The backend to use
- host: str = '127.0.0.1'
The host to connect to.
- port: int = 2379
The port to connect to
- class realtime.receive.modules.receiver.ScanProviderConfig(execution_block_id: str | None = None, end_scan_max_delay: float | None = 5.0, measurement_set: str | None = None, assign_resources_command: str | None = None, hardcoded_spectral_window_channels: None | str | ChannelRange = None)[source]
Set of options used to build a Scan Provider.
- assign_resources_command: str | None = None
A JSON resource containing an instance of an SDP AssignResources command, or an SDP Execution Block. The JSON document should contain all the keys needed to fully-define a Scan Type.
- property data_source: DataSource
The type of data source used by this configuration.
- end_scan_max_delay: float | None = 5.0
The maximum time to wait, in seconds, before closing the CBF streams after receiving an EndScan SDP command.
- execution_block_id: str | None = None
The ID of an Execution Block in the SDP Configuration Database for which Scan information should be monitored.
- hardcoded_spectral_window_channels: None | str | ChannelRange = None
If set, can be used together with measurement_set to force the spectral windows of generated Scans to be a certain channel range.
- measurement_set: str | None = None
A Measurement Set where antenna information will be extracted from.
- property needs_sdp_config_db
Whether this configuration requires an SDP Config DB client.
- class realtime.receive.modules.receiver.TelescopeManagerConfig(execution_block_id: str | None = None, measurement_set: str | None = None, schedblock: str | None = None, telmodel_key: str | None = None, telmodel_source_uris: str | None = None, antenna_layout: str | None = None, hardcoded_num_receptors: int | None = None, hardcoded_auto_corr: bool = True, hardcoded_lower_triangular: bool = True)[source]
Set of options used to build a Telescope Model.
Depending on which options are given, different sources of data are used to read Telescope Manager data. When multiple options are given,
execution_block_idtakes precedence overmeasurement_set, which in turn takes precedence overschedblock. On top of that the following combinations are also required/possible:When using
schedblock,antenna_layoutis also required.When using
execution_block_id, eithertelmodel_keyorantenna_layoutare required.When using
execution_block_idandtelmodel_key,telmodel_source_uriscan also be given.
- antenna_layout: str | None = None
A JSON resource providing the layout of all antennas/stations in the array. The layouts are defined as part of the Telescope Model and it is recommended that the current layout schema be examined. Required when using
schedblock, or when usingexecution_block_idand not specifying atelmodel_key.
- property data_source: DataSource
The type of data source used by this configuration.
- execution_block_id: str | None = None
The ID of an Execution Block in the SDP Configuration Database from where antenna information should be read.
- hardcoded_auto_corr: bool = True
If
hardcoded_num_receptorsis given, control whether to create baselines with or without autocorrelation.
- hardcoded_lower_triangular: bool = True
If
hardcoded_num_receptorsis given, control whether baselines are generated in lower or higher triangular order.
- hardcoded_num_receptors: int | None = None
If given, create the given number of fake receptors with hardcoded, dummy details.
- measurement_set: str | None = None
The measurement Set where antenna information can be extracted from.
- property needs_sdp_config_db
Whether this configuration requires an SDP Config DB client.
- schedblock: str | None = None
A JSON resource containing an instance of an SDP Scheduling Block, Execution Block or an AssignResources command. The JSON document should contain a resources.receptors key with a list of antenna names. This is useful only for testing.
- telmodel_key: str | None = None
The key in the SKA Telmodel data where the antenna layout should be read from. If empty, then
antenna_layoutshould be used instead.
- telmodel_source_uris: str | None = None
A comma-separated list of URIs specifying the sources of truth that the SKA Telmodel package will use to retrieve its data.
- class realtime.receive.modules.receiver.UVWEngineConfig(engine: str = 'katpoint', disable_astropy_iers_autodownload: bool = False)[source]
Set of options used to build a UVWEngine
- disable_astropy_iers_autodownload: bool = False
Configures astropy to disable the automatically downloading of the latest IERS data catalogue. This automatic download is otherwise triggered by the UVW calculations performed by the katpoint engine, and blocks the main thread for the duration of the download.
- engine: str = 'katpoint'
The package used to calculate the UVW coordinates from the antenna layout. Supported values are katpoint, measures and casa (alias for measures).
- class realtime.receive.modules.receiver.ReceiverConfig(tm: ~realtime.receive.modules.receiver.TelescopeManagerConfig = <factory>, scan_provider: ~realtime.receive.modules.receiver.ScanProviderConfig = <factory>, uvw_engine: ~realtime.receive.modules.receiver.UVWEngineConfig = <factory>, sdp_config_db: ~realtime.receive.modules.receiver.SdpConfigDbConfig = <factory>, reception: ~realtime.receive.modules.receivers._config.Config = <factory>, consumer: ~realtime.receive.modules.consumers._config.Config = <factory>, aggregation: ~realtime.receive.modules.aggregators.config.AggregationConfig = <factory>)[source]
A full configuration specification to receive ICD data.
- aggregation: AggregationConfig = FieldInfo(annotation=NoneType, required=False, default_factory=AggregationConfig)
The configuration used to control data aggregation features
- consumer: Config = FieldInfo(annotation=NoneType, required=False, default_factory=Config)
The configuration to create a consumer.
- property needs_sdp_config_db
Whether this configuration requires an SDP Config DB client.
- reception: Config = FieldInfo(annotation=NoneType, required=False, default_factory=Config)
The configuration to create a receiver.
- scan_provider: ScanProviderConfig = FieldInfo(annotation=NoneType, required=False, default_factory=ScanProviderConfig)
The configuration to create a ScanProvider.
- sdp_config_db: SdpConfigDbConfig = FieldInfo(annotation=NoneType, required=False, default_factory=SdpConfigDbConfig)
The configuration to connect to the SDP Configuration Database.
- tm: TelescopeManagerConfig = FieldInfo(annotation=NoneType, required=False, default_factory=TelescopeManagerConfig)
The configuration use to create a Telescope Manager.
- uvw_engine: UVWEngineConfig = FieldInfo(annotation=NoneType, required=False, default_factory=UVWEngineConfig)
The configuration to create a UVWEngine.
- async realtime.receive.modules.receiver.receive(config: ReceiverConfig, signal_handlers: dict[int, Callable] | None = None, receiver_ready_evt: Event | None = None)[source]
Coroutine that creates a receiver and all its constituent parts based on configuration options, and runs the reception of data until it finishes. If this coroutine is cancelled while data reception is happening, it will shutdown cleanly and not raise a CancelledError, giving callers the option to still get a hold onto the underlying receiver object.
- Parameters:
config – An object containg all the necessary configuration values to create a working receiver. It should be a ReceiveConfig object, but the old dictionary-based configuration objects still work.
signal_handlers – Signal handlers to install for the duration of the data reception. The callables, if given, should expect two parameters: the signal and a task representing the reception of data.
receiver_ready_evt – An event object that, if given, is set when the receiver is fully up and running, and ready to receive data.
- Returns:
the underlying receiver object through which data was received. It can be queried for stats and other information.
Receivers
- class realtime.receive.modules.receivers.Config(method: str = 'spead2_receivers')[source]
Base configuration for all receivers
- method: str = 'spead2_receivers'
The method used to receive
UDP Protocol Multi-stream SPEAD2 receiver.
- class realtime.receive.modules.receivers.spead2_receivers.ItemDescStatus(value)[source]
An enumeration describing the different status in which a stream can be in with respect to having received ICD item descriptors for its data heaps.
- class realtime.receive.modules.receivers.spead2_receivers.Spead2ReceiverPayload[source]
A Payload that updates itself from data coming from spead2 heaps
- property has_all_required_item_descriptors
Whether the payload has all the required SOS item descriptor values.
- class realtime.receive.modules.receivers.spead2_receivers.Spead2ReceptionConfig(method: str = 'spead2_receivers', num_streams: int | None = None, num_channels: int | None = None, continuous_mode: bool | None = None, scans_to_receive: int | None = None, channels_per_stream: int | None = None, stats_receiver_interval: float = 1.0, stats_config: StatsConfig | None = None, data_loss_report_rate: float = 1.0, start_of_stream_report_rate: float | None = None, stream_extremes_report_rate: float | None = None, port_start: int = 41000, bind_hostname: str = '', transport_protocol: TransportProtocols = TransportProtocols.UDP, pcap_file: str = '', readiness_filename: str = '', max_pending_data_heaps: int = 10, test_failure: bool = False, ring_heaps: int = 16, buffer_size: int | None = None, max_packet_size: int = 9200, receiver_threads: int = 1, reset_time_indexing_after_each_scan: bool = False, exit_timeout: float = 10)[source]
Set of options used to build a spead2 network receiver.
- bind_hostname: str = ''
The IP address or hostname of the interface to which to bind for reception.
- buffer_size: int | None = None
The socket buffer size to use. If not given, a default value is calculated based on the default value set by spead2, and the limits imposed by the OS.
- channels_per_stream: int | None = None
DEPRECATED, use num_streams instead.
The number of channels for which data will be sent in a single stream. This is used in the case where multiple ports are required with multiple channels per port. Together with num_channels they define the number of streams to listen for (and therefore the number of ports to open).
0means that all channels should be received on a single stream.
- continuous_mode: bool | None = None
use scans_to_receive instead. Streams are also not normally recreated.
Whether the receiver should re-create the streams and resume receiving data after all end of streams are reached.
- Type:
DEPRECATED
- data_loss_report_rate: float = 1.0
The period, in seconds, at which lost data heaps should be reported, if any.
- exit_timeout: float = 10
If non-negative, the maximum amount of time in seconds to wait for the final aggregation to occur before exiting. If it takes longer, the final aggregation is cancelled. If negative, wait indefinitely.
- max_packet_size: int = 9200
The maximum packet size to accept on the streams.
- max_pending_data_heaps: int = 10
The number of data heaps on each stream to accumulate in memory before the start-of-stream heap arrives. Data heaps cannot be processed before the start-of-stream heap arrives, so a finite queue is implemented to keep some of them around. Further data heaps that don’t fit in the queue are dropped and permanently lost.
- method: str = 'spead2_receivers'
The method used to receive
- num_channels: int | None = None
DEPRECATED, use num_streams instead.
The number of channels to receive.
- num_streams: int | None = None
The number of streams this receiver should open.
(Preferred over
num_channelsandchannels_per_streamwhich are now deprecated).
- pcap_file: str = ''
Packet capture file to read SPEAD packets from. If set then data reception will be done by reading data from this file instead of from the network. The pcap file should have data for one or more valid SPEAD UDP streams. The
num_streamsandport_startconfiguration options are still used to determine how many of these streams to “receive” from the file, with each stream resulting from filtering the pcap file for a different, successfive destination UDP port starting atport_start.
- property port_range: range
The range of ports the receiver should bind to
- port_start: int = 41000
The initial port number to which to bind the receiver streams. Successive streams are opened in successive ports after this.
- readiness_filename: str = ''
If given, the name of the file to create (empty) on disk once the receiver has finished setting itself up and is ready to receive data.
- receiver_threads: int = 1
The number threads allocated to the spead2 I/O thread pool.
- reset_time_indexing_after_each_scan: bool = False
Whether to reset the aggregator’s time indexing when data reception for a scan finishes.
- ring_heaps: int = 16
The number of ring heaps used in by each SPEAD stream.
- scans_to_receive: int | None = None
The number of scans to receive data for, after which the reception finishes automatically. If non-positive, reception never finishes automatically. Default is to first inspect the deprecated continuous_mode option if set and obey it, otherwise receive data for a single scan. In the future it will default to receive data indefinitely.
- start_of_stream_report_rate: float | None = None
DEPRECATED, use stream_extremes_report_rate instead.
The period, in seconds, at which start-of-stream heaps should be reported as they arrive.
- stats_config: StatsConfig | None = None
Stats capture configuration. If provided, send stats to the specified dataqueue.
- stats_receiver_interval: float = 1.0
Period of time, in seconds, between publishing of receiver stats to kafka.
- stream_extremes_report_rate: float | None = None
The period in seconds that start-of-stream and end-of-stream heaps report as they arrive. Defaults to 1, unless start_of_stream_report_rate has been given, in which case its value is used. Regardless of where the value of stream_extremes_report_rate is initialised from, it is set to 1 if non-positive.
- test_failure: bool = False
DEPRECATED, has no effect.
- transport_protocol: TransportProtocols = 'udp'
The network transport protocol used by spead2.
- class realtime.receive.modules.receivers.spead2_receivers.StatsConfig(dataqueue_bootstrap_address: str, dataqueue_topic: str)[source]
Configuration for realtime receiver stats.
- class realtime.receive.modules.receivers.spead2_receivers.TransportProtocols(value)[source]
Supported transport protocols for SPEAD reception.
- realtime.receive.modules.receivers.spead2_receivers.create_stream(io_thread_pool, ring_heaps, max_heaps)[source]
Create a spead2 stream for the given options
- class realtime.receive.modules.receivers.spead2_receivers.receiver(config: Spead2ReceptionConfig, multi_beam_pipeline: MultiBeamPipeline)[source]
SPEAD2 receiver.
This class uses the spead2 library to receive a multiple number of streams, each using a single UDP reader. As heaps are received they are given to a single consumer.
This receiver supports UDP multicast addressing. This means that multiple receivers (and therefore consumers) can access the same transmitted stream if they bind to the same multicast IP address and port.
- property aggregated_payloads: int
The number of data payloads that have been received and aggregated.
- config_class
alias of
Spead2ReceptionConfig
- property consumer: Consumer
The consumer this receiver finally forwards data to.
- property received_payloads: int
The number of data payloads that have been received.
- async run(ready_event: Event | None = None)[source]
Opens all configured streams for data reception, passing any data payload down for aggregation. This coroutine will run until it gets cancelled, or until the number of scans indicated in the
scans_to_receivesetting have been received.
- property stats: ReceptionStats
Return the latest receiver statistics.
- property visibilities_consumed: int
The number of visibilities that have been successfully consumed.
- property visibilities_generated: int
The number of visibilities that have been successfully generated.
Aggregation
- class realtime.receive.modules.aggregators.AggregationConfig(time_period: float = 5, num_timestamps_per_aggregation: int = -1, timestamp_tolerance: int = 5, payloads_per_backoff: int = 0, backoff_time: float = 0.0)[source]
Options controlling how data aggregation happens in the receiver before it’s handed over to Consumers.
- backoff_time: float = 0.0
This is a HIGHLY TECHNICAL option, so only change if you know what you’re doing.
Time to back off for when yielding back control to the IO loop during payload aggregation into a VisibilityBuilder. See payloads_per_backoff for more details. By default we simply yield back without requesting any sleeping time in between.
- num_timestamps_per_aggregation: int = -1
Number of payloads to aggregate into a single Visibility object. This is used to ensure that the Visibility object is a predictable time. The time_period is approximate and in the case where multiple receivers need to be synchronised the number of timestamps per aggregation is more reliable. Behaviour: If this is a positive number, then the aggregation return a visibility from a flush when the number of payloads is equal to this number. This is managed via the periodic aggregation task.
- payloads_per_backoff: int = 0
This is a HIGHLY TECHNICAL option, so only change if you know what you’re doing.
Number of payloads to put into the temporary VisibilityBuilder object during the periodic background aggregation before giving control back to the IO loop. Adding payloads into the VisibilityBuilder is a CPU intensive task, so if many payloads are being added this can stall the other coroutines that are executing in the system concurrently under the same IO loop. Yielding back control to the IO loop every now and then allows other coroutines to progress at the expense of this final aggregation step taking longer.
Defaults to 0, which means the final value is calculated as a function of the number of streams being received, which itself is a proxy for the number of coroutines in concurrent execution. If positive, it is taken as-is. If negative, there is no backoff.
- time_period: float = 5
Period, in seconds, after which payloads should be aggregated. If this is a non-positive number then aggregation doesn’t happen in the background, but can still be triggered manually. It is still triggered automatically at shutdown regardless.
- timestamp_tolerance: int = 5
Number of integration intervals to avoid considering in an aggregation step to cater for slower streams that haven’t caught up with receiving some of their data.
- property tolerance
Number of integration intervals to avoid considering in an aggregation step to cater for slower streams that haven’t caught up with receiving some of their data.
- class realtime.receive.modules.aggregators.MultiBeamPipeline(config: AggregationConfig, consumer: Consumer, tm: TelescopeManager, scan_provider: ScanProvider, reception_ranges: dict[str, dict[str, ReceptionRange]])[source]
Aggregates and streams visibility beams of an observation to a consumer.
Detects scan boundaries sends scan events to the consumers.
Generated Visibility objects are passed to a single consumer on periodic flush cycles from a background asyncio task.
This class builds and manages a collection of
BeamPipelineinstances per beam to implement theadd_payload()interface.- property added_payloads: int
Number of payloads added to the aggregator.
- property aggregated_payloads: int
Number of payloads that have been aggregated into visibilities.
- async astart()[source]
Start the aggregation system.
This starts the periodic aggregation task if configured.
- async astop()[source]
Stop the aggregation system.
This stops the periodic task and performs a final flush of any remaining payloads.
- property consumer: Consumer
The consumer this aggregation system forwards visibilities to.
- async flush(*, full_flush: bool = False) None[source]
Flush the aggregator and process the results.
- Parameters:
full_flush – If True, flush all payload buffers, else flush only full or timed-out payload buffers. defaults to False.
- reset_time_indexing() None[source]
Reset the internal time indexing used for payload sequencing.
This delegates to the payload aggregator’s reset method.
- property visibilities_consumed: int
Number of visibilities that have been successfully consumed.
- property visibilities_generated: int
Number of visibilities that have been generated.
- class realtime.receive.modules.aggregators.PayloadAggregator(config: AggregationConfig, num_streams: int)[source]
Collects individual payloads and groups them by time window on flushes.
Users should call
add_payload()whenever a new ICD payload is received and needs to be stored for later aggregation.Aggregation itself does not happen in this class. This class prepares the payloads for aggregation by providing them in structured time-based groups when
flush()is called. The actual creation of Visibility objects happens in the VisibilityGenerator class. A top-levelAggregatorclass has been created to manage the complete flow from payload aggregation through visibility generation to consumption.Aggregation of payloads happens when
flush()is called, either periodically or manually. During a flush, the payloads are grouped by time windows based on the configured aggregation rules. Payloads that are incomplete or arrive too late may be dropped, depending on the configured tolerance. SeeAggregationConfigfor more details.When finished using this class, users should ensure all payloads have been flushed.
Detailed intended usage:
The primary use case for this class is to collect payloads from multiple streams and organize them into time-based groups for later aggregation into Visibility objects.
There is a two dimensional aggregation that is happening here. The first dimension is the time dimension and the second the frequency dimension.
The purpose is to reduce the transaction count between the receiver and the plasma store.
Intended behaviour:
The receiver passes all received payloads into the aggregator which adds them to a stack. A flush is triggered to retrieve the group payloads. Only timesteps in the stack that are “complete” are assembled into a visibility object.
TODO:
There are two possible failure modes that need to be addressed. The first is if a timestamp is lost from a stream. There is no error correction for this and the visibility object will be incomplete.
To avoid the receiver being blocked by a stream that is lost the aggregation waits for a certain number of timestamps (tolerance) before it flushes the incomplete visibility object.
- add_payload(payload: Payload) None[source]
Add a single payload to be considered for aggregation in the next Visibility object.
- property added_payloads: int
The number of data payloads that have been added to this object.
- flush(*, full_flush: bool = False) PayloadTimeGroup | None[source]
Flush the aggregated payloads into groups for visibility generation.
Groups payloads into time windows and prepares them for conversion into Visibility objects. May drop payloads that are too old.
- Parameters:
full_flush – If True, flush all payload buffers, else flush only full or timed-out payload buffers. defaults to False.
- reset_time_indexing() None[source]
Reset the internal time indexing used to calculate payload sequence numbers.
NOTE: This shouldn’t be normally called, but can be useful in some situations; e.g., when receiving data for different scans that is tagged with the same timestamps.
- Raises:
RuntimeError – no payloads have been added since the last
flush().
Consumers
- class realtime.receive.modules.consumers.Config(name: str = 'mswriter')[source]
Common configuration for all consumers
- name: str = 'mswriter'
The fully-qualified name of the consumer class to use. Built-in consumers can be specified by simple module name (e.g., mswriter).
- class realtime.receive.modules.consumers.accumulating_consumer.AccumulatingConsumer(*args, **kwargs)[source]
A consumer that keeps all visibilities around for later inspection.
- async consume(visibility: Visibility)[source]
Asyncronously consumes/processes an incoming Visibility.
- Parameters:
visibility – visibilities dataset
Takes a SPEAD2 HEAP and writes it to a MEASUREMENT SET. This is pretty much the same functionality as presented in the OSKAR python binding example available at: https://github.com/OxfordSKA/OSKAR/blob/master/python/examples/spead/receiver/spead_recv.py
- class realtime.receive.modules.consumers.mswriter.MSWriterConsumer(config: Config, tm: TelescopeManager, uvw_engine: UVWEngine)[source]
A heap consumer that writes incoming data into an MS.
Because data consumption happens inside the event loop we need to defer the data writing to a different thread. We do this by creating a single-threaded executor that we then use to schedule the I/O-heavy MS writing tasks onto.
- async astop()[source]
Called when receiver stream ends.
Users should implement their own astop method. To maintain backwards compatibility this default implementation invokes the old, blocking stop() method.
- config_class
alias of
MSWriterConsumerConfig
- async consume(visibility: Visibility)[source]
Asyncronously consumes/processes an incoming Visibility.
- Parameters:
visibility – visibilities dataset
- class realtime.receive.modules.consumers.mswriter.MSWriterConsumerConfig(name: str = 'mswriter', output_filename: str = 'recv-vis.ms', max_payloads_per_ms: int | None = None, command_template: list[str] | None = None, timestamp_output: bool = False)[source]
Configuration for the mswriter consumer.
- command_template: list[str] | None = None
Command to execute after a Measurement Set has been written.
- max_payloads_per_ms: int | None = None
Maximum number of payloads to write onto a single Measurement Set. If more payloads are received a new Measurement Set is opened for writing.
- name: str = 'mswriter'
The fully-qualified name of the consumer class to use. Built-in consumers can be specified by simple module name (e.g., mswriter).
- output_filename: str = 'recv-vis.ms'
The filename of the output Measurement Set.
- timestamp_output: bool = False
Whether to add timestamp information to the output filename.
Takes a SPEAD2 HEAP and writes it to an apache plasma store. This uses the sdp-dal-prototype API and will fail if that cannot be loaded
- class realtime.receive.modules.consumers.plasma_writer.InvocationResults(success: int = 0, fail: int = 0, unknown: int = 0)[source]
Summary of RPC call results
- class realtime.receive.modules.consumers.plasma_writer.PlasmaExitedAction(value)[source]
Actions to take when the consumer detects Plasma has exited.
- IGNORE = 2
Ignore the fact that Plasma exited and don’t continue.
- RECONNECT_AND_RETRY = 1
Try reconnecting to Plasma and upon success retry the current operation.
- class realtime.receive.modules.consumers.plasma_writer.PlasmaWriterConfig(name: str = 'plasma_writer', plasma_path: str = '/tmp/plasma', payloads_in_flight: int = 10, remove_old_references_timeout: float = 0.0, wait_for_all_responses_timeout: float = 5.0)[source]
Configuration for the plasma_writer consumer.
- name: str = 'plasma_writer'
The fully-qualified name of the consumer class to use. Built-in consumers can be specified by simple module name (e.g., mswriter).
- payloads_in_flight: int = 10
The maximum number of payloads to keep in flight in the Plasma Store before requesting their results and releasing our local references to them. If more payloads are written to Plasma the oldest references are relesed.
- plasma_path: str = '/tmp/plasma'
The UNIX socket to connect to.
- remove_old_references_timeout: float = 0.0
The maximum amount of time to wait, in seconds, for a pending response when removing references to old input objects in the Plasma store.
- wait_for_all_responses_timeout: float = 5.0
The maximum amount of time to wait, in seconds, for all pending responses to be read from Plasma when the consumer is shut down.
- class realtime.receive.modules.consumers.plasma_writer.PlasmaWriterConsumer(config: PlasmaWriterConfig, tm: TelescopeManager, uvw_engine: UVWEngine)[source]
A heap consumer that writes incoming data into a Plasma Store.
Because data consumption happens inside the event loop we need to defer the data writing to a different thread. We do this by creating a single-threaded executor that we then use to schedule the access to the Plasma Store.
- async astop()[source]
Called when receiver stream ends.
Users should implement their own astop method. To maintain backwards compatibility this default implementation invokes the old, blocking stop() method.
- config_class
alias of
PlasmaWriterConfig
- async consume(visibility: Visibility)[source]
Asyncronously consumes/processes an incoming Visibility.
- Parameters:
visibility – visibilities dataset
- property invocation_results
A summary of the results from RPC invocations.
- property invocations_in_flight
The number of RPC invocations that are still in flight, and for which we haven’t collected their output.
- property num_processors
The number of processors known to this caller.
- async start_scan(scan: Scan) None[source]
Invoke the start_scan RPC procedure on remote processors.
Telescope Managers
some utils to get information
- class realtime.receive.modules.tm.base_tm.TelescopeManager(antennas: Sequence[Antenna] | str, baselines: Baselines)[source]
Telescope Manager containing an immutable telescope model consisting on the antennas/stations composing a subarray.
- property as_configuration: Configuration
self -> SDP Configuration
- property baselines_as_visibility_indices: MultiIndex
Baselines as a MultiIndex suitable for the Visibility class
- property num_baselines: int
The number of baselines used by the current observation
- property num_stations: int
The number of stations used by the current observation
some utils to get information
- class realtime.receive.modules.tm.ms_tm.MeasurementSetTM(ms: MeasurementSet | str)[source]
Telescope Manager that reads its model information from a Measurement Set.
- class realtime.receive.modules.tm.sched_tm.SKATelescopeManager(antenna_layout: str | Sequence[dict], assign_resources_command: ExecutionBlock | dict | str | None = None)[source]
TelescopeManager class that combines information from an AssignResources command and an Antenna Layout to assemble the list of antennas making up a subarray.
- static from_sdp_config(execution_block_id: str, sdp_config: Config, telmodel_key: str | None = None, telmodel_source_uris: list[str] | None = None, antenna_layout: str | Sequence[dict] | None = None)[source]
Construct a telescope manager for the given options, contacting the SDP Configuration Database and using the SKA Telmodel package to obtain all necessary information.
- Parameters:
execution_block_id – The Execution Block ID for which a TM needs to be created.
sdp_config_db – A client to the SDP Configuration DB.
telmodel_key – The key in the SKA Telmodel data where the antenna layout can be read from. If not given, then antenna_layout must be given.
telmodel_source_uris – An optional list of URIs used by the SKA Telmodel package to read its data from.
antenna_layout – An antenna layout, either a a list of dictionaries or as a URL/filename with a JSON representation of such list. Used only if telmodel_key is not given.
- realtime.receive.modules.tm.sched_tm.parse_ap_ids(ap_matches: list[Match], indexed_by_id: dict[int, Antenna]) list[Antenna][source]
Convert list of APxxx.yy regex matches into sorted Antenna objects.
- realtime.receive.modules.tm.sched_tm.sort_receptors_by_icd(receptors: list[str], all_antennas: list[Antenna]) list[Antenna][source]
Sort receptor names and returns corresponding Antenna objects. For MID, accept names in format SKAxxx or MKTxxx. For LOW, accept any names provided.
- Parameters:
receptors – List of receptor names.
all_antennas – List of all Antenna objects.
- Returns:
Sorted list of Antenna objects.
Scan Providers
- class realtime.receive.modules.scan_provider.AssignResourcesScanProvider(assign_resources_command: str)[source]
A ScanProvider that creates Scans based on the contents of a AssignResources command.
An AssignResources command alone doesn’t contain all the necessary data to map Scan IDs to ScanTypes (and the rest of their related metadata). This bypasses this by mapping consecutive Scans to consecutive ScanTypes in a round-robin fashion.
- get(scan_id: int) Scan | None[source]
Return the Scan object associated to the given scan ID.
If no scan is known for the given ID, None is returned instead
- Parameters:
scan_id – The Scan ID
- query_scans(start_time: float, end_time: float) list[EbStateScan][source]
Return a table scan definitions intersection with the specified time window.
- Parameters:
start_time – window start time as utc timestamp or unix tai timestamp.
end_time – window end time as utc timestamp or unix tai timestamp.
- class realtime.receive.modules.scan_provider.FakeScanProvider(scan_types)[source]
A Fake ScanProvider.
Returns always the same ScanType information.
- static from_spectral_window(sw_channels: ChannelRange, scan_intents: list | None = None, integration_time: float = 1, averaging_samples: int = 1, averaging_channels: int = 1) FakeScanProvider[source]
Create instance using a single SpectralWindow with the given channel configuration.
- get(scan_id: int) Scan | None[source]
Return the Scan object associated to the given scan ID.
If no scan is known for the given ID, None is returned instead
- Parameters:
scan_id – The Scan ID
- query_scans(start_time: float, end_time: float) list[EbStateScan][source]
Return a table scan definitions intersection with the specified time window.
- Parameters:
start_time – window start time as utc timestamp or unix tai timestamp.
end_time – window end time as utc timestamp or unix tai timestamp.
- class realtime.receive.modules.scan_provider.MSScanProvider(measurement_set: str | MeasurementSet)[source]
A ScanProvider that reads Scan information from an input Measurement Set.
- class realtime.receive.modules.scan_provider.ScanProvider(scan_types: Iterable[ScanType])[source]
A class that can return Scan objects for given scan IDs.
- abstract get(scan_id: int) Scan | None[source]
Return the Scan object associated to the given scan ID.
If no scan is known for the given ID, None is returned instead
- Parameters:
scan_id – The Scan ID
- abstract query_scan(time: float) EbStateScan[source]
Return a table scan definition for the specified time.
- abstract query_scans(start_time: float, end_time: float) list[EbStateScan][source]
Return a table scan definitions intersection with the specified time window.
- Parameters:
start_time – window start time as utc timestamp or unix tai timestamp.
end_time – window end time as utc timestamp or unix tai timestamp.
- class realtime.receive.modules.scan_provider.SdpConfigScanProvider(execution_block_id, sdp_config: Config, end_scan_max_delay: float | None = None)[source]
Creates Scan models based on the contents of the SDP configuration database.
For details on the Subarray obsStates and the commands that trigger their transitions see https://developer.skao.int/projects/ska-control-model/en/0.3.1/obs_state.html.
- property end_scan_pending: bool
Whether an EndScan command has been seen but not processed yet.
- get(scan_id: int) Scan | None[source]
Return the Scan object associated to the given scan ID.
If no scan is known for the given ID, None is returned instead
- Parameters:
scan_id – The Scan ID
- query_scans(start_time: float, end_time: float) list[EbStateScan][source]
Return a table scan definitions intersection with the specified time window.
- Parameters:
start_time – window start time as utc timestamp or unix tai timestamp.
end_time – window end time as utc timestamp or unix tai timestamp.