Source code for ska_low_mccs.calibration_solver.solver_device

# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides a Tango device for a station calibration solver."""
from __future__ import annotations

import importlib.resources
import json
import threading
from typing import Any, Final, cast

import ska_tango_base as stb
from ska_low_mccs_common import MccsBaseDevice
from tango.server import device_property

from ska_low_mccs.calibration_solver.solver_component_manager import (
    StationCalibrationSolverComponentManager,
)

__all__ = ["StationCalibrationSolverDevice", "main"]


# pylint: disable=too-many-ancestors
[docs] class StationCalibrationSolverDevice( MccsBaseDevice[StationCalibrationSolverComponentManager] ): """A Tango device for a station calibration solver.""" InitCommand = None # type: ignore[assignment] RootPath = device_property(dtype=str, mandatory=True) EEPRootPath = device_property(dtype=str, mandatory=True)
[docs] def init_device(self: StationCalibrationSolverDevice) -> None: """Initialise the device.""" super().init_device() self.component_manager: StationCalibrationSolverComponentManager device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}' properties = ( f"Initialised {device_name} device with properties:\n" f"\tRootPath: '{self.RootPath}'\n" f"\tEEPRootPath : '{self.EEPRootPath}'\n" ) self.logger.info("\n%s\n%s", device_name, properties) self.init_completed()
[docs] def create_component_manager( self: StationCalibrationSolverDevice, ) -> StationCalibrationSolverComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ return StationCalibrationSolverComponentManager( self.RootPath, self.EEPRootPath, self.logger, self._communication_state_changed, self._component_state_changed, )
with ( importlib.resources.files("ska_low_mccs.calibration_solver.schemas") / "StationCalibrationSolverDevice_Solve.json" ).open("r") as fp: Solve_SCHEMA: Final = json.loads(fp.read())
[docs] @stb.long_running_commands.long_running_command @stb.validators.validate_json_args def Solve( self, data_path: str, solution_path: str, station_config_path: tuple[str, str], **kwargs: Any, ) -> stb.type_hints.TaskFunctionType: """ Solve for a calibration solution. :param data_path: path to the stored data to use for solving for calibration. :param solution_path: the path of the solution to be written. :param station_config_path: a list used to locate configuration from TelModel. :param kwargs: optional keys eep_filebase, back_rotation, nside, niter, skymodel :example: >>> dp = tango.DeviceProxy("low-mccs/solver/solver") >>> config = json.dumps({ "data_path": ( # relative to RootPath property "eb-t0001-20240422-00008/correlation_burst_205_20240422_27281_0.hdf5"), "solution_path": ( # relative to RootPath property "eb-t0001-20240422-00008/correlation_burst_205_20240422_27281_0.npy"), "eep_filebase": "FEKO_AAVS3_vogel_256_elem_50ohm_", "station_config_path": [ "car:ska-low-aavs3?main", "instrument/mccs-configuration/aavs3.yaml", ], "structure_version", "back_rotation": True, "nside": 32, "niter": 200, "skymodel": "gsm" } }) >>> dp.command_inout("Solve", config) :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.solve( data_path, solution_path, station_config_path, **kwargs, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
# ---------- # Run server # ----------
[docs] def main(*args: str, **kwargs: str) -> int: # pragma: no cover """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ return cast( int, StationCalibrationSolverDevice.run_server(args=args or None, **kwargs) )
if __name__ == "__main__": main()