Source code for ska_low_mccs.station.station_device

#  -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements the MCCS station device."""
# pylint: disable=too-many-lines

from __future__ import annotations

import functools
import importlib.resources
import json
import sys
import threading
from typing import Any, Final, Optional, cast

import numpy as np
import ska_tango_base as stb
import tango
from ska_control_model import (
    AdminMode,
    CommunicationStatus,
    HealthState,
    PowerState,
    ResultCode,
    TaskStatus,
)
from ska_control_model.health_rollup import HealthRollup, HealthSummary
from ska_low_mccs_common import MccsBaseDevice
from ska_tango_base.obs import SKAObsDevice
from ska_tango_base.type_hints import DevVarLongStringArrayType
from tango.server import attribute, command, device_property

from ska_low_mccs.station.station_component_manager import StationComponentManager
from ska_low_mccs.station.station_health_model import StationHealthModel
from ska_low_mccs.station.station_obs_state_model import StationObsStateModel

from ..utils import NumpyEncoder, interleaved_float_list_to_complex_list

__all__ = ["MccsStation", "main"]


# pylint: disable=too-many-instance-attributes,too-many-public-methods
# pylint: disable=too-many-ancestors
[docs] class MccsStation(MccsBaseDevice, SKAObsDevice): """An implementation of a station Tango device for MCCS.""" InitCommand = None # type: ignore[assignment] # ----------------- # Device Properties # ----------------- StationId = device_property(dtype=int, default_value=0) FieldStationName = device_property(dtype=str, default_value="") AntennaTrls = device_property(dtype=(str,), default_value=[]) AntennaXs = device_property(dtype=(float,), default_value=[]) AntennaYs = device_property(dtype=(float,), default_value=[]) AntennaZs = device_property(dtype=(float,), default_value=[]) AntennaIDs = device_property(dtype=(int,), default_value=[]) StationCalibratorTrl = device_property(dtype=str, default_value="") SpsStationTrl = device_property(dtype=str, default_value="") RefLatitude = device_property(dtype=float, default_value=0.0) RefLongitude = device_property(dtype=float, default_value=0.0) RefHeight = device_property(dtype=float, default_value=0.0) CalibrationLoadDelay = device_property(dtype=float, default_value=1.0) AntennaMasks = device_property( dtype=(bool,), doc=("Masked antenna indexed by TPM_Idx (i.e tpm_id * 16 + channel // 2). "), default_value=[False] * 256, ) UseBeamWeights = device_property( dtype=bool, default_value=True, doc=( "True if we want to use the beam weighting in calibration coefficients. " "Currently only unit weights are allowed and " "masked antennas are zeros accross all channels." ), ) # --------------- # Initialisation # ---------------
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None: """ Initialise this device object. :param args: positional args to the init :param kwargs: keyword args to the init """ # We aren't supposed to define initialisation methods for Tango # devices; we are only supposed to define an `init_device` method. But # we insist on doing so here, just so that we can define some # attributes, thereby stopping the linters from complaining about # "attribute-defined-outside-init" etc. We still need to make sure that # `init_device` re-initialises any values defined in here. # Health definitions needed before super init self._use_new_health_model: bool = True super().__init__(*args, **kwargs) self._calibration_coefficients: list[float] self._beam_trls: list[str] self._health_state: HealthState = HealthState.UNKNOWN self._health_thresholds: dict[str, Any] self._health_rollup: HealthRollup self._health_model: StationHealthModel self._health_report: str = "" self.component_manager: StationComponentManager self._delay_centre: list[float] self._obs_state_model: StationObsStateModel self._refLatitude: float self._refLongitude: float self._refHeight: float self._failed_pointing_updates: dict[int, int]
def _setup_health_rollup( self: MccsStation, ) -> HealthRollup: # Rollup is based on three configurable thresholds: # * the number of FAILED (or UNKNOWN) sources that cause health # to roll up to overall FAILED; # * the number of FAILED (or UNKNOWN) sources that cause health # to roll up to overall DEGRADED; # * the number of DEGRADED sources that cause health to roll up to # overall DEGRADED. # Here the "self" entry represets MccsStation specific health changes # of which there are currently none. rollup_members = ["self"] thresholds = {"self": (1, 1, 1)} if self.FieldStationName != "": rollup_members.append("fieldstation") # Any failed is failed. # Any failed is degraded. # Any degraded is degraded. thresholds["fieldstation"] = self._health_thresholds[self.FieldStationName] if self.SpsStationTrl != "": rollup_members.append("spsstation") # Any failed is failed. # Any failed is degraded. # Any degraded is degraded. thresholds["spsstation"] = self._health_thresholds[self.SpsStationTrl] if len(self.AntennaTrls) > 0: rollup_members.append("antennas") # ~10% (up to 25) failed is failed. # ~5% (up to 12) failed is degraded. # ~10% (up to 25) degraded is degraded. thresholds["antennas"] = self._health_thresholds["antennas"] health_rollup = HealthRollup( rollup_members, thresholds["self"], self._health_changed, self._health_summary_changed, ) if "fieldstation" in thresholds: health_rollup.define( "fieldstation", [self.FieldStationName], thresholds["fieldstation"], ) if "spsstation" in thresholds: health_rollup.define( "spsstation", [self.SpsStationTrl], thresholds["spsstation"], ) if "stationcalibrator" in thresholds: health_rollup.define( "stationcalibrator", [self.StationCalibratorTrl], thresholds["stationcalibrator"], ) if "antennas" in thresholds: health_rollup.define("antennas", self.AntennaTrls, thresholds["antennas"]) return health_rollup def _redefine_health_rollup(self: MccsStation) -> None: """ Redefine the health rollup members and thresholds. Redefines the health rollup following a change in subdevice thresholds. This pulls the old/current healths from the health report, instantiates a new health_rollup instance and restores those healthstates. """ def _flatten_dict(d: dict[str, Any]) -> dict[str, Any]: """ Return a flattened dictionary given nested dicts. Returns a flattened dictionary containing the key-value pairs of the nested dictionaries. Where a key-value pair is itself a dictionary this will also be flattened and the parent key omitted. :param d: the nested dictionary to flatten :return: flattened dictionary. """ def _flatten(d: dict[str, Any]) -> dict[str, Any]: items: list[Any] = [] for k, v in d.items(): if isinstance(v, dict): items.extend(_flatten(v).items()) else: items.append((k, v)) return dict(items) return _flatten(d) # Pull out the old healthstates. old_report = json.loads(self._health_report) old_subdevice_healths = _flatten_dict(old_report) old_online = self._health_rollup.online self._health_rollup = self._setup_health_rollup() self._health_rollup.online = old_online # Restore old healthstates. for subdevice, health in old_subdevice_healths.items(): self._health_rollup.health_changed(subdevice, cast(HealthState, health))
[docs] def init_device(self: MccsStation) -> None: """Initialise the device.""" self._subarray_id = 0 self._refLatitude = 0.0 self._refLongitude = 0.0 self._refHeight = 0.0 self._beam_trls = [] self._transient_buffer_trl = "" self._delay_centre = [] self._calibration_coefficients = [] self._is_calibrated = False self._calibration_job_id = 0 self._daq_job_id = 0 self._data_directory = "" self._failed_pointing_updates = {} super().init_device() self._build_state = sys.modules["ska_low_mccs"].__version_info__ self._version_id = sys.modules["ska_low_mccs"].__version__ device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}' version = f"{device_name} Software Version: {self._version_id}" properties = ( f"Initialised {device_name} device with properties:\n" f"\tStationId: {self.StationId:03}\n" f"\tReferenceLatitude: {self.RefLatitude:03}\n" f"\tReferenceLongitude: {self.RefLongitude:03}\n" f"\tReferenceHeight: {self.RefHeight:03}\n" f"\tFieldStationName: {self.FieldStationName}\n" f"\tStationCalibratorTrl: {self.StationCalibratorTrl}\n" f"\tSpsStationTrl: {self.SpsStationTrl}\n" f"\tAntennaTrls: {self.AntennaTrls}\n" f"\tAntennaXs: {self.AntennaXs}\n" f"\tAntennaYs: {self.AntennaYs}\n" f"\tAntennaZs: {self.AntennaZs}\n" f"\tAntennaIDs: {self.AntennaIDs}\n" f"\tAntennaMasks: {self.AntennaMasks}\n" f"\tCalibrationLoadDelay: {self.CalibrationLoadDelay}\n" ) self.logger.info( "\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties ) if all( trl == "" for trl in ( self.FieldStationName, self.StationCalibratorTrl, self.SpsStationTrl, ) ) and all(trllist == [] for trllist in self.AntennaTrls): self.logger.warning( "%s initialised with no subdevices. The device will " "report PowerState.ON and CommunicationStatus.ESTABLISHED", device_name, ) self.set_change_event("beamTrls", True, True) self.set_archive_event("beamTrls", True, True) self.set_change_event("transientBufferTrl", True, False) self.set_archive_event("transientBufferTrl", True, False) self.set_change_event("outsideTemperature", True, False) self.set_archive_event("outsideTemperature", True, False) self.set_change_event("dataReceivedResult", True, False) self.set_archive_event("dataReceivedResult", True, False) self.set_change_event("failedPointingUpdates", True, False) self.set_archive_event("failedPointingUpdates", True, False) self.init_completed()
def _init_state_model(self: MccsStation) -> None: super()._init_state_model() self._obs_state_model = StationObsStateModel( self.logger, self._update_obs_state ) self._health_thresholds = { "antennas": ( min(np.ceil(len(self.AntennaTrls) * 0.1), 25), # 10% up to max of 25. min(np.ceil(len(self.AntennaTrls) * 0.05), 12), # 5% up to max of 12. min(np.ceil(len(self.AntennaTrls) * 0.1), 25), # 10% up to max of 25. ), f"{self.FieldStationName}": (1, 1, 1), f"{self.SpsStationTrl}": (1, 1, 1), } self._health_state = HealthState.UNKNOWN # InitCommand.do() does this too late. self._health_rollup = self._setup_health_rollup() self._health_model = StationHealthModel( self.FieldStationName, self.SpsStationTrl, self.AntennaTrls, self._old_health_changed, thresholds={ "antenna_degraded": 0.05, "antenna_failed": 0.2, }, ) self.set_change_event("healthState", True, False) def _update_admin_mode(self: MccsStation, admin_mode: AdminMode) -> None: super()._update_admin_mode(admin_mode) self._health_rollup.online = admin_mode in [ AdminMode.ENGINEERING, AdminMode.ONLINE, ]
[docs] def create_component_manager( self: MccsStation, ) -> StationComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ # Combine the three antenna location lists into a single array antenna_station_locations = np.array( list(zip(self.AntennaXs, self.AntennaYs, self.AntennaZs)) ) return StationComponentManager( self.StationId, self.RefLatitude, self.RefLongitude, self.RefHeight, self.FieldStationName, self.AntennaTrls, antenna_station_locations, self.AntennaIDs, self.StationCalibratorTrl, self.SpsStationTrl, self.CalibrationLoadDelay, self.AntennaMasks, self.UseBeamWeights, self.logger, self._communication_state_changed, self._component_state_callback, event_serialiser=self._event_serialiser, )
Scan_SCHEMA: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.station", "MccsStation_Scan_3_0.json", ) ) ConfigureSemiStatic_SCHEMA: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.station", "MccsStation_ConfigureSemiStatic_3_0.json", ) ) TrackObject_SCHEMA: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.station", "MccsStation_TrackObject_3_1.json", ) ) ApplyConfiguration_SCHEMA: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.station", "MccsStation_ApplyConfiguration_1_2.json", ) ) AcquireDataForCalibration_SCHEMA: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.station", "MccsStation_AcquireDataForCalibration_1_0.json", ) ) TriggerAdcEqualisation_SCHEMA: Final = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.station", "MccsStation_TriggerAdcEqualisation_1_0.json", ) ) StartAcquisition_SCHEMA = json.loads( importlib.resources.read_text( "ska_low_mccs.schemas.common", "Mccs_StartAcquisition.json", ) ) GetPointingDelays_SCHEMA: Final = json.loads( importlib.resources.files("ska_low_mccs.schemas.station") .joinpath("MccsStation_GetPointingDelays_3_0.json") .read_text() ) LoadBeamWeights_SCHEMA: Final = json.loads( s=importlib.resources.files(package="ska_low_mccs.schemas.station") .joinpath("MccsStation_LoadBeamWeights_2_0.json") .read_text() ) # ---------- # Callbacks # ---------- def _communication_state_changed( self: MccsStation, communication_state: CommunicationStatus, ) -> None: """ Handle change in communications status between component manager and component. This is a callback hook, called by the component manager when the communications status changes. It is implemented here to drive the op_state. :param communication_state: the status of communications between the component manager and its component. """ super()._communication_state_changed(communication_state) self._health_model.update_state( communicating=communication_state == CommunicationStatus.ESTABLISHED ) if communication_state == CommunicationStatus.DISABLED: self._component_state_callback(trl="self", health=HealthState.UNKNOWN) elif communication_state == CommunicationStatus.ESTABLISHED: self._component_state_callback(trl="self", health=HealthState.OK) # pylint: disable=too-many-arguments, too-many-branches def _component_state_callback( self: MccsStation, fault: Optional[bool] = None, power: Optional[PowerState] = None, health: Optional[HealthState] = None, trl: Optional[str] = None, is_configured: Optional[bool] = None, outside_temperature: Optional[float] = None, data_received_result: Optional[tuple[str, str]] = None, failed_pointing_updates: Optional[dict[int, int]] = None, fault_antenna_mapping: Optional[bool] = None, ) -> None: """ Handle change in the state of the component. This is a callback hook, called by the component manager when the state of the component changes. For the power_state parameter it is implemented here to drive the op_state. For the health parameter it is implemented to update the health attribute and push change events whenever the HealthModel's evaluated health state changes. :param power: An optional parameter with the new power state of the device. :param fault: An optional parameter if the device is entering or exiting a fault state. :param health: An optional parameter with the new health state of the device. :param trl: TRL of the device whose state has changed. None if the device is a station. :param is_configured: An optional flag indicating whether the Station is configured. :param outside_temperature: the outside temperature reported by the field station. :param data_received_result: the dataReceivedResult reported by the SPS station. :param failed_pointing_updates: the number of failed pointing updates on a per beam basis. :param fault_antenna_mapping: flag for missing/missconfigured antenna mappings :raises ValueError: If TRL not found """ health_state_changed_callback = None if trl is None or trl == "self": power_state_changed_callback = self._component_power_state_changed if power is not None: self._health_model.update_state(fault=fault, power=power) else: self._health_model.update_state(fault=fault) else: # Old health model callback setup. device_family = trl.split("/")[1] if device_family == "fieldstation": health_state_changed_callback = functools.partial( self._health_model.field_station_health_changed, trl ) power_state_changed_callback = ( self.component_manager._field_station_power_state_changed ) elif device_family == "antenna": health_state_changed_callback = functools.partial( self._health_model.antenna_health_changed, trl ) power_state_changed_callback = functools.partial( self.component_manager._antenna_power_state_changed, trl ) elif device_family == "spsstation": health_state_changed_callback = functools.partial( self._health_model.sps_station_health_changed, trl ) power_state_changed_callback = ( self.component_manager._sps_station_power_state_changed ) elif device_family == "stationcalibrator": pass else: raise ValueError( f"Unknown TRL '{trl}', should be None or belong to antenna," " tile, spsstation or fieldstation" ) if power is not None: power_state_changed_callback(power) if health is not None: if trl is not None: # New health callback. self._health_rollup.health_changed(source=trl, health=health) if health_state_changed_callback is not None: # Old health callback. health_state_changed_callback(health) if is_configured is not None: self._obs_state_model.is_configured_changed(is_configured) if outside_temperature is not None: self._outside_temperature_changed(outside_temperature) if data_received_result is not None: self._data_received_result_changed(data_received_result) if failed_pointing_updates is not None: self._failed_pointing_updates_changed(failed_pointing_updates) if fault_antenna_mapping: self.set_state(tango.DevState.FAULT) def _component_power_state_changed( self: MccsStation, power_state: PowerState, ) -> None: """ Handle change in the power mode of the component. This is a callback hook, called by the component manager when the power mode of the component changes. It is implemented here to drive the op_state. :param power_state: the power mode of the component. """ action_map = { PowerState.OFF: "component_off", PowerState.STANDBY: "component_standby", PowerState.ON: "component_on", PowerState.UNKNOWN: "component_unknown", } self.op_state_model.perform_action(action_map[power_state]) def _health_changed(self: MccsStation, health: HealthState) -> None: """ Handle change in this device's health state. This is a callback hook, called whenever the HealthModel's evaluated health state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed. :param health: the new health value """ if self._use_new_health_model: self._health_state = health def _old_health_changed(self: MccsStation, health: HealthState) -> None: """ Handle change in this device's health state. This is a callback hook, called whenever the HealthModel's evaluated health state changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed. :param health: the new health value """ if not self._use_new_health_model: if self._health_state != health: self._health_state = health def _health_summary_changed( self: MccsStation, health_summary: HealthSummary ) -> None: """ Handle change in this device's health summary. This is a callback hook, called whenever this device's evaluated health summary changes. It is responsible for updating the tango side of things i.e. making sure the attribute is up to date, and events are pushed. :param health_summary: the new health summary """ self._health_report = json.dumps(health_summary) def _outside_temperature_changed( self: MccsStation, outside_temperature: float ) -> None: """ Handle change in the outside temperature. This passes changes in the outside temperature, which originate from the field station. :param outside_temperature: the outside temperature """ self.push_change_event("outsideTemperature", outside_temperature) def _data_received_result_changed( self: MccsStation, data_received_result: tuple[str, str] ) -> None: """ Handle change in the dataReceivedResult. This passes changes in the dataReceivedResult, which originate from the SPS station. :param data_received_result: the data receieved result """ self.push_change_event("dataReceivedResult", data_received_result) def _failed_pointing_updates_changed( self: MccsStation, failed_pointing_updates: dict[int, int] ) -> None: """ Handle change in the unlocked beams. :param failed_pointing_updates: the unlocked beams. """ self._failed_pointing_updates = failed_pointing_updates self.push_change_event( "failedPointingUpdates", json.dumps(failed_pointing_updates) ) self.push_archive_event( "failedPointingUpdates", json.dumps(failed_pointing_updates) ) # ---------- # Attributes # ---------- @attribute(dtype="DevBoolean") def useNewHealthModel(self: MccsStation) -> bool: """ Return a flag indicating whether this station is using the new health model. :return: a flag indicating whether this station is currently using the new health model. """ return self._use_new_health_model
[docs] @useNewHealthModel.write # type: ignore[no-redef] def useNewHealthModel(self: MccsStation, argin: bool) -> None: """ Set a flag indicating whether this station is using the new health model. :param argin: a flag indicating whether this station is currently using the new health model. """ self._use_new_health_model = argin
[docs] @attribute( dtype="int", label="refLongitude", ) def stationID(self: MccsStation) -> int: """ Return the station ID. :return: the numerical station ID. """ return self.StationId
[docs] @attribute( dtype="float", label="refLongitude", ) def refLongitude(self: MccsStation) -> float: """ Return the refLongitude attribute. :return: the WGS84 Longitude of the station reference position """ return self.component_manager.ref_longitude
[docs] @attribute( dtype="float", label="refLatitude", ) def refLatitude(self: MccsStation) -> float: """ Return the refLatitude attribute. :return: the WGS84 Latitude of the station reference position """ return self.component_manager.ref_latitude
[docs] @attribute( dtype="float", label="refHeight", unit="meters", ) def refHeight(self: MccsStation) -> float: """ Return the refHeight attribute. :return: the ellipsoidal height of the station reference position """ return self.component_manager.ref_height
[docs] @attribute( dtype="DevString", format="%s", ) def activeBankCalibrationId(self: MccsStation) -> str: """ Return the calibration id in the active bank. :return: Return the calibration id in use """ return self.component_manager.active_cal_id
[docs] @attribute( dtype="DevString", format="%s", ) def inactiveBankCalibrationId(self: MccsStation) -> str: """ Return the calibration id of the inactive bank. :return: Return the inactive bank calibration id """ return self.component_manager.inactive_cal_id
[docs] @attribute( dtype="DevString", format="%s", ) def latestPreferredJobId(self: MccsStation) -> str: """ Return the user_friendly_name of the most recent preferred calibration job. :return: the user_friendly_name of the most recent preferred job, or an empty string if none exists. """ return self.component_manager.latest_preferred_job_id
[docs] @attribute( dtype="DevString", format="%s", ) def transientBufferTrl(self: MccsStation) -> str: """ Return the TRL of the TANGO device that managers the transient buffer. :return: the TRL of the TANGO device that managers the transient buffer """ return self._transient_buffer_trl
[docs] @attribute(dtype="DevBoolean") def isCalibrated(self: MccsStation) -> bool: """ Return a flag indicating whether this station is currently calibrated or not. :return: a flag indicating whether this station is currently calibrated or not. """ return self._is_calibrated
[docs] @attribute(dtype="DevBoolean") def isConfigured(self: MccsStation) -> bool: """ Return a flag indicating whether this station is currently configured or not. :return: a flag indicating whether this station is currently configured or not. """ return self.component_manager._is_configured
[docs] @attribute( dtype="DevLong", format="%i", ) def calibrationJobId(self: MccsStation) -> int: """ Return the calibration job id. :return: the calibration job id """ return self._calibration_job_id
[docs] @attribute( dtype="DevLong", format="%i", ) def daqJobId(self: MccsStation) -> int: """ Return the DAQ job id. :return: the DAQ job id """ return self._daq_job_id
[docs] @attribute( dtype="DevString", format="%s", ) def dataDirectory(self: MccsStation) -> str: """ Return the data directory. (the parent directory for all files generated by this station) :return: the data directory """ return self._data_directory
[docs] @attribute( dtype=("DevString",), max_dim_x=8, format="%s", ) def beamTrls(self: MccsStation) -> list[str]: """ Return the TRLs of station beams associated with this station. :return: the TRLs of station beams associated with this station """ return self._beam_trls
@attribute(dtype="DevFloat") def calibrationLoadDelay(self: MccsStation) -> float: """ Return the load time in seconds used when we call ApplyCalibration internally. Value defaults to 1 second. :returns: calibration load time that we used. """ return self.component_manager.calibration_load_delay
[docs] @calibrationLoadDelay.write # type: ignore[no-redef] def calibrationLoadDelay(self: MccsStation, value: float) -> None: """ Set the load time for calls to ApplyCalibration. :param value: Number of seconds in the future for the calibration to be applied. """ self.component_manager.calibration_load_delay = value
@attribute(dtype=("DevBoolean",), max_dim_x=256, fisallowed="engineering_to_write") def antennaMasks(self: MccsStation) -> float: """ Return the antennaMasks. :returns: The antennaMasks. """ return self.component_manager.antenna_masks
[docs] @antennaMasks.write # type: ignore[no-redef] def antennaMasks(self: MccsStation, masks: list[bool]) -> None: """ Set the antennaMasks. This must be executed in EngineeringMode. Value is persisted in the database. :param masks: masks to apply """ tango.Database().put_device_property(self.get_name(), {"AntennaMasks": masks}) self.component_manager.antenna_masks = np.array(masks)
[docs] def engineering_to_write(self: MccsStation, req_type: tango.AttReqType) -> bool: """ Return a flag representing whether we are in Engineering mode. :param req_type: the request type :return: True if Tile is in Engineering Mode. """ is_engineering = self._admin_mode == AdminMode.ENGINEERING if not is_engineering and req_type == tango.AttReqType.WRITE_REQ: reason = "CommandNotAllowed" msg = ( "To write this attribute we must be in adminMode Engineering " f"Tile is currently in adminMode {AdminMode(self._admin_mode).name}" ) tango.Except.throw_exception(reason, msg, self.get_name()) return False return True
[docs] @attribute( dtype=(("DevFloat",),), max_dim_x=384, # Logical Channels max_dim_y=256, # Antennas ) def configuredWeightingBlock(self: MccsStation) -> np.ndarray: """ Return the configured weighting block. This will return the Weighting Block configured. Note that this may or may not have been written to the hardware. Please view appliedWeightingBlock for the block applied. Magic number -1 represents that this has not been updated. .. warning: This is not read from hardware and is not persisted well. please view TPM calibration_table for written value. :return: The current configured weightingBlock. """ return self.component_manager.configured_weighting_block
[docs] @attribute( dtype=(("DevFloat",),), max_dim_x=384, # Logical Channels max_dim_y=256, # Antennas ) def appliedWeightingBlock(self: MccsStation) -> np.ndarray: """ Return the last applied weighting block. Magic number -1 represents that this has not been updated. .. warning: This is not read from hardware and is not persisted well. please view TPM calibration_table for written value. :return: The last know weightingBlock applied. """ return self.component_manager.applied_weighting_block
@attribute( dtype=("DevFloat",), max_dim_x=2, ) def delayCentre(self: MccsStation) -> list[float]: """ Return the WGS84 position of the delay centre of the station. :todo: WGS84 is a datum. What is the coordinate system? Latitude and longitude? Or is it SUTM50 eastings and northings? Either way, do we need to allow for elevation too? :return: the WGS84 position of the delay centre of the station """ return self._delay_centre
[docs] @delayCentre.write # type: ignore[no-redef] def delayCentre(self: MccsStation, value: list[float]) -> None: """ Set the delay centre of the station. :param value: WGS84 position """ self._delay_centre = value
[docs] @attribute( dtype=("DevFloat",), max_dim_x=512, ) def calibrationCoefficients(self: MccsStation) -> list[float]: """ Return the calibration coefficients for the station. :todo: How big should this array be? 4 complex values (Jones matrix) per channel. This station can have up to 16 tiles of up to 16 antennas, so that is 8 x 16 x 16 = 2048 coefficients per channel. But how many channels? 384 channels, 786432 elements per station (402M for SKA Low) :return: the calibration coefficients """ return self._calibration_coefficients
@attribute( dtype="DevString", format="%s", ) def healthThresholds(self: MccsStation) -> str: """ Get the health params from the health model. Default health thresholds: "antennas": (f2f, d2f, d2d), tuple(int, int, int): Number of antennas failed before health failed, Number of antennas degraded before health failed, Number of antennas degraded before health degraded :return: the health params """ if not self._use_new_health_model: self.logger.warning( "These are thresholds used by the new health model. " "Old health model is in use. " "To see old health model thresholds use healthModelParams." ) return json.dumps(self._health_thresholds)
[docs] @healthThresholds.write # type: ignore[no-redef] def healthThresholds(self: MccsStation, argin: str) -> None: """ Set the params for health transition rules. Default health thresholds: "antennas": (f2f, d2f, d2d), tuple(int, int, int): Number of antennas failed before health failed, Number of antennas degraded before health failed, Number of antennas degraded before health degraded :param argin: JSON-string of dictionary of health thresholds """ if not self._use_new_health_model: self.logger.warning( "Old health model is in use. " "These thresholds are for the new health model. " "Thresholds will be updated but will not be used unless the " "new health model is activated. " "To update old health model thresholds use healthModelParams." ) thresholds = json.loads(argin) for key, threshold in thresholds.items(): if key not in self._health_thresholds: self.logger.info( "Invalid Key Supplied: %s. Allowed keys: %s", key, self._health_thresholds.keys(), ) continue self._health_thresholds[key] = threshold # TODO: Modify rollup classes to allow this. # Redefine health thresholds if needed. # if key == "antennas": # self._health_rollup.define( # "antennas", self.AntennaTrls, self._health_thresholds["antennas"] # ) # if key == self.FieldStationName: # self._health_rollup.define( # self.FieldStationName, # [self.FieldStationName], # thresholds["fieldstation"], # ) # if key == self.SpsStationTrl: # self._health_rollup.define( # self.SpsStationTrl, [self.SpsStationTrl], thresholds["spsstation"] # ) # If we changed thresholds for subdevices, redefine health rollup. if any( subdevice in thresholds for subdevice in ["antennas", "spsstation", "fieldstation"] ): self.logger.info("Reconfiguring subdevice health thresholds.") self._redefine_health_rollup() # If old health model is around, update it too. if self._health_model is not None: self._health_model.health_params = ( self._health_model.health_params | self._health_thresholds )
@attribute( dtype="DevString", format="%s", ) def healthModelParams(self: MccsStation) -> str: """ Get the health params from the health model. :return: the health params """ if self._use_new_health_model: self.logger.warning( "These are the thresholds for the old health model. " "New health model is currently in use. " "To see new health model thresholds use healthThresholds." ) return json.dumps(self._health_model.health_params)
[docs] @healthModelParams.write # type: ignore[no-redef] def healthModelParams(self: MccsStation, argin: str) -> None: """ Set the params for health transition rules. :param argin: JSON-string of dictionary of health states """ if self._use_new_health_model: self.logger.warning( "New health model is in use. " "These thresholds are for the old health model." "Thresholds will be updated but will not " "be used unless the old health model is activated. " "To update new health model thresholds use healthThresholds." ) self._health_model.health_params = json.loads(argin) self._health_model.update_health()
@attribute( dtype="DevString", format="%s", ) def pointingUpdateTimingHistory(self: MccsStation) -> str: """ Return the pointing update timing history as JSON. Contains timing information for the last N pointing updates (configurable via write, default 10), including: - window_start: Start time of the update window - window_end: End time of the update window - cycle_start: When the calculate/load/apply cycle started - cycle_end: When the calculate/load/apply cycle finished - load_time: The time supplied to hardware for applying delays - time_into_window: Seconds into the window when cycle started - time_consumed: Seconds consumed by the update cycle Write an integer to this attribute to change the history size. :return: JSON string containing timing history """ return self.component_manager.pointing_update_timing_history
[docs] @pointingUpdateTimingHistory.write # type: ignore[no-redef] def pointingUpdateTimingHistory(self: MccsStation, history_size: int) -> None: """ Set the number of pointing update timing records to keep in history. :param history_size: Number of timing records to keep (must be > 0) """ self.component_manager.set_pointing_update_timing_history_size(history_size)
[docs] @attribute(dtype=(("DevLong",),), max_dim_y=48, max_dim_x=8) def beamformerTable(self: MccsStation) -> list[list[int]]: """ Return the ids of the channels configured for this beam. :return: channel table """ return self.component_manager.beamformer_table
[docs] @attribute(dtype="DevLong") def numberOfChannels(self: MccsStation) -> int: """ Return the total number of channels in the beamformer. :return: the total number of channels """ return self.component_manager._number_of_channels
[docs] @attribute(dtype="DevBoolean") def isSynchronised(self: MccsStation) -> bool: """ Return true if all tiles in the SpsStation are synchronised. :return: true if all tiles in the SpsStation are synchronised. """ return all( tileprogrammingstate.lower() == "synchronised" for tileprogrammingstate in self.component_manager.tileprogrammingstate )
[docs] @attribute(dtype="DevBoolean") def isInitialised(self: MccsStation) -> bool: """ Return true if all tiles in the SpsStation are initialised. :return: true if all tiles in the SpsStation are initialised. """ return all( tileprogrammingstate.lower() == "initialised" for tileprogrammingstate in self.component_manager.tileprogrammingstate )
[docs] @stb.long_running_commands.submit_lrc_task def execute_On(self) -> stb.type_hints.TaskFunctionType: """ Turn the station on. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ return self.component_manager.do_on
[docs] @stb.long_running_commands.submit_lrc_task def execute_Off(self) -> stb.type_hints.TaskFunctionType: """ Turn the station on. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ return self.component_manager.do_off
[docs] @attribute(dtype="float", label="OutsideTemperature") def outsideTemperature(self: MccsStation) -> Optional[float]: """ Return the OutsideTemperature. :return: the OutsideTemperature. """ return self.component_manager.outside_temperature
[docs] @attribute(dtype="DevString") def healthReport(self: MccsStation) -> str: """ Get the health report. :return: the health report. """ if self._use_new_health_model: return self._health_report return self._health_model.health_report
[docs] @attribute( dtype=("str",), max_dim_x=2, # Always the last result (unique_id, JSON-encoded result) ) def dataReceivedResult(self: MccsStation) -> tuple[str, str] | None: """ Read the result of the receiving of data. :return: A tuple containing the data mode of transmission and a json string with any additional data about the data such as the file name. """ return self.component_manager.data_received_result
[docs] @attribute(dtype="DevString") def lastPointingDelays(self: MccsStation) -> str: """ Return last pointing delays applied to the tiles. Values are initialised to 0.0 if they haven't been set. These values are in antenna EEP order. :returns: last pointing delays applied to the tiles. """ return json.dumps( self.component_manager._pointing_manager._get_last_pointing_delays(), cls=NumpyEncoder, )
[docs] @attribute(dtype="DevString") def trackingStatus(self: MccsStation) -> str: """ Return the tracking status of currently active PointingRequests. Beam IDs not listed have no active PointingRequest. :returns: Tracking status of active PointingRequests. """ return json.dumps(self.component_manager._pointing_manager.tracking_status)
[docs] @attribute(dtype="DevString") def activeBeamInfo(self: MccsStation) -> str: """ Return the configuration information of currently active PointingRequests. List information about each active beam. Info includes pointing type, values, reference time, scan time, time step, next update time, and whether the scan is complete. :returns: A json encoded dictionary with beam IDs as keys and their information as values. """ return json.dumps(self.component_manager._pointing_manager.active_beam_info)
[docs] @attribute(dtype="DevString") def failedPointingUpdates(self: MccsStation) -> str: """ Return the tracking beams which are currently unlocked. :returns: the tracking beams which are currently unlocked. """ return json.dumps(self._failed_pointing_updates)
[docs] @attribute(dtype=str) def cspIngestConfig(self: MccsStation) -> str: """ Report the CspIngest configuration in use for this station. :return: json string with CspIngest configuration. """ if self.component_manager._sps_station_proxy is None: return "No SPS Station associated with this station" return self.component_manager._sps_station_proxy.csp_ingest_config()
@attribute(dtype=bool) def additionalDebug(self: MccsStation) -> bool: """ Return whether additional debug information is enabled. :return: True if additional debug information is enabled, False otherwise. """ return self.component_manager.additional_debug
[docs] @additionalDebug.write # type: ignore[no-redef] def additionalDebug(self: MccsStation, argin: bool) -> None: """ Set whether additional debug information is enabled. :param argin: True to enable additional debug information, False to disable. """ self.component_manager.additional_debug = argin
[docs] @attribute(dtype=("DevFloat",), max_dim_x=512) def staticDelays(self: MccsStation) -> list[float]: """ Report the static delays in use for this station. :return: list of static delays. """ if self.component_manager._sps_station_proxy is None: return [] return self.component_manager._sps_station_proxy.static_delays()
[docs] @attribute(dtype=str) def daqPath(self: MccsStation) -> str: """ Report the DAQ path in use for this station. :return: DAQ path. """ if self.component_manager._sps_station_proxy is None: return "No SPS Station associated with this station" return self.component_manager._sps_station_proxy.daq_path()
# TODO ADR-111: add scanId attribute (int vector, max_dim_x = 17) # to read self.component_manager._scan_id # -------- # Commands # --------
[docs] @stb.long_running_commands.long_running_command(dtype_in="DevVarDoubleArray") def LoadPointingDelays( self: MccsStation, delays: np.ndarray, ) -> stb.type_hints.TaskFunctionType: """ Set the pointing delay parameters of this Station's Tiles. :param delays: an array containing a beam index followed by antenna delays in antenna EEP order. 1 + 256*2 = 513 elements. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> dp.command_inout("LoadPointingDelays", delay_list) """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.load_pointing_delays( delays=delays, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @stb.long_running_commands.long_running_command(dtype_in="DevString") def ApplyPointingDelays( self: MccsStation, load_time: str ) -> stb.type_hints.TaskFunctionType: """ Set the pointing delay parameters of this Station's Tiles. :param load_time: switch time, in ISO formatted time. Default: now :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> time_string = switch time as ISO formatted time >>> dp.command_inout("ApplyPointingDelays", time_string) """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.apply_pointing_delays( load_time=load_time, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @command( dtype_in=("DevLong",), dtype_out="DevVarLongStringArray", ) def ConfigureChannels( self: MccsStation, argin: list[int] ) -> DevVarLongStringArrayType: """ Set the beamformer table entries for a station beam. Entries are defined as a flattened 2D array, for a maximum of 48 entries Each entry is 7 channels long and corresponds to 8 consecutive frequency channels. .. note:: A call to this method will delete any previous channels allocated to this beam before reconfiguration. :param argin: list of channel block description. Elements are: * channel block index: value in range 0:47 for the channel block to set * start_channel - (int) region starting channel, even in range 0 to 510 * beam_index - (int) beam used for this region with range 0 to 47 * subarray_id - (int) Subarray: 0 is reserved for unallocated entry * subarray_logical_channel - (int) logical channel # in the subarray * subarray_beam_id - (int) ID of the subarray beam * substation_id - (int) Substation * aperture_id: ID of the aperture (station*100+substation?) :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> dp.command_inout("ConfigureChannels", block_table) """ return_code = self.component_manager.configure_channels(argin) message = "ConfigureChannels completed" # handler = self.get_command_object("ConfigureChannels") # (return_code, message) = handler(argin) return ([return_code], [message])
[docs] @command( dtype_in="DevLong", dtype_out="DevVarLongStringArray", ) def DeallocateSubarray( self: MccsStation, subarray_id: int ) -> DevVarLongStringArrayType: """ Deallocates entries relative to a subarray in aggregate tables. :param subarray_id: the ID of the subarray to deallocate :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ return_code = self.component_manager.deallocate_subarray(subarray_id) message = "DeallocateSubarray completed" # handler = self.get_command_object("DeallocateSubarray") # (return_code, message) = handler(subarray_id) return ([return_code], [message])
[docs] @stb.long_running_commands.long_running_command(dtype_in="DevString") @stb.validators.validate_json_args def LoadCalibrationCoefficients( self: MccsStation, calibration_id: Optional[str] ) -> stb.type_hints.TaskFunctionType: """ Load Calibration solutions to the station, but does not apply them. NOTE for each 384 channels this will select and load a solution specified by the selection rules https://developer.skao.int/projects/ska-low-mccs /en/0.22.0/api/calibration_store/selection_policy/index.html TODO This method will load calibration solution for each antenna in the station. This will loop round all antenna and load a inverse jones for each 384 channels. A unity calibration is loaded is no solution is found, this sounds dangerous. :param calibration_id: Unique identifier for calibration. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> dp.command_inout("LoadCalibrationCoefficients") """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.load_calibration_coefficients( calibration_id=calibration_id, subarray_id=None, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @stb.long_running_commands.long_running_command( dtype_in="DevString", dtype_out="DevVarLongStringArray" ) def ApplyCalibration( self: MccsStation, load_time: str ) -> stb.type_hints.TaskFunctionType: """ Load the calibration coefficients at the specified time delay. :param load_time: load time, in ISO formatted time. If not set, defaults to `calibrationLoadDelay` seconds in the future. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> dp.command_inout("ApplyCalibration", "") """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.apply_calibration( load_time=load_time, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @stb.long_running_commands.submit_lrc_task def execute_ApplyConfiguration( self: MccsStation, **kwargs: Any, ) -> stb.type_hints.TaskFunctionType: """ Apply the aggregated channel table to this Station's SpsStation. :param kwargs: containing optional transaction_id, calibration_id, subarray_id. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.apply_configuration( **kwargs, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") @stb.validators.validate_json_args def ApplyConfiguration( self: MccsStation, **kwargs: Any ) -> DevVarLongStringArrayType: """ Apply the aggregated channel table to this Station's SpsStation. :param kwargs: containing optional transaction_id, calibration_id, subarray_id. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> dp.command_inout("ApplyConfiguration") """ if self.component_manager._station_calibrator_power_state != PowerState.ON: station_calibrator_not_ready_message = ( "Service StationCalibrator is not avaliable or ON" "Unable to apply configuration." ) self.logger.error(station_calibrator_not_ready_message) return [ResultCode.REJECTED], [station_calibrator_not_ready_message] return self.execute_ApplyConfiguration(**kwargs)
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") @stb.validators.validate_json_args def TrackObject( self: MccsStation, pointing_type: str, values: dict | str, reference_time: str, **kwargs: Any, ) -> DevVarLongStringArrayType: """ Track an object through the sky. :param pointing_type: the type of pointing requested :param values: Coordinates for object to be tracked :param reference_time: time in which coordinates are equal, in ISO8601 formatted astropy.Time time :param kwargs: optional kwargs inclusing scan_time, station_beam_number, time_step :return: A tuple containing a return code and a string message. ResultCode.OK if tracking was successfully queued in the pointing manager, ResultCode.REJECTED otherwise. """ (result, mssg) = self.component_manager.verify_antennas_mapping() if result == TaskStatus.REJECTED: return [ResultCode.REJECTED], [mssg] if self.component_manager.verify_values(pointing_type, values): return [ResultCode.REJECTED], ["pointing values are not valid."] result_code = self.component_manager.track_object( pointing_type=pointing_type, values=values, reference_time=reference_time, **kwargs, ) if result_code == ResultCode.OK: return [ResultCode.OK], ["Tracking started"] return [result_code], ["Failed to queue tracking request"]
[docs] @command( dtype_in="DevLong", dtype_out="DevBoolean", ) def BeamformerRunningForSubarray(self: MccsStation, subarray_id: int) -> bool: """ Get Beamformer Running status for a specific subarray. :param subarray_id: subarray_id, in the range 1 to 16 :return: True if the beamformer is running """ return self.component_manager.beamformer_running_for_subarray( subarray_id=subarray_id )
[docs] @command( dtype_in="DevString", dtype_out="DevVarDoubleArray", ) @stb.validators.validate_json_args def GetPointingDelays( self: MccsStation, pointing_type: str, values: dict | str, **kwargs: Any, ) -> np.ndarray: """ Get pointing coefficients. :param pointing_type: The type of pointing requested. :param values: The pointing values, either in alt_az or ra_dec. :param kwargs: Optional arguments, including: - interface: The schema version this is running against. - reference_time: Time in which coordinates are equal, in ISO8601 formatted astropy.Time. - time_step: Duration between each time step in seconds. :return: The pointing delays as pairs of (delay, delay rate) in EEP order. """ return self.component_manager.get_pointing_delays( pointing_type=pointing_type, values=values, **kwargs )
[docs] @command( dtype_in="DevString", dtype_out="DevVoid", ) @stb.validators.validate_json_args def LoadBeamWeights( self: MccsStation, subarray_beam_id: int, antenna_weights: list[float], ) -> None: """ Load beam weights for a specific subarray beam. :param subarray_beam_id: The ID of the subarray beam for which to load weights. :param antenna_weights: A list of weights for each antenna in the subarray beam. """ self.component_manager.load_beam_weights( subarray_beam_id=subarray_beam_id, antenna_weights=interleaved_float_list_to_complex_list(antenna_weights), )
[docs] @stb.long_running_commands.long_running_command(dtype_in="DevString") @stb.validators.validate_json_args def ConfigureSemiStatic( self: MccsStation, antenna_config: dict[str, Any], station_config: Optional[dict[str, Any]] = None, field_station_config: Optional[dict[str, Any]] = None, ) -> stb.type_hints.TaskFunctionType: """ Configure semi static information like position of antennas. :param station_config: Configuration specification for the station device. :param field_station_config: Configuration specification for the field station device. :param antenna_config: Configuration specification for the antenna deviced. :return: The task to be submitted as an LRC. :example: >>> dp = tango.DeviceProxy("low-mccs/station/ci-1") >>> config = json.dumps({ "station": { "StationId": 1, "ref_latitude": 1.0, "ref_longitude": 1.0, "ref_height": 1.0, } }) >>> dp.command_inout("ConfigureSemiStatic", config) """ # TODO: this method is not maintained and i am unsure why it is here # Create ticket to do something about it. def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.configure_semi_static( antenna_config=antenna_config, station_config=station_config, field_station_config=field_station_config, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @stb.long_running_commands.long_running_command(dtype_in="DevString") @stb.validators.validate_json_args def Scan( self: MccsStation, subarray_id: int, scan_id: int, start_time: Optional[str] = None, duration: float = 0.0, **kwargs: Any, ) -> stb.type_hints.TaskFunctionType: """ Start the scan associated with the station_beam. :param subarray_id: The subarray for which the command applies :param scan_id: The ID for this scan :param start_time: UTC time for begin of scan, None for immediate start :param duration: Scan duration in seconds. 0.0 or omitted means forever :param kwargs: kwargs :return: The task to be submitted as an LRC. """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.scan( subarray_id=subarray_id, scan_id=scan_id, start_time=start_time, duration=duration, **kwargs, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @stb.long_running_commands.submit_lrc_task def execute_EndScan( self: MccsStation, subarray_id: int, ) -> stb.type_hints.TaskFunctionType: """ Execute the EndScan command as a long-running task. :param subarray_id: the subarray for which the command applies :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.end_scan( subarray_id=subarray_id, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @command(dtype_in="DevLong", dtype_out="DevVarLongStringArray") def EndScan( self: MccsStation, subarray_id: int, ) -> DevVarLongStringArrayType: """ Stop the current scan associated with the station_beam. :param subarray_id: the subarray for which the command applies :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ if subarray_id < 1 or subarray_id > 16: self.logger.error("Invalid subarray ID %s", subarray_id) return ([ResultCode.REJECTED], ["Invalid subscan ID"]) return self.execute_EndScan(subarray_id)
[docs] @stb.long_running_commands.long_running_command(dtype_in="DevString") def ConfigureStationForCalibration( self: MccsStation, argin: str, ) -> stb.type_hints.TaskFunctionType: """ Configure the station for calibration. :param argin: JSON-ified argument containing DAQ configuration overrides. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.configure_station_for_calibration( argin=argin, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @stb.long_running_commands.long_running_command(dtype_in="DevLong") def StopTracking( self: MccsStation, station_beam_id: int, ) -> stb.type_hints.TaskFunctionType: """ Stop the current tracking thread. :param station_beam_id: the beam id whose tracking you wish to stop. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.stop_tracking( station_beam_id=station_beam_id, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @stb.long_running_commands.long_running_command def StopTrackingAll( self: MccsStation, ) -> stb.type_hints.TaskFunctionType: """ Stop all tracking threads. :return: The task to be submitted as a LRC. """ return self.component_manager.stop_tracking_all
[docs] @stb.long_running_commands.long_running_command def Initialise( self: MccsStation, ) -> stb.type_hints.TaskFunctionType: """ Initialise this station's tiles. :return: The task to be submitted as an LRC. """ return self.component_manager.initialise
[docs] @stb.long_running_commands.submit_lrc_task def execute_AcquireDataForCalibration( self: MccsStation, first_channel: int, last_channel: int, daq_mode: str, ) -> stb.type_hints.TaskFunctionType: """ Execute the AcquireDataForCalibration command as a long-running task. :param first_channel: The first channel to acquire data for :param last_channel: The last channel to acquire data for :param daq_mode: Which correlator to start, default TCC. :return: The task to be submitted as a LRC. """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.acquire_data_for_calibration( task_callback=task_callback, task_abort_event=task_abort_event, first_channel=first_channel, last_channel=last_channel, daq_mode=daq_mode, ) return task
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") @stb.validators.validate_json_args def AcquireDataForCalibration( self: MccsStation, first_channel: int, last_channel: int, daq_mode: str = "TCC", ) -> DevVarLongStringArrayType: """ Instruct the SpsStation to start acquiring calibration data from the tiles. :param first_channel: The first channel to acquire data for :param last_channel: The last channel to acquire data for :param daq_mode: Which correlator to start, default TCC. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ if self.component_manager._sps_station_proxy is None: return [ResultCode.REJECTED], ["No SpsStation proxy to collect data."] return self.execute_AcquireDataForCalibration( first_channel=first_channel, last_channel=last_channel, daq_mode=daq_mode )
[docs] @stb.long_running_commands.long_running_command(dtype_in="DevString") @stb.validators.validate_json_args def StartAcquisition( self: MccsStation, **kwargs: Any ) -> stb.type_hints.TaskFunctionType: """ Start data acquisition. :param kwargs: optional keywords: * start_time - (ISO UTC time) start time * delay - (int) delay start if StartTime is not specified, default 2s :return: The task to be submitted as a LRC. """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.start_acquisition( **kwargs, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") def SetCspIngest(self: MccsStation, argin: str) -> tuple[ResultCode, str]: """ Configure link for beam data packets to CSP. :param argin: json dictionary with optional keywords: * destination_ip - (string) Destination IP * source_port - (int) Source port for integrated data streams * destination_port - (int) Destination port for integrated data streams :return: result code name of SetCspIngest """ return self.component_manager.set_csp_ingest(argin)
[docs] @command(dtype_out="DevVarLongStringArray") def ResetCspIngest(self: MccsStation) -> DevVarLongStringArrayType: """ Reset link for beam data packets to CSP to defaults. :return: result code name of ResetCspIngest """ return self.component_manager.reset_csp_ingest()
[docs] @stb.long_running_commands.submit_lrc_task def execute_TriggerAdcEqualisation( self: MccsStation, target_adc: float, **kwargs: Any ) -> stb.type_hints.TaskFunctionType: """ Execute the TriggerAdcEqualisation command as a long-running task. :param target_adc: target value for ADC gain. :param kwargs: optional parameters include bias. :return: The task to be submitted as a LRC. """ def task( task_callback: stb.type_hints.TaskCallbackType, task_abort_event: threading.Event, ) -> None: self.component_manager.trigger_adc_equalisation( target_adc=target_adc, **kwargs, task_callback=task_callback, task_abort_event=task_abort_event, ) return task
[docs] @command(dtype_in="DevString", dtype_out="DevVarLongStringArray") @stb.validators.validate_json_args def TriggerAdcEqualisation( self: MccsStation, target_adc: float, **kwargs: Any, ) -> DevVarLongStringArrayType: """ Get the equalised ADC values. Getting the equalised values takes up to 20 seconds (to get an average to avoid spikes). So we trigger the collection and publish to dbmPowers :param target_adc: the expected average power received by antennas in ADU units. Has an input minimum of 0, but in code its limited to 4.2e-7 (corresponds to the maxiumum output of 31.75 dB). There is no maximum value, however, 40 ADUs will result in 0 dB with no bias and 1600 ADUs will result in 0 dB with the maxiumum bias allowed of 32dB. :param kwargs: optional parameters include bias. user specifed bias in dB added to the antenna preadu levels. Bias input value rounded as part of value sanitation and as a result it increases in steps of 0.25. Ranges from -32 to 32 with default 0. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :example: >>> dp = tango.DeviceProxy("mccs/station/001") >>> json_arg = json.dumps({"target_adc" : "18", bias: "1"}) >>> dp.command_inout("TriggerAdcEqualisation", json_arg) """ if self.component_manager._sps_station_proxy is None: mssg = "There is no SpsStation proxy to call." self.logger.error(mssg) return [ResultCode.REJECTED], [mssg] return self.execute_TriggerAdcEqualisation( target_adc, **kwargs, )
# ---------- # Run server # ----------
[docs] def main(*args: str, **kwargs: str) -> int: # pragma: no cover """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code """ return MccsStation.run_server(args=args or None, **kwargs)
if __name__ == "__main__": main()