Source code for realtime.receive.modules.consumers.accumulating_consumer

from .consumer import Consumer, Visibility


[docs] class AccumulatingConsumer(Consumer): """A consumer that keeps all visibilities around for later inspection.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.visibilities: list[Visibility] = [] self.scans_started = [] self.scans_ended = []
[docs] async def consume(self, visibility: Visibility): self.visibilities.append(visibility)
[docs] async def start_scan(self, scan_id: int) -> None: self.scans_started.append(scan_id)
[docs] async def end_scan(self, scan_id: int) -> None: self.scans_ended.append(scan_id)