"""UDP Protocol Multi-stream SPEAD2 receiver."""
import asyncio
import contextlib
import enum
import functools
import logging
import operator
import time
import warnings
from typing import Any
import spead2.recv.asyncio
from pydantic import dataclasses
from realtime.receive.core import socket_utils
from realtime.receive.core.icd import ICD, ItemID, LowICD, MidICD, Payload, Telescope
from realtime.receive.modules.aggregators.multi_beam_pipeline import MultiBeamPipeline
from realtime.receive.modules.consumers.consumer import Consumer
from realtime.receive.modules.periodic_summary_logger import PeriodicSummaryLogger
from realtime.receive.modules.receivers import Config
from realtime.receive.modules.stats.heap_tracker import HeapTracker
from realtime.receive.modules.stats.models import ReceptionStats
from realtime.receive.modules.stats.receiver_stats_tracker import ReceiverStatsTracker
from realtime.receive.modules.stats.stats_data_queue_worker import (
StatsDataQueueProducer,
StatsDataQueueWorker,
)
logger = logging.getLogger(__name__)
[docs]
def create_stream(io_thread_pool, ring_heaps, max_heaps):
"""Create a spead2 stream for the given options"""
return spead2.recv.asyncio.Stream(
io_thread_pool,
config=spead2.recv.StreamConfig(max_heaps=max_heaps, stop_on_stop_item=False),
ring_config=spead2.recv.RingStreamConfig(heaps=ring_heaps, contiguous_only=False),
)
[docs]
def stop_streams(streams: list[spead2.recv.Stream]):
"""Stops all given streams"""
for stream in streams:
stream.stop()
def _item_size(item):
assert item.itemsize_bits % 8 == 0
return item.itemsize_bits // 8 * functools.reduce(operator.mul, item.shape, 1)
def _get_item_group_size(item_group):
return sum(_item_size(item) for item in item_group.values())
[docs]
class Spead2ReceiverPayload(Payload):
"""A Payload that updates itself from data coming from spead2 heaps"""
LOW_CBF_SOURCE_ID = b"L"
def __init__(self):
super().__init__()
self._item_group = spead2.ItemGroup()
self.item_group_size = None
self._has_all_required_item_descriptors = False
@property
def _icd(self) -> ICD:
return LowICD if self.telescope == Telescope.LOW else MidICD
@property
def has_all_required_item_descriptors(self):
"""Whether the payload has all the required SOS item descriptor values."""
return self._has_all_required_item_descriptors
[docs]
def set_item_descriptors(self, heap: spead2.recv.Heap) -> Any:
"""
Updates the SPEAD ItemGroup of this Payload with the ItemDescriptors
from `heap`.
:param heap: A SPEAD heap.
:returns: Whether all required ItemDescriptors this Payload needs have
been found.
"""
updated_items = self._item_group.update(heap)
items_by_id = {item.id: item for item in updated_items.values()}
self.item_group_size = _get_item_group_size(self._item_group)
# CBF Low sends this item to identify itself, otherwise assume Mid
self.telescope = Telescope.MID
if ItemID.CBF_SOURCE_ID in items_by_id:
cbf_source_id = items_by_id[ItemID.CBF_SOURCE_ID].value
if cbf_source_id == Spead2ReceiverPayload.LOW_CBF_SOURCE_ID:
self.telescope = Telescope.LOW
else:
logger.warning(
"ItemDescriptor 0x%04x (CBF_SOURCE_ID) found, but value "
'"%s" != "%s" for Low, assuming "M" for Mid',
ItemID.CBF_SOURCE_ID,
cbf_source_id.decode(),
Spead2ReceiverPayload.LOW_CBF_SOURCE_ID.decode("ascii"),
)
self._has_all_required_item_descriptors = set(self._icd.ITEM_IDS).issubset(
self._item_group.ids()
)
if self._has_all_required_item_descriptors:
def get_item(x):
return self._item_group[x.value.id]
Items = self._icd.Items
# cache references to items in item group for faster access on
# future updates
# pylint: disable=attribute-defined-outside-init
self._corr_out_data_item = get_item(Items.CORRELATOR_OUTPUT_DATA)
if self.telescope == Telescope.LOW:
self._sps_epoch_item = get_item(Items.SPS_EPOCH)
self._epoch_offset_item = get_item(Items.EPOCH_OFFSET)
# the sdp-cbf-emulator can send multiple channels per stream
# even when emulating Low
corr_out_data_shape = self._corr_out_data_item.shape
self.baseline_count = corr_out_data_shape[-1]
if len(corr_out_data_shape) == 2:
logger.warning(
"Found multi-channel visibilities in SKA Low stream: "
"%r, dealing with it",
corr_out_data_shape,
)
self.channel_count = corr_out_data_shape[0]
else:
self.channel_count = 1
else:
self._baseline_count_item = get_item(Items.BASELINE_COUNT)
self._channel_count_item = get_item(Items.CHANNEL_COUNT)
self._channel_id_item = get_item(Items.CHANNEL_ID)
self._hardware_id_item = get_item(Items.HARDWARE_ID)
self._phase_bin_id_item = get_item(Items.PHASE_BIN_ID)
self._phase_bin_count_item = get_item(Items.PHASE_BIN_COUNT)
self._polarisation_id_item = get_item(Items.POLARISATION_ID)
self._scan_id_item = get_item(Items.SCAN_ID)
self._timestamp_count_item = get_item(Items.TIMESTAMP_COUNT)
self._timestamp_fraction_item = get_item(Items.TIMESTAMP_FRACTION)
# Log that the start-of-stream heap has arrived
if self.telescope == Telescope.LOW:
# Low also sends most values in the SOS heap
self.channel_id = get_item(Items.CHANNEL_ID).value
self.scan_id = get_item(Items.SCAN_ID).value
# TO DO: something similar needs to be done for MID as well
self.beam_id = get_item(Items.STATION_BEAM_ID).value
self.hardware_id = get_item(Items.HARDWARE_ID).value
else:
missing_items = set(self._icd.ITEM_IDS) - set(self._item_group.ids())
logger.warning("Missing required ItemDescriptors: %s", missing_items)
[docs]
def update(self, heap: spead2.recv.Heap):
"""
Updates this Payload with the data extracted from the Items in the given
heap.
:param heap: A SPEAD heap.
"""
assert (
self._has_all_required_item_descriptors
), "ItemGroup doesn't have all required ItemDescriptors"
ig = self._item_group
ig.update(heap)
if self.telescope == Telescope.MID:
self.baseline_count = self._baseline_count_item.value
self.channel_count = self._channel_count_item.value
self.channel_id = self._channel_id_item.value
self.hardware_id = self._hardware_id_item.value
self.phase_bin_id = self._phase_bin_id_item.value
self.phase_bin_count = self._phase_bin_count_item.value
self.polarisation_id = self._polarisation_id_item.value
self.scan_id = self._scan_id_item.value
self.timestamp = self._icd.icd_to_unix(
self._timestamp_count_item.value,
self._timestamp_fraction_item.value,
)
corr_out_data = self._corr_out_data_item.value
self.time_centroid_indices = corr_out_data["TCI"]
self.correlated_data_fraction = corr_out_data["FD"]
self.cci = corr_out_data["CCI"]
self.visibilities = corr_out_data["VIS"]
else:
self.timestamp = self._icd.icd_to_unix(
self._sps_epoch_item.value,
self._epoch_offset_item.value,
)
corr_out_data = self._corr_out_data_item.value
self.time_centroid_indices = corr_out_data["TCI"]
self.correlated_data_fraction = corr_out_data["FD"]
self.visibilities = corr_out_data["VIS"]
[docs]
class ItemDescStatus(enum.IntEnum):
"""
An enumeration describing the different status in which a stream can be in
with respect to having received ICD item descriptors for its data heaps.
"""
NOT_RECEIVED = 0
VALID = 1
INVALID = 2
[docs]
class TransportProtocols(enum.Enum):
"""Supported transport protocols for SPEAD reception."""
UDP = "udp"
TCP = "tcp"
[docs]
@dataclasses.dataclass
class StatsConfig:
"""Configuration for realtime receiver stats."""
dataqueue_bootstrap_address: str
dataqueue_topic: str
[docs]
@dataclasses.dataclass
class Spead2ReceptionConfig(Config):
"""Set of options used to build a spead2 network receiver."""
method: str = "spead2_receivers"
num_streams: int | None = None
"""
The number of streams this receiver should open.
(Preferred over :py:attr:`num_channels` and :py:attr:`channels_per_stream`
which are now deprecated).
"""
num_channels: int | None = None
"""
*DEPRECATED*, use `num_streams` instead.
The number of channels to receive.
"""
continuous_mode: bool | None = None
"""
*DEPRECATED*: use `scans_to_receive` instead. Streams are also not normally
recreated.
Whether the receiver should re-create the streams and resume receiving data
after all end of streams are reached.
"""
scans_to_receive: int | None = None
"""
The number of scans to receive data for, after which the reception finishes
automatically. If non-positive, reception never finishes automatically.
Default is to first inspect the deprecated continuous_mode option if set
and obey it, otherwise receive data for a single scan. In the future it will
default to receive data indefinitely.
"""
channels_per_stream: int | None = None
"""
*DEPRECATED*, use `num_streams` instead.
The number of channels for which data will be sent in a single stream. This
is used in the case where multiple ports are required with multiple
channels per port. Together with `num_channels` they define the number of
streams to listen for (and therefore the number of ports to open). ``0``
means that all channels should be received on a single stream.
"""
stats_receiver_interval: float = 1.0
"""
Period of time, in seconds, between publishing of receiver stats to kafka.
"""
stats_config: StatsConfig | None = None
"""
Stats capture configuration. If provided, send stats to the specified
dataqueue.
"""
data_loss_report_rate: float = 1.0
"""
The period, in seconds, at which lost data heaps should be reported, if
any.
"""
start_of_stream_report_rate: float | None = None
"""
*DEPRECATED*, use `stream_extremes_report_rate` instead.
The period, in seconds, at which start-of-stream heaps should be reported
as they arrive.
"""
stream_extremes_report_rate: float | None = None
"""
The period in seconds that start-of-stream and end-of-stream heaps report as
they arrive. Defaults to 1, unless start_of_stream_report_rate has been
given, in which case its value is used. Regardless of where the value of
stream_extremes_report_rate is initialised from, it is set to 1 if
non-positive.
"""
port_start: int = 41000
"""
The initial port number to which to bind the receiver streams. Successive
streams are opened in successive ports after this.
"""
bind_hostname: str = ""
"""
The IP address or hostname of the interface to which to bind for reception.
"""
transport_protocol: TransportProtocols = TransportProtocols.UDP
"""The network transport protocol used by spead2."""
pcap_file: str = ""
"""
Packet capture file to read SPEAD packets from. If set then data reception
will be done by reading data from this file instead of from the network.
The pcap file should have data for one or more valid SPEAD UDP streams. The
``num_streams`` and ``port_start`` configuration options are still used to
determine how many of these streams to "receive" from the file, with each
stream resulting from filtering the pcap file for a different, successfive
destination UDP port starting at ``port_start``.
"""
readiness_filename: str = ""
"""
If given, the name of the file to create (empty) on disk once the receiver
has finished setting itself up and is ready to receive data.
"""
max_pending_data_heaps: int = 10
"""
The number of data heaps on each stream to accumulate in memory before the
start-of-stream heap arrives. Data heaps cannot be processed before the
start-of-stream heap arrives, so a finite queue is implemented to keep some
of them around. Further data heaps that don’t fit in the queue are dropped
and permanently lost.
"""
test_failure: bool = False
"""
DEPRECATED, has no effect.
"""
ring_heaps: int = 16
"""
The number of ring heaps used in by each SPEAD stream.
"""
buffer_size: int | None = None
"""
The socket buffer size to use. If not given, a default value is calculated
based on the default value set by spead2, and the limits imposed by the OS.
"""
max_packet_size: int = spead2.recv.Stream.DEFAULT_UDP_MAX_SIZE
"""
The maximum packet size to accept on the streams.
"""
receiver_threads: int = 1
"""
The number threads allocated to the spead2 I/O thread pool.
"""
reset_time_indexing_after_each_scan: bool = False
"""
Whether to reset the aggregator's time indexing when data reception for a
scan finishes.
"""
exit_timeout: float = 10
"""
If non-negative, the maximum amount of time in seconds to wait for the final aggregation to
occur before exiting. If it takes longer, the final aggregation is cancelled. If negative,
wait indefinitely.
"""
@property
def port_range(self) -> range:
"""The range of ports the receiver should bind to"""
return range(self.port_start, self.port_start + self.num_streams)
def __post_init__(self):
self._adjust_num_streams()
self._adjust_buffer_size()
self._adjust_stream_extremes_report_rate()
self._adjust_scans_to_receive()
def _adjust_num_streams(self):
if self.num_streams is not None:
return
if self.num_channels is not None or self.channels_per_stream is not None:
warnings.warn(
(
"reception.num_channels and reception.channels_per_stream "
"are deprecated, use reception.num_streams instead"
),
category=DeprecationWarning,
)
num_channels = self.num_channels
channels_per_stream = self.channels_per_stream
if num_channels is None:
num_channels = 1
if num_channels == 0:
raise ValueError(
"Reception num_channels configuration must be the real amount of files"
)
if channels_per_stream == 0 or channels_per_stream is None:
channels_per_stream = num_channels
self.num_streams = num_channels // channels_per_stream
else:
self.num_streams = 1
def _adjust_buffer_size(self):
if self.buffer_size is not None:
return
default_buffer_size = (
spead2.recv.Stream.DEFAULT_UDP_BUFFER_SIZE
if self.transport_protocol == TransportProtocols.UDP
else spead2.recv.Stream.DEFAULT_TCP_BUFFER_SIZE
)
os_max_buffer_size = socket_utils.max_socket_read_buffer_size(
self.transport_protocol.value
)
if os_max_buffer_size >= default_buffer_size:
self.buffer_size = default_buffer_size
else:
logger.debug(
"Adjusting default reception buffer_size (%d -> %d) to match OS max settings",
default_buffer_size,
os_max_buffer_size,
)
self.buffer_size = os_max_buffer_size
def _adjust_stream_extremes_report_rate(self):
if self.start_of_stream_report_rate is not None:
warnings.warn(
(
"reception.start_of_stream_report_rate is deprecated, "
"use reception.stream_extremes_report_rate instead"
),
category=DeprecationWarning,
)
if self.stream_extremes_report_rate is None:
self.stream_extremes_report_rate = self.start_of_stream_report_rate
if self.stream_extremes_report_rate is None:
self.stream_extremes_report_rate = 1
if self.stream_extremes_report_rate <= 0:
self.stream_extremes_report_rate = 1
def _adjust_scans_to_receive(self):
if self.scans_to_receive is not None:
return
if self.continuous_mode is not None:
warnings.warn(
"reception.continuous_mode is deprecated, use reception.scans_to_receive",
category=DeprecationWarning,
)
self.scans_to_receive = 0 if self.continuous_mode else 1
else:
self.scans_to_receive = 1
[docs]
class receiver:
"""SPEAD2 receiver.
This class uses the spead2 library to receive a multiple number of streams,
each using a single UDP reader. As heaps are received they are given to a
single consumer.
This receiver supports UDP multicast addressing. This means that multiple
receivers (and therefore consumers) can access the same transmitted stream
if they bind to the same multicast IP address and port.
"""
config_class = Spead2ReceptionConfig
def __init__(
self,
config: Spead2ReceptionConfig,
multi_beam_pipeline: MultiBeamPipeline,
):
self.config = config
self._multi_beam_pipeline = multi_beam_pipeline
# non-spead2 heap type specific trackers
self._dropped_data_tracker = HeapTracker[int]()
self._sos_complete_tracker = HeapTracker[int]()
self._sos_incomplete_tracker = HeapTracker[int]()
self._eos_tracker = HeapTracker[int]()
# spead2 tracking added lazily via set_streams
self.reception_stats_tracker = ReceiverStatsTracker(
sos_complete_tracker=self._sos_complete_tracker,
sos_incomplete_tracker=self._sos_incomplete_tracker,
eos_tracker=self._eos_tracker,
dropped_data_tracker=self._dropped_data_tracker,
)
# background thread periodic messaging to data queues
self._stats_worker = StatsDataQueueWorker(
self.reception_stats_tracker.collect, interval=config.stats_receiver_interval
)
# active thread periodic logging
(
self._dropped_data_logger,
self._sos_complete_logger,
self._sos_incomplete_logger,
self._eos_heap_logger,
) = (
PeriodicSummaryLogger(
logger,
message_template,
lambda tracker=tracker: tracker.num_heaps_since_snapshot,
tracker.record_snapshot,
period,
)
for message_template, tracker, period in [
(
"Dropped %(summary)s data heaps in the last %(elapsed_time).2f [s] as"
" start-of-stream heap with item descriptors has not been received and"
" buffer is full",
self._dropped_data_tracker,
config.data_loss_report_rate,
),
(
"Successfully received %(summary)s complete start-of-stream heaps in the"
" last %(elapsed_time).2f [s]",
self._sos_complete_tracker,
config.stream_extremes_report_rate,
),
(
"Received %(summary)s incomplete start-of-stream heaps in the last"
" %(elapsed_time).2f [s]",
self._sos_incomplete_tracker,
config.stream_extremes_report_rate,
),
(
"Successfully received %(summary)s end-of-stream heaps in the last"
" %(elapsed_time).2f [s]",
self._eos_tracker,
config.stream_extremes_report_rate,
),
]
)
if config.stats_config:
self._stats_worker.add_producer(
StatsDataQueueProducer(
dataqueue_server=config.stats_config.dataqueue_bootstrap_address,
topic=config.stats_config.dataqueue_topic,
)
)
self._reception_loop_interrupt_event = asyncio.Event()
# Only a single background flush is executed on the background for simplicity.
# Multiple background flushes can be supported if required.
self._end_of_scan_aggregation_flush_task: asyncio.Task | None = None
@property
def stats(self) -> ReceptionStats:
"""Return the latest receiver statistics."""
return self.reception_stats_tracker.collect()
@property
def received_payloads(self) -> int:
"""The number of data payloads that have been received."""
return self._multi_beam_pipeline.added_payloads
@property
def aggregated_payloads(self) -> int:
"""The number of data payloads that have been received and aggregated."""
return self._multi_beam_pipeline.aggregated_payloads
@property
def visibilities_generated(self) -> int:
"""The number of visibilities that have been successfully generated."""
return self._multi_beam_pipeline.visibilities_generated
@property
def visibilities_consumed(self) -> int:
"""The number of visibilities that have been successfully consumed."""
return self._multi_beam_pipeline.visibilities_consumed
@property
def consumer(self) -> Consumer:
"""The consumer this receiver finally forwards data to."""
return self._multi_beam_pipeline.consumer
def _setup_streams(self, io_thread_pool, config):
start_time = time.time()
streams = []
def log_stream_creation(stream_type: str):
logger.info(
"Created %d %s receive streams starting at port %d in %.3f [ms]",
config.num_streams,
stream_type,
config.port_start,
(time.time() - start_time) * 1000,
stacklevel=2,
)
if config.pcap_file:
pcap_filter = "udp dst port {port}"
for port in config.port_range:
stream = create_stream(io_thread_pool, config.ring_heaps, 32)
stream.add_udp_pcap_file_reader(config.pcap_file, pcap_filter.format(port=port))
streams.append(stream)
log_stream_creation(f"pcap-based (filter='{pcap_filter}')")
else:
protocol = config.transport_protocol
for i, port in enumerate(config.port_range):
stream = create_stream(io_thread_pool, config.ring_heaps, 32)
add_reader = stream.add_udp_reader
if protocol == TransportProtocols.TCP:
add_reader = stream.add_tcp_reader
try:
add_reader(
port,
bind_hostname=config.bind_hostname,
buffer_size=config.buffer_size,
max_size=config.max_packet_size,
)
except Exception as e:
raise RuntimeError(f"Cannot create stream {i} on port {port}") from e
streams.append(stream)
log_stream_creation(protocol.value)
payloads = [Spead2ReceiverPayload() for _ in range(config.num_streams)]
self.reception_stats_tracker.set_streams(streams)
return streams, payloads
[docs]
async def run(self, ready_event: asyncio.Event | None = None):
"""
Opens all configured streams for data reception, passing any data payload
down for aggregation. This coroutine will run until it gets cancelled,
or until the number of scans indicated in the ``scans_to_receive`` setting
have been received.
"""
async with self._stats_worker:
try:
await self._run(ready_event)
finally:
await self._finish_reception()
async def _run(self, ready_event: asyncio.Event | None):
current_task = asyncio.current_task()
io_thread_pool = spead2.ThreadPool(threads=self.config.receiver_threads)
try:
streams, payloads = self._setup_streams(io_thread_pool, self.config)
except Exception: # pylint: disable=broad-except
io_thread_pool.stop()
raise
current_task.add_done_callback(lambda _: stop_streams(streams))
current_task.add_done_callback(lambda _: io_thread_pool.stop())
self._signal_ready_to_receive(ready_event)
received_scans = 0
while True:
self.reception_stats_tracker.reset()
streams_stopped_by_end_scan = await self._run_reception_loop(streams, payloads)
self._eos_heap_logger.maybe_log(force=True)
if streams_stopped_by_end_scan:
logger.warning(
"%d streams stopped early due to SDP's EndScan", streams_stopped_by_end_scan
)
await self._flush_then_schedule_end_of_scan()
self.reception_stats_tracker.reception_stopped()
if self.config.reset_time_indexing_after_each_scan:
self._multi_beam_pipeline.reset_time_indexing()
self.reception_stats_tracker.collect().log()
received_scans += 1
if (
self.config.scans_to_receive is not None
and self.config.scans_to_receive > 0
and received_scans >= self.config.scans_to_receive
):
break
# tcp requires re-opening the ports, the receiver streams accept() only one sender connection
if self.config.transport_protocol == TransportProtocols.TCP:
logger.info("Re-creating streams in TCP mode")
stop_streams(streams)
streams, payloads = self._setup_streams(io_thread_pool, self.config)
async def _flush_then_schedule_end_of_scan(self):
"""Flush immediately then schedule processing with an end of scan.
Cancels and drops pending flushes of previous incomplete end of scans.
"""
logger.info("receiver triggering full flush")
if self._end_of_scan_aggregation_flush_task:
try:
await receiver._acancel(self._end_of_scan_aggregation_flush_task)
except Exception: # pylint: disable=broad-exception-caught
logger.exception("Unexpected error on full flush")
async def process_and_end_scan():
try:
await self._multi_beam_pipeline.flush(full_flush=True)
finally:
logger.info("full flush complete")
self._end_of_scan_aggregation_flush_task = asyncio.create_task(process_and_end_scan())
@staticmethod
async def _acancel(task: asyncio.Task) -> None:
"""Cancel and await the completion for the given task if is not done yet.
:param task: The task to finish.
:raises Exception: Any exception raised by the task other than asyncio.CancelledError.
"""
if not task.done():
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
async def _finish_reception(self):
if (task := self._end_of_scan_aggregation_flush_task) is not None:
if self.config.exit_timeout >= 0:
try:
await asyncio.wait_for(task, self.config.exit_timeout)
except asyncio.TimeoutError:
logger.warning(
"Timed out after %.3f [s] waiting for final aggregation, cancelling",
self.config.exit_timeout,
)
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
def _signal_ready_to_receive(self, ready_event: asyncio.Event | None = None):
readiness_filename = self.config.readiness_filename
if readiness_filename:
with open(readiness_filename, "wb"):
pass
logger.debug(
"Created %s to signal we are ready to receive data",
readiness_filename,
)
if ready_event:
ready_event.set()
logger.info("Ready to receive data")
async def _run_reception_loop(
self, streams: list[spead2.recv.asyncio.Stream], payloads: list[Spead2ReceiverPayload]
) -> int:
"""
Receive all spead2 streams until the respective end of stream heaps or the interrupt event.
Length of `streams` must match length of `payloads`.
:returns: Number of streams that were forecefully interrupted.
"""
self._reception_loop_interrupt_event.clear()
interrupt_task = asyncio.create_task(self._reception_loop_interrupt_event.wait())
receive_tasks = [
self._interruptible_reception_loop(stream_idx, stream, payload, interrupt_task)
for stream_idx, (stream, payload) in enumerate(zip(streams, payloads, strict=True))
]
logger.info("(Re-)starting reception loops")
try:
results = await asyncio.gather(*receive_tasks, return_exceptions=False)
finally:
await receiver._acancel(interrupt_task)
return sum(1 for result in results if result)
async def _interruptible_reception_loop(
self,
stream_index: int,
stream: spead2.recv.asyncio.Stream,
payload: Spead2ReceiverPayload,
interrupt_task: asyncio.Task,
) -> bool:
"""
Receive a single spead2 stream until the end of scan heap or the interrupt event.
:returns: `True` if interrupted.
"""
reception_loop = asyncio.create_task(
self._process_stream_heaps(stream_index, stream, payload)
)
done, _pending = await asyncio.wait(
[interrupt_task, reception_loop], return_when=asyncio.FIRST_COMPLETED
)
await receiver._acancel(reception_loop)
return interrupt_task in done
async def _process_stream_heaps(
self, stream_index: int, stream: spead2.recv.asyncio.Stream, payload: Spead2ReceiverPayload
) -> None:
"""Read heaps from the stream and processes based on the heap type."""
# pylint: disable=too-many-branches
pending_data_heaps: list[spead2.recv.Heap] = []
item_desc_status = ItemDescStatus.NOT_RECEIVED
async for heap in stream:
now = time.time()
self.reception_stats_tracker.reception_started(now)
self._dropped_data_logger.maybe_log(timestamp=now)
self._sos_complete_logger.maybe_log(timestamp=now)
self._sos_incomplete_logger.maybe_log(timestamp=now)
# Handle stream control heaps
if heap.is_start_of_stream():
if isinstance(heap, spead2.recv.IncompleteHeap):
self._sos_incomplete_tracker.record_heap(stream_index)
continue
payload.set_item_descriptors(heap)
if not payload.has_all_required_item_descriptors:
item_desc_status = ItemDescStatus.INVALID
logger.error(
"start-of-stream heap received, "
"but doesn't contain all required item descriptors, "
"all incoming heaps will be discarded"
)
self._sos_incomplete_tracker.record_heap(stream_index)
continue
item_desc_status = ItemDescStatus.VALID
self._sos_complete_tracker.record_heap(stream_index)
# These need to be done sequentially because the payload
# object is updated with the data from each heap.
for pending_heap in pending_data_heaps:
await self._process_data_heap(pending_heap, payload)
if pending_data_heaps:
pending_data_heaps.clear()
continue
elif heap.is_end_of_stream():
self._eos_tracker.record_heap(stream_index)
break
# Handle data heaps
if isinstance(heap, spead2.recv.IncompleteHeap):
continue
if item_desc_status == ItemDescStatus.INVALID:
continue
elif item_desc_status == ItemDescStatus.NOT_RECEIVED:
if len(pending_data_heaps) == self.config.max_pending_data_heaps:
self._dropped_data_tracker.record_heap(stream_index)
else:
pending_data_heaps.append(heap)
continue
await self._process_data_heap(heap, payload)
ts = time.time()
self._dropped_data_logger.maybe_log(timestamp=ts, force=True)
self._sos_complete_logger.maybe_log(timestamp=ts, force=True)
self._sos_incomplete_logger.maybe_log(timestamp=ts, force=True)
async def _process_data_heap(self, heap: spead2.recv.Heap, payload: Spead2ReceiverPayload):
"""Update the payload container with a heap and pass to the aggregation layer."""
payload.update(heap)
self.reception_stats_tracker.inform_item_group_size(payload.item_group_size)
self.reception_stats_tracker.scan_id = int(payload.scan_id)
self._multi_beam_pipeline.add_payload(payload)