import argparse
import json
import logging
import threading
from threading import Event
import h5py
import ska_ser_logging
import tango
from overrides import override
from tango import ArgType, AttrDataFormat, DevState, EnsureOmniThread
from tango.server import Device, attribute, command, device_property, run
from ska_sdp_mock_dish_devices.pointing_utils import (
assert_hdf5_pointing_table,
stream_hdf5_pointings,
time_delay_pointings,
)
logger = logging.getLogger(__name__)
[docs]
class MockDishMaster(Device):
"""
A Mock implementation of ska-sim-dishmaster for interfaces
used by SDP.
"""
mock_achieved_paths: str = device_property(
dtype=ArgType.DevString, default_value=""
) # type: ignore
"""
A json mapping of scan id to HDF5 filepaths adhering to the
`ska-sdp-datamodels` PointingTable schema for the achievedPointing attribute.
"""
antenna_id: int = device_property(dtype=ArgType.DevInt, default_value=0) # type: ignore
"""Index of the antenna to mock in the PointingTable data."""
time_scale: float = device_property(dtype=ArgType.DevFloat, default_value=1.0) # type: ignore
"""
Factor to adjust the cadence of pointing events by. By default, events
will emit at the same cadence as the data, but this will allow events to
be emitted at a faster or slower rate.
"""
@attribute(
name="achievedPointing",
dtype=ArgType.DevDouble,
dformat=AttrDataFormat.SPECTRUM,
max_dim_x=3,
)
def achieved_pointing(self):
return self.__achieved_pointing
[docs]
@override
def init_device(self):
super().init_device()
self.__achieved_pointing = [0.0, 0.0, 0.0]
self.__stop_event = Event()
self.__achieved_pointing_thread = None
self.__achieved_pointing_config = (
json.loads(self.mock_achieved_paths) if self.mock_achieved_paths else {}
)
self.set_change_event("achievedPointing", True, False)
self.set_state(DevState.OFF)
[docs]
@command(dtype_in=ArgType.DevLong)
def Scan(self, scan_id: int):
"""Starts the mock tango device emulation.
Args:
scan_id (int): scan id corresponding to
a configured mock data path.
Raises:
ValueError: scan id out of range.
"""
logger.info("MockDishMaster %s starting scan %s", self.antenna_id, scan_id)
# Achieved Pointing
achieved_pointing_path = self.__achieved_pointing_config.get(str(scan_id))
if achieved_pointing_path:
logger.info(
"Scan %i reading achieved pointings from %s",
scan_id,
achieved_pointing_path,
)
self.__stop_event.clear()
self.__achieved_pointing_thread = threading.Thread(
target=self.__achieved_pointing_update_loop,
args=(h5py.File(achieved_pointing_path),),
)
self.__achieved_pointing_thread.start()
else:
logger.error("Scan %i contains no achieved pointings", scan_id)
self.set_state(DevState.ON)
[docs]
def is_Scan_allowed(self):
return self.get_state() == DevState.OFF
[docs]
@command
def EndScan(self):
"""
Stops and resets the tango device emulation.
"""
self.__stop_event.set()
if self.__achieved_pointing_thread:
self.__achieved_pointing_thread.join() # type: ignore
self.__achieved_pointing_thread = None
logger.info("MockDishMaster %s stopped", self.antenna_id)
self.set_state(DevState.OFF)
[docs]
def is_EndScan_allowed(self):
return self.get_state() == DevState.ON
def __achieved_pointing_update_loop(self, datafile: h5py.File):
with EnsureOmniThread():
try:
assert_hdf5_pointing_table(datafile, self.antenna_id)
for achieved_pointing in time_delay_pointings(
stream_hdf5_pointings(datafile, self.antenna_id),
self.time_scale,
self.__stop_event,
):
self.__achieved_pointing = achieved_pointing
self.push_change_event("achievedPointing", self.__achieved_pointing)
except Exception:
logging.exception("achieved pointing data exception")
# ----------
# Run server
# ----------
_TANGO_TO_PYTHON = {
tango.LogLevel.LOG_FATAL: logging.CRITICAL,
tango.LogLevel.LOG_ERROR: logging.ERROR,
tango.LogLevel.LOG_WARN: logging.WARNING,
tango.LogLevel.LOG_INFO: logging.INFO,
tango.LogLevel.LOG_DEBUG: logging.DEBUG,
tango.LogLevel.LOG_OFF: logging.NOTSET,
}
def main():
"""Main function of the MockDishMaster module."""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("-v", dest="verbose", nargs="?", type=int)
log_level = parser.parse_known_args()[0].verbose
ska_ser_logging.configure_logging(level=_TANGO_TO_PYTHON.get(log_level, logging.INFO))
run(classes=(MockDishMaster,))
if __name__ == "__main__":
main()