# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements an component manager for an MCCS antenna Tango device."""
from __future__ import annotations
import functools
import json
import logging
import threading
from typing import Callable, Optional
import tango
from ska_control_model import CommunicationStatus, PowerState, ResultCode, TaskStatus
from ska_low_mccs_common.component import DeviceComponentManager
from ska_tango_base.base import check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager
__all__ = ["AntennaComponentManager"]
NUMBER_OF_ANTENNA_IN_STATION = 256
class _FieldStationProxy(DeviceComponentManager):
"""A proxy to the FieldStation this antenna is a part off."""
def __init__(
self: _FieldStationProxy,
name: str,
antenna_id: int,
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
) -> None:
"""
Initialise a proxy to the FieldStation.
:param name: the name of the FieldStation
:param antenna_id: the logical id of this antenna.
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be called when
the status of the communications channel between the
component manager and its component changes
:param component_state_callback: callback to be called when the
component state changes
:raises AssertionError: if parameters are out of bounds
"""
self._power_state_lock = threading.RLock()
assert (
0 < antenna_id < NUMBER_OF_ANTENNA_IN_STATION + 1
), "The antenna id must be in the range 1-256"
self._antenna_id = antenna_id
self._antenna_change_registered = False
self._supplied_power_state: Optional[PowerState] = None
super().__init__(
name,
logger,
communication_state_callback,
component_state_callback, # type: ignore[arg-type]
)
def stop_communicating(self: _FieldStationProxy) -> None:
"""Cease communicating with the FieldStation device."""
super().stop_communicating()
self._antenna_change_registered = False
def reset(
self: _FieldStationProxy, task_callback: Optional[Callable] = None
) -> tuple[TaskStatus, str]:
"""
Reset the antenna; this is not implemented.
This raises NotImplementedError because the antenna is passive
hardware and cannot meaningfully be reset.
:param task_callback: Update task state, defaults to None
:raises NotImplementedError: because the antenna's power state is
not controlled via the Tile device; it is controlled via the
FieldStation device.
"""
raise NotImplementedError("Antenna cannot be reset.")
@check_communicating
def power_on(self: _FieldStationProxy) -> ResultCode | None:
"""
Tell the FieldStation to power on the port this antenna is attached to.
:return: a result code.
"""
if self._supplied_power_state == PowerState.ON:
return None
return self._power_up_antenna()
def _power_up_antenna(self: _FieldStationProxy) -> ResultCode:
assert self._proxy is not None # for the type checker
([result_code], _) = self._proxy.PowerOnAntenna(self._antenna_id)
return result_code
@check_communicating
def power_off(self: _FieldStationProxy) -> ResultCode | None:
"""
Tell the FieldStation to power off the port this antenna is attached to.
:return: a result code.
"""
if self._supplied_power_state == PowerState.OFF:
return None
return self._power_down_antenna()
def _power_down_antenna(self: _FieldStationProxy) -> ResultCode:
assert self._proxy is not None # for the type checker
([result_code], _) = self._proxy.PowerOffAntenna(self._antenna_id)
return result_code
@property # type: ignore[misc]
@check_communicating
@check_on
def current(self: _FieldStationProxy) -> float:
"""
Return the antenna's current as reported by the FieldStation.
:return: the current of this antenna
"""
assert self._proxy is not None # for the type checker
port_currents = self._proxy.PortsCurrentDraw
return port_currents[self._antenna_id - 1]
@property # type: ignore[misc]
@check_communicating
@check_on
def voltage(self: _FieldStationProxy) -> float:
"""
Return the antenna's voltage as reported by the FieldStation.
:raises NotImplementedError: This monitoring point
is not avaliable
"""
assert self._proxy is not None # for the type checker
raise NotImplementedError # Do not have this monitoring point.
@property # type: ignore[misc]
@check_communicating
@check_on
def temperature(self: _FieldStationProxy) -> float:
"""
Return the antenna's temperature as reported by FieldStation.
:raises NotImplementedError: This monitoring point
is not avaliable
"""
assert self._proxy is not None # for the type checker
raise NotImplementedError # Do not have this monitoring point.
def update_supplied_power_state(
self: _FieldStationProxy,
supplied_power_state: Optional[PowerState],
) -> None:
"""
Update the supplied power state.
:param supplied_power_state: the supplied power state
"""
if self._supplied_power_state != supplied_power_state:
self._supplied_power_state = supplied_power_state
if (
self._supplied_power_state is not None
and self._component_state_callback
):
self._component_state_callback(power=self._supplied_power_state)
@property
def supplied_power_state(
self: _FieldStationProxy,
) -> Optional[PowerState]:
"""
Return the power state of the FieldStation.
:return: the power state of the FieldStation.
"""
return self._supplied_power_state
def _device_state_changed(
self: _FieldStationProxy,
event_name: str,
event_value: tango.DevState,
event_quality: tango.AttrQuality,
) -> None:
assert (
event_name.lower() == "state"
), f"state changed callback called but event_name is {event_name}."
super()._device_state_changed(event_name, event_value, event_quality)
if event_value == tango.DevState.OFF:
if self._supplied_power_state != PowerState.OFF:
self._supplied_power_state = PowerState.OFF
def subscribe_to_attributes(self: _FieldStationProxy) -> None:
"""Subscribe to attributes of interest."""
assert self._proxy is not None # for the type checker
self._proxy.add_change_event_callback(
"antennaPowerStates",
self._antenna_power_state_changed,
stateless=True,
)
self._antenna_change_registered = True
def _antenna_power_state_changed(
self: _FieldStationProxy,
event_name: str,
event_value: str,
event_quality: tango.AttrQuality,
) -> None:
"""
Handle change in antenna power state.
This is a callback that is triggered by an event subscription
on the Fieldstation device.
:param event_name: name of the event; will always be
"antennaPowerStates" for this callback
:param event_value: the new attribute value
:param event_quality: the quality of the change event
"""
try:
powers = json.loads(event_value)
assert event_name.lower() == "antennapowerstates"
except Exception as e: # pylint: disable=broad-except
self.logger.error(f"Issues with power change event: {repr(e)}")
return
if self._component_state_callback is not None:
self._component_state_callback(
power=PowerState(powers[str(self._antenna_id)]),
trl=None, # type: ignore[call-arg]
)
self.update_supplied_power_state(PowerState(powers[str(self._antenna_id)]))
class _TileProxy(DeviceComponentManager):
"""
A component manager for an antenna, that proxies through a Tile Tango device.
Note the semantics: the end goal is the antenna, not the Tile that
it proxies through.
For example, the communication status of this component manager
reflects whether communication has been established all the way to
the antenna. If we have established communication with the Tile
Tango device, but the Tile Tango device reports that the Tile is
turned off, then we have NOT established communication to the
antenna.
At present it is an unused, unimplemented placeholder.
"""
def __init__(
self: _TileProxy,
name: str,
channels: tuple[int, int],
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
) -> None:
"""
Initialise a new instance.
:param name: the name of the Tile device
:param channels: the tile ADC channels containing
Y and X signal from this antenna
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be called when
the status of the communications channel between the
component manager and its component changes
:param component_state_callback: callback to be called when the
component state changes
"""
(self._y_channel, self._x_channel) = channels
self._adc_rms_change_registered = False
self._adc_power = (0.0, 0.0)
super().__init__(
name,
logger,
communication_state_callback,
component_state_callback,
)
def stop_communicating(self: _TileProxy) -> None:
"""Cease communicating with the Tile device."""
super().stop_communicating()
self._adc_rms_change_registered = False
def off(
self: _TileProxy, task_callback: Optional[Callable] = None
) -> tuple[TaskStatus, str]:
"""
Turn the antenna off; this is not implemented.
This raises NotImplementedError because the antenna's power state
is not controlled via the Tile device; it is controlled via the
FieldStation device.
:param task_callback: Update task state, defaults to None
:raises NotImplementedError: because the antenna's power state is
not controlled via the Tile device; it is controlled via the
FieldStation device.
"""
raise NotImplementedError(
"Antenna power state is not controlled via Tile device."
)
def standby(
self: _TileProxy, task_callback: Optional[Callable] = None
) -> tuple[TaskStatus, str]:
"""
Put the antenna into standby state; this is not implemented.
This raises NotImplementedError because the antenna has no
standby state; and because the antenna's power state is not
controlled via the Tile device; it is controlled via the FieldStation
device.
:param task_callback: Update task state, defaults to None
:raises NotImplementedError: because the antenna's power state is
not controlled via the Tile device; it is controlled via the
FieldStation device.
"""
raise NotImplementedError(
"Antenna power state is not controlled via Tile device."
)
def on(
self: _TileProxy, task_callback: Optional[Callable] = None
) -> tuple[TaskStatus, str]:
"""
Turn the antenna on; this is not implemented.
This raises NotImplementedError because the antenna's power state
is not controlled via the Tile device; it is controlled via the
FieldStation device.
:param task_callback: Update task state, defaults to None
:raises NotImplementedError: because the antenna's power state is
not controlled via the Tile device; it is controlled via the
FieldStation device.
"""
raise NotImplementedError(
"Antenna power state is not controlled via Tile device."
)
def reset(
self: _TileProxy, task_callback: Optional[Callable] = None
) -> tuple[TaskStatus, str]:
"""
Reset the antenna; this is not implemented.
This raises NotImplementedError because the antenna is passive
hardware and cannot meaningfully be reset.
:param task_callback: callback to be called when the status of
the command changes
:raises NotImplementedError: because the antenna's power state is
not controlled via the Tile device; it is controlled via the
FieldStation device.
"""
raise NotImplementedError("Antenna hardware is not resettable.")
@check_communicating
def configure(self: _TileProxy, config: str) -> None:
"""
Configure the device proxy.
:param config: json string of configuration.
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
self._proxy._device.Configure(config)
def _device_state_changed(
self: _TileProxy,
event_name: str,
event_value: tango.DevState,
event_quality: tango.AttrQuality,
) -> None:
assert (
event_name.lower() == "state"
), "state changed callback called but event_name is {event_name}."
super()._device_state_changed(event_name, event_value, event_quality)
if event_value == tango.DevState.ON and not self._adc_rms_change_registered:
self._register_adc_rms_callback()
def _register_adc_rms_callback(self: _TileProxy) -> None:
"""Register the change event callback for adcPower."""
assert self._proxy is not None # for the type checker
self._proxy.add_change_event_callback(
"adcPower",
self._adc_rms_changed,
stateless=True,
)
self._adc_rms_change_registered = True
def _adc_rms_changed(
self: _TileProxy,
event_name: str,
event_value: list[float],
event_quality: tango.AttrQuality,
) -> None:
"""
Handle change in tile ADC rms power.
This is a callback that is triggered by an event subscription
on the Tile device.
:param event_name: name of the event; will always be
"adcPower" for this callback
:param event_value: the new attribute value
:param event_quality: the quality of the change event
"""
assert event_name.lower() == "adcPower".lower(), (
"Tile 'adcPower' attribute changed callback called but "
f"event_name is {event_name}."
)
adc_rms = (
event_value[self._y_channel],
event_value[self._x_channel],
)
if self._component_state_callback is not None:
self._component_state_callback(adc_rms=adc_rms)
# pylint: disable=too-many-instance-attributes
[docs]
class AntennaComponentManager(TaskExecutorComponentManager):
"""
A component manager for managing the component of an MCCS antenna Tango device.
Since there is no way to monitor and control an antenna directly,
this component manager simply proxies certain commands to the
FieldStation and/or tile Tango device.
"""
# pylint: disable=too-many-arguments
[docs]
def __init__(
self: AntennaComponentManager,
field_station_name: str,
antenna_id: int,
tile_trl: str,
tile_channels: tuple[int, int],
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
) -> None:
"""
Initialise a new instance.
:param field_station_name: the TRL of the Tango device for this
antenna's FieldStation.
:param antenna_id: the logical id of this antenna (1-256)
:param tile_trl: the TRL of the Tango device for this
antenna's tile.
:param tile_channels: the tile ADC channels containing
Y and X signal from this antenna.
:param logger: a logger for this object to use
:param communication_state_callback: callback to be called when
the status of the communications channel between the
component manager and its component changes
:param component_state_callback: callback to be called when the
component state changes
"""
self._power_state_lock = threading.RLock()
self._field_station_power_state = PowerState.UNKNOWN
self._target_power_state: Optional[PowerState] = None
self._field_station_communication_state: CommunicationStatus = (
CommunicationStatus.DISABLED
)
self._tile_communication_state: CommunicationStatus = (
CommunicationStatus.DISABLED
)
self._antenna_faulty_via_field_station = False
self._antenna_faulty_via_tile = False
self._antenna_id = antenna_id
self._field_station_name = field_station_name
self._tile_trl = tile_trl
self._field_station_proxy = _FieldStationProxy(
field_station_name,
antenna_id,
logger,
self._field_station_communication_state_changed,
functools.partial(component_state_callback, trl=field_station_name),
)
self._tile_proxy = _TileProxy(
tile_trl,
tile_channels,
logger,
self._tile_communication_state_changed,
functools.partial(component_state_callback, trl=tile_trl),
)
# super.init with state keys included.
super().__init__(
logger,
communication_state_callback,
component_state_callback,
power=None,
fault=None,
configuration_changed=None,
adc_rms=None,
)
[docs]
def start_communicating(self: AntennaComponentManager) -> None:
"""Establish communication with the component, then start monitoring."""
self._field_station_proxy.start_communicating()
self._tile_proxy.start_communicating()
[docs]
def stop_communicating(self: AntennaComponentManager) -> None:
"""Cease monitoring the component, and break off all communication with it."""
self._field_station_proxy.stop_communicating()
self._tile_proxy.stop_communicating()
def _field_station_communication_state_changed(
self: AntennaComponentManager,
communication_state: CommunicationStatus,
) -> None:
"""
Handle a change in status of communication with the field station.
:param communication_state: the status of communication with
the field station.
"""
if communication_state == CommunicationStatus.ESTABLISHED:
self._field_station_proxy.subscribe_to_attributes()
self._field_station_communication_state = communication_state
self._update_joint_communication_state()
def _tile_communication_state_changed(
self: AntennaComponentManager,
communication_state: CommunicationStatus,
) -> None:
"""
Handle a change in status of communication with the antenna via the tile.
:param communication_state: the status of communication with the
antenna via the tile.
"""
self._tile_communication_state = communication_state
self._update_joint_communication_state()
def _update_joint_communication_state(
self: AntennaComponentManager,
) -> None:
"""
Update the status of communication with the antenna.
The update takes into account communication via both tile and
Field Station.
"""
for communication_state in [
CommunicationStatus.DISABLED,
CommunicationStatus.ESTABLISHED,
]:
if (
self._field_station_communication_state == communication_state
and self._tile_communication_state == communication_state
):
self._update_communication_state(communication_state)
return
self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
def _field_station_power_state_changed(
self: AntennaComponentManager,
power_state: PowerState,
) -> None:
with self._power_state_lock:
self._field_station_power_state = power_state
if power_state == PowerState.UNKNOWN:
self._update_component_state(power=PowerState.UNKNOWN)
elif power_state in [PowerState.OFF, PowerState.STANDBY]:
self._update_component_state(power=PowerState.OFF)
else:
# power_state is ON, wait for antenna power change
pass
self._review_power()
def _antenna_power_state_changed(
self: AntennaComponentManager,
antenna_power_state: PowerState,
) -> None:
self._update_component_state(power=antenna_power_state)
self._review_power()
def _field_station_component_fault_changed(
self: AntennaComponentManager,
faulty: bool,
) -> None:
"""
Handle a change in antenna fault status as reported via the FieldStation.
:param faulty: whether the antenna is faulting.
"""
self._antenna_faulty_via_field_station = faulty
self._update_component_state(fault=self._antenna_faulty_via_field_station)
def _tile_component_fault_changed(
self: AntennaComponentManager,
faulty: bool,
) -> None:
"""
Handle a change in antenna fault status as reported via the tile.
:param faulty: whether the antenna is faulting.
"""
self._antenna_faulty_via_tile = faulty
self._update_component_state(fault=self._antenna_faulty_via_tile)
def _update_antenna_configs(
self: AntennaComponentManager,
configuration: dict,
) -> None:
"""
Update the config for the antenna device.
:param configuration: dict containing the config of the device
"""
if self._component_state_callback is not None:
self._component_state_callback(configuration_changed=configuration)
def _update_tile_configs(
self: AntennaComponentManager,
configuration: dict,
) -> None:
"""
Update the config for the antenna device.
:param configuration: dict containing the config of the device
"""
tile_config = configuration.get("tile")
if tile_config is not None:
self._tile_proxy.configure(json.dumps(tile_config))
def _configure(
self: AntennaComponentManager,
antenna_config: dict,
tile_config: dict,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Configure the antennas children.
This sends off configuration commands to all of the devices that
this antenna manages.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
:param antenna_config: antenna config
:param tile_config: tile config
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
try:
self._update_antenna_configs(antenna_config)
self._update_tile_configs(tile_config)
except ValueError as value_error:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=f"Configure command has failed: {repr(value_error)}",
)
return
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result="Configure command has completed",
)
@property
def power_state_lock(self: AntennaComponentManager) -> threading.RLock:
"""
Return the power state lock of this component manager.
:return: the power state lock of this component manager.
"""
return self._power_state_lock
@property
def power_state(self: AntennaComponentManager) -> Optional[PowerState]:
"""
Return my power state.
:return: my power state
"""
return self._component_state["power"]
# TODO should the decorator be uncommented
# @check_communicating
[docs]
def off(
self: AntennaComponentManager, task_callback: Optional[Callable] = None
) -> tuple[TaskStatus, str]:
"""
Submit the off slow task.
This method returns immediately after it submitted `self._off`
for execution.
:param task_callback: Update task state, defaults to None
:return: task status and message
"""
return self.submit_task(self._off, task_callback=task_callback)
def _off(
self: AntennaComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Turn the antenna off.
It does so by telling the FieldStation to turn the right antenna off.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
# Indicate that the task has started
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
try:
with self._power_state_lock:
self._target_power_state = PowerState.OFF
# TODO should deal with the return code here
self._review_power()
# pylint: disable=broad-except
except Exception as ex:
if task_callback:
task_callback(status=TaskStatus.FAILED, result=f"Exception: {repr(ex)}")
return
# Indicate that the task has completed
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result="Antenna off completed",
)
[docs]
def standby(
self: AntennaComponentManager, task_callback: Optional[Callable] = None
) -> tuple[TaskStatus, str]:
"""
Put the antenna into standby state; this is not implemented.
This raises NotImplementedError because the antenna has no
standby state.
:param task_callback: Update task state, defaults to None
:raises NotImplementedError: because the antenna has no standby
state.
"""
raise NotImplementedError("Antenna has no standby state.")
# TODO should the decorator be uncommented
# @check_communicating
[docs]
def on(
self: AntennaComponentManager, task_callback: Optional[Callable] = None
) -> tuple[TaskStatus, str]:
"""
Submit the on slow task.
This method returns immediately after it submitted `self._on`
for execution.
:param task_callback: Update task state, defaults to None
:return: task status and message
"""
return self.submit_task(self._on, task_callback=task_callback)
def _on(
self: AntennaComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Turn the antenna on.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
# Indicate that the task has started
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
try:
with self._power_state_lock:
self._target_power_state = PowerState.ON
# TODO should deal with the return code here
self._review_power()
# pylint: disable=broad-except
except Exception as ex:
if task_callback:
task_callback(status=TaskStatus.FAILED, result=f"Exception: {repr(ex)}")
return
# Indicate that the task has completed
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result="Antenna on completed",
)
def _review_power(self: AntennaComponentManager) -> ResultCode | None:
with self._power_state_lock:
if self._target_power_state is None:
return None
if self.power_state == self._target_power_state:
self._target_power_state = None # attained without any action needed
return None
if (
self.power_state == PowerState.OFF
and self._target_power_state == PowerState.ON
):
result_code = self._field_station_proxy.power_on()
self._target_power_state = None
return result_code
if (
self.power_state == PowerState.ON
and self._target_power_state == PowerState.OFF
):
result_code = self._field_station_proxy.power_off()
self._target_power_state = None
return result_code
return ResultCode.QUEUED
[docs]
def reset(
self: AntennaComponentManager, task_callback: Optional[Callable] = None
) -> tuple[TaskStatus, str]:
"""
Reset the antenna; this is not implemented.
This raises NotImplementedError because the antenna is passive
hardware and cannot meaningfully be reset.
:param task_callback: Update task state, defaults to None
:raises NotImplementedError: because the antenna's power state is
not controlled via the Tile device; it is controlled via the
FieldStation device.
"""
raise NotImplementedError("Antenna cannot be reset.")
@property
def current(self: AntennaComponentManager) -> float:
"""
Return the antenna's current.
:return: the current of this antenna
"""
return self._field_station_proxy.current
@property
def voltage(self: AntennaComponentManager) -> float:
"""
Return the antenna's voltage.
:return: the voltage of this antenna
"""
return self._field_station_proxy.voltage
@property
def temperature(self: AntennaComponentManager) -> float:
"""
Return the antenna's temperature.
:return: the temperature of this antenna
"""
return self._field_station_proxy.temperature