Source code for ska_low_mccs.antenna.antenna_health_model

#  -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""An implementation of a health model for an antenna."""
from __future__ import annotations

from typing import Callable, Optional

from ska_control_model import HealthState
from ska_low_mccs_common.health import HealthModel

from .antenna_health_rules import AntennaHealthRules

__all__ = ["AntennaHealthModel"]


[docs] class AntennaHealthModel(HealthModel): """A health model for an antenna."""
[docs] def __init__( self: AntennaHealthModel, component_state_callback: Callable[..., None], thresholds: Optional[dict[str, float]] = None, ) -> None: """ Initialise a new instance. :param component_state_callback: callback to be called whenever there is a change to this component's state, including the health model's evaluated health state. :param thresholds: the threshold parameters for the health rules """ self._fieldstation_health: Optional[HealthState] = HealthState.UNKNOWN self._tile_health: Optional[HealthState] = HealthState.UNKNOWN self._tile_adc_rms: Optional[tuple[float, float]] = None self._health_rules = AntennaHealthRules(thresholds) super().__init__(component_state_callback)
[docs] def fieldstation_health_changed( self: AntennaHealthModel, fieldstation_health: Optional[HealthState], ) -> None: """ Handle a change in FieldStation health. :param fieldstation_health: the health state of the FieldStation """ if self._fieldstation_health != fieldstation_health: self._fieldstation_health = fieldstation_health self.update_health()
[docs] def tile_health_changed( self: AntennaHealthModel, tile_health: Optional[HealthState], ) -> None: """ Handle a change in tile health. :param tile_health: the health state of the tile """ if self._tile_health != tile_health: self._tile_health = tile_health self.update_health()
[docs] def tile_adc_rms_changed( self: AntennaHealthModel, tile_adc_rms: Optional[tuple[float, float]], ) -> None: """ Handle a change in tile adc reading. :param tile_adc_rms: the adc reading from the tile """ if self._tile_adc_rms != tile_adc_rms: self._tile_adc_rms = tile_adc_rms self.update_health()
[docs] def evaluate_health( self: AntennaHealthModel, ) -> HealthState: """ Compute overall health of the station. The overall health is based on the fault and communication status of the station overall, together with the health of the FieldStation, antennas and tiles that it manages. This implementation simply sets the health of the station to the health of its least healthy component. :return: an overall health of the station """ antenna_health = super().evaluate_health() for health in [ HealthState.FAILED, HealthState.UNKNOWN, HealthState.DEGRADED, HealthState.OK, ]: if self._health_rules.rules[health]( self._fieldstation_health, self._tile_health, antenna_health, self._tile_adc_rms, ): return health return HealthState.UNKNOWN