- Module code
- realtime.receive.modules.scan_providers.ms_scan_provider
-
Source code for realtime.receive.modules.scan_providers.ms_scan_provider
import logging
import numpy as np
from astropy.time import Time
from realtime.receive.core.ms.ms_utils import MeasurementSet
from realtime.receive.core.scan import Scan, ScanType
from realtime.receive.modules.scan_providers.eb_state import EbStateScan
from realtime.receive.modules.scan_providers.fake_scan_provider import FakeScanProvider
logger = logging.getLogger(__name__)
MAX_CONCURRENT_SCANS = 2
"""
Maximum number of scans ever expected to be received concurrently. 2 is already
a stretch, but is useful for tests.
"""
_SECS_PER_DAY = 86400
def _calculate_scan_table(ms: MeasurementSet) -> list[EbStateScan]:
# pylint: disable-next=protected-access
timerange = ms._subtable("OBSERVATION").getcell("TIME_RANGE", 0) / _SECS_PER_DAY
return [
EbStateScan(
scan_id=1,
scan_type_id="0",
# NOTE: conversion losses apparent, widening using floor and ceil helps testing
start_time=Time(np.floor(timerange[0]), format="mjd").unix_tai,
end_time=Time(np.ceil(timerange[1]), format="mjd").unix_tai,
),
EbStateScan(
scan_id=0,
scan_type_id="0",
start_time=Time(np.ceil(timerange[1]), format="mjd").unix_tai,
end_time=None,
),
]
[docs]
class MSScanProvider(FakeScanProvider):
"""A ScanProvider that reads Scan information from an input Measurement Set."""
def __init__(self, measurement_set: str | MeasurementSet):
_ms = (
MeasurementSet.open(measurement_set)
if isinstance(measurement_set, str)
else measurement_set
)
scan_type = _ms.calculate_scan_type()
self._scan_table = [
Scan(eb_scan.scan_id, scan_type) for eb_scan in _calculate_scan_table(_ms)
]
if isinstance(measurement_set, str):
_ms.close()
super().__init__([scan_type])
@property
def _scan_type(self) -> ScanType:
assert len(self.scan_types) == 1
return next(iter(self.scan_types.values()))