# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements an antenna Tango device for MCCS."""
from __future__ import annotations
import importlib # allow forward references in type hints
import json
import sys
from typing import Any, Callable, Final, Optional
import tango
from jsonschema import ValidationError, validate
from ska_control_model import (
CommunicationStatus,
HealthState,
PowerState,
ResultCode,
SimulationMode,
)
from ska_tango_base.base import SKABaseDevice
from ska_tango_base.commands import (
DeviceInitCommand,
JsonValidator,
SubmittedSlowCommand,
)
from tango.server import attribute, command, device_property
from ska_low_mccs.antenna.antenna_component_manager import AntennaComponentManager
from ska_low_mccs.antenna.antenna_health_model import AntennaHealthModel
__all__ = ["MccsAntenna", "main"]
DevVarLongStringArrayType = tuple[list[ResultCode], list[Optional[str]]]
# pylint: disable=too-many-public-methods, too-many-instance-attributes
[docs]class MccsAntenna(SKABaseDevice):
"""An implementation of an antenna Tango device for MCCS."""
# -----------------
# Device Properties
# -----------------
FieldStationName = device_property(dtype=str, mandatory=True)
AntennaId = device_property(dtype=int, mandatory=True)
TileName = device_property(dtype=str, mandatory=True)
TileYChannel = device_property(dtype=int, mandatory=True)
TileXChannel = device_property(dtype=int, mandatory=True)
Xdisplacement = device_property(dtype=float, default_value=0.0)
Ydisplacement = device_property(dtype=float, default_value=0.0)
Zdisplacement = device_property(dtype=float, default_value=0.0)
# ---------------
# Initialisation
# ---------------
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Initialise this device object.
:param args: positional args to the init
:param kwargs: keyword args to the init
"""
# We aren't supposed to define initialisation methods for Tango
# devices; we are only supposed to define an `init_device` method. But
# we insist on doing so here, just so that we can define some
# attributes, thereby stopping the linters from complaining about
# "attribute-defined-outside-init" etc. We still need to make sure that
# `init_device` re-initialises any values defined in here.
super().__init__(*args, **kwargs)
self._health_state: HealthState = HealthState.UNKNOWN
self._health_model: AntennaHealthModel
self.component_manager: AntennaComponentManager
self._antennaId: int
[docs] def init_device(self: MccsAntenna) -> None:
"""
Initialise the device.
This is overridden here to change the Tango serialisation model.
"""
util = tango.Util.instance()
util.set_serial_model(tango.SerialModel.NO_SYNC)
super().init_device()
self._build_state = sys.modules["ska_low_mccs"].__version_info__
self._version_id = sys.modules["ska_low_mccs"].__version__
device_name = f'{str(self.__class__).rsplit(".", maxsplit=1)[-1][0:-2]}'
version = f"{device_name} Software Version: {self._version_id}"
properties = (
f"Initialised {device_name} device with properties:\n"
f"\tAntennaId: {self.AntennaId}\n"
f"\tFieldStationName: {self.FieldStationName}\n"
f"\tTileName: {self.TileName}\n"
f"\tTileYChannel: {self.TileYChannel}\n"
f"\tTileXChannel: {self.TileYChannel}\n"
f"\txDisplacement: {self.Xdisplacement}\n"
f"\tyDisplacement: {self.Ydisplacement}\n"
f"\tzDisplacement: {self.Zdisplacement}\n"
)
self.logger.info(
"\n%s\n%s\n%s", str(self.GetVersionInfo()), version, properties
)
def _init_state_model(self: MccsAntenna) -> None:
super()._init_state_model()
self._health_state = HealthState.UNKNOWN # InitCommand.do() does this too late.
self._health_model = AntennaHealthModel(self._component_state_callback)
self.set_change_event("healthState", True, False)
[docs] def create_component_manager(
self: MccsAntenna,
) -> AntennaComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
return AntennaComponentManager(
self.FieldStationName,
self.AntennaId,
self.TileName,
(self.TileYChannel, self.TileXChannel),
self.logger,
self._communication_state_callback,
self._component_state_callback,
)
_schema_configure: Final = json.loads(
importlib.resources.read_text(
"ska_low_mccs.schemas.antenna",
"MccsAntenna_Configure_3_0.json",
)
)
[docs] def init_command_objects(self: MccsAntenna) -> None:
"""Initialise the command handlers for commands supported by this device."""
super().init_command_objects()
for command_name, method_name, schema in [
("Configure", "configure", self._schema_configure),
]:
validator = (
None
if schema is None
else JsonValidator(
command_name,
schema,
logger=self.logger,
)
)
self.register_command_object(
command_name,
SubmittedSlowCommand(
command_name,
self._command_tracker,
self.component_manager,
method_name,
callback=None,
logger=None,
validator=validator,
),
)
[docs] class InitCommand(DeviceInitCommand):
"""Class that implements device initialisation for the MCCS antenna device."""
[docs] def do(
self: MccsAntenna.InitCommand,
*args: Any,
**kwargs: Any,
) -> tuple[ResultCode, str]:
"""
Stateless hook for device initialisation.
Initialises the attributes and properties of the
:py:class:`.MccsAntenna`.
:param args: positional args to the component manager method
:param kwargs: keyword args to the component manager method
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
"""
self._device._antennaId = 0
self._device._gain = 0.0
self._device._rms = 0.0
self._device._xPolarisationFaulty = False
self._device._yPolarisationFaulty = False
self._device._xDisplacement = 0.0
self._device._yDisplacement = 0.0
self._device._zDisplacement = 0.0
self._device._timestampOfLastSpectrum = ""
self._device._logicalAntennaId = 0
self._device._xPolarisationScalingFactor = [0]
self._device._yPolarisationScalingFactor = [0]
self._device._calibrationCoefficient = [0.0]
self._device._pointingCoefficient = [0.0]
self._device._spectrumX = [0.0]
self._device._spectrumY = [0.0]
self._device._position = [0.0]
self._device._delays = [0.0]
self._device._delayRates = [0.0]
self._device._bandpassCoefficient = [0.0]
self._device._first = True
self._device._altitude = 0.0
self._device._fieldNodeLatitude = 0.0
self._device._fieldNodeLongitude = 0.0
event_names = [
"voltage",
"temperature",
"xPolarisationFaulty",
"yPolarisationFaulty",
]
for name in event_names:
self._device.set_change_event(name, True, True)
self._device.set_archive_event(name, True, True)
return (ResultCode.OK, "Init command completed OK")
# --------------
# Callback hooks
# --------------
def _communication_state_callback(
self: MccsAntenna,
communication_state: CommunicationStatus,
) -> None:
"""
Handle change in communications status between component manager and component.
This is a callback hook, called by the component manager when
the communications status changes. It is implemented here to
drive the op_state.
:param communication_state: the status of communications between
the component manager and its component.
"""
action_map = {
CommunicationStatus.DISABLED: "component_disconnected",
CommunicationStatus.NOT_ESTABLISHED: "component_unknown",
CommunicationStatus.ESTABLISHED: None, # wait for a power mode update
}
action = action_map[communication_state]
if action is not None:
self.op_state_model.perform_action(action)
self._health_model.is_communicating(
communication_state == CommunicationStatus.ESTABLISHED
)
def _component_state_callback( # pylint: disable=too-many-arguments
self: MccsAntenna,
fault: Optional[bool] = None,
power: Optional[PowerState] = None,
health: Optional[HealthState] = None,
trl: Optional[str] = None,
configuration_changed: Optional[dict[str, Any]] = None,
adc_rms: Optional[tuple[float, float]] = None,
) -> None:
"""
Handle change in the state of the component.
This is a callback hook, called by the component manager when
the state of the component changes.
:param fault: An optional flag if the device is entering or
exiting a fault state.
:param power: An optional parameter with the new power state of
the device.
:param health: An optional parameter with the new health state
of the device.
:param trl: TRL of the device whose state has changed.
None if the device is an antenna.
:param configuration_changed: An optional parameter with the new
configuration of the device.
:param adc_rms: An optional parameter with an adc reading from a Tile.
:raises ValueError: if the TRL is unknown
"""
# if trl is None:
health_state_changed_callback: (
Callable[[HealthState], None] | Callable[[HealthState | None], None]
) = self._health_changed
power_state_changed_callback = self._component_power_state_changed
if trl is not None:
device_family = trl.split("/")[1]
if device_family == "fieldstation":
health_state_changed_callback = (
self._health_model.fieldstation_health_changed
)
power_state_changed_callback = (
self.component_manager._field_station_power_state_changed
)
elif device_family == "tile":
if adc_rms is not None:
self._health_model.tile_adc_rms_changed(adc_rms)
health_state_changed_callback = self._health_model.tile_health_changed
# power_state_changed_callback = functools.partial(
# self.component_manager._tile_power_state_changed, trl
# )
else:
raise ValueError(
f"unknown trl '{trl}', should be None or belong to "
"tile or fieldstation"
)
if fault is not None:
if fault:
self.op_state_model.perform_action("component_fault")
else:
if self.component_manager.power_state:
power_state_changed_callback(self.component_manager.power_state)
self._health_model.component_fault(fault)
if health is not None:
health_state_changed_callback(health)
if power is not None:
with self.component_manager.power_state_lock:
power_state_changed_callback(power)
if configuration_changed is not None:
assert isinstance(configuration_changed, dict)
self._configure_antenna(configuration_changed)
def _component_power_state_changed(
self: MccsAntenna,
power_state: PowerState,
) -> None:
"""
Handle change in the power mode of the component.
This is a callback hook, called by the component manager when
the power mode of the component changes. It is implemented here
to drive the op_state.
:param power_state: the power mode of the component.
"""
action_map = {
PowerState.OFF: "component_off",
PowerState.STANDBY: "component_standby",
PowerState.ON: "component_on",
PowerState.UNKNOWN: "component_unknown",
}
self.op_state_model.perform_action(action_map[power_state])
def _health_changed(self: MccsAntenna, health: HealthState) -> None:
"""
Handle change in this device's health state.
This is a callback hook, called whenever the HealthModel's
evaluated health state changes. It is responsible for updating
the tango side of things i.e. making sure the attribute is up to
date, and events are pushed.
:param health: the new health value
"""
if self._health_state == health:
return
self._health_state = health
self.push_change_event("healthState", health)
def _configure_antenna(self: MccsAntenna, config: dict) -> None:
"""
Configure the antenna attributes.
:param config: the configuration settings for this antenna.
"""
antenna_config_schema = {
"type": "object",
"properties": {
"antennaId": {"type": "number"},
"xDisplacement": {"type": "number"},
"yDisplacement": {"type": "number"},
"zDisplacement": {"type": "number"},
},
}
try:
validate(instance=config, schema=antenna_config_schema)
self._antennaId = config.get("antennaId", self._antennaId)
self.Xdisplacement = config.get("xDisplacement", self.Xdisplacement)
self.Ydisplacement = config.get("yDisplacement", self.Ydisplacement)
self.Zdisplacement = config.get("zDisplacement", self.Zdisplacement)
except ValidationError as error:
self.logger.error(
"Failed to configure the device due to invalid schema: +%s",
str(error),
)
# ----------
# Attributes
# ----------
@attribute(
dtype=SimulationMode,
memorized=True,
hw_memorized=True,
)
def simulationMode(self):
"""
Return the simulation mode of this device.
This overrides the base class as the antenna device cannot be
put into simulation mode TRUE
:return: the simulation mode of this device
"""
return SimulationMode.FALSE
# pylint: disable=arguments-differ
[docs] @simulationMode.write # type: ignore [no-redef]
def simulationMode(self: MccsAntenna, value: SimulationMode) -> None:
"""
Set the simulation mode of this device.
:param value: the new simulation mode
:raises ValueError: because this device cannot be put into simulation mode.
"""
if value == SimulationMode.TRUE:
raise ValueError("MccsAntenna cannot be put into simulation mode.")
[docs] @attribute(dtype="int", label="AntennaID")
def antennaId(self: MccsAntenna) -> int:
"""
Return the antenna ID attribute.
:return: antenna ID
"""
return self.component_manager._antenna_id
[docs] @attribute(dtype="float", label="gain")
def gain(self: MccsAntenna) -> float:
"""
Return the gain attribute.
:return: the gain
"""
return self._gain
[docs] @attribute(dtype="float", label="rms")
def rms(self: MccsAntenna) -> float:
"""
Return the measured RMS of the antenna.
:return: the measured rms
"""
return self._rms
[docs] @attribute(
dtype="float",
label="voltage",
unit="volts",
abs_change=0.05,
min_value=2.5,
max_value=5.5,
min_alarm=2.75,
max_alarm=5.45,
)
def voltage(self: MccsAntenna) -> float:
"""
Return the voltage attribute.
:return: the voltage
"""
return self.component_manager.voltage
[docs] @attribute(dtype="float", label="current", unit="amperes")
def current(self: MccsAntenna) -> float:
"""
Return the current attribute.
:return: the current
"""
return self.component_manager.current
[docs] @attribute(dtype="float", label="temperature", unit="DegC")
def temperature(self: MccsAntenna) -> float:
"""
Return the temperature attribute.
:return: the temperature
"""
return self.component_manager.temperature
[docs] @attribute(dtype="bool", label="xPolarisationFaulty")
def xPolarisationFaulty(self: MccsAntenna) -> bool:
"""
Return the xPolarisationFaulty attribute.
:return: the x-polarisation faulty flag
"""
return self._xPolarisationFaulty
[docs] @attribute(dtype="bool", label="yPolarisationFaulty")
def yPolarisationFaulty(self: MccsAntenna) -> bool:
"""
Return the yPolarisationFaulty attribute.
:return: the y-polarisation faulty flag
"""
return self._yPolarisationFaulty
[docs] @attribute(dtype="float", label="fieldNodeLongitude")
def fieldNodeLongitude(self: MccsAntenna) -> float:
"""
Return the fieldNodeLongitude attribute.
:return: the Longitude of field node centre
"""
return self._fieldNodeLongitude
[docs] @attribute(dtype="float", label="fieldNodeLatitude")
def fieldNodeLatitude(self: MccsAntenna) -> float:
"""
Return the fieldNodeLatitude attribute.
:return: the Latitude of field node centre
"""
return self._fieldNodeLatitude
[docs] @attribute(dtype="float", label="altitude", unit="meters")
def altitude(self: MccsAntenna) -> float:
"""
Return the altitude attribute.
:return: the altitude of the antenna
"""
return self._altitude
[docs] @attribute(dtype="float", label="xDisplacement", unit="meters")
def xDisplacement(self: MccsAntenna) -> float:
"""
Return the horizontal displacement east attribute.
:return: the horizontal displacement eastwards from station
reference position
"""
return self.Xdisplacement
[docs] @attribute(dtype="float", label="yDisplacement", unit="meters")
def yDisplacement(self: MccsAntenna) -> float:
"""
Return the horizontal displacement north attribute.
:return: the horizontal displacement northwards from station
reference position
"""
return self.Ydisplacement
[docs] @attribute(dtype="float", label="zDisplacement", unit="meters")
def zDisplacement(self: MccsAntenna) -> float:
"""
Return the vertical displacement attribute.
:return: the vertical displacement upwards from station
reference position
"""
return self.Zdisplacement
[docs] @attribute(dtype="str", label="timestampOfLastSpectrum")
def timestampOfLastSpectrum(self: MccsAntenna) -> str:
"""
Return the timestampOfLastSpectrum attribute.
:return: the timestamp of the last spectrum
"""
return self._timestampOfLastSpectrum
[docs] @attribute(dtype="int", label="logicalAntennaId")
def logicalAntennaId(self: MccsAntenna) -> int:
"""
Return the logical antenna ID attribute.
:return: the logical antenna ID
"""
return self._logicalAntennaId
[docs] @attribute(dtype=("int",), max_dim_x=100, label="xPolarisationScalingFactor")
def xPolarisationScalingFactor(self: MccsAntenna) -> list[int]:
"""
Return the logical antenna ID attribute.
:return: the x polarisation scaling factor
"""
return self._xPolarisationScalingFactor
[docs] @attribute(dtype=("int",), max_dim_x=100, label="yPolarisationScalingFactor")
def yPolarisationScalingFactor(self: MccsAntenna) -> list[int]:
"""
Return the yPolarisationScalingFactor attribute.
:return: the y polarisation scaling factor
"""
return self._yPolarisationScalingFactor
[docs] @attribute(dtype=("float",), max_dim_x=100, label="calibrationCoefficient")
def calibrationCoefficient(self: MccsAntenna) -> list[float]:
"""
Get the Calibration coefficients.
The coefficients to be applied for the next frequency channel in
the calibration cycle.
:return: the calibration coefficients
"""
return self._calibrationCoefficient
[docs] @attribute(dtype=("float",), max_dim_x=100)
def pointingCoefficient(self: MccsAntenna) -> list[float]:
"""
Return the pointingCoefficient attribute.
:return: the pointing coefficients
"""
return self._pointingCoefficient
[docs] @attribute(dtype=("float",), max_dim_x=100, label="spectrumX")
def spectrumX(self: MccsAntenna) -> list[float]:
"""
Return the spectrumX attribute.
:return: x spectrum
"""
return self._spectrumX
[docs] @attribute(dtype=("float",), max_dim_x=100, label="spectrumY")
def spectrumY(self: MccsAntenna) -> list[float]:
"""
Return the spectrumY attribute.
:return: y spectrum
"""
return self._spectrumY
[docs] @attribute(dtype=("float",), max_dim_x=100, label="position")
def position(self: MccsAntenna) -> list[float]:
"""
Return the position attribute.
:return: positions
"""
return self._position
[docs] @attribute(dtype=("float",), max_dim_x=100, label="delays")
def delays(self: MccsAntenna) -> list[float]:
"""
Return the delays attribute.
:return: delay for each beam
"""
return self._delays
[docs] @attribute(dtype=("float",), max_dim_x=100, label="delayRates")
def delayRates(self: MccsAntenna) -> list[float]:
"""
Return the delayRates attribute.
:return: delay rate for each beam
"""
return self._delayRates
[docs] @attribute(dtype=("float",), max_dim_x=100, label="bandpassCoefficient")
def bandpassCoefficient(self: MccsAntenna) -> list[float]:
"""
Return the bandpassCoefficient attribute.
:return: bandpass coefficients
"""
return self._bandpassCoefficient
[docs] @attribute(dtype="bool", label="first")
def first(self: MccsAntenna) -> bool:
"""
Return the first attribute.
:return: the first faulty flag
"""
return self._first
@attribute(
dtype="DevString",
format="%s",
)
def healthModelParams(self: MccsAntenna) -> str:
"""
Get the health params from the health model.
:return: the health params
"""
return json.dumps(self._health_model.health_params)
[docs] @healthModelParams.write # type: ignore[no-redef]
def healthModelParams(self: MccsAntenna, argin: str) -> None:
"""
Set the params for health transition rules.
:param argin: JSON-string of dictionary of health states
"""
self._health_model.health_params = json.loads(argin)
# --------
# Commands
# --------
[docs] def is_On_allowed(self: MccsAntenna) -> bool:
"""
Check if command `On` is allowed in the current device state.
:return: ``True`` if the command is allowed
"""
return self.get_state() in [
tango.DevState.OFF,
tango.DevState.STANDBY,
tango.DevState.ON,
tango.DevState.UNKNOWN,
tango.DevState.FAULT,
]
# ----------
# Run server
# ----------
[docs]def main(*args: str, **kwargs: str) -> int: # pragma: no cover
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
"""
return MccsAntenna.run_server(args=args or None, **kwargs)
if __name__ == "__main__":
main()