Source code for realtime.receive.modules.consumers.mswriter

# -*- coding: utf-8 -*-


""" Takes a SPEAD2 HEAP and writes it to a MEASUREMENT SET. This is pretty much the
        same functionality as presented in the  OSKAR python binding example available at:
        https://github.com/OxfordSKA/OSKAR/blob/master/python/examples/spead/receiver/spead_recv.py
"""
import asyncio
import concurrent.futures
import dataclasses
import functools
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Optional

from overrides import overrides
from realtime.receive.core import Scan, msutils
from realtime.receive.core.common import autocast_fields
from realtime.receive.core.uvw_engine import UVWEngine
from ska_sdp_datamodels.visibility import Visibility

from realtime.receive.modules.consumers.consumer import Consumer
from realtime.receive.modules.tm import TelescopeManager
from realtime.receive.modules.utils.command_executor import CommandFileExecutor

from ._config import Config

logger = logging.getLogger(__name__)


[docs] @dataclasses.dataclass @autocast_fields class MSWriterConsumerConfig(Config): """Configuration for the `mswriter` consumer.""" name: str = "mswriter" output_filename: str = "recv-vis.ms" """The filename of the output Measurement Set.""" max_payloads_per_ms: Optional[int] = None """ Maximum number of payloads to write onto a single Measurement Set. If more payloads are received a new Measurement Set is opened for writing. """ command_template: Optional[List[str]] = None """Command to execute after a Measurement Set has been written.""" timestamp_output: bool = False """Whether to add timestamp information to the output filename."""
[docs] class MSWriterConsumer(Consumer): """ A heap consumer that writes incoming data into an MS. Because data consumption happens inside the event loop we need to defer the data writing to a different thread. We do this by creating a single-threaded executor that we then use to schedule the I/O-heavy MS writing tasks onto. """ config_class = MSWriterConsumerConfig @overrides def __init__( self, config: Config, tm: TelescopeManager, uvw_engine: UVWEngine, ): super().__init__(config, tm, uvw_engine) self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) self.received_payloads = 0 self._command_executor = None self.mswriter = None self._mswriter_creation_lock = None if config.command_template: self._command_executor = CommandFileExecutor(config.command_template) def _generate_output_path(self) -> str: if self.config.timestamp_output: # UTC Date Time Format p = Path(self.config.output_filename) return f'{p.stem}.{str(datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))}{p.suffix}' else: return self.config.output_filename
[docs] @overrides async def consume(self, visibility: Visibility): payload_seq_numbers: int = visibility.attrs["meta"]["payload_seq_numbers"] scan: Scan = visibility.attrs["meta"]["scan"] if self._mswriter_creation_lock is None: self._mswriter_creation_lock = asyncio.Lock() async with self._mswriter_creation_lock: if self.mswriter is None: output_path = self._generate_output_path() logger.info("Writing to %s", output_path) self.mswriter = await asyncio.get_running_loop().run_in_executor( self.executor, functools.partial( msutils.MSWriter, output_path, scan, self.tm.get_antennas(), baselines=self.tm.get_baselines(), ), ) all_uvws = self._get_uvw(visibility, scan) # TODO (rtobar): determine which beam this payload corresponds to # (probably based on channel id?) beam = scan.scan_type.beams[0] sw = beam.channels.spectral_windows[0] for vis, uvw, payload_seq_no, payload_time in zip( visibility.vis.data, all_uvws, payload_seq_numbers, visibility.time.data, ): await asyncio.get_running_loop().run_in_executor( self.executor, functools.partial( self.mswriter.write_data_row, scan_id=scan.scan_number, beam=beam, sw=sw, payload_seq_no=payload_seq_no, mjd_time=payload_time, interval=1, exposure=1, uvw=uvw, vis=vis, ), ) self.received_payloads += 1 # Write output ms if max payloads reached if ( self.config.max_payloads_per_ms is not None and self.received_payloads >= self.config.max_payloads_per_ms ): logger.info("Max payloads received") self._finish_writing() self.received_payloads = 0
def _finish_writing(self): if self.mswriter is None: raise RuntimeError("mswriter doesn't exist") else: output_path = self.mswriter.ms.name self.mswriter.close() self.mswriter = None logger.info("Finished writing %s", output_path) if self._command_executor: self._command_executor.schedule(output_path) # if writing to the same output file, wait until the # command executor finishes before overwriting. Timestamp # output option does not perform overwriting. if not self.config.timestamp_output: self._command_executor.stop() def _stop(self): if self.mswriter is not None: self._finish_writing() if self._command_executor is not None: self._command_executor.stop()
[docs] async def astop(self): await asyncio.get_running_loop().run_in_executor(self.executor, self._stop)