# -*- coding: utf-8 -*-
"""Reads data off a MS and creates SPEAD2 packets"""
import argparse
import asyncio
import contextlib
import dataclasses
import enum
import functools
import logging
import os
import signal
import sys
import warnings
from typing import Callable, Dict, Optional, Union
import ska_sdp_config
import ska_ser_logging
import yaml
from astropy.utils import iers
from realtime.receive.core.channel_range import ChannelRange
from realtime.receive.core.common import autocast_fields, from_dict
from realtime.receive.core.config import augment_config
from realtime.receive.core.katpoint_uvw_engine import KatpointUVWEngine
from realtime.receive.core.uvw_engine import UVWEngine
from realtime.receive.modules import aggregation as aggregation_mod
from realtime.receive.modules import consumers
from realtime.receive.modules.tm.base_tm import TelescopeManager
from . import receivers
from .scan_providers import (
AssignResourcesScanProvider,
HardcodedScanProvider,
MSScanProvider,
ScanProvider,
SdpConfigScanProvider,
)
from .tm import HardcodedTelescopeManager, MeasurementSetTM, SKATelescopeManager
logger = logging.getLogger(__name__)
[docs]
@dataclasses.dataclass
@autocast_fields
class SdpConfigDbConfig:
"""
Set of options used to establish a connection to the SDP Configuration DB.
"""
host: str = "127.0.0.1"
"""The host to connect to."""
port: int = 2379
"""The port to connect to"""
backend: str = "etcd3"
"""The backend to use"""
[docs]
@dataclasses.dataclass
class ScanProviderConfig:
"""Set of options used to build a Scan Provider."""
execution_block_id: Optional[str] = None
"""
The ID of an Execution Block in the SDP Configuration Database for which
Scan information should be monitored."""
end_scan_max_delay: Optional[float] = 5.0
"""
The maximum time to wait, in seconds, before closing the CBF streams after receiving
an EndScan SDP command.
"""
measurement_set: Optional[str] = None
"""A Measurement Set where antenna information will be extracted from."""
assign_resources_command: Optional[str] = None
"""
A JSON resource containing an instance of an SDP AssignResources command,
or an SDP Execution Block. The JSON document should contain all the keys
needed to fully-define a Scan Type.
"""
hardcoded_spectral_window_channels: Union[None, str, ChannelRange] = None
"""
If set, can be used together with `measurement_set` to force the spectral
windows of generated Scans to be a certain channel range.
"""
[docs]
class DataSource(enum.Enum):
"""An enumeration of types of data sources used to obtain data."""
SDP_CONFIG_DB = enum.auto()
ASSIGN_RESOURCES_FILE = enum.auto()
MS = enum.auto()
HARDCODED = enum.auto()
@property
def data_source(self) -> DataSource:
"""The type of data source used by this configuration."""
if self.execution_block_id:
return ScanProviderConfig.DataSource.SDP_CONFIG_DB
if self.measurement_set:
return ScanProviderConfig.DataSource.MS
if self.assign_resources_command:
return ScanProviderConfig.DataSource.ASSIGN_RESOURCES_FILE
if self.hardcoded_spectral_window_channels:
return ScanProviderConfig.DataSource.HARDCODED
raise ValueError("Invalid ScanProvider configuration, see documentation for details")
@property
def needs_sdp_config_db(self):
"""Whether this configuration requires an SDP Config DB client."""
return self.data_source == ScanProviderConfig.DataSource.SDP_CONFIG_DB
def __post_init__(self):
channels = self.hardcoded_spectral_window_channels
if isinstance(channels, str):
self.hardcoded_spectral_window_channels = ChannelRange.from_str(channels)
def create_scan_provider(
config: ScanProviderConfig, sdp_config_db: ska_sdp_config.Config | None
) -> ScanProvider:
"""Create a ScanProvider from the given config"""
match config.data_source:
case ScanProviderConfig.DataSource.SDP_CONFIG_DB:
logger.info("Reading scan types from SDP configuration database")
assert config.execution_block_id
assert sdp_config_db
scan_provider = SdpConfigScanProvider(
config.execution_block_id,
sdp_config_db,
config.end_scan_max_delay,
)
case ScanProviderConfig.DataSource.MS:
logger.info("Reading scan types from input MS")
assert config.measurement_set
scan_provider = MSScanProvider(config.measurement_set)
case ScanProviderConfig.DataSource.ASSIGN_RESOURCES_FILE:
logger.info("Reading scan types from assign resources command file")
assert config.assign_resources_command
scan_provider = AssignResourcesScanProvider(config.assign_resources_command)
case ScanProviderConfig.DataSource.HARDCODED:
assert config.hardcoded_spectral_window_channels
scan_provider = HardcodedScanProvider.from_spectral_window(
config.hardcoded_spectral_window_channels
)
return scan_provider
[docs]
@dataclasses.dataclass
@autocast_fields
class TelescopeManagerConfig:
"""
Set of options used to build a Telescope Model.
Depending on which options are given, different sources of data are used to
read Telescope Manager data. When multiple options are given,
``execution_block_id`` takes precedence over ``measurement_set``, which in
turn takes precedence over ``schedblock``.
On top of that the following combinations are also required/possible:
* When using ``schedblock``, ``antenna_layout`` is also required.
* When using ``execution_block_id``, either ``telmodel_key`` or
``antenna_layout`` are required.
* When using ``execution_block_id`` and ``telmodel_key``,
``telmodel_source_uris`` can also be given.
"""
execution_block_id: Optional[str] = None
"""
The ID of an Execution Block in the SDP Configuration Database from where
antenna information should be read.
"""
measurement_set: Optional[str] = None
"""
The measurement Set where antenna information can be extracted from.
"""
schedblock: Optional[str] = None
"""
A JSON resource containing an instance of an SDP Scheduling Block,
Execution Block or an `AssignResources` command. The JSON document should
contain a `resources.receptors` key with a list of antenna names. This is
useful only for testing.
"""
telmodel_key: Optional[str] = None
"""
The key in the SKA Telmodel data where the antenna layout should be read
from. If empty, then ``antenna_layout`` should be used instead.
"""
telmodel_source_uris: Optional[str] = None
"""
A comma-separated list of URIs specifying the sources of truth that the SKA
Telmodel package will use to retrieve its data.
"""
antenna_layout: Optional[str] = None
"""
A JSON resource providing the layout of all antennas/stations in the array.
The layouts are defined as part of the Telescope Model and it is
recommended that the current layout schema be examined. Required when using
``schedblock``, or when using ``execution_block_id`` and *not* specifying a
``telmodel_key``.
"""
hardcoded_num_receptors: Optional[int] = None
"""
If given, create the given number of fake receptors with hardcoded, dummy
details.
"""
hardcoded_auto_corr: bool = True
"""
If ``hardcoded_num_receptors`` is given, control whether to create
baselines with or without autocorrelation.
"""
hardcoded_lower_triangular: bool = True
"""
If ``hardcoded_num_receptors`` is given, control whether baselines are
generated in lower or higher triangular order.
"""
[docs]
class DataSource(enum.Enum):
"""An enumeration of types of data sources used to obtain data."""
SCHEDBLOCK = enum.auto()
ANTENNA_LAYOUT_FILE = enum.auto()
TELMODEL = enum.auto()
HARDCODED = enum.auto()
MS = enum.auto()
@property
def data_source(self) -> DataSource:
"""The type of data source used by this configuration."""
if self.schedblock and self.antenna_layout:
return TelescopeManagerConfig.DataSource.SCHEDBLOCK
if self.execution_block_id and self.antenna_layout:
return TelescopeManagerConfig.DataSource.ANTENNA_LAYOUT_FILE
if self.execution_block_id and self.telmodel_key:
return TelescopeManagerConfig.DataSource.TELMODEL
if self.measurement_set:
return TelescopeManagerConfig.DataSource.MS
if self.hardcoded_num_receptors is not None:
return TelescopeManagerConfig.DataSource.HARDCODED
raise ValueError("Invalid TelescopeModel configuration, see documentation for details")
@property
def needs_sdp_config_db(self):
"""Whether this configuration requires an SDP Config DB client."""
return self.data_source in (
TelescopeManagerConfig.DataSource.ANTENNA_LAYOUT_FILE,
TelescopeManagerConfig.DataSource.TELMODEL,
)
def create_tm(
config: TelescopeManagerConfig, sdp_config_db: ska_sdp_config.Config | None
) -> TelescopeManager:
"""Create a telescope manager from the given config"""
# Testing for schedblock first allows us to still use our workaround in the
# SDP integration tests where we copy the AR command into the pod
match config.data_source:
case TelescopeManagerConfig.DataSource.SCHEDBLOCK:
logger.info("Reading antenna list from SchedBlock file (OBSOLETE)")
assert config.antenna_layout
assert config.schedblock
tm = SKATelescopeManager(config.antenna_layout, config.schedblock)
case TelescopeManagerConfig.DataSource.ANTENNA_LAYOUT_FILE:
logger.info(
"Reading antenna list from SDP configuration database with fixed antenna layout (OBSOLETE)"
)
assert config.antenna_layout
assert config.execution_block_id
assert sdp_config_db
tm = SKATelescopeManager.from_sdp_config(
config.execution_block_id,
sdp_config_db,
antenna_layout=config.antenna_layout,
)
case TelescopeManagerConfig.DataSource.TELMODEL:
logger.info("Reading antenna list from SDP configuration database and SKA Telmodel")
telmodel_source_uris = []
if config.telmodel_source_uris:
telmodel_source_uris = config.telmodel_source_uris.split(",")
assert config.execution_block_id
assert config.telmodel_key
assert sdp_config_db
tm = SKATelescopeManager.from_sdp_config(
config.execution_block_id,
sdp_config_db,
telmodel_key=config.telmodel_key,
telmodel_source_uris=telmodel_source_uris,
)
case TelescopeManagerConfig.DataSource.HARDCODED:
assert config.hardcoded_num_receptors is not None
tm = HardcodedTelescopeManager(
config.hardcoded_num_receptors,
config.hardcoded_auto_corr,
config.hardcoded_lower_triangular,
)
case TelescopeManagerConfig.DataSource.MS:
logger.info("Reading antennas from MeasurementSet")
tm = MeasurementSetTM(config.measurement_set)
return tm
[docs]
@dataclasses.dataclass
@autocast_fields
class UVWEngineConfig:
"""Set of options used to build a UVWEngine"""
engine: str = "katpoint"
"""
The package used to calculate the UVW coordinates from the antenna layout.
Supported values are `katpoint`, `measures` and `casa` (alias for
`measures`).
"""
disable_astropy_iers_autodownload: bool = False
"""
Configures astropy to disable the automatically downloading of the latest
IERS data catalogue. This automatic download is otherwise triggered by the
UVW calculations performed by the `katpoint` engine, and blocks the main
thread for the duration of the download.
"""
def create_uvw_engine(config: UVWEngineConfig, tm: TelescopeManager) -> UVWEngine:
"""Create a UVWEngine from the given config and antennas"""
engine = config.engine.lower()
antennas = tm.get_antennas()
baselines = tm.get_baselines().as_tuples()
logger.info("Using UVW engine selection: %s", engine)
if engine in ("measures", "casa"):
return UVWEngine(antennas, baselines=baselines)
elif engine == "katpoint":
if config.disable_astropy_iers_autodownload:
logger.info("Disabled automatic download of astropy IERS download")
iers.conf.auto_download = False
return KatpointUVWEngine(antennas, baselines=baselines)
else:
raise ValueError("uvw_engine, if given, must be katpoint, measures or casa")
def _transfer_missing_options_from(
config, src_group_name, tgt_group_name, *transfers, no_warnings=False
):
if tgt_group_name not in config:
config[tgt_group_name] = {}
src_group = config[src_group_name]
tgt_group = config[tgt_group_name]
for src_key, tgt_key in transfers:
if tgt_key not in tgt_group and src_key in src_group:
if not no_warnings:
warnings.warn(
(
f"Usage of {src_group_name}.{src_key} is deprecated, "
f"use {tgt_group_name}.{tgt_key} instead"
),
category=DeprecationWarning,
)
tgt_group[tgt_key] = src_group[src_key]
def _handle_missing_groups(config):
for group in (
"aggregation",
"reception",
"telescope_model",
"sdp_config_db",
"uvw",
"consumer",
):
if group not in config:
config[group] = {}
def _handle_convenience_options(config):
if "general" not in config:
return
_transfer_missing_options_from(
config,
"general",
"telescope_model",
("measurement_set", "measurement_set"),
no_warnings=True,
)
_transfer_missing_options_from(
config,
"general",
"scan_provider",
("measurement_set", "measurement_set"),
no_warnings=True,
)
def _handle_deprecated_options(config):
# TM options
_transfer_missing_options_from(
config,
"reception",
"telescope_model",
("datamodel", "measurement_set"),
("schedblock", "schedblock"),
("layout", "antenna_layout"),
("execution_block_id", "execution_block_id"),
)
# SDP Config options
_transfer_missing_options_from(
config,
"reception",
"sdp_config_db",
("sdp_config_backend", "backend"),
("sdp_config_host", "host"),
("sdp_config_port", "port"),
)
# Scan provider options
_transfer_missing_options_from(
config,
"reception",
"scan_provider",
("execution_block_id", "execution_block_id"),
("datamodel", "measurement_set"),
("schedblock", "assign_resources_command"),
)
# UVW Engine options
_transfer_missing_options_from(
config,
"reception",
"uvw",
("uvw_engine", "engine"),
(
"disable_astropy_iers_autodownload",
"disable_astropy_iers_autodownload",
),
)
# Consumer options
_transfer_missing_options_from(
config,
"reception",
"consumer",
("consumer", "name"),
("outputfilename", "output_filename"),
("max_payloads", "max_payloads_per_ms"),
("command_template", "command_template"),
("timestamp_output", "timestamp_output"),
("plasma_path", "plasma_path"),
("payloads_in_flight", "payloads_in_flight"),
)
# Receiver options
_transfer_missing_options_from(
config,
"reception",
"reception",
("receiver_port_start", "port_start"),
("transport_proto", "transport_protocol"),
)
[docs]
@dataclasses.dataclass
class ReceiverConfig:
"""A full configuration specification to receive ICD data."""
tm: TelescopeManagerConfig = dataclasses.field(default_factory=TelescopeManagerConfig)
"""The configuration use to create a Telescope Manager."""
scan_provider: ScanProviderConfig = dataclasses.field(default_factory=ScanProviderConfig)
"""The configuration to create a ScanProvider."""
uvw_engine: UVWEngineConfig = dataclasses.field(default_factory=UVWEngineConfig)
"""The configuration to create a UVWEngine."""
sdp_config_db: SdpConfigDbConfig = dataclasses.field(default_factory=SdpConfigDbConfig)
"""The configuration to connect to the SDP Configuration Database."""
reception: receivers.Config = dataclasses.field(default_factory=receivers.Config)
"""The configuration to create a receiver."""
consumer: consumers.Config = dataclasses.field(default_factory=consumers.Config)
"""The configuration to create a consumer."""
aggregation: aggregation_mod.AggregationConfig = dataclasses.field(
default_factory=aggregation_mod.AggregationConfig
)
"""The configuration used to control data aggregation features"""
@property
def needs_sdp_config_db(self):
"""Whether this configuration requires an SDP Config DB client."""
return self.tm.needs_sdp_config_db or self.scan_provider.needs_sdp_config_db
def _to_receiver_config(dict_config: dict) -> ReceiverConfig:
_handle_missing_groups(dict_config)
_handle_convenience_options(dict_config)
_handle_deprecated_options(dict_config)
config = ReceiverConfig()
config.sdp_config_db = from_dict(SdpConfigDbConfig, dict_config["sdp_config_db"])
config.uvw_engine = from_dict(UVWEngineConfig, dict_config["uvw"])
config.scan_provider = from_dict(ScanProviderConfig, dict_config["scan_provider"])
config.tm = from_dict(TelescopeManagerConfig, dict_config["telescope_model"])
config.aggregation = from_dict(aggregation_mod.AggregationConfig, dict_config["aggregation"])
# These are dynamic depending on the selected receiver/consumer, so
# they need a bit of care
for (
group_name,
module,
) in (
("reception", receivers),
("consumer", consumers),
):
group_config = module.create_config(**dict_config[group_name])
setattr(config, f"{group_name}", group_config)
logger.info("Loaded receiver configuration: %r", config)
return config
def create_sdp_config_db(
config: ReceiverConfig,
) -> contextlib.AbstractContextManager[ska_sdp_config.Config | None]:
"""
Creates an SDP Config DB client, if required, and returns it as a context
manager for automatic connection closing. If no SDP Config DB client is
required the context manager returns ``None``.
"""
if not config.needs_sdp_config_db:
return contextlib.nullcontext()
return ska_sdp_config.Config(**dataclasses.asdict(config.sdp_config_db))
[docs]
async def receive(
config: ReceiverConfig,
signal_handlers: Optional[Dict[int, Callable]] = None,
receiver_ready_evt: Optional[asyncio.Event] = None,
):
"""
Coroutine that creates a receiver and all its constituent parts based on
configuration options, and runs the reception of data until it finishes.
If this coroutine is cancelled while data reception is happening, it will
shutdown cleanly and *not* raise a `CancelledError`, giving callers the
option to still get a hold onto the underlying receiver object.
:param config: An object containg all the necessary configuration values to
create a working receiver. It should be a ReceiveConfig object, but the old
dictionary-based configuration objects still work.
:param signal_handlers: Signal handlers to install for the duration of the
data reception. The callables, if given, should expect two parameters: the
signal and a task representing the reception of data.
:param receiver_ready_evt: An event object that, if given, is set when the
receiver is fully up and running, and ready to receive data.
:return: the underlying receiver object through which data was received.
It can be queried for stats and other information.
"""
signal_handlers = signal_handlers or {}
with create_sdp_config_db(config) as sdp_config_db:
tm = create_tm(config.tm, sdp_config_db)
uvw_engine = create_uvw_engine(config.uvw_engine, tm)
consumer = consumers.create(config.consumer, tm, uvw_engine)
scan_provider = create_scan_provider(config.scan_provider, sdp_config_db)
scan_provider.add_scan_lifecycle_handler(consumer)
aggregator = aggregation_mod.PayloadAggregator(
config.aggregation, consumer, scan_provider, tm
)
receiver = receivers.create(config.reception, aggregator, scan_provider)
scan_provider.add_scan_lifecycle_handler(receiver)
async with scan_provider, consumer, aggregator:
reception = asyncio.create_task(receiver.run(receiver_ready_evt))
loop = asyncio.get_running_loop()
for signo, handler in signal_handlers.items():
handle_signal = functools.partial(handler, signo, reception)
loop.add_signal_handler(signo, handle_signal)
try:
await reception
except asyncio.CancelledError:
logger.info("Reception cancelled, returning early")
finally:
for signo in signal_handlers:
loop.remove_signal_handler(signo)
logging.info(
"Payloads received/aggregated: %d / %d, Visibilities generated/consumed: %d / %d",
receiver.received_payloads,
receiver.aggregated_payloads,
receiver.visibilities_generated,
receiver.visibilities_consumed,
)
return receiver
def _config_from_yaml(config_file_path: str):
if not config_file_path:
return {}
with open(config_file_path, "rb") as config_file:
return yaml.load(config_file, yaml.SafeLoader)
def main():
"""Start a receiver from the command line"""
if not sys.warnoptions:
warnings.simplefilter("always", DeprecationWarning)
if os.path.basename(sys.argv[0]) == "emu-recv":
warnings.warn(
"emu-recv is deprecated in favour of vis-receive",
DeprecationWarning,
)
parser = argparse.ArgumentParser(description="Receives from a UDP socket")
parser.add_argument(
"-c",
"--config",
help="The configuration file to load (YAML format), default is empty.",
default="",
type=_config_from_yaml,
)
parser.add_argument(
"-o",
"--option",
help="Additional configuration options in the form of category.name=value",
action="append",
)
parser.add_argument(
"-v",
"--verbose",
help="If set, more verbose output will be produced",
action="store_true",
)
args = parser.parse_args()
logging_level = logging.DEBUG if args.verbose else logging.INFO
ska_ser_logging.configure_logging(level=logging_level)
dict_config = args.config
if args.option:
augment_config(dict_config, args.option)
config = _to_receiver_config(dict_config)
def stop_reception(signo, reception):
logger.info("%s received, stopping reception", signo)
reception.cancel()
signal_handlers = {
signal.SIGTERM: stop_reception,
signal.SIGINT: stop_reception,
}
loop = asyncio.new_event_loop()
loop.run_until_complete(receive(config, signal_handlers))
loop.close()
logger.info("Thanks for using our vis-receiver, come back again :-)")
if __name__ == "__main__":
main()