# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides a Tango device for a station calibration solver."""
from __future__ import annotations
import importlib.resources
import json
import threading
from typing import Any, Final, cast
import ska_tango_base as stb
from ska_low_mccs_common import MccsBaseDevice
from tango.server import device_property
from ska_low_mccs.calibration_solver.solver_component_manager import (
StationCalibrationSolverComponentManager,
)
__all__ = ["StationCalibrationSolverDevice", "main"]
# pylint: disable=too-many-ancestors
[docs]
class StationCalibrationSolverDevice(
MccsBaseDevice[StationCalibrationSolverComponentManager]
):
"""A Tango device for a station calibration solver."""
InitCommand = None # type: ignore[assignment]
RootPath = device_property(dtype=str, mandatory=True)
EEPRootPath = device_property(dtype=str, mandatory=True)
[docs]
def init_device(self: StationCalibrationSolverDevice) -> None:
"""Initialise the device."""
super().init_device()
self.component_manager: StationCalibrationSolverComponentManager
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
properties = (
f"Initialised {device_name} device with properties:\n"
f"\tRootPath: '{self.RootPath}'\n"
f"\tEEPRootPath : '{self.EEPRootPath}'\n"
)
self.logger.info("\n%s\n%s", device_name, properties)
self.init_completed()
[docs]
def create_component_manager(
self: StationCalibrationSolverDevice,
) -> StationCalibrationSolverComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
return StationCalibrationSolverComponentManager(
self.RootPath,
self.EEPRootPath,
self.logger,
self._communication_state_changed,
self._component_state_changed,
)
with (
importlib.resources.files("ska_low_mccs.calibration_solver.schemas")
/ "StationCalibrationSolverDevice_Solve.json"
).open("r") as fp:
Solve_SCHEMA: Final = json.loads(fp.read())
[docs]
@stb.long_running_commands.long_running_command
@stb.validators.validate_json_args
def Solve(
self,
data_path: str,
solution_path: str,
station_config_path: tuple[str, str],
**kwargs: Any,
) -> stb.type_hints.TaskFunctionType:
"""
Solve for a calibration solution.
:param data_path: path to the stored data to use for solving for calibration.
:param solution_path: the path of the solution to be written.
:param station_config_path: a list used to locate configuration from TelModel.
:param kwargs: optional keys eep_filebase, back_rotation, nside, niter, skymodel
:example:
>>> dp = tango.DeviceProxy("low-mccs/solver/solver")
>>> config = json.dumps({
"data_path": ( # relative to RootPath property
"eb-t0001-20240422-00008/correlation_burst_205_20240422_27281_0.hdf5"),
"solution_path": ( # relative to RootPath property
"eb-t0001-20240422-00008/correlation_burst_205_20240422_27281_0.npy"),
"eep_filebase": "FEKO_AAVS3_vogel_256_elem_50ohm_",
"station_config_path": [
"car:ska-low-aavs3?main",
"instrument/mccs-configuration/aavs3.yaml",
],
"structure_version",
"back_rotation": True,
"nside": 32,
"niter": 200,
"skymodel": "gsm"
}
})
>>> dp.command_inout("Solve", config)
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
def task(
task_callback: stb.type_hints.TaskCallbackType,
task_abort_event: threading.Event,
) -> None:
self.component_manager.solve(
data_path,
solution_path,
station_config_path,
**kwargs,
task_callback=task_callback,
task_abort_event=task_abort_event,
)
return task
# ----------
# Run server
# ----------
[docs]
def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return cast(
int, StationCalibrationSolverDevice.run_server(args=args or None, **kwargs)
)
if __name__ == "__main__":
main()