Source code for realtime.receive.modules.receiver

# -*- coding: utf-8 -*-
"""Reads data off a MS and creates SPEAD2 packets"""

import argparse
import asyncio
import contextlib
import dataclasses
import enum
import functools
import logging
import os
import signal
import sys
import warnings
from typing import Callable, Dict, Optional, Union

import ska_sdp_config
import ska_ser_logging
import yaml
from astropy.utils import iers
from realtime.receive.core.channel_range import ChannelRange
from realtime.receive.core.common import autocast_fields, from_dict
from realtime.receive.core.config import augment_config
from realtime.receive.core.katpoint_uvw_engine import KatpointUVWEngine
from realtime.receive.core.uvw_engine import UVWEngine

from realtime.receive.modules import aggregation as aggregation_mod
from realtime.receive.modules import consumers
from realtime.receive.modules.tm.base_tm import TelescopeManager

from . import receivers
from .scan_providers import (
    AssignResourcesScanProvider,
    HardcodedScanProvider,
    MSScanProvider,
    ScanProvider,
    SdpConfigScanProvider,
)
from .tm import HardcodedTelescopeManager, MeasurementSetTM, SKATelescopeManager

logger = logging.getLogger(__name__)


[docs] @dataclasses.dataclass @autocast_fields class SdpConfigDbConfig: """ Set of options used to establish a connection to the SDP Configuration DB. """ host: str = "127.0.0.1" """The host to connect to.""" port: int = 2379 """The port to connect to""" backend: str = "etcd3" """The backend to use"""
[docs] @dataclasses.dataclass class ScanProviderConfig: """Set of options used to build a Scan Provider.""" execution_block_id: Optional[str] = None """ The ID of an Execution Block in the SDP Configuration Database for which Scan information should be monitored.""" end_scan_max_delay: Optional[float] = 5.0 """ The maximum time to wait, in seconds, before closing the CBF streams after receiving an EndScan SDP command. """ measurement_set: Optional[str] = None """A Measurement Set where antenna information will be extracted from.""" assign_resources_command: Optional[str] = None """ A JSON resource containing an instance of an SDP AssignResources command, or an SDP Execution Block. The JSON document should contain all the keys needed to fully-define a Scan Type. """ hardcoded_spectral_window_channels: Union[None, str, ChannelRange] = None """ If set, can be used together with `measurement_set` to force the spectral windows of generated Scans to be a certain channel range. """
[docs] class DataSource(enum.Enum): """An enumeration of types of data sources used to obtain data.""" SDP_CONFIG_DB = enum.auto() ASSIGN_RESOURCES_FILE = enum.auto() MS = enum.auto() HARDCODED = enum.auto()
@property def data_source(self) -> DataSource: """The type of data source used by this configuration.""" if self.execution_block_id: return ScanProviderConfig.DataSource.SDP_CONFIG_DB if self.measurement_set: return ScanProviderConfig.DataSource.MS if self.assign_resources_command: return ScanProviderConfig.DataSource.ASSIGN_RESOURCES_FILE if self.hardcoded_spectral_window_channels: return ScanProviderConfig.DataSource.HARDCODED raise ValueError("Invalid ScanProvider configuration, see documentation for details") @property def needs_sdp_config_db(self): """Whether this configuration requires an SDP Config DB client.""" return self.data_source == ScanProviderConfig.DataSource.SDP_CONFIG_DB def __post_init__(self): channels = self.hardcoded_spectral_window_channels if isinstance(channels, str): self.hardcoded_spectral_window_channels = ChannelRange.from_str(channels)
def create_scan_provider( config: ScanProviderConfig, sdp_config_db: ska_sdp_config.Config | None ) -> ScanProvider: """Create a ScanProvider from the given config""" match config.data_source: case ScanProviderConfig.DataSource.SDP_CONFIG_DB: logger.info("Reading scan types from SDP configuration database") assert config.execution_block_id assert sdp_config_db scan_provider = SdpConfigScanProvider( config.execution_block_id, sdp_config_db, config.end_scan_max_delay, ) case ScanProviderConfig.DataSource.MS: logger.info("Reading scan types from input MS") assert config.measurement_set scan_provider = MSScanProvider(config.measurement_set) case ScanProviderConfig.DataSource.ASSIGN_RESOURCES_FILE: logger.info("Reading scan types from assign resources command file") assert config.assign_resources_command scan_provider = AssignResourcesScanProvider(config.assign_resources_command) case ScanProviderConfig.DataSource.HARDCODED: assert config.hardcoded_spectral_window_channels scan_provider = HardcodedScanProvider.from_spectral_window( config.hardcoded_spectral_window_channels ) return scan_provider
[docs] @dataclasses.dataclass @autocast_fields class TelescopeManagerConfig: """ Set of options used to build a Telescope Model. Depending on which options are given, different sources of data are used to read Telescope Manager data. When multiple options are given, ``execution_block_id`` takes precedence over ``measurement_set``, which in turn takes precedence over ``schedblock``. On top of that the following combinations are also required/possible: * When using ``schedblock``, ``antenna_layout`` is also required. * When using ``execution_block_id``, either ``telmodel_key`` or ``antenna_layout`` are required. * When using ``execution_block_id`` and ``telmodel_key``, ``telmodel_source_uris`` can also be given. """ execution_block_id: Optional[str] = None """ The ID of an Execution Block in the SDP Configuration Database from where antenna information should be read. """ measurement_set: Optional[str] = None """ The measurement Set where antenna information can be extracted from. """ schedblock: Optional[str] = None """ A JSON resource containing an instance of an SDP Scheduling Block, Execution Block or an `AssignResources` command. The JSON document should contain a `resources.receptors` key with a list of antenna names. This is useful only for testing. """ telmodel_key: Optional[str] = None """ The key in the SKA Telmodel data where the antenna layout should be read from. If empty, then ``antenna_layout`` should be used instead. """ telmodel_source_uris: Optional[str] = None """ A comma-separated list of URIs specifying the sources of truth that the SKA Telmodel package will use to retrieve its data. """ antenna_layout: Optional[str] = None """ A JSON resource providing the layout of all antennas/stations in the array. The layouts are defined as part of the Telescope Model and it is recommended that the current layout schema be examined. Required when using ``schedblock``, or when using ``execution_block_id`` and *not* specifying a ``telmodel_key``. """ hardcoded_num_receptors: Optional[int] = None """ If given, create the given number of fake receptors with hardcoded, dummy details. """ hardcoded_auto_corr: bool = True """ If ``hardcoded_num_receptors`` is given, control whether to create baselines with or without autocorrelation. """ hardcoded_lower_triangular: bool = True """ If ``hardcoded_num_receptors`` is given, control whether baselines are generated in lower or higher triangular order. """
[docs] class DataSource(enum.Enum): """An enumeration of types of data sources used to obtain data.""" SCHEDBLOCK = enum.auto() ANTENNA_LAYOUT_FILE = enum.auto() TELMODEL = enum.auto() HARDCODED = enum.auto() MS = enum.auto()
@property def data_source(self) -> DataSource: """The type of data source used by this configuration.""" if self.schedblock and self.antenna_layout: return TelescopeManagerConfig.DataSource.SCHEDBLOCK if self.execution_block_id and self.antenna_layout: return TelescopeManagerConfig.DataSource.ANTENNA_LAYOUT_FILE if self.execution_block_id and self.telmodel_key: return TelescopeManagerConfig.DataSource.TELMODEL if self.measurement_set: return TelescopeManagerConfig.DataSource.MS if self.hardcoded_num_receptors is not None: return TelescopeManagerConfig.DataSource.HARDCODED raise ValueError("Invalid TelescopeModel configuration, see documentation for details") @property def needs_sdp_config_db(self): """Whether this configuration requires an SDP Config DB client.""" return self.data_source in ( TelescopeManagerConfig.DataSource.ANTENNA_LAYOUT_FILE, TelescopeManagerConfig.DataSource.TELMODEL, )
def create_tm( config: TelescopeManagerConfig, sdp_config_db: ska_sdp_config.Config | None ) -> TelescopeManager: """Create a telescope manager from the given config""" # Testing for schedblock first allows us to still use our workaround in the # SDP integration tests where we copy the AR command into the pod match config.data_source: case TelescopeManagerConfig.DataSource.SCHEDBLOCK: logger.info("Reading antenna list from SchedBlock file (OBSOLETE)") assert config.antenna_layout assert config.schedblock tm = SKATelescopeManager(config.antenna_layout, config.schedblock) case TelescopeManagerConfig.DataSource.ANTENNA_LAYOUT_FILE: logger.info( "Reading antenna list from SDP configuration database with fixed antenna layout (OBSOLETE)" ) assert config.antenna_layout assert config.execution_block_id assert sdp_config_db tm = SKATelescopeManager.from_sdp_config( config.execution_block_id, sdp_config_db, antenna_layout=config.antenna_layout, ) case TelescopeManagerConfig.DataSource.TELMODEL: logger.info("Reading antenna list from SDP configuration database and SKA Telmodel") telmodel_source_uris = [] if config.telmodel_source_uris: telmodel_source_uris = config.telmodel_source_uris.split(",") assert config.execution_block_id assert config.telmodel_key assert sdp_config_db tm = SKATelescopeManager.from_sdp_config( config.execution_block_id, sdp_config_db, telmodel_key=config.telmodel_key, telmodel_source_uris=telmodel_source_uris, ) case TelescopeManagerConfig.DataSource.HARDCODED: assert config.hardcoded_num_receptors is not None tm = HardcodedTelescopeManager( config.hardcoded_num_receptors, config.hardcoded_auto_corr, config.hardcoded_lower_triangular, ) case TelescopeManagerConfig.DataSource.MS: logger.info("Reading antennas from MeasurementSet") tm = MeasurementSetTM(config.measurement_set) return tm
[docs] @dataclasses.dataclass @autocast_fields class UVWEngineConfig: """Set of options used to build a UVWEngine""" engine: str = "katpoint" """ The package used to calculate the UVW coordinates from the antenna layout. Supported values are `katpoint`, `measures` and `casa` (alias for `measures`). """ disable_astropy_iers_autodownload: bool = False """ Configures astropy to disable the automatically downloading of the latest IERS data catalogue. This automatic download is otherwise triggered by the UVW calculations performed by the `katpoint` engine, and blocks the main thread for the duration of the download. """
def create_uvw_engine(config: UVWEngineConfig, tm: TelescopeManager) -> UVWEngine: """Create a UVWEngine from the given config and antennas""" engine = config.engine.lower() antennas = tm.get_antennas() baselines = tm.get_baselines().as_tuples() logger.info("Using UVW engine selection: %s", engine) if engine in ("measures", "casa"): return UVWEngine(antennas, baselines=baselines) elif engine == "katpoint": if config.disable_astropy_iers_autodownload: logger.info("Disabled automatic download of astropy IERS download") iers.conf.auto_download = False return KatpointUVWEngine(antennas, baselines=baselines) else: raise ValueError("uvw_engine, if given, must be katpoint, measures or casa") def _transfer_missing_options_from( config, src_group_name, tgt_group_name, *transfers, no_warnings=False ): if tgt_group_name not in config: config[tgt_group_name] = {} src_group = config[src_group_name] tgt_group = config[tgt_group_name] for src_key, tgt_key in transfers: if tgt_key not in tgt_group and src_key in src_group: if not no_warnings: warnings.warn( ( f"Usage of {src_group_name}.{src_key} is deprecated, " f"use {tgt_group_name}.{tgt_key} instead" ), category=DeprecationWarning, ) tgt_group[tgt_key] = src_group[src_key] def _handle_missing_groups(config): for group in ( "aggregation", "reception", "telescope_model", "sdp_config_db", "uvw", "consumer", ): if group not in config: config[group] = {} def _handle_convenience_options(config): if "general" not in config: return _transfer_missing_options_from( config, "general", "telescope_model", ("measurement_set", "measurement_set"), no_warnings=True, ) _transfer_missing_options_from( config, "general", "scan_provider", ("measurement_set", "measurement_set"), no_warnings=True, ) def _handle_deprecated_options(config): # TM options _transfer_missing_options_from( config, "reception", "telescope_model", ("datamodel", "measurement_set"), ("schedblock", "schedblock"), ("layout", "antenna_layout"), ("execution_block_id", "execution_block_id"), ) # SDP Config options _transfer_missing_options_from( config, "reception", "sdp_config_db", ("sdp_config_backend", "backend"), ("sdp_config_host", "host"), ("sdp_config_port", "port"), ) # Scan provider options _transfer_missing_options_from( config, "reception", "scan_provider", ("execution_block_id", "execution_block_id"), ("datamodel", "measurement_set"), ("schedblock", "assign_resources_command"), ) # UVW Engine options _transfer_missing_options_from( config, "reception", "uvw", ("uvw_engine", "engine"), ( "disable_astropy_iers_autodownload", "disable_astropy_iers_autodownload", ), ) # Consumer options _transfer_missing_options_from( config, "reception", "consumer", ("consumer", "name"), ("outputfilename", "output_filename"), ("max_payloads", "max_payloads_per_ms"), ("command_template", "command_template"), ("timestamp_output", "timestamp_output"), ("plasma_path", "plasma_path"), ("payloads_in_flight", "payloads_in_flight"), ) # Receiver options _transfer_missing_options_from( config, "reception", "reception", ("receiver_port_start", "port_start"), ("transport_proto", "transport_protocol"), )
[docs] @dataclasses.dataclass class ReceiverConfig: """A full configuration specification to receive ICD data.""" tm: TelescopeManagerConfig = dataclasses.field(default_factory=TelescopeManagerConfig) """The configuration use to create a Telescope Manager.""" scan_provider: ScanProviderConfig = dataclasses.field(default_factory=ScanProviderConfig) """The configuration to create a ScanProvider.""" uvw_engine: UVWEngineConfig = dataclasses.field(default_factory=UVWEngineConfig) """The configuration to create a UVWEngine.""" sdp_config_db: SdpConfigDbConfig = dataclasses.field(default_factory=SdpConfigDbConfig) """The configuration to connect to the SDP Configuration Database.""" reception: receivers.Config = dataclasses.field(default_factory=receivers.Config) """The configuration to create a receiver.""" consumer: consumers.Config = dataclasses.field(default_factory=consumers.Config) """The configuration to create a consumer.""" aggregation: aggregation_mod.AggregationConfig = dataclasses.field( default_factory=aggregation_mod.AggregationConfig ) """The configuration used to control data aggregation features""" @property def needs_sdp_config_db(self): """Whether this configuration requires an SDP Config DB client.""" return self.tm.needs_sdp_config_db or self.scan_provider.needs_sdp_config_db
def _to_receiver_config(dict_config: dict) -> ReceiverConfig: _handle_missing_groups(dict_config) _handle_convenience_options(dict_config) _handle_deprecated_options(dict_config) config = ReceiverConfig() config.sdp_config_db = from_dict(SdpConfigDbConfig, dict_config["sdp_config_db"]) config.uvw_engine = from_dict(UVWEngineConfig, dict_config["uvw"]) config.scan_provider = from_dict(ScanProviderConfig, dict_config["scan_provider"]) config.tm = from_dict(TelescopeManagerConfig, dict_config["telescope_model"]) config.aggregation = from_dict(aggregation_mod.AggregationConfig, dict_config["aggregation"]) # These are dynamic depending on the selected receiver/consumer, so # they need a bit of care for ( group_name, module, ) in ( ("reception", receivers), ("consumer", consumers), ): group_config = module.create_config(**dict_config[group_name]) setattr(config, f"{group_name}", group_config) logger.info("Loaded receiver configuration: %r", config) return config def create_sdp_config_db( config: ReceiverConfig, ) -> contextlib.AbstractContextManager[ska_sdp_config.Config | None]: """ Creates an SDP Config DB client, if required, and returns it as a context manager for automatic connection closing. If no SDP Config DB client is required the context manager returns ``None``. """ if not config.needs_sdp_config_db: return contextlib.nullcontext() return ska_sdp_config.Config(**dataclasses.asdict(config.sdp_config_db))
[docs] async def receive( config: ReceiverConfig, signal_handlers: Optional[Dict[int, Callable]] = None, receiver_ready_evt: Optional[asyncio.Event] = None, ): """ Coroutine that creates a receiver and all its constituent parts based on configuration options, and runs the reception of data until it finishes. If this coroutine is cancelled while data reception is happening, it will shutdown cleanly and *not* raise a `CancelledError`, giving callers the option to still get a hold onto the underlying receiver object. :param config: An object containg all the necessary configuration values to create a working receiver. It should be a ReceiveConfig object, but the old dictionary-based configuration objects still work. :param signal_handlers: Signal handlers to install for the duration of the data reception. The callables, if given, should expect two parameters: the signal and a task representing the reception of data. :param receiver_ready_evt: An event object that, if given, is set when the receiver is fully up and running, and ready to receive data. :return: the underlying receiver object through which data was received. It can be queried for stats and other information. """ signal_handlers = signal_handlers or {} with create_sdp_config_db(config) as sdp_config_db: tm = create_tm(config.tm, sdp_config_db) uvw_engine = create_uvw_engine(config.uvw_engine, tm) consumer = consumers.create(config.consumer, tm, uvw_engine) scan_provider = create_scan_provider(config.scan_provider, sdp_config_db) scan_provider.add_scan_lifecycle_handler(consumer) aggregator = aggregation_mod.PayloadAggregator( config.aggregation, consumer, scan_provider, tm ) receiver = receivers.create(config.reception, aggregator, scan_provider) scan_provider.add_scan_lifecycle_handler(receiver) async with scan_provider, consumer, aggregator: reception = asyncio.create_task(receiver.run(receiver_ready_evt)) loop = asyncio.get_running_loop() for signo, handler in signal_handlers.items(): handle_signal = functools.partial(handler, signo, reception) loop.add_signal_handler(signo, handle_signal) try: await reception except asyncio.CancelledError: logger.info("Reception cancelled, returning early") finally: for signo in signal_handlers: loop.remove_signal_handler(signo) logging.info( "Payloads received/aggregated: %d / %d, Visibilities generated/consumed: %d / %d", receiver.received_payloads, receiver.aggregated_payloads, receiver.visibilities_generated, receiver.visibilities_consumed, ) return receiver
def _config_from_yaml(config_file_path: str): if not config_file_path: return {} with open(config_file_path, "rb") as config_file: return yaml.load(config_file, yaml.SafeLoader) def main(): """Start a receiver from the command line""" if not sys.warnoptions: warnings.simplefilter("always", DeprecationWarning) if os.path.basename(sys.argv[0]) == "emu-recv": warnings.warn( "emu-recv is deprecated in favour of vis-receive", DeprecationWarning, ) parser = argparse.ArgumentParser(description="Receives from a UDP socket") parser.add_argument( "-c", "--config", help="The configuration file to load (YAML format), default is empty.", default="", type=_config_from_yaml, ) parser.add_argument( "-o", "--option", help="Additional configuration options in the form of category.name=value", action="append", ) parser.add_argument( "-v", "--verbose", help="If set, more verbose output will be produced", action="store_true", ) args = parser.parse_args() logging_level = logging.DEBUG if args.verbose else logging.INFO ska_ser_logging.configure_logging(level=logging_level) dict_config = args.config if args.option: augment_config(dict_config, args.option) config = _to_receiver_config(dict_config) def stop_reception(signo, reception): logger.info("%s received, stopping reception", signo) reception.cancel() signal_handlers = { signal.SIGTERM: stop_reception, signal.SIGINT: stop_reception, } loop = asyncio.new_event_loop() loop.run_until_complete(receive(config, signal_handlers)) loop.close() logger.info("Thanks for using our vis-receiver, come back again :-)") if __name__ == "__main__": main()