# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides a Tango device for a station calibration solver."""
from __future__ import annotations
import importlib.resources
import json
import logging
from typing import Callable, Final, cast
from numpy import ndarray
from ska_control_model import ResultCode
from ska_low_mccs_common import MccsBaseDevice
from ska_tango_base.base import CommandTracker
from ska_tango_base.commands import JsonValidator, SubmittedSlowCommand
from tango.server import command, device_property
from ska_low_mccs.calibration_solver.solver_component_manager import (
StationCalibrationSolverComponentManager,
)
DevVarLongStringArrayType = tuple[list[ResultCode], list[str]]
__all__ = ["StationCalibrationSolverDevice", "main"]
# pylint: disable=too-many-ancestors
[docs]
class StationCalibrationSolverDevice(
MccsBaseDevice[StationCalibrationSolverComponentManager]
):
"""A Tango device for a station calibration solver."""
RootPath = device_property(dtype=str, mandatory=True)
EEPRootPath = device_property(dtype=str, mandatory=True)
[docs]
def init_device(self: StationCalibrationSolverDevice) -> None:
"""Initialise the device."""
super().init_device()
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
properties = (
f"Initialised {device_name} device with properties:\n"
f"\tRootPath: '{self.RootPath}'\n"
f"\tEEPRootPath : '{self.EEPRootPath}'\n"
)
self.logger.info("\n%s\n%s", device_name, properties)
[docs]
def create_component_manager(
self: StationCalibrationSolverDevice,
) -> StationCalibrationSolverComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
return StationCalibrationSolverComponentManager(
self.RootPath,
self.EEPRootPath,
self.logger,
self._communication_state_changed,
self._component_state_changed,
)
[docs]
def init_command_objects(self: StationCalibrationSolverDevice) -> None:
"""Initialise the command handlers for commands supported by this device."""
super().init_command_objects()
self.register_command_object(
"Solve",
StationCalibrationSolverDevice.SolveCommand(
self._command_tracker,
self.component_manager,
callback=None,
logger=self.logger,
),
)
self.register_command_object(
"GetFittedGains",
StationCalibrationSolverDevice.GetFittedGainsCommand(
self._command_tracker,
self.component_manager,
callback=None,
logger=self.logger,
),
)
[docs]
class GetFittedGainsCommand(SubmittedSlowCommand):
"""A class for the GetFittedGainsCommand() command."""
with (
importlib.resources.files("ska_low_mccs.calibration_solver.schemas")
/ "StationCalibrationSolverDevice_GetFittedGains.json"
).open("r") as fp:
SCHEMA: Final = json.loads(fp.read())
[docs]
def __init__(
self: StationCalibrationSolverDevice.GetFittedGainsCommand,
command_tracker: CommandTracker,
component_manager: StationCalibrationSolverComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
validator = JsonValidator("GetFittedGains", self.SCHEMA, logger)
super().__init__(
"GetFittedGains",
command_tracker,
component_manager,
"get_fitted_gains",
callback=callback,
logger=logger,
validator=validator,
)
[docs]
class SolveCommand(SubmittedSlowCommand):
"""A class for the Solve() command."""
with (
importlib.resources.files("ska_low_mccs.calibration_solver.schemas")
/ "StationCalibrationSolverDevice_Solve.json"
).open("r") as fp:
SCHEMA: Final = json.loads(fp.read())
[docs]
def __init__(
self: StationCalibrationSolverDevice.SolveCommand,
command_tracker: CommandTracker,
component_manager: StationCalibrationSolverComponentManager,
callback: Callable[[bool], None] | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise a new instance.
:param command_tracker: the device's command tracker
:param component_manager: the device's component manager
:param callback: an optional callback to be called when this
command starts and finishes.
:param logger: a logger for this command to log with.
"""
validator = JsonValidator("Solve", self.SCHEMA, logger)
super().__init__(
"Solve",
command_tracker,
component_manager,
"solve",
callback=callback,
logger=logger,
validator=validator,
)
[docs]
@command( # type: ignore[misc]
dtype_in="DevString", dtype_out="DevVarLongStringArray"
)
def Solve(
self: StationCalibrationSolverDevice, argin: str
) -> DevVarLongStringArrayType:
"""
Solve for a calibration solution.
:param argin: json dictionary containing the mandatory keys
data_path, solution_path, eep_path, eep_filebase, station_config_path
and optional keys
back_rotation, nside, niter, skymodel
:example:
>>> dp = tango.DeviceProxy("low-mccs/solver/solver")
>>> config = json.dumps({
"data_path": ( # relative to RootPath property
"eb-t0001-20240422-00008/correlation_burst_205_20240422_27281_0.hdf5"),
"solution_path": ( # relative to RootPath property
"eb-t0001-20240422-00008/correlation_burst_205_20240422_27281_0.npy"),
"eep_filebase": "FEKO_AAVS3_vogel_256_elem_50ohm_",
"station_config_path": [
"car:ska-low-aavs3?main",
"instrument/mccs-configuration/aavs3.yaml",
],
"structure_version",
"back_rotation": True,
"nside": 32,
"niter": 200,
"skymodel": "gsm"
}
})
>>> dp.command_inout("Solve", config)
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
handler = self.get_command_object("Solve")
(return_code, unique_id) = handler(argin)
return ([return_code], [unique_id])
[docs]
@command( # type: ignore[misc]
dtype_in=("DevLong",), dtype_out="DevVarLongStringArray"
)
def GetFittedGains(
self: StationCalibrationSolverDevice,
channels: list[int],
phases: ndarray,
path: str,
) -> DevVarLongStringArrayType:
"""
Solve for a calibration solution.
:param channels: The range of channels to fit over
:param phases: 2D Numpy array containing phases for every antenna at every
frequency, shape [n_phases,n_ant]
:param path: The path to save the offsets, gradients, cov_matrices and costs
>>> dp = tango.DeviceProxy("low-mccs/solver/solver")
>>> config = json.dumps({
"channels": [50, 51, 52, 53, 54, ... , 100] # Some channels to fit over
"phases": [[0.1, 0.2], [0.3, 0.4]... ] # Some array of phases to be fit over
"path": "some/file/path"
}
})
>>> dp.command_inout("GetFittedGains", config)
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
handler = self.get_command_object("GetFittedGains")
(return_code, unique_id) = handler(channels, phases, path)
return ([return_code], [unique_id])
# ----------
# Run server
# ----------
[docs]
def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return cast(
int, StationCalibrationSolverDevice.run_server(args=args or None, **kwargs)
)
if __name__ == "__main__":
main()