Source code for realtime.receive.modules.scan_providers.fake_scan_provider

from __future__ import annotations

import functools
import logging

from astropy import units
from astropy.coordinates.angles import Angle
from deprecated import deprecated
from overrides import override
from realtime.receive.core.channel_range import ChannelRange
from realtime.receive.core.scan import (
    Beam,
    Channels,
    Field,
    PhaseDirection,
    Polarisations,
    Scan,
    ScanType,
    SpectralWindow,
    StokesType,
)

from realtime.receive.modules.scan_providers.eb_state import EbStateScan, UnixTaiTimestamp
from realtime.receive.modules.scan_providers.scan_provider import ScanProvider

logger = logging.getLogger(__name__)

MAX_CONCURRENT_SCANS = 2
"""
Maximum number of scans ever expected to be received concurrently. 2 is already
a stretch, but is useful for tests.
"""


def create_fake_scan_type(
    sw_channels: ChannelRange,
    scan_type_id: str = "scan_type_id",
    beam_id: str = "beam_id",
    scan_intents: list | None = None,
    integration_time: float = 1,
    averaging_samples: int = 1,
    averaging_channels: int = 1,
):
    """Create a fake scan type from a spectral window definition."""
    if scan_intents is None:
        scan_intents = []
    spectral_window = SpectralWindow(
        spectral_window_id="spectral_window_id",
        count=sw_channels.count,
        start=sw_channels.start_id,
        freq_min=1e8,
        freq_max=2e8,
        stride=sw_channels.stride,
    )
    return ScanType(
        scan_type_id=scan_type_id,
        beams=[
            Beam(
                beam_id=beam_id,
                function="visibilities",
                visibility_beam_id=1,
                channels=Channels(
                    channels_id="channels_id",
                    spectral_windows=[spectral_window],
                ),
                polarisations=Polarisations(
                    polarisation_id="polarisation_id",
                    correlation_type=[
                        StokesType.XX,
                        StokesType.XY,
                        StokesType.YX,
                        StokesType.YY,
                    ],
                ),
                field=Field(
                    field_id="field_id",
                    phase_dir=PhaseDirection(
                        ra=Angle([0], units.rad),
                        dec=Angle([0], units.rad),
                        reference_time="J2000",
                    ),
                ),
            )
        ],
        scan_intents=scan_intents,
        integration_time=integration_time,
        averaging_channels=averaging_channels,
        averaging_samples=averaging_samples,
    )


[docs] class FakeScanProvider(ScanProvider): """A Fake ScanProvider. Returns always the same ScanType information. """ def __init__(self, scan_types): super().__init__(scan_types) self._scan_table: list[Scan] = []
[docs] @staticmethod def from_spectral_window( sw_channels: ChannelRange, scan_intents: list | None = None, integration_time: float = 1, averaging_samples: int = 1, averaging_channels: int = 1, ) -> FakeScanProvider: """Create instance using a single SpectralWindow with the given channel configuration.""" return FakeScanProvider( [ create_fake_scan_type( sw_channels, scan_intents=scan_intents, integration_time=integration_time, averaging_samples=averaging_samples, averaging_channels=averaging_channels, ) ] )
@property @deprecated def _scan_type(self) -> ScanType: assert len(self.scan_types) == 1 return next(iter(self.scan_types.values()))
[docs] @deprecated @override @functools.lru_cache(MAX_CONCURRENT_SCANS) def get(self, scan_id: int) -> Scan | None: return Scan(scan_id, self._scan_type)
[docs] @override def query_scans( self, start_time: UnixTaiTimestamp, end_time: UnixTaiTimestamp ) -> list[EbStateScan]: scan_type = next(iter(self._scan_types.values())) return [ EbStateScan( scan_id=1, scan_type_id=scan_type.scan_type_id, start_time=start_time, end_time=(end_time + 1), ) ]
[docs] @override def query_scan(self, time: UnixTaiTimestamp) -> EbStateScan: scan_type = next(iter(self._scan_types.values())) return EbStateScan( scan_id=1, scan_type_id=scan_type.scan_type_id, start_time=time, end_time=None, )