- Module code
- realtime.receive.modules.scan_providers.scan_provider
-
Source code for realtime.receive.modules.scan_providers.scan_provider
"""Base class definition for all scan providers."""
import abc
import logging
from collections.abc import Iterable
from contextlib import AbstractAsyncContextManager
from typing import TypeAlias
from deprecated import deprecated
from overrides import override
from realtime.receive.core.scan import Scan, ScanType
from realtime.receive.modules.scan_providers.eb_state import EbStateScan
logger = logging.getLogger(__name__)
UnixTaiTimestamp: TypeAlias = float
"""Floating-point TAI seconds from Unix epoch."""
[docs]
class ScanProvider(AbstractAsyncContextManager):
"""A class that can return Scan objects for given scan IDs."""
def __init__(self, scan_types: Iterable[ScanType]):
self._scan_types: dict[str, ScanType] = {
scan_type.scan_type_id: scan_type for scan_type in scan_types
}
[docs]
@abc.abstractmethod
@deprecated(reason="use query_scans")
def get(self, scan_id: int) -> Scan | None:
"""Return the Scan object associated to the given scan ID.
If no scan is known for the given ID, None is returned instead
:param scan_id: The Scan ID
"""
[docs]
@abc.abstractmethod
def query_scans(
self, start_time: UnixTaiTimestamp, end_time: UnixTaiTimestamp
) -> list[EbStateScan]:
"""Return a table scan definitions intersection with the specified time window.
:param start_time: window start time as utc timestamp or unix tai timestamp.
:param end_time: window end time as utc timestamp or unix tai timestamp.
"""
[docs]
@abc.abstractmethod
def query_scan(self, time: UnixTaiTimestamp) -> EbStateScan:
"""Return a table scan definition for the specified time."""
@property
def scan_types(self) -> dict[str, ScanType]:
"""Dictionary of all ScanTypes known to this scan provider keyed by ScanType ID."""
return self._scan_types
[docs]
async def aclose(self):
"""Release all underlying resources."""
@override
async def __aenter__(self):
return self
@override
async def __aexit__(self, typ, value, traceback):
await self.aclose()