import argparse
import json
import logging
import threading
from threading import Event
import h5py
import ska_ser_logging
import tango
from overrides import override
from tango import ArgType, AttrDataFormat, DevState, EnsureOmniThread
from tango.server import Device, attribute, command, device_property, run
from ska_sdp_mock_dish_devices.pointing_utils import (
assert_hdf5_pointing_table,
stream_hdf5_pointings,
time_delay_pointings,
)
logger = logging.getLogger(__name__)
[docs]
class MockDishLeafnode(Device):
"""
A Mock implementation of ska-tmc-dishleafnode for interfaces
used by SDP.
"""
mock_desired_paths: str = device_property(
dtype=ArgType.DevString, default_value=""
) # type: ignore
"""
A json mapping of scan id to HDF5 filepaths adhering to the
`ska-sdp-datamodels` PointingTable schema for the desiredPointing attribute.
"""
mock_offset_paths: str = device_property(
dtype=ArgType.DevString, default_value=""
) # type: ignore
"""
A json mapping of scan id to HDF5 filepaths adhering to the
`ska-sdp-datamodels` PointingTable schema for the sourceOffset attribute.
"""
antenna_id: int = device_property(dtype=ArgType.DevInt, default_value=0) # type: ignore
"""Index of the antenna to mock in the PointingTable data."""
time_scale: float = device_property(dtype=ArgType.DevFloat, default_value=1.0) # type: ignore
"""
Factor to adjust the cadence of pointing events by. By default, events
will emit at the same cadence as the data, but this will allow events to
be emitted at a faster or slower rate.
"""
@attribute(
name="desiredPointing",
dtype=ArgType.DevDouble,
dformat=AttrDataFormat.SPECTRUM,
max_dim_x=3,
)
def desired_pointing(self):
return self.__desired_pointing
@attribute(
name="sourceOffset",
dtype=ArgType.DevDouble,
dformat=AttrDataFormat.SPECTRUM,
max_dim_x=3,
)
def source_offset(self):
return self.__source_offset
[docs]
@override
def init_device(self):
super().init_device()
self.__desired_pointing = [0.0, 0.0, 0.0]
self.__source_offset = [0.0, 0.0, 0.0]
self.__stop_event = Event()
self.__desired_pointing_thread = None
self.__desired_pointing_config = (
json.loads(self.mock_desired_paths) if self.mock_desired_paths else {}
)
self.set_change_event("desiredPointing", True, False)
self.__source_offset_thread = None
self.__source_offset_config = (
json.loads(self.mock_offset_paths) if self.mock_offset_paths else {}
)
self.set_change_event("sourceOffset", True, False)
self.set_state(DevState.OFF)
[docs]
@command(dtype_in=ArgType.DevLong)
def Scan(self, scan_id: int):
"""Starts the mock tango device emulation.
Args:
scan_id (int): scan id corresponding to
a configured mock data path.
Raises:
ValueError: scan id out of range.
"""
logger.info("MockDishLeafnode %s starting scan %s", self.antenna_id, scan_id)
# Desired Pointing
desired_pointing_path = self.__desired_pointing_config.get(str(scan_id))
if desired_pointing_path:
logger.info(
"Scan %i reading desired pointings from %s",
scan_id,
desired_pointing_path,
)
self.__stop_event.clear()
self.__desired_pointing_thread = threading.Thread(
target=self.__desired_pointing_update_loop,
args=(h5py.File(desired_pointing_path),),
)
self.__desired_pointing_thread.start()
else:
logger.error("Scan %i contains no desired pointings", scan_id)
# Source Offset
source_offset_path = self.__source_offset_config.get(str(scan_id))
if self.__source_offset_config:
logger.info(
"Scan %i reading source offsets from %s",
scan_id,
source_offset_path,
)
self.__source_offset_thread = threading.Thread(
target=self.__source_offset_update_loop,
args=(h5py.File(source_offset_path),),
)
self.__source_offset_thread.start()
else:
logger.warning("Scan %i contains no source offsets", scan_id)
self.set_state(DevState.ON)
[docs]
def is_Scan_allowed(self):
return self.get_state() == DevState.OFF
[docs]
@command
def EndScan(self):
"""
Stops and resets the tango device emulation.
"""
self.__stop_event.set()
if self.__desired_pointing_thread:
self.__desired_pointing_thread.join() # type: ignore
self.__desired_pointing_thread = None
if self.__source_offset_thread:
self.__source_offset_thread.join() # type: ignore
self.__source_offset_thread = None
logger.info("MockDishLeafnode %s stopped", self.antenna_id)
self.set_state(DevState.OFF)
[docs]
def is_EndScan_allowed(self):
return self.get_state() == DevState.ON
def __desired_pointing_update_loop(self, datafile):
with EnsureOmniThread():
try:
assert_hdf5_pointing_table(datafile, self.antenna_id)
for desired_pointing in time_delay_pointings(
stream_hdf5_pointings(datafile, self.antenna_id),
self.time_scale,
self.__stop_event,
):
self.__desired_pointing = desired_pointing
self.push_change_event("desiredPointing", self.__desired_pointing)
except Exception:
logging.exception("pointing data exception")
def __source_offset_update_loop(self, datafile: h5py.File):
with EnsureOmniThread():
try:
assert_hdf5_pointing_table(datafile, self.antenna_id)
for source_offset in time_delay_pointings(
stream_hdf5_pointings(datafile, self.antenna_id),
self.time_scale,
self.__stop_event,
):
self.__source_offset = source_offset
self.push_change_event("sourceOffset", self.__source_offset)
except Exception:
logging.exception("source offset data exception")
# ----------
# Run server
# ----------
_TANGO_TO_PYTHON = {
tango.LogLevel.LOG_FATAL: logging.CRITICAL,
tango.LogLevel.LOG_ERROR: logging.ERROR,
tango.LogLevel.LOG_WARN: logging.WARNING,
tango.LogLevel.LOG_INFO: logging.INFO,
tango.LogLevel.LOG_DEBUG: logging.DEBUG,
tango.LogLevel.LOG_OFF: logging.NOTSET,
}
def main():
"""Main function of the MockDishLeafnode module."""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("-v", dest="verbose", nargs="?", type=int)
log_level = parser.parse_known_args()[0].verbose
ska_ser_logging.configure_logging(level=_TANGO_TO_PYTHON.get(log_level, logging.INFO))
run(classes=(MockDishLeafnode,))
if __name__ == "__main__":
main()