Source code for ska_low_mccs.antenna.antenna_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements an component manager for an MCCS antenna Tango device."""
from __future__ import annotations

import functools
import json
import logging
import threading
from typing import Callable, Optional

import tango
from ska_control_model import CommunicationStatus, PowerState, ResultCode, TaskStatus
from ska_low_mccs_common.component import DeviceComponentManager
from ska_tango_base.base import check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager

__all__ = ["AntennaComponentManager"]

NUMBER_OF_ANTENNA_IN_STATION = 256


class _FieldStationProxy(DeviceComponentManager):
    """A proxy to the FieldStation this antenna is a part off."""

    def __init__(
        self: _FieldStationProxy,
        name: str,
        antenna_id: int,
        logger: logging.Logger,
        communication_state_callback: Callable[[CommunicationStatus], None],
        component_state_callback: Callable[..., None],
    ) -> None:
        """
        Initialise a proxy to the FieldStation.

        :param name: the name of the FieldStation
        :param antenna_id: the logical id of this antenna.
        :param logger: the logger to be used by this object.
        :param communication_state_callback: callback to be called when
            the status of the communications channel between the
            component manager and its component changes
        :param component_state_callback: callback to be called when the
            component state changes
        :raises AssertionError: if parameters are out of bounds
        """
        self._power_state_lock = threading.RLock()
        assert (
            0 < antenna_id < NUMBER_OF_ANTENNA_IN_STATION + 1
        ), "The antenna id must be in the range 1-256"
        self._antenna_id = antenna_id

        self._antenna_change_registered = False
        self._supplied_power_state: Optional[PowerState] = None
        super().__init__(
            name,
            logger,
            communication_state_callback,
            component_state_callback,  # type: ignore[arg-type]
        )

    def stop_communicating(self: _FieldStationProxy) -> None:
        """Cease communicating with the FieldStation device."""
        super().stop_communicating()
        self._antenna_change_registered = False

    def reset(
        self: _FieldStationProxy, task_callback: Optional[Callable] = None
    ) -> tuple[TaskStatus, str]:
        """
        Reset the antenna; this is not implemented.

        This raises NotImplementedError because the antenna is passive
        hardware and cannot meaningfully be reset.

        :param task_callback: Update task state, defaults to None

        :raises NotImplementedError: because the antenna's power state is
            not controlled via the Tile device; it is controlled via the
            FieldStation device.
        """
        raise NotImplementedError("Antenna cannot be reset.")

    @check_communicating
    def power_on(self: _FieldStationProxy) -> ResultCode | None:
        """
        Tell the FieldStation to power on the port this antenna is attached to.

        :return: a result code.
        """
        if self._supplied_power_state == PowerState.ON:
            return None
        return self._power_up_antenna()

    def _power_up_antenna(self: _FieldStationProxy) -> ResultCode:
        assert self._proxy is not None  # for the type checker
        ([result_code], _) = self._proxy.PowerOnAntenna(self._antenna_id)
        return result_code

    @check_communicating
    def power_off(self: _FieldStationProxy) -> ResultCode | None:
        """
        Tell the FieldStation to power off the port this antenna is attached to.

        :return: a result code.
        """
        if self._supplied_power_state == PowerState.OFF:
            return None
        return self._power_down_antenna()

    def _power_down_antenna(self: _FieldStationProxy) -> ResultCode:
        assert self._proxy is not None  # for the type checker
        ([result_code], _) = self._proxy.PowerOffAntenna(self._antenna_id)
        return result_code

    @property  # type: ignore[misc]
    @check_communicating
    @check_on
    def current(self: _FieldStationProxy) -> float:
        """
        Return the antenna's current as reported by the FieldStation.

        :return: the current of this antenna
        """
        assert self._proxy is not None  # for the type checker
        port_currents = self._proxy.PortsCurrentDraw
        return port_currents[self._antenna_id - 1]

    @property  # type: ignore[misc]
    @check_communicating
    @check_on
    def voltage(self: _FieldStationProxy) -> float:
        """
        Return the antenna's voltage as reported by the FieldStation.

        :raises NotImplementedError: This monitoring point
            is not avaliable
        """
        assert self._proxy is not None  # for the type checker

        raise NotImplementedError  # Do not have this monitoring point.

    @property  # type: ignore[misc]
    @check_communicating
    @check_on
    def temperature(self: _FieldStationProxy) -> float:
        """
        Return the antenna's temperature as reported by FieldStation.

        :raises NotImplementedError: This monitoring point
            is not avaliable
        """
        assert self._proxy is not None  # for the type checker
        raise NotImplementedError  # Do not have this monitoring point.

    def update_supplied_power_state(
        self: _FieldStationProxy,
        supplied_power_state: Optional[PowerState],
    ) -> None:
        """
        Update the supplied power state.

        :param supplied_power_state: the supplied power state
        """
        if self._supplied_power_state != supplied_power_state:
            self._supplied_power_state = supplied_power_state
            if (
                self._supplied_power_state is not None
                and self._component_state_callback
            ):
                self._component_state_callback(power=self._supplied_power_state)

    @property
    def supplied_power_state(
        self: _FieldStationProxy,
    ) -> Optional[PowerState]:
        """
        Return the power state of the FieldStation.

        :return: the power state of the FieldStation.
        """
        return self._supplied_power_state

    def _device_state_changed(
        self: _FieldStationProxy,
        event_name: str,
        event_value: tango.DevState,
        event_quality: tango.AttrQuality,
    ) -> None:
        assert (
            event_name.lower() == "state"
        ), f"state changed callback called but event_name is {event_name}."
        super()._device_state_changed(event_name, event_value, event_quality)
        if event_value == tango.DevState.OFF:
            if self._supplied_power_state != PowerState.OFF:
                self._supplied_power_state = PowerState.OFF

    def subscribe_to_attributes(self: _FieldStationProxy) -> None:
        """Subscribe to attributes of interest."""
        assert self._proxy is not None  # for the type checker
        self._proxy.add_change_event_callback(
            "antennaPowerStates",
            self._antenna_power_state_changed,
            stateless=True,
        )
        self._antenna_change_registered = True

    def _antenna_power_state_changed(
        self: _FieldStationProxy,
        event_name: str,
        event_value: str,
        event_quality: tango.AttrQuality,
    ) -> None:
        """
        Handle change in antenna power state.

        This is a callback that is triggered by an event subscription
        on the Fieldstation device.

        :param event_name: name of the event; will always be
            "antennaPowerStates" for this callback
        :param event_value: the new attribute value
        :param event_quality: the quality of the change event
        """
        try:
            powers = json.loads(event_value)
            assert event_name.lower() == "antennapowerstates"
        except Exception as e:  # pylint: disable=broad-except
            self.logger.error(f"Issues with power change event: {repr(e)}")
            return
        if self._component_state_callback is not None:
            self._component_state_callback(
                power=PowerState(powers[str(self._antenna_id)]),
                trl=None,  # type: ignore[call-arg]
            )

        self.update_supplied_power_state(PowerState(powers[str(self._antenna_id)]))


class _TileProxy(DeviceComponentManager):
    """
    A component manager for an antenna, that proxies through a Tile Tango device.

    Note the semantics: the end goal is the antenna, not the Tile that
    it proxies through.

    For example, the communication status of this component manager
    reflects whether communication has been established all the way to
    the antenna. If we have established communication with the Tile
    Tango device, but the Tile Tango device reports that the Tile is
    turned off, then we have NOT established communication to the
    antenna.

    At present it is an unused, unimplemented placeholder.
    """

    def __init__(
        self: _TileProxy,
        name: str,
        channels: tuple[int, int],
        logger: logging.Logger,
        communication_state_callback: Callable[[CommunicationStatus], None],
        component_state_callback: Callable[..., None],
    ) -> None:
        """
        Initialise a new instance.

        :param name: the name of the Tile device
        :param channels: the tile ADC channels containing
            Y and X signal from this antenna
        :param logger: the logger to be used by this object.
        :param communication_state_callback: callback to be called when
            the status of the communications channel between the
            component manager and its component changes
        :param component_state_callback: callback to be called when the
            component state changes
        """
        (self._y_channel, self._x_channel) = channels
        self._adc_rms_change_registered = False
        self._adc_power = (0.0, 0.0)

        super().__init__(
            name,
            logger,
            communication_state_callback,
            component_state_callback,
        )

    def stop_communicating(self: _TileProxy) -> None:
        """Cease communicating with the Tile device."""
        super().stop_communicating()
        self._adc_rms_change_registered = False

    def off(
        self: _TileProxy, task_callback: Optional[Callable] = None
    ) -> tuple[TaskStatus, str]:
        """
        Turn the antenna off; this is not implemented.

        This raises NotImplementedError because the antenna's power state
        is not controlled via the Tile device; it is controlled via the
        FieldStation device.

        :param task_callback: Update task state, defaults to None

        :raises NotImplementedError: because the antenna's power state is
            not controlled via the Tile device; it is controlled via the
            FieldStation device.
        """
        raise NotImplementedError(
            "Antenna power state is not controlled via Tile device."
        )

    def standby(
        self: _TileProxy, task_callback: Optional[Callable] = None
    ) -> tuple[TaskStatus, str]:
        """
        Put the antenna into standby state; this is not implemented.

        This raises NotImplementedError because the antenna has no
        standby state; and because the antenna's power state is not
        controlled via the Tile device; it is controlled via the FieldStation
        device.

        :param task_callback: Update task state, defaults to None

        :raises NotImplementedError: because the antenna's power state is
            not controlled via the Tile device; it is controlled via the
            FieldStation device.
        """
        raise NotImplementedError(
            "Antenna power state is not controlled via Tile device."
        )

    def on(
        self: _TileProxy, task_callback: Optional[Callable] = None
    ) -> tuple[TaskStatus, str]:
        """
        Turn the antenna on; this is not implemented.

        This raises NotImplementedError because the antenna's power state
        is not controlled via the Tile device; it is controlled via the
        FieldStation device.

        :param task_callback: Update task state, defaults to None

        :raises NotImplementedError: because the antenna's power state is
            not controlled via the Tile device; it is controlled via the
            FieldStation device.
        """
        raise NotImplementedError(
            "Antenna power state is not controlled via Tile device."
        )

    def reset(
        self: _TileProxy, task_callback: Optional[Callable] = None
    ) -> tuple[TaskStatus, str]:
        """
        Reset the antenna; this is not implemented.

        This raises NotImplementedError because the antenna is passive
        hardware and cannot meaningfully be reset.

        :param task_callback: callback to be called when the status of
            the command changes

        :raises NotImplementedError: because the antenna's power state is
            not controlled via the Tile device; it is controlled via the
            FieldStation device.
        """
        raise NotImplementedError("Antenna hardware is not resettable.")

    @check_communicating
    def configure(self: _TileProxy, config: str) -> None:
        """
        Configure the device proxy.

        :param config: json string of configuration.
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker
        self._proxy._device.Configure(config)

    def _device_state_changed(
        self: _TileProxy,
        event_name: str,
        event_value: tango.DevState,
        event_quality: tango.AttrQuality,
    ) -> None:
        assert (
            event_name.lower() == "state"
        ), "state changed callback called but event_name is {event_name}."

        super()._device_state_changed(event_name, event_value, event_quality)
        if event_value == tango.DevState.ON and not self._adc_rms_change_registered:
            self._register_adc_rms_callback()

    def _register_adc_rms_callback(self: _TileProxy) -> None:
        """Register the change event callback for adcPower."""
        assert self._proxy is not None  # for the type checker
        self._proxy.add_change_event_callback(
            "adcPower",
            self._adc_rms_changed,
            stateless=True,
        )
        self._adc_rms_change_registered = True

    def _adc_rms_changed(
        self: _TileProxy,
        event_name: str,
        event_value: list[float],
        event_quality: tango.AttrQuality,
    ) -> None:
        """
        Handle change in tile ADC rms power.

        This is a callback that is triggered by an event subscription
        on the Tile device.

        :param event_name: name of the event; will always be
            "adcPower" for this callback
        :param event_value: the new attribute value
        :param event_quality: the quality of the change event
        """
        assert event_name.lower() == "adcPower".lower(), (
            "Tile 'adcPower' attribute changed callback called but "
            f"event_name is {event_name}."
        )
        adc_rms = (
            event_value[self._y_channel],
            event_value[self._x_channel],
        )
        if self._component_state_callback is not None:
            self._component_state_callback(adc_rms=adc_rms)


