# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""An implementation of a health model for an antenna."""
from __future__ import annotations
from typing import Callable, Optional
from ska_control_model import HealthState
from ska_low_mccs_common.health import HealthModel
from .antenna_health_rules import AntennaHealthRules
__all__ = ["AntennaHealthModel"]
[docs]class AntennaHealthModel(HealthModel):
"""A health model for an antenna."""
[docs] def __init__(
self: AntennaHealthModel,
component_state_callback: Callable[..., None],
thresholds: Optional[dict[str, float]] = None,
) -> None:
"""
Initialise a new instance.
:param component_state_callback: callback to be called whenever
there is a change to this component's state, including the health
model's evaluated health state.
:param thresholds: the threshold parameters for the health rules
"""
self._fieldstation_health: Optional[HealthState] = HealthState.UNKNOWN
self._tile_health: Optional[HealthState] = HealthState.UNKNOWN
self._tile_adc_rms: Optional[tuple[float, float]] = None
self._health_rules = AntennaHealthRules(thresholds)
super().__init__(component_state_callback)
[docs] def fieldstation_health_changed(
self: AntennaHealthModel,
fieldstation_health: Optional[HealthState],
) -> None:
"""
Handle a change in FieldStation health.
:param fieldstation_health: the health state of the FieldStation
"""
if self._fieldstation_health != fieldstation_health:
self._fieldstation_health = fieldstation_health
self.update_health()
[docs] def tile_health_changed(
self: AntennaHealthModel,
tile_health: Optional[HealthState],
) -> None:
"""
Handle a change in tile health.
:param tile_health: the health state of the tile
"""
if self._tile_health != tile_health:
self._tile_health = tile_health
self.update_health()
[docs] def tile_adc_rms_changed(
self: AntennaHealthModel,
tile_adc_rms: Optional[tuple[float, float]],
) -> None:
"""
Handle a change in tile adc reading.
:param tile_adc_rms: the adc reading from the tile
"""
if self._tile_adc_rms != tile_adc_rms:
self._tile_adc_rms = tile_adc_rms
self.update_health()
[docs] def evaluate_health(
self: AntennaHealthModel,
) -> HealthState:
"""
Compute overall health of the station.
The overall health is based on the fault and communication
status of the station overall, together with the health of the
FieldStation, antennas and tiles that it manages.
This implementation simply sets the health of the station to the
health of its least healthy component.
:return: an overall health of the station
"""
antenna_health = super().evaluate_health()
for health in [
HealthState.FAILED,
HealthState.UNKNOWN,
HealthState.DEGRADED,
HealthState.OK,
]:
if self._health_rules.rules[health](
self._fieldstation_health,
self._tile_health,
antenna_health,
self._tile_adc_rms,
):
return health
return HealthState.UNKNOWN