- Module code
- realtime.receive.modules.consumers.accumulating_consumer
-
Source code for realtime.receive.modules.consumers.accumulating_consumer
from realtime.receive.core.scan import Scan
from .consumer import Consumer, Visibility
[docs]
class AccumulatingConsumer(Consumer):
"""A consumer that keeps all visibilities around for later inspection."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.visibilities: list[Visibility] = []
self.scans_started = []
self.scans_ended = []
[docs]
async def consume(self, visibility: Visibility):
self.visibilities.append(visibility)
[docs]
async def start_scan(self, scan: Scan) -> None:
self.scans_started.append(scan.scan_number)
[docs]
async def end_scan(self, scan: Scan) -> None:
self.scans_ended.append(scan.scan_number)