# pylint: disable=too-many-instance-attributes
[docs]class AntennaComponentManager(TaskExecutorComponentManager): """ A component manager for managing the component of an MCCS antenna Tango device. Since there is no way to monitor and control an antenna directly, this component manager simply proxies certain commands to the FieldStation and/or tile Tango device. """ # pylint: disable=too-many-arguments
[docs] def __init__( self: AntennaComponentManager, field_station_name: str, antenna_id: int, tile_trl: str, tile_channels: tuple[int, int], logger: logging.Logger, communication_state_callback: Callable[[CommunicationStatus], None], component_state_callback: Callable[..., None], ) -> None: """ Initialise a new instance. :param field_station_name: the TRL of the Tango device for this antenna's FieldStation. :param antenna_id: the logical id of this antenna (1-256) :param tile_trl: the TRL of the Tango device for this antenna's tile. :param tile_channels: the tile ADC channels containing Y and X signal from this antenna. :param logger: a logger for this object to use :param communication_state_callback: callback to be called when the status of the communications channel between the component manager and its component changes :param component_state_callback: callback to be called when the component state changes """ self._power_state_lock = threading.RLock() self._field_station_power_state = PowerState.UNKNOWN self._target_power_state: Optional[PowerState] = None self._field_station_communication_state: CommunicationStatus = ( CommunicationStatus.DISABLED ) self._tile_communication_state: CommunicationStatus = ( CommunicationStatus.DISABLED ) self._antenna_faulty_via_field_station = False self._antenna_faulty_via_tile = False self._antenna_id = antenna_id self._field_station_name = field_station_name self._tile_trl = tile_trl self._field_station_proxy = _FieldStationProxy( field_station_name, antenna_id, logger, self._field_station_communication_state_changed, functools.partial(component_state_callback, trl=field_station_name), ) self._tile_proxy = _TileProxy( tile_trl, tile_channels, logger, self._tile_communication_state_changed, functools.partial(component_state_callback, trl=tile_trl), ) # super.init with state keys included. super().__init__( logger, communication_state_callback, component_state_callback, power=None, fault=None, configuration_changed=None, adc_rms=None, )
[docs] def start_communicating(self: AntennaComponentManager) -> None: """Establish communication with the component, then start monitoring.""" self._field_station_proxy.start_communicating() self._tile_proxy.start_communicating()
[docs] def stop_communicating(self: AntennaComponentManager) -> None: """Cease monitoring the component, and break off all communication with it.""" self._field_station_proxy.stop_communicating() self._tile_proxy.stop_communicating()
def _field_station_communication_state_changed( self: AntennaComponentManager, communication_state: CommunicationStatus, ) -> None: """ Handle a change in status of communication with the field station. :param communication_state: the status of communication with the field station. """ if communication_state == CommunicationStatus.ESTABLISHED: self._field_station_proxy.subscribe_to_attributes() self._field_station_communication_state = communication_state self._update_joint_communication_state() def _tile_communication_state_changed( self: AntennaComponentManager, communication_state: CommunicationStatus, ) -> None: """ Handle a change in status of communication with the antenna via the tile. :param communication_state: the status of communication with the antenna via the tile. """ self._tile_communication_state = communication_state self._update_joint_communication_state() def _update_joint_communication_state( self: AntennaComponentManager, ) -> None: """ Update the status of communication with the antenna. The update takes into account communication via both tile and Field Station. """ for communication_state in [ CommunicationStatus.DISABLED, CommunicationStatus.ESTABLISHED, ]: if ( self._field_station_communication_state == communication_state and self._tile_communication_state == communication_state ): self._update_communication_state(communication_state) return self._update_communication_state(CommunicationStatus.NOT_ESTABLISHED) def _field_station_power_state_changed( self: AntennaComponentManager, power_state: PowerState, ) -> None: with self._power_state_lock: self._field_station_power_state = power_state if power_state == PowerState.UNKNOWN: self._update_component_state(power=PowerState.UNKNOWN) elif power_state in [PowerState.OFF, PowerState.STANDBY]: self._update_component_state(power=PowerState.OFF) else: # power_state is ON, wait for antenna power change pass self._review_power() def _antenna_power_state_changed( self: AntennaComponentManager, antenna_power_state: PowerState, ) -> None: self._update_component_state(power=antenna_power_state) self._review_power() def _field_station_component_fault_changed( self: AntennaComponentManager, faulty: bool, ) -> None: """ Handle a change in antenna fault status as reported via the FieldStation. :param faulty: whether the antenna is faulting. """ self._antenna_faulty_via_field_station = faulty self._update_component_state(fault=self._antenna_faulty_via_field_station) def _tile_component_fault_changed( self: AntennaComponentManager, faulty: bool, ) -> None: """ Handle a change in antenna fault status as reported via the tile. :param faulty: whether the antenna is faulting. """ self._antenna_faulty_via_tile = faulty self._update_component_state(fault=self._antenna_faulty_via_tile) def _update_antenna_configs( self: AntennaComponentManager, configuration: dict, ) -> None: """ Update the config for the antenna device. :param configuration: dict containing the config of the device """ if self._component_state_callback is not None: self._component_state_callback(configuration_changed=configuration) def _update_tile_configs( self: AntennaComponentManager, configuration: dict, ) -> None: """ Update the config for the antenna device. :param configuration: dict containing the config of the device """ tile_config = configuration.get("tile") if tile_config is not None: self._tile_proxy.configure(json.dumps(tile_config))
[docs] @check_communicating def configure( self: AntennaComponentManager, task_callback: Optional[Callable] = None, *, interface: Optional[str] = None, antenna_config: dict, tile_config: dict, ) -> tuple[TaskStatus, str]: """ Submit the configure method. This method returns immediately after it submitted `self._configure` for execution. :param interface: the schema version this is running against. :param antenna_config: antenna config :param tile_config: tile config :param task_callback: Update task state, defaults to None :return: a result code and response string """ return self.submit_task( self._configure, args=[antenna_config, tile_config], task_callback=task_callback, )
def _configure( self: AntennaComponentManager, antenna_config: dict, tile_config: dict, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Configure the antennas children. This sends off configuration commands to all of the devices that this antenna manages. :param task_callback: Update task state, defaults to None :param task_abort_event: Abort the task :param antenna_config: antenna config :param tile_config: tile config """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) try: self._update_antenna_configs(antenna_config) self._update_tile_configs(tile_config) except ValueError as value_error: if task_callback: task_callback( status=TaskStatus.FAILED, result=f"Configure command has failed: {repr(value_error)}", ) return if task_callback: task_callback( status=TaskStatus.COMPLETED, result="Configure command has completed", ) @property def power_state_lock(self: AntennaComponentManager) -> threading.RLock: """ Return the power state lock of this component manager. :return: the power state lock of this component manager. """ return self._power_state_lock @property def power_state(self: AntennaComponentManager) -> Optional[PowerState]: """ Return my power state. :return: my power state """ return self._component_state["power"] # TODO should the decorator be uncommented # @check_communicating
[docs] def off( self: AntennaComponentManager, task_callback: Optional[Callable] = None ) -> tuple[TaskStatus, str]: """ Submit the off slow task. This method returns immediately after it submitted `self._off` for execution. :param task_callback: Update task state, defaults to None :return: task status and message """ return self.submit_task(self._off, task_callback=task_callback)
def _off( self: AntennaComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Turn the antenna off. It does so by telling the FieldStation to turn the right antenna off. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ # Indicate that the task has started if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) try: with self._power_state_lock: self._target_power_state = PowerState.OFF # TODO should deal with the return code here self._review_power() # pylint: disable=broad-except except Exception as ex: if task_callback: task_callback(status=TaskStatus.FAILED, result=f"Exception: {repr(ex)}") return # Indicate that the task has completed if task_callback: task_callback( status=TaskStatus.COMPLETED, result="Antenna off completed", )
[docs] def standby( self: AntennaComponentManager, task_callback: Optional[Callable] = None ) -> tuple[TaskStatus, str]: """ Put the antenna into standby state; this is not implemented. This raises NotImplementedError because the antenna has no standby state. :param task_callback: Update task state, defaults to None :raises NotImplementedError: because the antenna has no standby state. """ raise NotImplementedError("Antenna has no standby state.")
# TODO should the decorator be uncommented # @check_communicating
[docs] def on( self: AntennaComponentManager, task_callback: Optional[Callable] = None ) -> tuple[TaskStatus, str]: """ Submit the on slow task. This method returns immediately after it submitted `self._on` for execution. :param task_callback: Update task state, defaults to None :return: task status and message """ return self.submit_task(self._on, task_callback=task_callback)
def _on( self: AntennaComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Turn the antenna on. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ # Indicate that the task has started if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) try: with self._power_state_lock: self._target_power_state = PowerState.ON # TODO should deal with the return code here self._review_power() # pylint: disable=broad-except except Exception as ex: if task_callback: task_callback(status=TaskStatus.FAILED, result=f"Exception: {repr(ex)}") return # Indicate that the task has completed if task_callback: task_callback( status=TaskStatus.COMPLETED, result="Antenna on completed", ) def _review_power(self: AntennaComponentManager) -> ResultCode | None: with self._power_state_lock: if self._target_power_state is None: return None if self.power_state == self._target_power_state: self._target_power_state = None # attained without any action needed return None if ( self.power_state == PowerState.OFF and self._target_power_state == PowerState.ON ): result_code = self._field_station_proxy.power_on() self._target_power_state = None return result_code if ( self.power_state == PowerState.ON and self._target_power_state == PowerState.OFF ): result_code = self._field_station_proxy.power_off() self._target_power_state = None return result_code return ResultCode.QUEUED
[docs] def reset( self: AntennaComponentManager, task_callback: Optional[Callable] = None ) -> tuple[TaskStatus, str]: """ Reset the antenna; this is not implemented. This raises NotImplementedError because the antenna is passive hardware and cannot meaningfully be reset. :param task_callback: Update task state, defaults to None :raises NotImplementedError: because the antenna's power state is not controlled via the Tile device; it is controlled via the FieldStation device. """ raise NotImplementedError("Antenna cannot be reset.")
@property def current(self: AntennaComponentManager) -> float: """ Return the antenna's current. :return: the current of this antenna """ return self._field_station_proxy.current @property def voltage(self: AntennaComponentManager) -> float: """ Return the antenna's voltage. :return: the voltage of this antenna """ return self._field_station_proxy.voltage @property def temperature(self: AntennaComponentManager) -> float: """ Return the antenna's temperature. :return: the temperature of this antenna """ return self._field_station_proxy.temperature