from __future__ import annotations
import functools
import logging
from astropy import units
from astropy.coordinates.angles import Angle
from deprecated import deprecated
from overrides import override
from realtime.receive.core.channel_range import ChannelRange
from realtime.receive.core.scan import (
Beam,
Channels,
Field,
PhaseDirection,
Polarisations,
Scan,
ScanType,
SpectralWindow,
StokesType,
)
from realtime.receive.modules.scan_providers.eb_state import EbStateScan, UnixTaiTimestamp
from realtime.receive.modules.scan_providers.scan_provider import ScanProvider
logger = logging.getLogger(__name__)
MAX_CONCURRENT_SCANS = 2
"""
Maximum number of scans ever expected to be received concurrently. 2 is already
a stretch, but is useful for tests.
"""
def create_fake_scan_type(
sw_channels: ChannelRange,
scan_type_id: str = "scan_type_id",
beam_id: str = "beam_id",
scan_intents: list | None = None,
integration_time: float = 1,
averaging_samples: int = 1,
averaging_channels: int = 1,
):
"""Create a fake scan type from a spectral window definition."""
if scan_intents is None:
scan_intents = []
spectral_window = SpectralWindow(
spectral_window_id="spectral_window_id",
count=sw_channels.count,
start=sw_channels.start_id,
freq_min=1e8,
freq_max=2e8,
stride=sw_channels.stride,
)
return ScanType(
scan_type_id=scan_type_id,
beams=[
Beam(
beam_id=beam_id,
function="visibilities",
visibility_beam_id=1,
channels=Channels(
channels_id="channels_id",
spectral_windows=[spectral_window],
),
polarisations=Polarisations(
polarisation_id="polarisation_id",
correlation_type=[
StokesType.XX,
StokesType.XY,
StokesType.YX,
StokesType.YY,
],
),
field=Field(
field_id="field_id",
phase_dir=PhaseDirection(
ra=Angle([0], units.rad),
dec=Angle([0], units.rad),
reference_time="J2000",
),
),
)
],
scan_intents=scan_intents,
integration_time=integration_time,
averaging_channels=averaging_channels,
averaging_samples=averaging_samples,
)
[docs]
class FakeScanProvider(ScanProvider):
"""A Fake ScanProvider.
Returns always the same ScanType information.
"""
def __init__(self, scan_types):
super().__init__(scan_types)
self._scan_table: list[Scan] = []
[docs]
@staticmethod
def from_spectral_window(
sw_channels: ChannelRange,
scan_intents: list | None = None,
integration_time: float = 1,
averaging_samples: int = 1,
averaging_channels: int = 1,
) -> FakeScanProvider:
"""Create instance using a single SpectralWindow with the given channel configuration."""
return FakeScanProvider(
[
create_fake_scan_type(
sw_channels,
scan_intents=scan_intents,
integration_time=integration_time,
averaging_samples=averaging_samples,
averaging_channels=averaging_channels,
)
]
)
@property
@deprecated
def _scan_type(self) -> ScanType:
assert len(self.scan_types) == 1
return next(iter(self.scan_types.values()))
[docs]
@deprecated
@override
@functools.lru_cache(MAX_CONCURRENT_SCANS)
def get(self, scan_id: int) -> Scan | None:
return Scan(scan_id, self._scan_type)
[docs]
@override
def query_scans(
self, start_time: UnixTaiTimestamp, end_time: UnixTaiTimestamp
) -> list[EbStateScan]:
scan_type = next(iter(self._scan_types.values()))
return [
EbStateScan(
scan_id=1,
scan_type_id=scan_type.scan_type_id,
start_time=start_time,
end_time=(end_time + 1),
)
]
[docs]
@override
def query_scan(self, time: UnixTaiTimestamp) -> EbStateScan:
scan_type = next(iter(self._scan_types.values()))
return EbStateScan(
scan_id=1,
scan_type_id=scan_type.scan_type_id,
start_time=time,
end_time=None,
)