Source code for realtime.receive.modules.scan_providers.ms_scan_provider

import logging

import numpy as np
from astropy.time import Time
from realtime.receive.core.ms.ms_utils import MeasurementSet
from realtime.receive.core.scan import Scan, ScanType

from realtime.receive.modules.scan_providers.eb_state import EbStateScan
from realtime.receive.modules.scan_providers.fake_scan_provider import FakeScanProvider

logger = logging.getLogger(__name__)

MAX_CONCURRENT_SCANS = 2
"""
Maximum number of scans ever expected to be received concurrently. 2 is already
a stretch, but is useful for tests.
"""


_SECS_PER_DAY = 86400


def _calculate_scan_table(ms: MeasurementSet) -> list[EbStateScan]:
    # pylint: disable-next=protected-access
    timerange = ms._subtable("OBSERVATION").getcell("TIME_RANGE", 0) / _SECS_PER_DAY
    return [
        EbStateScan(
            scan_id=1,
            scan_type_id="0",
            # NOTE: conversion losses apparent, widening using floor and ceil helps testing
            start_time=Time(np.floor(timerange[0]), format="mjd").unix_tai,
            end_time=Time(np.ceil(timerange[1]), format="mjd").unix_tai,
        ),
        EbStateScan(
            scan_id=0,
            scan_type_id="0",
            start_time=Time(np.ceil(timerange[1]), format="mjd").unix_tai,
            end_time=None,
        ),
    ]


[docs] class MSScanProvider(FakeScanProvider): """A ScanProvider that reads Scan information from an input Measurement Set.""" def __init__(self, measurement_set: str | MeasurementSet): _ms = ( MeasurementSet.open(measurement_set) if isinstance(measurement_set, str) else measurement_set ) scan_type = _ms.calculate_scan_type() self._scan_table = [ Scan(eb_scan.scan_id, scan_type) for eb_scan in _calculate_scan_table(_ms) ] if isinstance(measurement_set, str): _ms.close() super().__init__([scan_type]) @property def _scan_type(self) -> ScanType: assert len(self.scan_types) == 1 return next(iter(self.scan_types.values()))