Source code for ska_low_mccs.calibration_solver.solver_device

# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides a Tango device for a station calibration solver."""
from __future__ import annotations

import importlib.resources
import json
import logging
from typing import Callable, Final, cast

from numpy import ndarray
from ska_control_model import ResultCode
from ska_low_mccs_common import MccsBaseDevice
from ska_tango_base.base import CommandTracker
from ska_tango_base.commands import JsonValidator, SubmittedSlowCommand
from tango.server import command, device_property

from ska_low_mccs.calibration_solver.solver_component_manager import (
    StationCalibrationSolverComponentManager,
)

DevVarLongStringArrayType = tuple[list[ResultCode], list[str]]

__all__ = ["StationCalibrationSolverDevice", "main"]


# pylint: disable=too-many-ancestors
[docs] class StationCalibrationSolverDevice( MccsBaseDevice[StationCalibrationSolverComponentManager] ): """A Tango device for a station calibration solver.""" RootPath = device_property(dtype=str, mandatory=True) EEPRootPath = device_property(dtype=str, mandatory=True)
[docs] def init_device(self: StationCalibrationSolverDevice) -> None: """Initialise the device.""" super().init_device() device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}' properties = ( f"Initialised {device_name} device with properties:\n" f"\tRootPath: '{self.RootPath}'\n" f"\tEEPRootPath : '{self.EEPRootPath}'\n" ) self.logger.info("\n%s\n%s", device_name, properties)
[docs] def create_component_manager( self: StationCalibrationSolverDevice, ) -> StationCalibrationSolverComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ return StationCalibrationSolverComponentManager( self.RootPath, self.EEPRootPath, self.logger, self._communication_state_changed, self._component_state_changed, )
[docs] def init_command_objects(self: StationCalibrationSolverDevice) -> None: """Initialise the command handlers for commands supported by this device.""" super().init_command_objects() self.register_command_object( "Solve", StationCalibrationSolverDevice.SolveCommand( self._command_tracker, self.component_manager, callback=None, logger=self.logger, ), ) self.register_command_object( "GetFittedGains", StationCalibrationSolverDevice.GetFittedGainsCommand( self._command_tracker, self.component_manager, callback=None, logger=self.logger, ), )
[docs] class GetFittedGainsCommand(SubmittedSlowCommand): """A class for the GetFittedGainsCommand() command.""" with ( importlib.resources.files("ska_low_mccs.calibration_solver.schemas") / "StationCalibrationSolverDevice_GetFittedGains.json" ).open("r") as fp: SCHEMA: Final = json.loads(fp.read())
[docs] def __init__( self: StationCalibrationSolverDevice.GetFittedGainsCommand, command_tracker: CommandTracker, component_manager: StationCalibrationSolverComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ validator = JsonValidator("GetFittedGains", self.SCHEMA, logger) super().__init__( "GetFittedGains", command_tracker, component_manager, "get_fitted_gains", callback=callback, logger=logger, validator=validator, )
[docs] class SolveCommand(SubmittedSlowCommand): """A class for the Solve() command.""" with ( importlib.resources.files("ska_low_mccs.calibration_solver.schemas") / "StationCalibrationSolverDevice_Solve.json" ).open("r") as fp: SCHEMA: Final = json.loads(fp.read())
[docs] def __init__( self: StationCalibrationSolverDevice.SolveCommand, command_tracker: CommandTracker, component_manager: StationCalibrationSolverComponentManager, callback: Callable[[bool], None] | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ validator = JsonValidator("Solve", self.SCHEMA, logger) super().__init__( "Solve", command_tracker, component_manager, "solve", callback=callback, logger=logger, validator=validator, )
[docs] @command( # type: ignore[misc] dtype_in="DevString", dtype_out="DevVarLongStringArray" ) def Solve( self: StationCalibrationSolverDevice, argin: str ) -> DevVarLongStringArrayType: """ Solve for a calibration solution. :param argin: json dictionary containing the mandatory keys data_path, solution_path, eep_path, eep_filebase, station_config_path and optional keys back_rotation, nside, niter, skymodel :example: >>> dp = tango.DeviceProxy("low-mccs/solver/solver") >>> config = json.dumps({ "data_path": ( # relative to RootPath property "eb-t0001-20240422-00008/correlation_burst_205_20240422_27281_0.hdf5"), "solution_path": ( # relative to RootPath property "eb-t0001-20240422-00008/correlation_burst_205_20240422_27281_0.npy"), "eep_filebase": "FEKO_AAVS3_vogel_256_elem_50ohm_", "station_config_path": [ "car:ska-low-aavs3?main", "instrument/mccs-configuration/aavs3.yaml", ], "structure_version", "back_rotation": True, "nside": 32, "niter": 200, "skymodel": "gsm" } }) >>> dp.command_inout("Solve", config) :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("Solve") (return_code, unique_id) = handler(argin) return ([return_code], [unique_id])
[docs] @command( # type: ignore[misc] dtype_in=("DevLong",), dtype_out="DevVarLongStringArray" ) def GetFittedGains( self: StationCalibrationSolverDevice, channels: list[int], phases: ndarray, path: str, ) -> DevVarLongStringArrayType: """ Solve for a calibration solution. :param channels: The range of channels to fit over :param phases: 2D Numpy array containing phases for every antenna at every frequency, shape [n_phases,n_ant] :param path: The path to save the offsets, gradients, cov_matrices and costs >>> dp = tango.DeviceProxy("low-mccs/solver/solver") >>> config = json.dumps({ "channels": [50, 51, 52, 53, 54, ... , 100] # Some channels to fit over "phases": [[0.1, 0.2], [0.3, 0.4]... ] # Some array of phases to be fit over "path": "some/file/path" } }) >>> dp.command_inout("GetFittedGains", config) :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("GetFittedGains") (return_code, unique_id) = handler(channels, phases, path) return ([return_code], [unique_id])
# ---------- # Run server # ----------
[docs] def main(*args: str, **kwargs: str) -> int: # pragma: no cover """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ return cast( int, StationCalibrationSolverDevice.run_server(args=args or None, **kwargs) )
if __name__ == "__main__": main()