Aggregation
A single receiver can listen for data coming from multiple streams, each associated to different frequency channels. Additionally, data can be sent at different integration intervals ranging from 0.14 to 1.4 seconds. Thus, and in order to efficiently scale, the receiver aggregates the individual blocks of data it receives across time and frequency before data is handed over to the Consumers.
Aggregation happens as follows:
Aggregation is triggered periodically (with a configurable period time). When the trigger occurs, unequal amounts of data could have been received by the different reception streams.
In the example above, four streams have been receiving data before aggregation is triggered. Some streams have gaps in them at the beginning, middle or/and end of the full time interval.
Individual data chunks are put together into a single
Visibility
object, which spans from[t_first, t_last]
.t_first
is always set to the minimum timestamp of any data chunk.t_last
is calculated as follows:
The maximum timestamp
t_max
of any data chunk is first calculated.A (configurable) tolerance is used to determine the minimum last time
t_last_min
, defined as a number of time steps beforet_max
. This tolerance gives the different streams room to finish receiving data for time steps that might have been received by other streams. For instance, in the example below onlyS1
has received data forT7
.If all streams have data past
t_last_min
, the last time they all have data for is used ast_last
.In the example above, a
Visibility
object is created with all data chunks from[T0, T4]
, even though the configured tolerance initially pointed atT3
as the potential last time.The resulting
Visibility
object will flag missing data chunks by setting theflags
dataset to a non-zero value. In the example above, the data chunk corresponding to streamS1
and timeT0
will be flagged as missing.Data chunks that have been aggregated are removed, and reception continues until the next aggregation occurs.
Data chunks with timestamps lower or equal to the previous
t_last
(i.e., late arrivals) are fully discarded. In the example below aVisibility
is first produced with data chunks between[T5, T7]
(using a tolerance of 0). This doesn’t include the data chunk forS4
and timeT7
though, which arrives after that firstVisibility
is built. When the nextVisibility
is built, data chunks between[T8, T10]
are aggregated, and the data chunk forS4
and timeT7
is discarded.Time indexing can be explicitly reset (i.e.,
t
will start atT0
again) if no payloads have been added since the last flush (seePayloadAggregator.reset_time_indexing
). This can allow payloads with old timestamps to be aggregated, even though data for those timestamps has already been processed.When reception finishes for a scan (i.e., all streams have received their end-of-stream heaps), or when the reception process finishes, a full flush of any remaining data chunks is immediately performed, aggregating them into a single
Visibility
object.
See the Configuration options section to understand how to configure aggregation.