Source code for realtime.receive.modules.aggregators.config

from deprecated import deprecated
from pydantic import dataclasses


[docs] @dataclasses.dataclass class AggregationConfig: """ Options controlling how data aggregation happens in the receiver before it's handed over to Consumers. """ time_period: float = 5 """ Period, in seconds, after which payloads should be aggregated. If this is a non-positive number then aggregation doesn't happen in the background, but can still be triggered manually. It is still triggered automatically at shutdown regardless. """ num_timestamps_per_aggregation: int = -1 """ Number of payloads to aggregate into a single Visibility object. This is used to ensure that the Visibility object is a predictable time. The time_period is approximate and in the case where multiple receivers need to be synchronised the number of timestamps per aggregation is more reliable. Behaviour: If this is a positive number, then the aggregation return a visibility from a flush when the number of payloads is equal to this number. This is managed via the periodic aggregation task. """ timestamp_tolerance: int = 5 """ Number of integration intervals to avoid considering in an aggregation step to cater for slower streams that haven't caught up with receiving some of their data. """ payloads_per_backoff: int = 0 """ This is a *HIGHLY TECHNICAL* option, so only change if you know what you're doing. Number of payloads to put into the temporary `VisibilityBuilder` object during the periodic background aggregation before giving control back to the IO loop. Adding payloads into the `VisibilityBuilder` is a CPU intensive task, so if many payloads are being added this can stall the other coroutines that are executing in the system concurrently under the same IO loop. Yielding back control to the IO loop every now and then allows other coroutines to progress at the expense of this final aggregation step taking longer. Defaults to 0, which means the final value is calculated as a function of the number of streams being received, which itself is a proxy for the number of coroutines in concurrent execution. If positive, it is taken as-is. If negative, there is no backoff. """ backoff_time: float = 0.0 """ This is a *HIGHLY TECHNICAL* option, so only change if you know what you're doing. Time to back off for when yielding back control to the IO loop during payload aggregation into a `VisibilityBuilder`. See `payloads_per_backoff` for more details. By default we simply yield back without requesting any sleeping time in between. """ @property @deprecated def tolerance(self): """ Number of integration intervals to avoid considering in an aggregation step to cater for slower streams that haven't caught up with receiving some of their data. """ return self.timestamp_tolerance