- Module code
- realtime.receive.modules.aggregators.config
-
Source code for realtime.receive.modules.aggregators.config
from deprecated import deprecated
from pydantic import dataclasses
[docs]
@dataclasses.dataclass
class AggregationConfig:
"""
Options controlling how data aggregation happens in the receiver before
it's handed over to Consumers.
"""
time_period: float = 5
"""
Period, in seconds, after which payloads should be aggregated. If this is
a non-positive number then aggregation doesn't happen in the background,
but can still be triggered manually. It is still triggered automatically
at shutdown regardless.
"""
num_timestamps_per_aggregation: int = -1
"""
Number of payloads to aggregate into a single Visibility object. This is used
to ensure that the Visibility object is a predictable time. The time_period is
approximate and in the case where multiple receivers need to be synchronised
the number of timestamps per aggregation is more reliable.
Behaviour: If this is a positive number, then the aggregation return a visibility from
a flush when the number of payloads is equal to this number.
This is managed via the periodic aggregation task.
"""
timestamp_tolerance: int = 5
"""
Number of integration intervals to avoid considering in an aggregation step
to cater for slower streams that haven't caught up with receiving some of
their data.
"""
payloads_per_backoff: int = 0
"""
This is a *HIGHLY TECHNICAL* option, so only change if you know what you're
doing.
Number of payloads to put into the temporary `VisibilityBuilder` object
during the periodic background aggregation before giving control back to
the IO loop. Adding payloads into the `VisibilityBuilder` is a CPU
intensive task, so if many payloads are being added this can stall the
other coroutines that are executing in the system concurrently under the
same IO loop. Yielding back control to the IO loop every now and then
allows other coroutines to progress at the expense of this final
aggregation step taking longer.
Defaults to 0, which means the final value is calculated as a function of
the number of streams being received, which itself is a proxy for the number
of coroutines in concurrent execution. If positive, it is taken as-is.
If negative, there is no backoff.
"""
backoff_time: float = 0.0
"""
This is a *HIGHLY TECHNICAL* option, so only change if you know what you're
doing.
Time to back off for when yielding back control to the IO loop during
payload aggregation into a `VisibilityBuilder`. See `payloads_per_backoff`
for more details. By default we simply yield back without requesting any
sleeping time in between.
"""
@property
@deprecated
def tolerance(self):
"""
Number of integration intervals to avoid considering in an aggregation step
to cater for slower streams that haven't caught up with receiving some of
their data.
"""
return self.timestamp_tolerance