Source code for ska_low_mccs.station.station_device

#  -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements the MCCS station device."""
# pylint: disable=too-many-lines

from __future__ import annotations

import functools
import importlib.resources
import json
import logging
import sys
from typing import Any, Callable, Final, Optional

import numpy as np
import tango
from ska_control_model import CommunicationStatus, HealthState, PowerState, ResultCode
from ska_tango_base.base import CommandTracker
from ska_tango_base.commands import FastCommand, JsonValidator, SubmittedSlowCommand
from ska_tango_base.obs import SKAObsDevice
from tango.server import attribute, command, device_property

from ska_low_mccs.station.station_component_manager import StationComponentManager
from ska_low_mccs.station.station_health_model import StationHealthModel
from ska_low_mccs.station.station_obs_state_model import StationObsStateModel

DevVarLongStringArrayType = tuple[list[ResultCode], list[Optional[str]]]

__all__ = ["MccsStation", "main"]


# pylint: disable=too-many-instance-attributes,too-many-public-methods
[docs]class MccsStation(SKAObsDevice): """An implementation of a station beam Tango device for MCCS.""" # ----------------- # Device Properties # ----------------- StationId = device_property(dtype=int, default_value=0) FieldStationName = device_property(dtype=str, default_value="") AntennaTrls = device_property(dtype=(str,), default_value=[]) AntennaXs = device_property(dtype=(float,), default_value=[]) AntennaYs = device_property(dtype=(float,), default_value=[]) AntennaZs = device_property(dtype=(float,), default_value=[]) AntennaIDs = device_property(dtype=(int,), default_value=[]) StationCalibratorTrl = device_property(dtype=str, default_value="") SpsStationTrl = device_property(dtype=str, default_value="") RefLatitude = device_property(dtype=float, default_value=0.0) RefLongitude = device_property(dtype=float, default_value=0.0) RefHeight = device_property(dtype=float, default_value=0.0) # --------------- # Initialisation # ---------------
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None: """ Initialise this device object. :param args: positional args to the init :param kwargs: keyword args to the init """ # We aren't supposed to define initialisation methods for Tango # devices; we are only supposed to define an `init_device` method. But # we insist on doing so here, just so that we can define some # attributes, thereby stopping the linters from complaining about # "attribute-defined-outside-init" etc. We still need to make sure that # `init_device` re-initialises any values defined in here. super().__init__(*args, **kwargs) self._health_state: HealthState = HealthState.UNKNOWN self._health_model: StationHealthModel self.component_manager: StationComponentManager self._delay_centre: list[float] self._obs_state_model: StationObsStateModel self._refLatitude: float self._refLongitude: float self._refHeight: float
[docs] def init_device(self: MccsStation) -> None: """ Initialise the device. This is overridden here to change the Tango serialisation model. """ util = tango.Util.instance() util.set_serial_model(tango.SerialModel.NO_SYNC) super().init_device() self._build_state = sys.modules["ska_low_mccs"].__version_info__ self._version_id = sys.modules["ska_low_mccs"].__version__ device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}' version = f"{device_name} Software Version: {self._version_id}" properties = ( f"Initialised {device_name} device with properties:\n" f"\tStationId: {self.StationId:03}\n" f"\tReferenceLatitude: {self.RefLatitude:03}\n" f"\tReferenceLongitude: {self.RefLongitude:03}\n" f"\tReferenceHeight: {self.RefHeight:03}\n" f"\tFieldStationName: {self.FieldStationName}\n" f"\tStationCalibratorTrl: {self.StationCalibratorTrl}\n" f"\tSpsStationTrl: {self.SpsStationTrl}\n" f"\tAntennaTrls: {self.AntennaTrls}\n" f"\tAntennaXs: {self.AntennaXs}\n" f"\tAntennaYs: {self.AntennaYs}\n" f"\tAntennaZs: {self.AntennaZs}\n" f"\tAntennaIDs: {self.AntennaIDs}\n" ) self.logger.info( "\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties ) if all( trl == "" for trl in ( self.FieldStationName, self.StationCalibratorTrl, self.SpsStationTrl, ) ) and all(trllist == [] for trllist in self.AntennaTrls): self.logger.warning( "%s initialised with no subdevices. The device will " "report PowerState.ON and CommunicationStatus.ESTABLISHED", device_name, )
def _init_state_model(self: MccsStation) -> None: super()._init_state_model() self._obs_state_model = StationObsStateModel( self.logger, self._update_obs_state ) self._health_state = HealthState.UNKNOWN # InitCommand.do() does this too late. self._health_model = StationHealthModel( self.FieldStationName, self.SpsStationTrl, self.AntennaTrls, self.health_changed, thresholds={ "antenna_degraded": 0.05, "antenna_failed": 0.2, }, ) self.set_change_event("healthState", True, False)
[docs] def create_component_manager( self: MccsStation, ) -> StationComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ # Combine the three antenna location lists into a single array antenna_station_locations = np.array( list(zip(self.AntennaXs, self.AntennaYs, self.AntennaZs)) ) return StationComponentManager( self.StationId, self.RefLatitude, self.RefLongitude, self.RefHeight, self.FieldStationName, self.AntennaTrls, antenna_station_locations, self.AntennaIDs, self.StationCalibratorTrl, self.SpsStationTrl, self.logger, self._communication_state_changed, self._component_state_callback, )
_schema_scan: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.station", "MccsStation_Scan_3_0.json", ) ) _schema_configure_semi_static: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.station", "MccsStation_ConfigureSemiStatic_3_0.json", ) ) _schema_track_object: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.station", "MccsStation_TrackObject_3_0.json", ) )
[docs] def init_command_objects(self: MccsStation) -> None: """Set up the handler objects for Commands.""" super().init_command_objects() for command_name, method_name, schema in [ ( "ApplyConfiguration", "apply_configuration", None, ), ( "ConfigureSemiStatic", "configure_semi_static", self._schema_configure_semi_static, ), ( "ConfigureChannels", "configure_channels", None, ), ( "DeallocateSubarray", "deallocate_subarray", None, ), # ("LoadCalibrationCoefficients", "load_calibration_coefficients"), # ("ApplyCalibration", "apply_calibration"), ( "ApplyPointingDelays", "apply_pointing_delays", None, ), ( "LoadPointingDelays", "load_pointing_delays", None, ), ( "TrackObject", "track_object", self._schema_track_object, ), ("Scan", "scan", self._schema_scan), ("EndScan", "end_scan", None), ("AcquireDataForCalibration", "acquire_data_for_calibration", None), ("StopTracking", "stop_tracking", None), ("StopTrackingAll", "stop_tracking_all", None), ( "Initialise", "initialise", None, ), ]: validator = ( None if schema is None else JsonValidator( command_name, schema, logger=self.logger, ) ) self.register_command_object( command_name, SubmittedSlowCommand( command_name, self._command_tracker, self.component_manager, method_name, callback=None, logger=self.logger, validator=validator, ), ) self.register_command_object( "StartAcquisition", self.StartAcquisitionCommand( self._command_tracker, self.component_manager, callback=None, logger=self.logger, ), ) self.register_command_object( "GetPointingDelays", self.GetPointingDelaysCommand( self.component_manager, logger=self.logger, ), )
[docs] class InitCommand(SKAObsDevice.InitCommand): """ A class for :py:class:`~.MccsStation`'s Init command. The :py:meth:`~.MccsStation.InitCommand.do` method below is called upon :py:class:`~.MccsStation`'s initialisation. """
[docs] def do( self: MccsStation.InitCommand, *args: Any, **kwargs: Any, ) -> tuple[ResultCode, str]: """ Initialise the :py:class:`.MccsStation`. :param args: positional args to the component manager method :param kwargs: keyword args to the component manager method :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ self._device._subarray_id = 0 self._device._refLatitude = 0.0 self._device._refLongitude = 0.0 self._device._refHeight = 0.0 self._device._beam_trls = [] self._device._transient_buffer_trl = "" self._device._delay_centre = [] self._device._calibration_coefficients = [] self._device._is_calibrated = False self._device._calibration_job_id = 0 self._device._daq_job_id = 0 self._device._data_directory = "" self._device._build_state = str( sys.modules["ska_low_mccs"].__version_info__ ) self._device._version_id = sys.modules["ska_low_mccs"].__version__ self._device.set_change_event("beamTrls", True, True) self._device.set_archive_event("beamTrls", True, True) self._device.set_change_event("transientBufferTrl", True, False) self._device.set_archive_event("transientBufferTrl", True, False) self._device.set_change_event("outsideTemperature", True, False) self._device.set_archive_event("outsideTemperature", True, False) self._device.set_change_event("dataReceivedResult", True, False) self._device.set_archive_event("dataReceivedResult", True, False) super().do() return (ResultCode.OK, "Initialisation complete")
[docs] def is_On_allowed(self: MccsStation) -> bool: """ Check if command `Off` is allowed in the current device state. :return: ``True`` if the command is allowed """ return self.get_state() in [ tango.DevState.OFF, tango.DevState.STANDBY, tango.DevState.ON, tango.DevState.UNKNOWN, tango.DevState.FAULT, ]
# ---------- # Callbacks # ---------- def _communication_state_changed( self: MccsStation, communication_state: CommunicationStatus, ) -> None: """ Handle change in communications status between component manager and component. This is a callback hook, called by the component manager when the communications status changes. It is implemented here to drive the op_state. :param communication_state: the status of communications between the component manager and its component. """ action_map = { CommunicationStatus.DISABLED: "component_disconnected", CommunicationStatus.NOT_ESTABLISHED: "component_unknown", CommunicationStatus.ESTABLISHED: None, # wait for a power mode update } action = action_map[communication_state] if action is not None: self.op_state_model.perform_action(action) self._health_model.update_state( communicating=communication_state == CommunicationStatus.ESTABLISHED ) # pylint: disable=too-many-arguments, too-many-branches def _component_state_callback( self: MccsStation, power: Optional[PowerState] = None, fault: Optional[bool] = None, health: Optional[HealthState] = None, trl: Optional[str] = None, is_configured: Optional[bool] = None, outside_temperature: Optional[float] = None, data_received_result: Optional[tuple[str, str]] = None, ) -> None: """ Handle change in the state of the component. This is a callback hook, called by the component manager when the state of the component changes. For the power_state parameter it is implemented here to drive the op_state. For the health parameter it is implemented to update the health attribute and push change events whenever the HealthModel's evaluated health state changes. :param power: An optional parameter with the new power state of the device. :param fault: An optional parameter if the device is entering or exiting a fault state. :param health: An optional parameter with the new health state of the device. :param trl: TRL of the device whose state has changed. None if the device is a station. :param is_configured: An optional flag indicating whether the Station is configured. :param outside_temperature: the outside temperature reported by the field station. :param data_received_result: the dataReceivedResult reported by the SPS station. :raises ValueError: If TRL not found """ if trl is None: power_state_changed_callback = self._component_power_state_changed if power is not None: self._health_model.update_state(fault=fault, power=power) else: self._health_model.update_state(fault=fault) else: device_family = trl.split("/")[1] if device_family == "fieldstation": health_state_changed_callback = functools.partial( self._health_model.field_station_health_changed, trl ) power_state_changed_callback = ( self.component_manager._field_station_power_state_changed ) elif device_family == "antenna": health_state_changed_callback = functools.partial( self._health_model.antenna_health_changed, trl ) power_state_changed_callback = functools.partial( self.component_manager._antenna_power_state_changed, trl ) elif device_family == "spsstation": health_state_changed_callback = functools.partial( self._health_model.sps_station_health_changed, trl ) power_state_changed_callback = ( self.component_manager._sps_station_power_state_changed ) else: raise ValueError( f"Unknown TRL '{trl}', should be None or belong to antenna," " tile, spsstation or fieldstation" ) if power is not None: with self.component_manager.power_state_lock: power_state_changed_callback(power) if health is not None: health_state_changed_callback(health) if is_configured is not None: self._obs_state_model.is_configured_changed(is_configured) if outside_temperature is not None: self._outside_temperature_changed(outside_temperature) if data_received_result is not None: self._data_received_result_changed(data_received_result) def _component_power_state_changed( self: MccsStation, power_state: PowerState, ) -> None: """ Handle change in the power mode of the component. This is a callback hook, called by the component manager when the power mode of the component changes. It is implemented here to drive the op_state. :param power_state: the power mode of the component. """ action_map = { PowerState.OFF: "component_off", PowerState.STANDBY: "component_standby", PowerState.ON: "component_on", PowerState.UNKNOWN: "component_unknown", } self.op_state_model.perform_action(action_map[power_state])
[docs] def health_changed(self: MccsStation, health: HealthState) -> None: """ Handle change in this device's health state. This is a callback hook, called whenever the HealthModel's evaluated health state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed. :param health: the new health value """ if self._health_state != health: self._health_state = health self.push_change_event("healthState", health)
def _outside_temperature_changed( self: MccsStation, outside_temperature: float ) -> None: """ Handle change in the outside temperature. This passes changes in the outside temperature, which originate from the field station. :param outside_temperature: the outside temperature """ self.push_change_event("outsideTemperature", outside_temperature) def _data_received_result_changed( self: MccsStation, data_received_result: tuple[str, str] ) -> None: """ Handle change in the dataReceivedResult. This passes changes in the dataReceivedResult, which originate from the SPS station. :param data_received_result: the data receieved result """ self.push_change_event("dataReceivedResult", data_received_result) # ---------- # Attributes # ----------
[docs] @attribute( dtype="float", label="refLongitude", ) def refLongitude(self: MccsStation) -> float: """ Return the refLongitude attribute. :return: the WGS84 Longitude of the station reference position """ return self.component_manager.ref_longitude
[docs] @attribute( dtype="float", label="refLatitude", ) def refLatitude(self: MccsStation) -> float: """ Return the refLatitude attribute. :return: the WGS84 Latitude of the station reference position """ return self.component_manager.ref_latitude
[docs] @attribute( dtype="float", label="refHeight", unit="meters", ) def refHeight(self: MccsStation) -> float: """ Return the refHeight attribute. :return: the ellipsoidal height of the station reference position """ return self.component_manager.ref_height
[docs] @attribute( dtype="DevString", format="%s", ) def transientBufferTrl(self: MccsStation) -> str: """ Return the TRL of the TANGO device that managers the transient buffer. :return: the TRL of the TANGO device that managers the transient buffer """ return self._transient_buffer_trl
[docs] @attribute(dtype="DevBoolean") def isCalibrated(self: MccsStation) -> bool: """ Return a flag indicating whether this station is currently calibrated or not. :return: a flag indicating whether this station is currently calibrated or not. """ return self._is_calibrated
[docs] @attribute(dtype="DevBoolean") def isConfigured(self: MccsStation) -> bool: """ Return a flag indicating whether this station is currently configured or not. :return: a flag indicating whether this station is currently configured or not. """ return self.component_manager._is_configured
[docs] @attribute( dtype="DevLong", format="%i", ) def calibrationJobId(self: MccsStation) -> int: """ Return the calibration job id. :return: the calibration job id """ return self._calibration_job_id
[docs] @attribute( dtype="DevLong", format="%i", ) def daqJobId(self: MccsStation) -> int: """ Return the DAQ job id. :return: the DAQ job id """ return self._daq_job_id
[docs] @attribute( dtype="DevString", format="%s", ) def dataDirectory(self: MccsStation) -> str: """ Return the data directory. (the parent directory for all files generated by this station) :return: the data directory """ return self._data_directory
[docs] @attribute( dtype=("DevString",), max_dim_x=8, format="%s", ) def beamTrls(self: MccsStation) -> list[str]: """ Return the TRLs of station beams associated with this station. :return: the TRLs of station beams associated with this station """ return self._beam_trls
@attribute( dtype=("DevFloat",), max_dim_x=2, ) def delayCentre(self: MccsStation) -> list[float]: """ Return the WGS84 position of the delay centre of the station. :todo: WGS84 is a datum. What is the coordinate system? Latitude and longitude? Or is it SUTM50 eastings and northings? Either way, do we need to allow for elevation too? :return: the WGS84 position of the delay centre of the station """ return self._delay_centre
[docs] @delayCentre.write # type: ignore[no-redef] def delayCentre(self: MccsStation, value: list[float]) -> None: """ Set the delay centre of the station. :param value: WGS84 position """ self._delay_centre = value
[docs] @attribute( dtype=("DevFloat",), max_dim_x=512, ) def calibrationCoefficients(self: MccsStation) -> list[float]: """ Return the calibration coefficients for the station. :todo: How big should this array be? 4 complex values (Jones matrix) per channel. This station can have up to 16 tiles of up to 16 antennas, so that is 8 x 16 x 16 = 2048 coefficients per channel. But how many channels? 384 channels, 786432 elements per station (402M for SKA Low) :return: the calibration coefficients """ return self._calibration_coefficients
@attribute( dtype="DevString", format="%s", ) def healthModelParams(self: MccsStation) -> str: """ Get the health params from the health model. :return: the health params """ return json.dumps(self._health_model.health_params)
[docs] @healthModelParams.write # type: ignore[no-redef] def healthModelParams(self: MccsStation, argin: str) -> None: """ Set the params for health transition rules. :param argin: JSON-string of dictionary of health states """ self._health_model.health_params = json.loads(argin) self._health_model.update_health()
[docs] @attribute(dtype=(("DevLong",),), max_dim_y=48, max_dim_x=8) def beamformerTable(self: MccsStation) -> list[list[int]]: """ Return the ids of the channels configured for this beam. :return: channel table """ return self.component_manager.beamformer_table
[docs] @attribute(dtype="DevLong") def numberOfChannels(self: MccsStation) -> int: """ Return the total number of channels in the beamformer. :return: the total number of channels """ return self.component_manager._number_of_channels
[docs] @attribute(dtype="DevBoolean") def isSynchronised(self: MccsStation) -> bool: """ Return true if all tiles in the SpsStation are synchronised. :return: true if all tiles in the SpsStation are synchronised. """ return all( tileprogrammingstate.lower() == "synchronised" for tileprogrammingstate in self.component_manager.tileprogrammingstate )
[docs] @attribute(dtype="DevBoolean") def isInitialised(self: MccsStation) -> bool: """ Return true if all tiles in the SpsStation are initialised. :return: true if all tiles in the SpsStation are initialised. """ return all( tileprogrammingstate.lower() == "initialised" for tileprogrammingstate in self.component_manager.tileprogrammingstate )
[docs] @attribute(dtype="float", label="OutsideTemperature") def outsideTemperature(self: MccsStation) -> Optional[float]: """ Return the OutsideTemperature. :return: the OutsideTemperature. """ return self.component_manager.outside_temperature
[docs] @attribute(dtype="DevString") def healthReport(self: MccsStation) -> str: """ Get the health report. :return: the health report. """ return self._health_model.health_report
[docs] @attribute( dtype=("str",), max_dim_x=2, # Always the last result (unique_id, JSON-encoded result) ) def dataReceivedResult(self: MccsStation) -> tuple[str, str] | None: """ Read the result of the receiving of data. :return: A tuple containing the data mode of transmission and a json string with any additional data about the data such as the file name. """ return self.component_manager.data_received_result
[docs] @attribute(dtype=("float",), max_dim_x=513) def lastPointingDelays(self: MccsStation) -> list[float]: """ Return last pointing delays applied to the tiles. Values are initialised to 0.0 if they haven't been set. These values are in antenna EEP order. :returns: last pointing delays applied to the tiles. """ return self.component_manager.last_pointing_delays
# -------- # Commands # --------
[docs] @command( dtype_in="DevVarDoubleArray", dtype_out="DevVarLongStringArray", ) def LoadPointingDelays( self: MccsStation, argin: np.ndarray, ) -> DevVarLongStringArrayType: """ Set the pointing delay parameters of this Station's Tiles. :param argin: an array containing a beam index followed by antenna delays in antenna EEP order. 1 + 256*2 = 513 elements. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> dp.command_inout("LoadPointingDelays", delay_list) """ handler = self.get_command_object("LoadPointingDelays") (return_code, message) = handler(argin) return ([return_code], [message])
[docs] @command( dtype_in="DevString", dtype_out="DevVarLongStringArray", ) def ApplyPointingDelays(self: MccsStation, argin: str) -> DevVarLongStringArrayType: """ Set the pointing delay parameters of this Station's Tiles. :param argin: switch time, in ISO formatted time. Default: now :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> time_string = switch time as ISO formatted time >>> dp.command_inout("ApplyPointingDelays", time_string) """ handler = self.get_command_object("ApplyPointingDelays") (return_code, message) = handler(argin) return ([return_code], [message])
[docs] @command( dtype_in=("DevLong",), dtype_out="DevVarLongStringArray", ) def ConfigureChannels( self: MccsStation, argin: list[int] ) -> DevVarLongStringArrayType: """ Set the beamformer table entries for a station beam. Entries are defined as a flattened 2D array, for a maximum of 48 entries Each entry is 7 channels long and corresponds to 8 consecutive frequency channels. :param argin: list of channel block description. Elements are: * channel block index: value in range 0:47 for the channel block to set * start_channel - (int) region starting channel, even in range 0 to 510 * beam_index - (int) beam used for this region with range 0 to 47 * subarray_id - (int) Subarray: 0 is reserved for unallocated entry * subarray_logical_channel - (int) logical channel # in the subarray * subarray_beam_id - (int) ID of the subarray beam * substation_id - (int) Substation * aperture_id: ID of the aperture (station*100+substation?) :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> dp.command_inout("ConfigureChannels", block_table) """ return_code = self.component_manager.configure_channels(argin) message = "ConfigureChannels completed" # handler = self.get_command_object("ConfigureChannels") # (return_code, message) = handler(argin) return ([return_code], [message])
[docs] @command( dtype_in="DevLong", dtype_out="DevVarLongStringArray", ) def DeallocateSubarray( self: MccsStation, subarray_id: int ) -> DevVarLongStringArrayType: """ Deallocates entries relative to a subarray in aggregate tables. :param subarray_id: the ID of the subarray to deallocate :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ return_code = self.component_manager.deallocate_subarray(subarray_id) message = "DeallocateSubarray completed" # handler = self.get_command_object("DeallocateSubarray") # (return_code, message) = handler(subarray_id) return ([return_code], [message])
[docs] @command( dtype_in="DevString", dtype_out="DevVarLongStringArray", ) def ApplyConfiguration( self: MccsStation, transaction_id: str, ) -> DevVarLongStringArrayType: """ Apply the aggregated channel table to this Station's SpsStation. :param transaction_id: transaction id for the configuration :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> dp.command_inout("ApplyConfiguration") """ handler = self.get_command_object("ApplyConfiguration") (return_code, message) = handler(transaction_id) return ([return_code], [message])
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") def TrackObject(self: MccsStation, argin: str) -> DevVarLongStringArrayType: """ Track an object through the sky. :param argin: Configuration parameters encoded in a json string :return: The id of the tracking thread. """ tracking_id = self.component_manager.tracking_id handler = self.get_command_object("TrackObject") (result_code, _) = handler(argin) return ([result_code], [str(tracking_id)])
[docs] @command(dtype_in="DevLong", dtype_out="DevVarLongStringArray") def StopTracking(self: MccsStation, track_id: int) -> DevVarLongStringArrayType: """ Stop tracking an object. :param track_id: The ID of the thread you wish to stop tracking. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("StopTracking") (result_code, unique_id) = handler(track_id) return ([result_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray") def StopTrackingAll(self: MccsStation) -> DevVarLongStringArrayType: """ Stop all tracking. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("StopTrackingAll") (result_code, unique_id) = handler() return ([result_code], [unique_id])
[docs] class GetPointingDelaysCommand(FastCommand): """Class for handling the GetPointingDelays() command.""" SCHEMA: Final = json.loads( importlib.resources.files("ska_low_mccs.schemas.station") .joinpath("MccsStation_GetPointingDelays_3_0.json") .read_text() )
[docs] def __init__( self: MccsStation.GetPointingDelaysCommand, component_manager: StationComponentManager, logger: logging.Logger | None = None, ) -> None: """ Initialise a new instance. :param component_manager: the device to which this command belongs. :param logger: a logger for this command to use. """ self._component_manager = component_manager validator = JsonValidator("GetPointingDelays", self.SCHEMA, logger) super().__init__(logger, validator)
[docs] def do( self: MccsStation.GetPointingDelaysCommand, *args: Any, **kwargs: Any, ) -> np.ndarray: """ Implement :py:meth:`MccsStation.GetPointingDelaysCommand` command. :param args: unspecified positional arguments. This should be empty and is provided for type hinting only :param kwargs: unspecified keyword arguments. This should be empty and is provided for type hinting only :return: json encoded string containing list of dictionaries """ pointing_type = kwargs.get("pointing_type", None) values = kwargs.get("values", None) time_step = kwargs.get("time_step", 10) reference_time = kwargs.get("reference_time", None) return self._component_manager.get_pointing_delays( pointing_type=pointing_type, values=values, time_step=time_step, reference_time=reference_time, )
[docs] @command( dtype_in="DevString", dtype_out="DevVarDoubleArray", ) def GetPointingDelays(self: MccsStation, argin: str) -> np.ndarray: """ Get Pointing Coefficients. :param argin: stringified dict of args :return: The pointing delays as pairs of (delay, delay rate) in EEP order. """ handler = self.get_command_object("GetPointingDelays") return handler(argin)
[docs] @command( dtype_in="DevString", dtype_out="DevVarLongStringArray", ) def ConfigureSemiStatic(self: MccsStation, argin: str) -> DevVarLongStringArrayType: """ Configure semi static information like position of antennas. :param argin: Semi static information :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> config = json.dumps({ "station": { "StationId": 1, "ref_latitude": 1.0, "ref_longitude": 1.0, "ref_height": 1.0, } }) >>> dp.command_inout("ConfigureSemiStatic", config) """ handler = self.get_command_object("ConfigureSemiStatic") (return_code, message) = handler(argin) return ([return_code], [message])
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") def Scan(self: MccsStation, argin: str) -> DevVarLongStringArrayType: """ Start the scan associated with the station_beam. :param argin: Configuration parameters encoded in a json string :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("Scan") (result_code, unique_id) = handler(argin) return ([result_code], [unique_id])
[docs] @command(dtype_in="DevLong", dtype_out="DevVarLongStringArray") def EndScan(self: MccsStation, subarray_id: int) -> DevVarLongStringArrayType: """ Stop the current scan associated with the station_beam. :param subarray_id: the subarray for which the command applies :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("EndScan") (result_code, unique_id) = handler(subarray_id) return ([result_code], [unique_id])
[docs] @command(dtype_in="DevLong", dtype_out="DevVarLongStringArray") def AcquireDataForCalibration( self: MccsStation, channel: int ) -> DevVarLongStringArrayType: """ Instruct the SpsStation to start acquiring calibration data from the tiles. :param channel: the frequency channel to calibrate for. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("AcquireDataForCalibration") (result_code, unique_id) = handler(channel) return ([result_code], [unique_id])
[docs] class StartAcquisitionCommand(SubmittedSlowCommand): # pylint: disable=line-too-long """ Class for handling the StartAcquisition() command. This command takes as input a JSON string that conforms to the following schema: .. literalinclude:: /../../src/ska_low_mccs/schemas/common/Mccs_StartAcquisition.json :language: json """ # noqa: E501 SCHEMA: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.common", "Mccs_StartAcquisition.json", ) )
[docs] def __init__( self: MccsStation.StartAcquisitionCommand, command_tracker: CommandTracker, component_manager: StationComponentManager, callback: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ) -> None: """ Initialise a new instance. :param command_tracker: the device's command tracker :param component_manager: the device's component manager :param callback: an optional callback to be called when this command starts and finishes. :param logger: a logger for this command to log with. """ validator = JsonValidator("StartAcquisition", self.SCHEMA, logger) super().__init__( "StartAcquisition", command_tracker, component_manager, "start_acquisition", callback=callback, logger=logger, validator=validator, )
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") def StartAcquisition(self: MccsStation, argin: str) -> DevVarLongStringArrayType: """ Start data acquisition. :param argin: json dictionary with optional keywords: * start_time - (ISO UTC time) start time * delay - (int) delay start if StartTime is not specified, default 2s :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("StartAcquisition") (return_code, unique_id) = handler(argin) return ([return_code], [unique_id])
[docs] @command(dtype_out="DevVarLongStringArray") def Initialise(self: MccsStation) -> DevVarLongStringArrayType: """ Initialise this station's tiles. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ handler = self.get_command_object("Initialise") (return_code, unique_id) = handler() return ([return_code], [unique_id])
# ---------- # Run server # ----------
[docs]def main(*args: str, **kwargs: str) -> int: # pragma: no cover """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ return MccsStation.run_server(args=args or None, **kwargs)
if __name__ == "__main__": main()