Source code for realtime.receive.modules.receivers.spead2_receivers

"""UDP Protocol Multi-stream SPEAD2 receiver."""

import asyncio
import contextlib
import enum
import functools
import logging
import operator
import time
import warnings
from typing import Any

import spead2.recv.asyncio
from pydantic import dataclasses
from realtime.receive.core import socket_utils
from realtime.receive.core.icd import ICD, ItemID, LowICD, MidICD, Payload, Telescope

from realtime.receive.modules.aggregators.multi_beam_pipeline import MultiBeamPipeline
from realtime.receive.modules.consumers.consumer import Consumer
from realtime.receive.modules.periodic_summary_logger import PeriodicSummaryLogger
from realtime.receive.modules.receivers import Config
from realtime.receive.modules.stats.heap_tracker import HeapTracker
from realtime.receive.modules.stats.models import ReceptionStats
from realtime.receive.modules.stats.receiver_stats_tracker import ReceiverStatsTracker
from realtime.receive.modules.stats.stats_data_queue_worker import (
    StatsDataQueueProducer,
    StatsDataQueueWorker,
)

logger = logging.getLogger(__name__)


[docs] def create_stream(io_thread_pool, ring_heaps, max_heaps): """Create a spead2 stream for the given options""" return spead2.recv.asyncio.Stream( io_thread_pool, config=spead2.recv.StreamConfig(max_heaps=max_heaps, stop_on_stop_item=False), ring_config=spead2.recv.RingStreamConfig(heaps=ring_heaps, contiguous_only=False), )
[docs] def stop_streams(streams: list[spead2.recv.Stream]): """Stops all given streams""" for stream in streams: stream.stop()
def _item_size(item): assert item.itemsize_bits % 8 == 0 return item.itemsize_bits // 8 * functools.reduce(operator.mul, item.shape, 1) def _get_item_group_size(item_group): return sum(_item_size(item) for item in item_group.values())
[docs] class Spead2ReceiverPayload(Payload): """A Payload that updates itself from data coming from spead2 heaps""" LOW_CBF_SOURCE_ID = b"L" def __init__(self): super().__init__() self._item_group = spead2.ItemGroup() self.item_group_size = None self._has_all_required_item_descriptors = False @property def _icd(self) -> ICD: return LowICD if self.telescope == Telescope.LOW else MidICD @property def has_all_required_item_descriptors(self): """Whether the payload has all the required SOS item descriptor values.""" return self._has_all_required_item_descriptors
[docs] def set_item_descriptors(self, heap: spead2.recv.Heap) -> Any: """ Updates the SPEAD ItemGroup of this Payload with the ItemDescriptors from `heap`. :param heap: A SPEAD heap. :returns: Whether all required ItemDescriptors this Payload needs have been found. """ updated_items = self._item_group.update(heap) items_by_id = {item.id: item for item in updated_items.values()} self.item_group_size = _get_item_group_size(self._item_group) # CBF Low sends this item to identify itself, otherwise assume Mid self.telescope = Telescope.MID if ItemID.CBF_SOURCE_ID in items_by_id: cbf_source_id = items_by_id[ItemID.CBF_SOURCE_ID].value if cbf_source_id == Spead2ReceiverPayload.LOW_CBF_SOURCE_ID: self.telescope = Telescope.LOW else: logger.warning( "ItemDescriptor 0x%04x (CBF_SOURCE_ID) found, but value " '"%s" != "%s" for Low, assuming "M" for Mid', ItemID.CBF_SOURCE_ID, cbf_source_id.decode(), Spead2ReceiverPayload.LOW_CBF_SOURCE_ID.decode("ascii"), ) self._has_all_required_item_descriptors = set(self._icd.ITEM_IDS).issubset( self._item_group.ids() ) if self._has_all_required_item_descriptors: def get_item(x): return self._item_group[x.value.id] Items = self._icd.Items # cache references to items in item group for faster access on # future updates # pylint: disable=attribute-defined-outside-init self._corr_out_data_item = get_item(Items.CORRELATOR_OUTPUT_DATA) if self.telescope == Telescope.LOW: self._sps_epoch_item = get_item(Items.SPS_EPOCH) self._epoch_offset_item = get_item(Items.EPOCH_OFFSET) # the sdp-cbf-emulator can send multiple channels per stream # even when emulating Low corr_out_data_shape = self._corr_out_data_item.shape self.baseline_count = corr_out_data_shape[-1] if len(corr_out_data_shape) == 2: logger.warning( "Found multi-channel visibilities in SKA Low stream: " "%r, dealing with it", corr_out_data_shape, ) self.channel_count = corr_out_data_shape[0] else: self.channel_count = 1 else: self._baseline_count_item = get_item(Items.BASELINE_COUNT) self._channel_count_item = get_item(Items.CHANNEL_COUNT) self._channel_id_item = get_item(Items.CHANNEL_ID) self._hardware_id_item = get_item(Items.HARDWARE_ID) self._phase_bin_id_item = get_item(Items.PHASE_BIN_ID) self._phase_bin_count_item = get_item(Items.PHASE_BIN_COUNT) self._polarisation_id_item = get_item(Items.POLARISATION_ID) self._scan_id_item = get_item(Items.SCAN_ID) self._timestamp_count_item = get_item(Items.TIMESTAMP_COUNT) self._timestamp_fraction_item = get_item(Items.TIMESTAMP_FRACTION) # Log that the start-of-stream heap has arrived if self.telescope == Telescope.LOW: # Low also sends most values in the SOS heap self.channel_id = get_item(Items.CHANNEL_ID).value self.scan_id = get_item(Items.SCAN_ID).value # TO DO: something similar needs to be done for MID as well self.beam_id = get_item(Items.STATION_BEAM_ID).value self.hardware_id = get_item(Items.HARDWARE_ID).value else: missing_items = set(self._icd.ITEM_IDS) - set(self._item_group.ids()) logger.warning("Missing required ItemDescriptors: %s", missing_items)
[docs] def update(self, heap: spead2.recv.Heap): """ Updates this Payload with the data extracted from the Items in the given heap. :param heap: A SPEAD heap. """ assert ( self._has_all_required_item_descriptors ), "ItemGroup doesn't have all required ItemDescriptors" ig = self._item_group ig.update(heap) if self.telescope == Telescope.MID: self.baseline_count = self._baseline_count_item.value self.channel_count = self._channel_count_item.value self.channel_id = self._channel_id_item.value self.hardware_id = self._hardware_id_item.value self.phase_bin_id = self._phase_bin_id_item.value self.phase_bin_count = self._phase_bin_count_item.value self.polarisation_id = self._polarisation_id_item.value self.scan_id = self._scan_id_item.value self.timestamp = self._icd.icd_to_unix( self._timestamp_count_item.value, self._timestamp_fraction_item.value, ) corr_out_data = self._corr_out_data_item.value self.time_centroid_indices = corr_out_data["TCI"] self.correlated_data_fraction = corr_out_data["FD"] self.cci = corr_out_data["CCI"] self.visibilities = corr_out_data["VIS"] else: self.timestamp = self._icd.icd_to_unix( self._sps_epoch_item.value, self._epoch_offset_item.value, ) corr_out_data = self._corr_out_data_item.value self.time_centroid_indices = corr_out_data["TCI"] self.correlated_data_fraction = corr_out_data["FD"] self.visibilities = corr_out_data["VIS"]
[docs] class ItemDescStatus(enum.IntEnum): """ An enumeration describing the different status in which a stream can be in with respect to having received ICD item descriptors for its data heaps. """ NOT_RECEIVED = 0 VALID = 1 INVALID = 2
[docs] class TransportProtocols(enum.Enum): """Supported transport protocols for SPEAD reception.""" UDP = "udp" TCP = "tcp"
[docs] @dataclasses.dataclass class StatsConfig: """Configuration for realtime receiver stats.""" dataqueue_bootstrap_address: str dataqueue_topic: str
[docs] @dataclasses.dataclass class Spead2ReceptionConfig(Config): """Set of options used to build a spead2 network receiver.""" method: str = "spead2_receivers" num_streams: int | None = None """ The number of streams this receiver should open. (Preferred over :py:attr:`num_channels` and :py:attr:`channels_per_stream` which are now deprecated). """ num_channels: int | None = None """ *DEPRECATED*, use `num_streams` instead. The number of channels to receive. """ continuous_mode: bool | None = None """ *DEPRECATED*: use `scans_to_receive` instead. Streams are also not normally recreated. Whether the receiver should re-create the streams and resume receiving data after all end of streams are reached. """ scans_to_receive: int | None = None """ The number of scans to receive data for, after which the reception finishes automatically. If non-positive, reception never finishes automatically. Default is to first inspect the deprecated continuous_mode option if set and obey it, otherwise receive data for a single scan. In the future it will default to receive data indefinitely. """ channels_per_stream: int | None = None """ *DEPRECATED*, use `num_streams` instead. The number of channels for which data will be sent in a single stream. This is used in the case where multiple ports are required with multiple channels per port. Together with `num_channels` they define the number of streams to listen for (and therefore the number of ports to open). ``0`` means that all channels should be received on a single stream. """ stats_receiver_interval: float = 1.0 """ Period of time, in seconds, between publishing of receiver stats to kafka. """ stats_config: StatsConfig | None = None """ Stats capture configuration. If provided, send stats to the specified dataqueue. """ data_loss_report_rate: float = 1.0 """ The period, in seconds, at which lost data heaps should be reported, if any. """ start_of_stream_report_rate: float | None = None """ *DEPRECATED*, use `stream_extremes_report_rate` instead. The period, in seconds, at which start-of-stream heaps should be reported as they arrive. """ stream_extremes_report_rate: float | None = None """ The period in seconds that start-of-stream and end-of-stream heaps report as they arrive. Defaults to 1, unless start_of_stream_report_rate has been given, in which case its value is used. Regardless of where the value of stream_extremes_report_rate is initialised from, it is set to 1 if non-positive. """ port_start: int = 41000 """ The initial port number to which to bind the receiver streams. Successive streams are opened in successive ports after this. """ bind_hostname: str = "" """ The IP address or hostname of the interface to which to bind for reception. """ transport_protocol: TransportProtocols = TransportProtocols.UDP """The network transport protocol used by spead2.""" pcap_file: str = "" """ Packet capture file to read SPEAD packets from. If set then data reception will be done by reading data from this file instead of from the network. The pcap file should have data for one or more valid SPEAD UDP streams. The ``num_streams`` and ``port_start`` configuration options are still used to determine how many of these streams to "receive" from the file, with each stream resulting from filtering the pcap file for a different, successfive destination UDP port starting at ``port_start``. """ readiness_filename: str = "" """ If given, the name of the file to create (empty) on disk once the receiver has finished setting itself up and is ready to receive data. """ max_pending_data_heaps: int = 10 """ The number of data heaps on each stream to accumulate in memory before the start-of-stream heap arrives. Data heaps cannot be processed before the start-of-stream heap arrives, so a finite queue is implemented to keep some of them around. Further data heaps that don’t fit in the queue are dropped and permanently lost. """ test_failure: bool = False """ DEPRECATED, has no effect. """ ring_heaps: int = 16 """ The number of ring heaps used in by each SPEAD stream. """ buffer_size: int | None = None """ The socket buffer size to use. If not given, a default value is calculated based on the default value set by spead2, and the limits imposed by the OS. """ max_packet_size: int = spead2.recv.Stream.DEFAULT_UDP_MAX_SIZE """ The maximum packet size to accept on the streams. """ receiver_threads: int = 1 """ The number threads allocated to the spead2 I/O thread pool. """ reset_time_indexing_after_each_scan: bool = False """ Whether to reset the aggregator's time indexing when data reception for a scan finishes. """ exit_timeout: float = 10 """ If non-negative, the maximum amount of time in seconds to wait for the final aggregation to occur before exiting. If it takes longer, the final aggregation is cancelled. If negative, wait indefinitely. """ @property def port_range(self) -> range: """The range of ports the receiver should bind to""" return range(self.port_start, self.port_start + self.num_streams) def __post_init__(self): self._adjust_num_streams() self._adjust_buffer_size() self._adjust_stream_extremes_report_rate() self._adjust_scans_to_receive() def _adjust_num_streams(self): if self.num_streams is not None: return if self.num_channels is not None or self.channels_per_stream is not None: warnings.warn( ( "reception.num_channels and reception.channels_per_stream " "are deprecated, use reception.num_streams instead" ), category=DeprecationWarning, ) num_channels = self.num_channels channels_per_stream = self.channels_per_stream if num_channels is None: num_channels = 1 if num_channels == 0: raise ValueError( "Reception num_channels configuration must be the real amount of files" ) if channels_per_stream == 0 or channels_per_stream is None: channels_per_stream = num_channels self.num_streams = num_channels // channels_per_stream else: self.num_streams = 1 def _adjust_buffer_size(self): if self.buffer_size is not None: return default_buffer_size = ( spead2.recv.Stream.DEFAULT_UDP_BUFFER_SIZE if self.transport_protocol == TransportProtocols.UDP else spead2.recv.Stream.DEFAULT_TCP_BUFFER_SIZE ) os_max_buffer_size = socket_utils.max_socket_read_buffer_size( self.transport_protocol.value ) if os_max_buffer_size >= default_buffer_size: self.buffer_size = default_buffer_size else: logger.debug( "Adjusting default reception buffer_size (%d -> %d) to match OS max settings", default_buffer_size, os_max_buffer_size, ) self.buffer_size = os_max_buffer_size def _adjust_stream_extremes_report_rate(self): if self.start_of_stream_report_rate is not None: warnings.warn( ( "reception.start_of_stream_report_rate is deprecated, " "use reception.stream_extremes_report_rate instead" ), category=DeprecationWarning, ) if self.stream_extremes_report_rate is None: self.stream_extremes_report_rate = self.start_of_stream_report_rate if self.stream_extremes_report_rate is None: self.stream_extremes_report_rate = 1 if self.stream_extremes_report_rate <= 0: self.stream_extremes_report_rate = 1 def _adjust_scans_to_receive(self): if self.scans_to_receive is not None: return if self.continuous_mode is not None: warnings.warn( "reception.continuous_mode is deprecated, use reception.scans_to_receive", category=DeprecationWarning, ) self.scans_to_receive = 0 if self.continuous_mode else 1 else: self.scans_to_receive = 1
[docs] class receiver: """SPEAD2 receiver. This class uses the spead2 library to receive a multiple number of streams, each using a single UDP reader. As heaps are received they are given to a single consumer. This receiver supports UDP multicast addressing. This means that multiple receivers (and therefore consumers) can access the same transmitted stream if they bind to the same multicast IP address and port. """ config_class = Spead2ReceptionConfig def __init__( self, config: Spead2ReceptionConfig, multi_beam_pipeline: MultiBeamPipeline, ): self.config = config self._multi_beam_pipeline = multi_beam_pipeline # non-spead2 heap type specific trackers self._dropped_data_tracker = HeapTracker[int]() self._sos_complete_tracker = HeapTracker[int]() self._sos_incomplete_tracker = HeapTracker[int]() self._eos_tracker = HeapTracker[int]() # spead2 tracking added lazily via set_streams self.reception_stats_tracker = ReceiverStatsTracker( sos_complete_tracker=self._sos_complete_tracker, sos_incomplete_tracker=self._sos_incomplete_tracker, eos_tracker=self._eos_tracker, dropped_data_tracker=self._dropped_data_tracker, ) # background thread periodic messaging to data queues self._stats_worker = StatsDataQueueWorker( self.reception_stats_tracker.collect, interval=config.stats_receiver_interval ) # active thread periodic logging ( self._dropped_data_logger, self._sos_complete_logger, self._sos_incomplete_logger, self._eos_heap_logger, ) = ( PeriodicSummaryLogger( logger, message_template, lambda tracker=tracker: tracker.num_heaps_since_snapshot, tracker.record_snapshot, period, ) for message_template, tracker, period in [ ( "Dropped %(summary)s data heaps in the last %(elapsed_time).2f [s] as" " start-of-stream heap with item descriptors has not been received and" " buffer is full", self._dropped_data_tracker, config.data_loss_report_rate, ), ( "Successfully received %(summary)s complete start-of-stream heaps in the" " last %(elapsed_time).2f [s]", self._sos_complete_tracker, config.stream_extremes_report_rate, ), ( "Received %(summary)s incomplete start-of-stream heaps in the last" " %(elapsed_time).2f [s]", self._sos_incomplete_tracker, config.stream_extremes_report_rate, ), ( "Successfully received %(summary)s end-of-stream heaps in the last" " %(elapsed_time).2f [s]", self._eos_tracker, config.stream_extremes_report_rate, ), ] ) if config.stats_config: self._stats_worker.add_producer( StatsDataQueueProducer( dataqueue_server=config.stats_config.dataqueue_bootstrap_address, topic=config.stats_config.dataqueue_topic, ) ) self._reception_loop_interrupt_event = asyncio.Event() # Only a single background flush is executed on the background for simplicity. # Multiple background flushes can be supported if required. self._end_of_scan_aggregation_flush_task: asyncio.Task | None = None @property def stats(self) -> ReceptionStats: """Return the latest receiver statistics.""" return self.reception_stats_tracker.collect() @property def received_payloads(self) -> int: """The number of data payloads that have been received.""" return self._multi_beam_pipeline.added_payloads @property def aggregated_payloads(self) -> int: """The number of data payloads that have been received and aggregated.""" return self._multi_beam_pipeline.aggregated_payloads @property def visibilities_generated(self) -> int: """The number of visibilities that have been successfully generated.""" return self._multi_beam_pipeline.visibilities_generated @property def visibilities_consumed(self) -> int: """The number of visibilities that have been successfully consumed.""" return self._multi_beam_pipeline.visibilities_consumed @property def consumer(self) -> Consumer: """The consumer this receiver finally forwards data to.""" return self._multi_beam_pipeline.consumer def _setup_streams(self, io_thread_pool, config): start_time = time.time() streams = [] def log_stream_creation(stream_type: str): logger.info( "Created %d %s receive streams starting at port %d in %.3f [ms]", config.num_streams, stream_type, config.port_start, (time.time() - start_time) * 1000, stacklevel=2, ) if config.pcap_file: pcap_filter = "udp dst port {port}" for port in config.port_range: stream = create_stream(io_thread_pool, config.ring_heaps, 32) stream.add_udp_pcap_file_reader(config.pcap_file, pcap_filter.format(port=port)) streams.append(stream) log_stream_creation(f"pcap-based (filter='{pcap_filter}')") else: protocol = config.transport_protocol for i, port in enumerate(config.port_range): stream = create_stream(io_thread_pool, config.ring_heaps, 32) add_reader = stream.add_udp_reader if protocol == TransportProtocols.TCP: add_reader = stream.add_tcp_reader try: add_reader( port, bind_hostname=config.bind_hostname, buffer_size=config.buffer_size, max_size=config.max_packet_size, ) except Exception as e: raise RuntimeError(f"Cannot create stream {i} on port {port}") from e streams.append(stream) log_stream_creation(protocol.value) payloads = [Spead2ReceiverPayload() for _ in range(config.num_streams)] self.reception_stats_tracker.set_streams(streams) return streams, payloads
[docs] async def run(self, ready_event: asyncio.Event | None = None): """ Opens all configured streams for data reception, passing any data payload down for aggregation. This coroutine will run until it gets cancelled, or until the number of scans indicated in the ``scans_to_receive`` setting have been received. """ async with self._stats_worker: try: await self._run(ready_event) finally: await self._finish_reception()
async def _run(self, ready_event: asyncio.Event | None): current_task = asyncio.current_task() io_thread_pool = spead2.ThreadPool(threads=self.config.receiver_threads) try: streams, payloads = self._setup_streams(io_thread_pool, self.config) except Exception: # pylint: disable=broad-except io_thread_pool.stop() raise current_task.add_done_callback(lambda _: stop_streams(streams)) current_task.add_done_callback(lambda _: io_thread_pool.stop()) self._signal_ready_to_receive(ready_event) received_scans = 0 while True: self.reception_stats_tracker.reset() streams_stopped_by_end_scan = await self._run_reception_loop(streams, payloads) self._eos_heap_logger.maybe_log(force=True) if streams_stopped_by_end_scan: logger.warning( "%d streams stopped early due to SDP's EndScan", streams_stopped_by_end_scan ) await self._flush_then_schedule_end_of_scan() self.reception_stats_tracker.reception_stopped() if self.config.reset_time_indexing_after_each_scan: self._multi_beam_pipeline.reset_time_indexing() self.reception_stats_tracker.collect().log() received_scans += 1 if ( self.config.scans_to_receive is not None and self.config.scans_to_receive > 0 and received_scans >= self.config.scans_to_receive ): break # tcp requires re-opening the ports, the receiver streams accept() only one sender connection if self.config.transport_protocol == TransportProtocols.TCP: logger.info("Re-creating streams in TCP mode") stop_streams(streams) streams, payloads = self._setup_streams(io_thread_pool, self.config) async def _flush_then_schedule_end_of_scan(self): """Flush immediately then schedule processing with an end of scan. Cancels and drops pending flushes of previous incomplete end of scans. """ logger.info("receiver triggering full flush") if self._end_of_scan_aggregation_flush_task: try: await receiver._acancel(self._end_of_scan_aggregation_flush_task) except Exception: # pylint: disable=broad-exception-caught logger.exception("Unexpected error on full flush") async def process_and_end_scan(): try: await self._multi_beam_pipeline.flush(full_flush=True) finally: logger.info("full flush complete") self._end_of_scan_aggregation_flush_task = asyncio.create_task(process_and_end_scan()) @staticmethod async def _acancel(task: asyncio.Task) -> None: """Cancel and await the completion for the given task if is not done yet. :param task: The task to finish. :raises Exception: Any exception raised by the task other than asyncio.CancelledError. """ if not task.done(): task.cancel() with contextlib.suppress(asyncio.CancelledError): await task async def _finish_reception(self): if (task := self._end_of_scan_aggregation_flush_task) is not None: if self.config.exit_timeout >= 0: try: await asyncio.wait_for(task, self.config.exit_timeout) except asyncio.TimeoutError: logger.warning( "Timed out after %.3f [s] waiting for final aggregation, cancelling", self.config.exit_timeout, ) task.cancel() with contextlib.suppress(asyncio.CancelledError): await task def _signal_ready_to_receive(self, ready_event: asyncio.Event | None = None): readiness_filename = self.config.readiness_filename if readiness_filename: with open(readiness_filename, "wb"): pass logger.debug( "Created %s to signal we are ready to receive data", readiness_filename, ) if ready_event: ready_event.set() logger.info("Ready to receive data") async def _run_reception_loop( self, streams: list[spead2.recv.asyncio.Stream], payloads: list[Spead2ReceiverPayload] ) -> int: """ Receive all spead2 streams until the respective end of stream heaps or the interrupt event. Length of `streams` must match length of `payloads`. :returns: Number of streams that were forecefully interrupted. """ self._reception_loop_interrupt_event.clear() interrupt_task = asyncio.create_task(self._reception_loop_interrupt_event.wait()) receive_tasks = [ self._interruptible_reception_loop(stream_idx, stream, payload, interrupt_task) for stream_idx, (stream, payload) in enumerate(zip(streams, payloads, strict=True)) ] logger.info("(Re-)starting reception loops") try: results = await asyncio.gather(*receive_tasks, return_exceptions=False) finally: await receiver._acancel(interrupt_task) return sum(1 for result in results if result) async def _interruptible_reception_loop( self, stream_index: int, stream: spead2.recv.asyncio.Stream, payload: Spead2ReceiverPayload, interrupt_task: asyncio.Task, ) -> bool: """ Receive a single spead2 stream until the end of scan heap or the interrupt event. :returns: `True` if interrupted. """ reception_loop = asyncio.create_task( self._process_stream_heaps(stream_index, stream, payload) ) done, _pending = await asyncio.wait( [interrupt_task, reception_loop], return_when=asyncio.FIRST_COMPLETED ) await receiver._acancel(reception_loop) return interrupt_task in done async def _process_stream_heaps( self, stream_index: int, stream: spead2.recv.asyncio.Stream, payload: Spead2ReceiverPayload ) -> None: """Read heaps from the stream and processes based on the heap type.""" # pylint: disable=too-many-branches pending_data_heaps: list[spead2.recv.Heap] = [] item_desc_status = ItemDescStatus.NOT_RECEIVED async for heap in stream: now = time.time() self.reception_stats_tracker.reception_started(now) self._dropped_data_logger.maybe_log(timestamp=now) self._sos_complete_logger.maybe_log(timestamp=now) self._sos_incomplete_logger.maybe_log(timestamp=now) # Handle stream control heaps if heap.is_start_of_stream(): if isinstance(heap, spead2.recv.IncompleteHeap): self._sos_incomplete_tracker.record_heap(stream_index) continue payload.set_item_descriptors(heap) if not payload.has_all_required_item_descriptors: item_desc_status = ItemDescStatus.INVALID logger.error( "start-of-stream heap received, " "but doesn't contain all required item descriptors, " "all incoming heaps will be discarded" ) self._sos_incomplete_tracker.record_heap(stream_index) continue item_desc_status = ItemDescStatus.VALID self._sos_complete_tracker.record_heap(stream_index) # These need to be done sequentially because the payload # object is updated with the data from each heap. for pending_heap in pending_data_heaps: await self._process_data_heap(pending_heap, payload) if pending_data_heaps: pending_data_heaps.clear() continue elif heap.is_end_of_stream(): self._eos_tracker.record_heap(stream_index) break # Handle data heaps if isinstance(heap, spead2.recv.IncompleteHeap): continue if item_desc_status == ItemDescStatus.INVALID: continue elif item_desc_status == ItemDescStatus.NOT_RECEIVED: if len(pending_data_heaps) == self.config.max_pending_data_heaps: self._dropped_data_tracker.record_heap(stream_index) else: pending_data_heaps.append(heap) continue await self._process_data_heap(heap, payload) ts = time.time() self._dropped_data_logger.maybe_log(timestamp=ts, force=True) self._sos_complete_logger.maybe_log(timestamp=ts, force=True) self._sos_incomplete_logger.maybe_log(timestamp=ts, force=True) async def _process_data_heap(self, heap: spead2.recv.Heap, payload: Spead2ReceiverPayload): """Update the payload container with a heap and pass to the aggregation layer.""" payload.update(heap) self.reception_stats_tracker.inform_item_group_size(payload.item_group_size) self.reception_stats_tracker.scan_id = int(payload.scan_id) self._multi_beam_pipeline.add_payload(payload)