Source code for ska_sdp_mock_dish_devices.mock_dish_master

import argparse
import json
import logging
import threading
from threading import Event

import h5py
import ska_ser_logging
import tango
from overrides import override
from tango import ArgType, AttrDataFormat, DevState, EnsureOmniThread
from tango.server import Device, attribute, command, device_property, run

from ska_sdp_mock_dish_devices.pointing_utils import (
    assert_hdf5_pointing_table,
    stream_hdf5_pointings,
    time_delay_pointings,
)

logger = logging.getLogger(__name__)


[docs] class MockDishMaster(Device): """ A Mock implementation of ska-sim-dishmaster for interfaces used by SDP. """ mock_achieved_paths: str = device_property( dtype=ArgType.DevString, default_value="" ) # type: ignore """ A json mapping of scan id to HDF5 filepaths adhering to the `ska-sdp-datamodels` PointingTable schema for the achievedPointing attribute. """ antenna_id: int = device_property(dtype=ArgType.DevInt, default_value=0) # type: ignore """Index of the antenna to mock in the PointingTable data.""" time_scale: float = device_property(dtype=ArgType.DevFloat, default_value=1.0) # type: ignore """ Factor to adjust the cadence of pointing events by. By default, events will emit at the same cadence as the data, but this will allow events to be emitted at a faster or slower rate. """ @attribute( name="achievedPointing", dtype=ArgType.DevDouble, dformat=AttrDataFormat.SPECTRUM, max_dim_x=3, ) def achieved_pointing(self): return self.__achieved_pointing
[docs] @override def init_device(self): super().init_device() self.__achieved_pointing = [0.0, 0.0, 0.0] self.__stop_event = Event() self.__achieved_pointing_thread = None self.__achieved_pointing_config = ( json.loads(self.mock_achieved_paths) if self.mock_achieved_paths else {} ) self.set_change_event("achievedPointing", True, False) self.set_state(DevState.OFF)
[docs] @command(dtype_in=ArgType.DevLong) def Scan(self, scan_id: int): """Starts the mock tango device emulation. Args: scan_id (int): scan id corresponding to a configured mock data path. Raises: ValueError: scan id out of range. """ logger.info("MockDishMaster %s starting scan %s", self.antenna_id, scan_id) # Achieved Pointing achieved_pointing_path = self.__achieved_pointing_config.get(str(scan_id)) if achieved_pointing_path: logger.info( "Scan %i reading achieved pointings from %s", scan_id, achieved_pointing_path, ) self.__stop_event.clear() self.__achieved_pointing_thread = threading.Thread( target=self.__achieved_pointing_update_loop, args=(h5py.File(achieved_pointing_path),), ) self.__achieved_pointing_thread.start() else: logger.error("Scan %i contains no achieved pointings", scan_id) self.set_state(DevState.ON)
[docs] def is_Scan_allowed(self): return self.get_state() == DevState.OFF
[docs] @command def EndScan(self): """ Stops and resets the tango device emulation. """ self.__stop_event.set() if self.__achieved_pointing_thread: self.__achieved_pointing_thread.join() # type: ignore self.__achieved_pointing_thread = None logger.info("MockDishMaster %s stopped", self.antenna_id) self.set_state(DevState.OFF)
[docs] def is_EndScan_allowed(self): return self.get_state() == DevState.ON
def __achieved_pointing_update_loop(self, datafile: h5py.File): with EnsureOmniThread(): try: assert_hdf5_pointing_table(datafile, self.antenna_id) for achieved_pointing in time_delay_pointings( stream_hdf5_pointings(datafile, self.antenna_id), self.time_scale, self.__stop_event, ): self.__achieved_pointing = achieved_pointing self.push_change_event("achievedPointing", self.__achieved_pointing) except Exception: logging.exception("achieved pointing data exception")
# ---------- # Run server # ---------- _TANGO_TO_PYTHON = { tango.LogLevel.LOG_FATAL: logging.CRITICAL, tango.LogLevel.LOG_ERROR: logging.ERROR, tango.LogLevel.LOG_WARN: logging.WARNING, tango.LogLevel.LOG_INFO: logging.INFO, tango.LogLevel.LOG_DEBUG: logging.DEBUG, tango.LogLevel.LOG_OFF: logging.NOTSET, } def main(): """Main function of the MockDishMaster module.""" parser = argparse.ArgumentParser(add_help=False) parser.add_argument("-v", dest="verbose", nargs="?", type=int) log_level = parser.parse_known_args()[0].verbose ska_ser_logging.configure_logging(level=_TANGO_TO_PYTHON.get(log_level, logging.INFO)) run(classes=(MockDishMaster,)) if __name__ == "__main__": main()