Source code for ska_sdp_mock_dish_devices.mock_dish_leafnode

import argparse
import json
import logging
import threading
from threading import Event

import h5py
import ska_ser_logging
import tango
from overrides import override
from tango import ArgType, AttrDataFormat, DevState, EnsureOmniThread
from tango.server import Device, attribute, command, device_property, run

from ska_sdp_mock_dish_devices.pointing_utils import (
    assert_hdf5_pointing_table,
    stream_hdf5_pointings,
    time_delay_pointings,
)

logger = logging.getLogger(__name__)


[docs] class MockDishLeafnode(Device): """ A Mock implementation of ska-tmc-dishleafnode for interfaces used by SDP. """ mock_desired_paths: str = device_property( dtype=ArgType.DevString, default_value="" ) # type: ignore """ A json mapping of scan id to HDF5 filepaths adhering to the `ska-sdp-datamodels` PointingTable schema for the desiredPointing attribute. """ mock_offset_paths: str = device_property( dtype=ArgType.DevString, default_value="" ) # type: ignore """ A json mapping of scan id to HDF5 filepaths adhering to the `ska-sdp-datamodels` PointingTable schema for the sourceOffset attribute. """ antenna_id: int = device_property(dtype=ArgType.DevInt, default_value=0) # type: ignore """Index of the antenna to mock in the PointingTable data.""" time_scale: float = device_property(dtype=ArgType.DevFloat, default_value=1.0) # type: ignore """ Factor to adjust the cadence of pointing events by. By default, events will emit at the same cadence as the data, but this will allow events to be emitted at a faster or slower rate. """ @attribute( name="desiredPointing", dtype=ArgType.DevDouble, dformat=AttrDataFormat.SPECTRUM, max_dim_x=3, ) def desired_pointing(self): return self.__desired_pointing @attribute( name="sourceOffset", dtype=ArgType.DevDouble, dformat=AttrDataFormat.SPECTRUM, max_dim_x=3, ) def source_offset(self): return self.__source_offset
[docs] @override def init_device(self): super().init_device() self.__desired_pointing = [0.0, 0.0, 0.0] self.__source_offset = [0.0, 0.0, 0.0] self.__stop_event = Event() self.__desired_pointing_thread = None self.__desired_pointing_config = ( json.loads(self.mock_desired_paths) if self.mock_desired_paths else {} ) self.set_change_event("desiredPointing", True, False) self.__source_offset_thread = None self.__source_offset_config = ( json.loads(self.mock_offset_paths) if self.mock_offset_paths else {} ) self.set_change_event("sourceOffset", True, False) self.set_state(DevState.OFF)
[docs] @command(dtype_in=ArgType.DevLong) def Scan(self, scan_id: int): """Starts the mock tango device emulation. Args: scan_id (int): scan id corresponding to a configured mock data path. Raises: ValueError: scan id out of range. """ logger.info("MockDishLeafnode %s starting scan %s", self.antenna_id, scan_id) # Desired Pointing desired_pointing_path = self.__desired_pointing_config.get(str(scan_id)) if desired_pointing_path: logger.info( "Scan %i reading desired pointings from %s", scan_id, desired_pointing_path, ) self.__stop_event.clear() self.__desired_pointing_thread = threading.Thread( target=self.__desired_pointing_update_loop, args=(h5py.File(desired_pointing_path),), ) self.__desired_pointing_thread.start() else: logger.error("Scan %i contains no desired pointings", scan_id) # Source Offset source_offset_path = self.__source_offset_config.get(str(scan_id)) if self.__source_offset_config: logger.info( "Scan %i reading source offsets from %s", scan_id, source_offset_path, ) self.__source_offset_thread = threading.Thread( target=self.__source_offset_update_loop, args=(h5py.File(source_offset_path),), ) self.__source_offset_thread.start() else: logger.warning("Scan %i contains no source offsets", scan_id) self.set_state(DevState.ON)
[docs] def is_Scan_allowed(self): return self.get_state() == DevState.OFF
[docs] @command def EndScan(self): """ Stops and resets the tango device emulation. """ self.__stop_event.set() if self.__desired_pointing_thread: self.__desired_pointing_thread.join() # type: ignore self.__desired_pointing_thread = None if self.__source_offset_thread: self.__source_offset_thread.join() # type: ignore self.__source_offset_thread = None logger.info("MockDishLeafnode %s stopped", self.antenna_id) self.set_state(DevState.OFF)
[docs] def is_EndScan_allowed(self): return self.get_state() == DevState.ON
def __desired_pointing_update_loop(self, datafile): with EnsureOmniThread(): try: assert_hdf5_pointing_table(datafile, self.antenna_id) for desired_pointing in time_delay_pointings( stream_hdf5_pointings(datafile, self.antenna_id), self.time_scale, self.__stop_event, ): self.__desired_pointing = desired_pointing self.push_change_event("desiredPointing", self.__desired_pointing) except Exception: logging.exception("pointing data exception") def __source_offset_update_loop(self, datafile: h5py.File): with EnsureOmniThread(): try: assert_hdf5_pointing_table(datafile, self.antenna_id) for source_offset in time_delay_pointings( stream_hdf5_pointings(datafile, self.antenna_id), self.time_scale, self.__stop_event, ): self.__source_offset = source_offset self.push_change_event("sourceOffset", self.__source_offset) except Exception: logging.exception("source offset data exception")
# ---------- # Run server # ---------- _TANGO_TO_PYTHON = { tango.LogLevel.LOG_FATAL: logging.CRITICAL, tango.LogLevel.LOG_ERROR: logging.ERROR, tango.LogLevel.LOG_WARN: logging.WARNING, tango.LogLevel.LOG_INFO: logging.INFO, tango.LogLevel.LOG_DEBUG: logging.DEBUG, tango.LogLevel.LOG_OFF: logging.NOTSET, } def main(): """Main function of the MockDishLeafnode module.""" parser = argparse.ArgumentParser(add_help=False) parser.add_argument("-v", dest="verbose", nargs="?", type=int) log_level = parser.parse_known_args()[0].verbose ska_ser_logging.configure_logging(level=_TANGO_TO_PYTHON.get(log_level, logging.INFO)) run(classes=(MockDishLeafnode,)) if __name__ == "__main__": main()