# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low CBF project
#
# Copyright (c) 2021 CSIRO
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
# pylint: disable=invalid-name,protected-access,too-few-public-methods
# can't do much about these:
# pylint: disable=attribute-defined-outside-init,pointless-string-statement
# pylint: disable=too-many-lines,too-many-ancestors,too-many-public-methods
""" SKA Low CBF
Subarray device for Low.CBF
"""
import json
import os
import time
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from enum import IntEnum
from functools import partial
from queue import Queue
from threading import Lock, Thread
from typing import Any, Union
import numpy as np
from ska_tango_base import SKABaseDevice, SKASubarray
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import AdminMode, HealthState, ObsState
from tango import (
AttrQuality,
AttrWriteType,
Database,
DevFailed,
DeviceProxy,
DevUShort,
EventData,
EventSystemFailed,
EventType,
)
from tango.server import attribute, device_property, run
from ska_low_cbf import release
from ska_low_cbf.device_proxy import MccsDeviceProxy
from ska_low_cbf.subarray.component_manager import (
LowCbfSubarrayComponentManager,
SubarrayTangoIface,
)
__all__ = ["LowCbfSubarray", "main"]
UNUSED = 2 # a kind of 3-rd state boolean indicating subarray polynomial is not used
MAX_STATIONS = 1024
MAX_SPS_CHAN = 384
# Jones-matrix timestamps in Kafka payloads are seconds since 2000-01-01 TAI;
# consumer-side "now" must use the same epoch and time scale to compute age.
# This defines the SKA epoch in terms of UTC (TAI is 37s ahead of UTC)
SKA_EPOCH = datetime(1999, 12, 31, 23, 59, 23, tzinfo=timezone.utc)
def utc_to_ska_seconds(utc: datetime) -> float:
"""Convert UTC time to SKA time."""
return (utc - SKA_EPOCH).total_seconds()
class Evt(IntEnum):
"Encode events received from other Tango devices."
ALVEOS_RELEASED = 0
ALVEOS_ASSIGNED = 1
DELAY_POLY_SUMMARY = 3
CALC_PROC_PERCENT = 4 # trigger processorsReadyPercent recalculation
CALC_ETHER_PERCENT = 5 # processorEthernetLockedPercent recalculation
PROCESS_HEALTH = 6
CALC_HOSTLINK_PERCENT = 7
CALC_TEMPERATURE = 8 # trigger processorFpgaTemperatureMean recalculation
CALC_POWER = 9 # trigger processorFpgaPowerMean recalculation
INTERNAL_ALVEO = 10 # configuration details
@dataclass
class EtherPortStat:
"""Class tracking Processor and Connector Ethernet port status."""
sw_name: str = None # comes from Allocator e.g. "p4_01"
sw_port: str = None # e.g. "17/0"
sw_proxy: DeviceProxy = None
sw_dev_name: str = None # e.g. "low-cbf/connector/0"
subscription_id: int = None
proc_ether_locked: bool = False
sw_ether_locked: bool = False
@property
def is_subscribed(self) -> bool:
"""
Check if we are subscribed to Connector port-up events.
:return: True if subscribed, False otherwise
"""
return self.subscription_id is not None
def unsubscribe_connector(self) -> None:
"""
Unsubscribe from Connector port updates.
:return: True if unsubscribed, False otherwise
"""
if all((self.subscription_id, self.sw_proxy)):
self.sw_proxy.unsubscribe_event(self.subscription_id)
return True
return False
@dataclass
class ConnectorPortsUp:
"""
Maintain port-up status for multiple connector devices.
Each switch (connector) has its own entry in the `ports_up` dictionary.
"""
ports_up: dict[str, list[int]] = field(default_factory=dict)
"""Port-up status for each participating switch.
key: Tango device name (e.g. 'low-cbf/connector/0')
value: list of integers; 1 == port is up, otherwise 0
"""
def is_up(self, dev_name: str, port: Union[int, str]) -> bool:
"""
Check if a port is up.
:param dev_name: connector Tango device (e.g. 'low-cbf/connector/0')
:param port: Port number (int) or string (e.g. '17/0')
:return: True if the port is up, False otherwise
"""
if isinstance(port, str):
if (index := port.find("/")) != -1:
port = port[:index]
port = int(port)
# arrray is 0 based, ports are 1 based
port -= 1
if dev_name not in self.ports_up:
return False
all_ports = self.ports_up[dev_name]
return bool(all_ports[port]) if len(all_ports) > port else False
TANGO_SUB_EXC = (TypeError, EventSystemFailed)
"""Exceptions proxy.subscribe_event can throw"""
class LowCbfSubarray(SKASubarray):
"""
Subarray device for Low.CBF
**Properties:**
- Device Property
ControllerAddress
- Tango address of Low.CBF Controller
- Type:'DevString'
AllocatorAddress
- Tango address of allocator device that will handle assignment of
FPGA resources
- Type:'DevString'
"""
def __init__(self, *args, **kwargs):
self._sps_stats_nflags_cache = {}
self._sps_stats_npkts_cache = {}
self._sps_stats_rms_cache = {}
self._sps_stats_vchans_cache = {}
self._sps_stats_update_time = {}
self._sps_stats_push_ctr = 0
self._sps_stats_push_per_archive = 10 # default 1 in 10 archived
self._sps_stats_flags_percent = np.asarray([], dtype=np.int8)
self._sps_stats_rms_xpol = np.asarray([], dtype=np.uint8)
self._sps_stats_rms_ypol = np.asarray([], dtype=np.uint8)
self._sps_stn_sstn = json.dumps([])
self._sps_bm_frq = json.dumps([])
self._proc_stats_mode = {}
"""Dict; key: processor fqdn; value: stats_mode"""
self._proc_delay_stat = {}
"""Dict; key: processor fqdn; value: station_delay_valid"""
self._sdp_host_ready_percent = (0, AttrQuality.ATTR_INVALID)
self._pss_host_ready_percent = (0, AttrQuality.ATTR_INVALID)
self._pst_host_ready_percent = (0, AttrQuality.ATTR_INVALID)
self._proc_fpga_temperatures = {}
"""Per-processor FPGA temperature cache {fqdn: float}"""
self._proc_fpga_powers = {}
"""Per-processor FPGA power cache {fqdn: float}"""
super().__init__(*args, **kwargs)
_health_states = (HealthState.FAILED, HealthState.DEGRADED, HealthState.OK)
"""HealthStates order by severity"""
# Device Properties
ControllerAddress = device_property(
dtype="DevString", default_value="low-cbf/control/0"
)
AllocatorAddress = device_property(
dtype="DevString", default_value="low-cbf/allocator/0"
)
ConnectorDbHost = device_property(
dtype="str", default_value="tango-databaseds"
)
ConnectorDbPort = device_property(dtype="int", default_value=10000)
RmsMinValue = device_property(dtype="int", default_value=10)
RmsMaxValue = device_property(dtype="int", default_value=30)
ProcessorTimeoutMs = device_property(dtype="int", default_value=10_000)
ConnectorTimeoutMs = device_property(dtype="int", default_value=10_000)
AllocatorTimeoutMs = device_property(dtype="int", default_value=10_000)
@attribute(
label="Stations",
doc="Report station & substation membership in subarray",
)
def stations(self) -> str:
"""
Report ``station`` & ``substation`` membership in subarray
"""
return str(self.component_manager.subarray.stations)
@attribute(dtype=DevUShort)
def delaysValid(self) -> DevUShort:
"""Get per-subarray delay polynomial validity value.
Legal values are:
* 0 - INVALID
* 1 - VALID
* 2 - UNUSED
:return: delay validity status as integer
:rtype: DevUShort
"""
return self._delays_valid
def _update_delays_valid(self, fqdn: str, new_delays: list[int]) -> None:
"Update this subarray specific delay polynomial validity status"
assert len(new_delays) >= self._subarray_id
self.logger.info(f"{new_delays}")
sub_id = self._subarray_id - 1 # subarrays are 1-based
# update this processor's contribution
self._delay_contributors[fqdn] = new_delays[sub_id]
for device, value in self._delay_contributors.items():
self.logger.info(f"{device}: {value}")
new_value = min(self._delay_contributors.values())
# pylint: disable=access-member-before-definition
if self._delays_valid != new_value:
self._delays_valid = new_value
self._push_events("delaysValid", new_value)
@attribute(
label="Station Beams",
doc="Report configuration of all Station Beams",
)
def stationBeams(self) -> str:
"""Return the ``stationBeams`` attribute."""
return str(self.component_manager.subarray.station_beams)
@attribute(
dtype="DevString",
label="Pulsar Search Beams",
doc=(
"Each Pulsar Search Beam is associated with one Station Beam, and "
"has additional configuration parameters including a delay "
"polynomial source (supplied via Configure)"
),
)
def pssBeams(self) -> str:
"""Return the ``pssBeams`` attribute."""
return str(self.component_manager.subarray.pss_beams)
@attribute(
label="Pulsar Timing Beams",
doc=(
"Each Pulsar Timing Beam is associated with one Station Beam, and "
"has additional configuration parameters including a delay "
"polynomial source (supplied via Configure)"
),
)
def pstBeams(self) -> str:
"""Return the ``pstBeams`` attribute."""
return str(self.component_manager.subarray.pst_beams)
@attribute(
label="Zoom window IDs",
doc="Report Zooms by window ID",
)
def zooms(self) -> str:
"""Return the ``zooms`` attribute."""
return str(self.component_manager.subarray.zooms)
# We override SKA-Tango-BASE adminMode attribute to prevent it being
# set to OFFLINE while subarray is resourced. This change means that
# a subarray can have resources only in adminMode ONLINE or ENGINEERING
@attribute(
# dtype=AdminMode, ... FIXME Sphinx doesn't like this
memorized=True,
hw_memorized=True,
access=AttrWriteType.READ_WRITE,
)
def adminMode(self: SKABaseDevice) -> AdminMode:
"""
Read the Admin Mode of the device.
It may interpret the current device condition and condition of all
managed devices to set this. Most possibly an aggregate attribute.
:return: Admin Mode of the device
"""
return self._admin_mode
[docs] def write_adminMode(self: SKABaseDevice, value: AdminMode) -> None:
"""
Set the Admin Mode of the device. Overide of ska-tango-base
to prevent being set offline while resourced
:param value: Admin Mode of the device.
:raises ValueError: for unknown adminMode
"""
# Block adminMode change if subarray active but obsState is not EMPTY
# or IDLE. IDLE also allowed - subarrays have empty assignresources.
# Allowing IDLE allows PTC1 to pass. PTC1 changes AdminMode in IDLE
blocked = self._admin_mode in (
AdminMode.ONLINE,
AdminMode.ENGINEERING,
) and not self._is_obs_state((ObsState.IDLE, ObsState.EMPTY))
assert not blocked, "Can't change adminMode when obsState not EMPTY"
value = AdminMode(value)
self._adjust_health(value)
if value == AdminMode.NOT_FITTED:
self.admin_mode_model.perform_action("to_notfitted")
elif value == AdminMode.OFFLINE:
# should health monitoring stop here?
self.admin_mode_model.perform_action("to_offline")
self.component_manager.stop_communicating()
elif value == AdminMode.ENGINEERING:
self.admin_mode_model.perform_action("to_engineering")
self.component_manager.start_communicating()
# propagate healthState in case it was updated while OFFLINE
self._recalculate_health()
elif value == AdminMode.ONLINE:
self.admin_mode_model.perform_action("to_online")
self.component_manager.start_communicating()
self._recalculate_health()
elif value == AdminMode.RESERVED:
self.admin_mode_model.perform_action("to_reserved")
else:
raise ValueError(f"Unknown adminMode {value}")
def _adjust_health(self, admin_mode: AdminMode) -> None:
"""Set ``healthState`` to UNKNOWN when switching to e.g. OFFLINE MODE
Otherwise start health monitoring when in ONLINE or ENGINEERING
:param admin_mode: the ``adminMode`` we are about to transition to
"""
monitoring_now = self.is_monitoring_mode()
monitoring_next = self.is_monitoring_mode(admin_mode)
# if currently not in a monitoring mode but will be after this change
# ensure the monitoring thread is running
if not monitoring_now and monitoring_next:
msg = f"Entering {admin_mode.name} mode: will track healthState"
self.logger.info(msg)
self._start_health_monitoring()
# if currently in a monitoring mode but won't be after this change
# change the healthState to UNKNOWN
elif monitoring_now and not monitoring_next:
msg = f"Entering {admin_mode.name} mode: won't track healthState"
self.logger.info(msg)
self._update_health_state(HealthState.UNKNOWN)
def _start_health_monitoring(self):
"""Start halth monitoring thread if it's not already running but
not in case NO_HEALTH_ROLLUP enivonment variable is set.
"""
if self._no_health_rollup or self._subscribed_to_alloc:
return
self._thrd = Thread(target=self._subscribe_allocator)
self._thrd.start()
self._subscribe_to_connector()
# TBD: Taranta didn't like the name 'assignedProcessors' (:shrug:)
@attribute(dtype="str", doc="List of processors assigned to subarray")
def assigned_processors(self: SKABaseDevice) -> str:
"""
Get a list of processors assigned to the subarray.
:return: JSON string, a list of processor serial numbers
"""
return json.dumps(list(self._assigned_proc_details.keys()))
@attribute(
dtype=int,
doc="Percentage of PSS hosts resolved.",
min_value="0",
max_value="100",
unit="%",
)
def pssHostsResolvedPercent(self):
"""Return the percentage of PSS hosts resolved."""
ready = self._is_obs_state((ObsState.READY, ObsState.SCANNING))
value_from_alloc, qual = self._pss_host_ready_percent
value = value_from_alloc if ready else 0
quality = qual if ready else AttrQuality.ATTR_INVALID
return value, time.time(), quality
def _get_percent_pss_resolved(self) -> (int, AttrQuality):
"""Return the percentage of PSS hosts resolved."""
percentage, quality = 0, AttrQuality.ATTR_INVALID
if "search_beams" in self._subarray_config:
# calcualate percentage only if in "ready" state
if self._is_obs_state((ObsState.READY, ObsState.SCANNING)):
if proxy := self._get_allocator_proxy():
percentage = proxy.GetPSSHostResolved(self._subarray_id)
quality = AttrQuality.ATTR_VALID
return percentage, quality
@attribute(
dtype=int,
doc="Percentage of PST hosts resolved.",
min_value="0",
max_value="100",
unit="%",
)
def pstHostsResolvedPercent(self):
"""Return the percentage of PST hosts resolved."""
ready = self._is_obs_state((ObsState.READY, ObsState.SCANNING))
value_from_alloc, qual = self._pst_host_ready_percent
value = value_from_alloc if ready else 0
quality = qual if ready else AttrQuality.ATTR_INVALID
return value, time.time(), quality
def _get_percent_pst_resolved(self) -> (int, AttrQuality):
"""Return percentage of PST hosts resolved."""
percentage, quality = 0, AttrQuality.ATTR_INVALID
if "timing_beams" in self._subarray_config:
# calcualate percentage only if in "ready" state
if self._is_obs_state((ObsState.READY, ObsState.SCANNING)):
if proxy := self._get_allocator_proxy():
percentage = proxy.GetPSTHostResolved(self._subarray_id)
quality = AttrQuality.ATTR_VALID
return percentage, quality
@attribute(
dtype=int,
doc="Percentage of SDP hosts resolved.",
min_value="0",
max_value="100",
unit="%",
)
def sdpHostsResolvedPercent(self):
"""Return percentage of SDP hosts resolved."""
ready = self._is_obs_state((ObsState.READY, ObsState.SCANNING))
value_from_alloc, qual = self._sdp_host_ready_percent
value = value_from_alloc if ready else 0
quality = qual if ready else AttrQuality.ATTR_INVALID
return value, time.time(), quality
def _get_percent_sdp_resolved(self) -> (int, AttrQuality):
"""Return the percentage of SDP hosts resolved."""
percentage, quality = 0, AttrQuality.ATTR_INVALID
if (
"vis" in self._subarray_config
or "coarse_zooms" in self._subarray_config
):
# calcualate percentage only if in "ready" state
if self._is_obs_state((ObsState.READY, ObsState.SCANNING)):
if proxy := self._get_allocator_proxy():
percentage = proxy.GetSDPHostResolved(self._subarray_id)
quality = AttrQuality.ATTR_VALID
return percentage, quality
@attribute(
dtype="int",
doc=("Percentage of processors ready for scan."),
min_value="0",
max_value="100",
unit="%",
)
def processorsReadyPercent(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of processors ready for subarray scan.
The value is meaningful only when ``obsState`` is READY or SCANNING so
we also return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._proc_ready_percent if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
@attribute(
dtype="int",
doc=("Percentage of station beam delay subscription ready for scan."),
min_value="0",
max_value="100",
unit="%",
)
def stationBeamDelayReadyPercent(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of station beam delay subscriptions ready for subarray scan.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = (
self._station_beam_delay_ready_percent if ready or scanning else 0
)
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
@attribute(
dtype="int",
doc="Percentage of PSS beam delay subscription ready for scan.",
min_value="0",
max_value="100",
unit="%",
)
def pssBeamDelayReadyPercent(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of PSS beam delay subscriptions ready for for subarray scan.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._pss_beam_delay_ready_percent if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
@attribute(
dtype="int",
doc="Percentage of PST beam delay subscription ready for scan.",
min_value="0",
max_value="100",
unit="%",
)
def pstBeamDelayReadyPercent(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of PST beam delay subscriptions ready for for subarray scan.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._pst_beam_delay_ready_percent if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
def _is_obs_state(
self, state: Union[ObsState, Iterable[ObsState]]
) -> bool:
"""Determine if subarray ``obsState`` value is in the specified state.
:param state: can be either a single state or a list of multiple
states e.g. (READY, SCANNING)
:return: True if ``obsState`` is in specified state, False otherwise
"""
if isinstance(state, Iterable):
return self._obs_state in state
return self._obs_state == state
@attribute(
dtype="int",
doc="Percentage of PST Jones applied (Not yet ready).",
min_value="0",
max_value="100",
unit="%",
)
def pstJonesAppliedPercent(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of PST Jones applied.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state((ObsState.READY, ObsState.SCANNING))
valid = ready and self._pst_jones_applied_percent is not None
value = self._pst_jones_applied_percent if valid else 0
quality = AttrQuality.ATTR_VALID if valid else AttrQuality.ATTR_INVALID
return value, time.time(), quality
@attribute(
dtype="int",
doc="Percentage of PSS Jones applied (Not yet ready).",
min_value="0",
max_value="100",
unit="%",
)
def pssJonesAppliedPercent(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of PSS Jones applied.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._pss_jones_applied_percent if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
@attribute(
dtype="int",
doc="Age of applied PST Jones matrices (Not yet ready).",
min_value="0",
unit="seconds",
)
def pstJonesAge(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get the age of latest PST Jones applied.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* age: age of the PST Jones Matrix
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state((ObsState.READY, ObsState.SCANNING))
valid = ready and self._pst_jones_age is not None
value = self._pst_jones_age if valid else 0
quality = AttrQuality.ATTR_VALID if valid else AttrQuality.ATTR_INVALID
return value, time.time(), quality
@attribute(
dtype="int",
doc="Age of applied PSS Jones matrices (Not yet ready)",
min_value="0",
unit="seconds",
)
def pssJonesAge(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get the age of latest PSS Jones applied.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* age: age of the PST Jones Matrix
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._pss_jones_age if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
@attribute(
dtype="int",
doc="Average of SPS RMS signal across all frequencies and stations.",
unit="%",
)
def spsRmsInRange(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get the percentage of SPS RMS in range across all frequencies and stations.
Range is specified in the RmsMinValue and RmsMaxValue properties.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* rms: percentage of SPS RMS signals within range
* time: int
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._sps_rms_in_range if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
@attribute(
dtype=int,
doc="Average of percentage of flagged signal across all frequencies and stations",
min_value=0,
unit="%",
)
def spsFlagsPercentMean(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get the average of percentage of flagged signal across all frequencies and stations.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* flag: average of percentage of flagged signal
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._sps_flags_percent_mean if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
# override of CspSubElementObsDevice _component_state_changed
def _component_state_changed_low(
self,
fault=None,
power=None,
resourced=None,
configured=None,
scanning=None,
obsfault=None, # new here
): # pylint: disable=too-many-arguments
"""Enhanced obsState callback to allow obsState=FAULT"""
# Most state is handled by calling ska-tango-base BaseComponentManager
self._component_state_changed(
fault=fault,
power=power,
resourced=resourced,
configured=configured,
scanning=scanning,
)
# But our new state variable is handled here
if obsfault is not None:
if obsfault:
self.obs_state_model.perform_action("component_obsfault")
else:
pass # (no state change needed when obsfault is cleared)
# General methods
def create_component_manager(self):
"""Create Subarray component manager"""
subarray_name = self.get_name()
subarray_name_parts = subarray_name.split("/")
subarray_id = int(subarray_name_parts[2])
allocator = MccsDeviceProxy(
self.AllocatorAddress,
self.logger,
connect=True,
tango_timeout_ms=self.AllocatorTimeoutMs,
)
tango_iface = SubarrayTangoIface(
update_attr_sps_stats=self.sps_stats_update,
get_admin_mode=self.get_admin_mode,
)
return LowCbfSubarrayComponentManager(
logger=self.logger,
subarray_id=subarray_id,
communication_state_callback=self._communication_state_changed,
component_state_callback=self._component_state_changed_low,
tango_iface=tango_iface,
allocator=allocator,
processor_timeout_ms=self.ProcessorTimeoutMs,
)
def get_admin_mode(self) -> int:
"""
callback providing access to subarray "adminMode"
:return: Which adminMode subarray is in
"""
return self._admin_mode
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
# Commands
def init_command_objects(self):
"""
Initialises the command handlers for commands supported by this device.
"""
# pylint: disable=useless-super-delegation
super().init_command_objects()
class InitCommand(SKASubarray.InitCommand):
"""Init Command object"""
def do(self):
"""
Initialises the attributes and properties of the LowCbfSubarray.
"""
super().do()
self._device._version_id = release.version
self._device._build_state = (
f"{release.name}, {release.version}, {release.description}"
)
# Events to manually be pushed
self._device.set_change_event("healthState", True, False)
self._device._proc_sn_to_fqdn = {}
"""All known Processors { serial number: Processor FQDN }"""
self._device._assigned_proc_details = {}
"""Serial numbers and routing details of Processors assigned to
this subarray e.g.
{ "XFL10NIYKVEU"": {"name": "p4_01", "port": "17/0"}, ...}
"""
self._device._proc_subscriptions = {}
"""Dict of processor subscription IDs (for later unsubscribe)
{ "<alveo_sn>": ( proxy, [ subscription_ids ] ) }
"""
self._device._subscribed_fqdn = []
"""FQDNs of Processors we are subscribed to"""
self._device._eth_port_stat = defaultdict(EtherPortStat)
"""A dictionary keeping track about Ethernet port status.
key: processor fqdn; value: EtherPortStat dataclass
"""
self._device._conn_port = ConnectorPortsUp()
"""Keeps track which Connector ports are up.
An array of integers for each port; value: 1 == port is up
"""
self._device._ether_locked_percent = 0
"""Percentage of Ethernet ports ready for scan."""
self._device._proc_thread = None
"""Thread used to subscribe to processors healthState as they get
assigned to this subarray"""
# TBD expose this as an attribute:
self._device._component_health = {}
"""A dictionary of health for constituent components; e.g.
{
"low-cbf/processor/0" : OK,
"low-cbf/processor/1" : DEGRADED,
"low-cbf/connector/0" : OK
}
"""
self._device._callback_lock = Lock()
"""Serialise callbacks from different devices"""
self._device._delays_valid = UNUSED
"""Overall indicator if all delays in subarray are valid"""
self._device._delay_contributors = {}
"""FQDN: delay_valid map - how much each processor contributes to
overall subarray's 'delaysValid' state"""
self._device._proc_ready_percent = 0
"""Percentage of processors ready for scan"""
self._device._station_beam_delay_ready_percent = 0
"""Percentage of station beam delay subscriptions ready for scan"""
self._device._pss_beam_delay_ready_percent = 0
"""Percentage of pss beam delay subscriptions ready for scan"""
self._device._pst_beam_delay_ready_percent = 0
"""Percentage of pst beam delay subscriptions ready for scan"""
self._device._pst_jones_applied_percent = 0
"""Percentage of PST jones matrices applied"""
self._device._pss_jones_applied_percent = 0
"""Percentage of PSS jones matrices applied"""
self._device._pst_jones_age = 0
"""Age of latest PST jones matrices"""
self._device._pss_jones_age = 0
"""Age of latest PSS jones matrices"""
self._device._pss_jones_information = {}
"""Information for the latest PSS jones matrices"""
self._device._pst_jones_information = {}
"""Information for the latest PST jones matrices"""
self._device._sps_rms_in_range = 0
"""Percentage of RMS values within range for this subarray"""
self._device._sps_flags_percent_mean = 0
"""Mean flag percentage value for this subarray"""
self._device._proc_temperature_mean = 0.0
"""Mean FPGA temperature across assigned processors (degrees C)"""
self._device._proc_power_mean = 0.0
"""Mean FPGA power across assigned processors (Watts)"""
self._device._configured_ok = False
"""Subarray configuration went ok."""
self._device._subarray_config = {}
"""This subarray configuration (from the Allocator)"""
self._device._allocator_proxy = None
"""Tango ProxyDevice for Allocator"""
# A few more attributes that could be pushed on change however
# at present they come from the component_manager so needs more work
# eg: stations, stationbeams, pssbeams, pstbeams, zooms
for attribute_name in (
"state",
"status",
"adminMode",
"healthState",
"controlMode",
"simulationMode",
"testMode",
"obsState",
"obsMode",
"configurationProgress",
"configurationDelayExpected",
"delaysValid",
"assigned_processors",
"processorsReadyPercent",
"stationBeamDelayReadyPercent",
"pssBeamDelayReadyPercent",
"pstBeamDelayReadyPercent",
"pssHostsResolvedPercent",
"pstHostsResolvedPercent",
"sdpHostsResolvedPercent",
"pstJonesAppliedPercent",
"pssJonesAppliedPercent",
"pssJonesAge",
"pstJonesAge",
"processorEthernetLockedPercent",
"spsRmsInRange",
"spsFlagsPercentMean",
"sps_stats_rms_xpol",
"sps_stats_rms_ypol",
"sps_stats_flag_percent",
"sps_beam_freqs",
"sps_station_substations",
"processorFpgaTemperatureMean",
"processorFpgaPowerMean",
):
self._device.set_change_event(attribute_name, True, False)
self._device.set_archive_event(attribute_name, True, False)
my_name = self._device.get_name()
# is there a better way to find my_id ?
self._device._subarray_id = int(my_name.split("/")[-1])
self._device.logger.info(
f"{my_name} ID {self._device._subarray_id}"
)
self._device._subscribed_to_alloc = False
self._device._admin_modes_using_health = [AdminMode.ONLINE]
value = os.getenv(
"ENGINEERING_MODE_IGNORE_HEALTH", default="false"
)
if value.lower() != "true":
self._device._admin_modes_using_health.append(
AdminMode.ENGINEERING
)
self._device._event_q = Queue()
eventloop_fun = self._device._tango_event_completion
self._device._event_thread = Thread(target=eventloop_fun)
self._device._event_thread.start()
# Use env. variable to determine if health rollup is used
# e.i. monitor external devices (proc, connector) health state
env_var = "NO_HEALTH_ROLLUP"
self._device._no_health_rollup = os.getenv(env_var) is not None
message = "LowCbfSubarray init complete"
self._device.logger.info(message)
self._completed()
return ResultCode.OK, message
# this will break in v11 of ska-tango-base
# (and we will get a whole new way of handling resources...)
# device.resource_manager._key = "lowcbf"
#
# try:
# allocator = DeviceProxy(device.AllocatorAddress)
# device.subarray = Subarray(allocator, device._connect)
# # dirty hack to make resource manager have a link to subarray
# # object (for use in assign command)
# device.resource_manager.subarray = device.subarray
#
# message = (
# f"LowCbfSubarray init complete"
# f", using allocator {device.AllocatorAddress}"
# )
# return ResultCode.OK, message
# except Exception as e:
# message = "Exception connecting to Allocator"
# device.logger.error(f"{message}\n{e}")
# return ResultCode.FAILED, message
# Inherited partially-implemented commands from ska-tango-base.subarray
# ==== Command ==== ==CommandObject== == method ==
# AssignResources "AssignResources" "assign"
# ReleaseResources "ReleaseResources" "release"
# Configure - replaced in ska-tango-base.csp.subarray -
# ReleaseAllResources "ReleaseAllResources" "release"
# Scan "Scan" "scan"
# EndScan "EndScan" "end_scan"
# End - replaced in ska-tango-base.csp.subarray -
# Abort "Abort" abort (not submitted)
# ObsReset "ObsReset" "obsreset"
# Restart "Restart" "restart"
# Inherited partially-implemented command from ska-tango-base:csp.subarray
# ConfigureScan "ConfigureScan"
# Configure - calls ConfigureScan
# GoToIdle "GoToIdle"
# End - calls GoToIdle
def _subscribe_to_connector(self):
"""Subscribe to healthState attribute changes on all connectors"""
if (db := self._get_conn_db()) is None:
return
prefix = f"{self.ConnectorDbHost}:{self.ConnectorDbPort}/"
dev_names = db.get_device_exported_for_class("LowCbfConnector")
for fqdn in dev_names:
full_path = prefix + fqdn
try:
if (proxy := DeviceProxy(full_path)) is None:
self.logger.warning("%s proxy FAILED", full_path)
continue
except DevFailed as e:
self.logger.warning("SUBSCR. EXCEPTION %s %s", full_path, e)
continue
self.logger.info(f"SUBSCRIBING to {full_path}")
proxy.subscribe_event(
"healthState",
EventType.CHANGE_EVENT,
# keep format compatible with processor subscription:
partial(self._health_callback, fqdn),
)
def _flush_alveos(self):
"Purge Alveo details on ReleaseResources event"
# pylint: disable=access-member-before-definition
self.logger.info("flush all Alveos")
for sn in self._proc_subscriptions.keys():
if sn not in self._proc_sn_to_fqdn:
continue
fqdn = self._proc_sn_to_fqdn[sn]
# this processor's delays are no longer used but we won't get
# its events as we are unsubscribing from them
delays_summary = [UNUSED] * self._subarray_id
self._update_delays_valid(fqdn, delays_summary)
if fqdn in self._component_health:
self._component_health.pop(fqdn)
self._delay_contributors = {}
# unsubscribe from Connector diagnostics_port_up updates
for item in self._eth_port_stat.values():
if item.unsubscribe_connector():
self.logger.info("Unsubscribed port %s", item.sw_port)
self._assigned_proc_details = {}
self._push_events("assigned_processors", "[]")
self._subscribed_fqdn = []
self._eth_port_stat = defaultdict(EtherPortStat)
"""Keep track of processor/connector Ethernet port status"""
self._proc_stats_mode = {}
self._proc_fpga_temperatures.clear()
self._proc_fpga_powers.clear()
# Unsubscribe from all processor tango attributes
serials = list(self._proc_subscriptions.keys())
for serial in serials:
dev_proxy, sub_ids = self._proc_subscriptions.pop(serial)
for sub_id in sub_ids:
dev_proxy.unsubscribe_event(sub_id)
del dev_proxy
alveo_serials = list(self._proc_subscriptions.keys())
txt = f"Monitor Processor health: {alveo_serials}"
self.logger.info(txt)
def _enqueue_event(
self, event: Evt, data: Union[None, Any] = None
) -> None:
"""Add event (and optionally associated data) to a queue to be handled
by a separate thread
"""
# obsState change could call this before things are initialised
if hasattr(self, "_event_q"):
self._event_q.put((event, data))
def is_monitoring_mode(self, mode: Union[AdminMode, None] = None) -> bool:
"""Determine if we are in monitoring mode
In monitoring mode we keep track of healthState and report its
changes.
ONLINE and ENGINEERING admin modes are considered monitoring while
NOT_FITTED, OFFLINE and RESERVED are not.
See RtD https://is.gd/nCuv1q
:param mode: adminMode we are about to transition to; when not
supplied use the current ``self._admin_mode`` value instead
:return: True when in monitoring mode; False otherwise
"""
value = self._admin_mode if mode is None else mode
return value in self._admin_modes_using_health
# ----------
# Callbacks
# ----------
def _fqdn_callback(self, name: str, jsonstr: str, quality) -> None:
"""Gets called when processor fqdn list gets updated in Allocator"""
# pylint: disable=unused-argument
self.logger.info(f"from ALLOCATOR {name} {jsonstr}")
procs = json.loads(jsonstr)
self._proc_sn_to_fqdn = procs
def _internal_alveo_callback(self, attr_name: str, jsonstr: str, quality):
"""Allocator calls this with information of mapping between Alveo
serial number and a subarray it is allocated to"""
# pylint: disable=unused-argument
self._enqueue_event(Evt.INTERNAL_ALVEO, jsonstr)
def _process_internal_alveo(self, jsonstr: str):
"""Process Subarray configuration from Allocator.internal_avleo."""
if not (cfg_dict := json.loads(jsonstr)):
self._enqueue_event(Evt.ALVEOS_RELEASED)
return
log = self.logger
log.info(f"ALLOCATOR CFG {jsonstr}")
# handle ONLY configuration relevant to THIS subarray
# probably the bit: "sa_id": 1 in cfg
assigned_procs = {}
for k, v in cfg_dict.items():
# check expected dict keys
for mandatory_key in ("regs", "switch"):
if mandatory_key not in v:
log.warning("MISSING %s key in %s", mandatory_key, k)
return
# NOTE PST and CORR have different key names
sub_key_names = ("sa_id", "subarray_id")
for subarray in v["regs"]:
if not subarray:
# skip empty register dicts representing unused subarray capacity
continue
if all(_ not in subarray for _ in sub_key_names):
log.warning("MISSING subarray ID")
continue
# get the subarray number using one of possible dict keys
for sub_key in sub_key_names:
if sub_key in subarray:
subarray_id = subarray[sub_key]
break
if self._subarray_id == subarray_id:
# bundle Alveo S/N and associated switch port together
evt_data = {k: v["switch"]}
assigned_procs.update(evt_data)
# 15-Jan-2024: formats are messy:
# -------------------------------
# for CORR:
# {"XFL1TJCHM3ON": {
# "fw": "vis:0.0.2-main.e4b5ad79",
# "regs": [
# {
# "sa_id": 4,
# "stns": [[345, 1], [350, 1], [352, 1], [355, 1], [431, 1], [434, 1]],
# "sa_bm": [[1, 140, 0, 27648, 192, 24, 0]]
# }
# ]}}
# for PST:
# {"XFL1XCRTUC22 ": {
# "fw": "pst:0.0.20-dev.7c13dd33",
# "regs": [
# {
# "subarray_id": 4,
# "stn_bm_id": 1,
# "stns": [[2, 1], [3, 1], [345, 1], [350, 1], [352, 1], [355, 1], [431, 1], [434, 1 ]], "freq_ids": [140, 141, 142, 143, 144, 145, 146, 147], "pst_bm_ids": [15]}, {}, {}]}}
# publish change in assigned processors (if any)
if assigned_procs != self._assigned_proc_details:
self._enqueue_event(Evt.ALVEOS_ASSIGNED, assigned_procs)
def _internal_subarray_callback(self, fqdn: str, jsonstr: str, quality):
"""Allocator event with subarray configuration details.
:param jsonstr: dict with
* key = ``subarray_id`` number
* value = subarray definition (not used here)
Warning: This is a Tango event callback. Don't do any work here
"""
# pylint: disable=unused-argument
configured_subarrays = json.loads(jsonstr)
# NOTE: in transport subarray ID was converted to string
sub_id_str = str(self._subarray_id)
config_ok = sub_id_str in configured_subarrays
self.logger.info("subarray configured ok: %s", config_ok)
self._subarray_config = (
configured_subarrays[sub_id_str] if config_ok else {}
)
# update dependencies if changed
# pylint: disable=access-member-before-definition
if self._configured_ok != config_ok:
self._configured_ok = config_ok
self._enqueue_event(Evt.CALC_PROC_PERCENT)
self._enqueue_event(Evt.CALC_ETHER_PERCENT)
def _health_callback(self, fqdn: str, evt: EventData) -> None:
"""Called by LowCbfProcessor or LowCbfConnector when their health state
changes.
Enqueue event for processing in a separate thread.
"""
self._enqueue_event(Evt.PROCESS_HEALTH, (fqdn, evt))
def _process_health(self, fqdn: str, evt: EventData) -> None:
"""Process LowCbfProcessor or LowCbfConnector health state change.
Will trigger processorsReadyPercent recalculation.
:param fqdn: device whose healthState changed
:param evt: health event details
"""
if self.is_evt_error(evt, "_process_health"):
return
name = evt.attr_value.name
value = evt.attr_value.value
is_processor = fqdn.find("/processor/") != -1
self.logger.info(f"HEALTH {fqdn} {name}: {value}")
self._component_health[fqdn] = value
# _recalculate_health ensures healthState is updated/propagated
# only when subarray is ONLINE (or ENGINEERING - depending on
# environment variable value)
self._recalculate_health()
if is_processor:
self._calc_proc_scan_quality()
def _recalculate_health(self):
"""Recalculate overall health state (across all devices) and
propagate change upwards (Controller) if needed"""
if not self.is_monitoring_mode():
return
current_health = HealthState.UNKNOWN
for hs in self._health_states:
if hs in self._component_health.values():
current_health = hs
break
if current_health != self._health_state:
self._update_health_state(current_health)
def _conn_port_up_cb(self, tango_evt: EventData) -> None:
"""
Handle a change event for a connector's port-up status.
Updates the internal record of port status for participating
connectors and triggers a recalculation of the overall Ethernet
locked percentage.
:param tango_evt: The Tango event data
Warning: This is a Tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_conn_port_up_cb"):
return
tango_dev_name = tango_evt.device.name()
value = tango_evt.attr_value.value
self.logger.info("%s ETHERNET %s", tango_dev_name, value)
self._conn_port.ports_up[tango_dev_name] = tango_evt.attr_value.value
self._enqueue_event(Evt.CALC_ETHER_PERCENT)
def _calc_proc_scan_quality(self) -> None:
"""Reevaluate percentage of processors ready for scan.
Do nothing if ``obsState`` is other than READY or SCANNING
Side effects:
* update self._proc_ready_percent, see processorsReadyPercent attribute
* generate event for subscribers
"""
def notify_not_ready(valid: bool) -> None:
"""Notify subscribers processors are not ready.
Even though processors are not ready (for whatever reason)
subscribers need to know about the new state.
:param valid: True if attr. quality is VALID, False otherwise
"""
if self._proc_ready_percent == 0:
return
self._proc_ready_percent = 0
self._push_events(
"processorsReadyPercent", self._proc_ready_percent, valid
)
self.logger.info("Chk proc percent in %s", self._obs_state)
if not self._configured_ok:
self.logger.info("subarray not configured")
notify_not_ready(valid=False)
return
if not self._is_obs_state(
(ObsState.READY, ObsState.SCANNING, ObsState.CONFIGURING)
):
notify_not_ready(valid=False)
return # nothing to do
proc_total_count = len(self._assigned_proc_details)
if proc_total_count < 1:
notify_not_ready(valid=True)
return
proc_ok_count = 0
for sn in self._assigned_proc_details:
if (fqdn := self._proc_sn_to_fqdn.get(sn)) is None:
self.logger.warning("NO fqdn for %s", sn)
continue
# check PROC FW loaded + is healthy
if (
stats_mode := self._proc_stats_mode.get(fqdn)
) is None or not stats_mode["ready"]:
continue
if self._component_health.get(fqdn) == HealthState.OK:
proc_ok_count += 1
new_ready_percent = round(100 * proc_ok_count / proc_total_count + 0.5)
# pylint: disable=access-member-before-definition
self.logger.info(
"processorsReadyPerc: new:%d, old:%d, count_ok:%d; count_all: %d",
new_ready_percent,
self._proc_ready_percent,
proc_ok_count,
proc_total_count,
)
if new_ready_percent != self._proc_ready_percent:
self._proc_ready_percent = new_ready_percent
self._push_events("processorsReadyPercent", new_ready_percent)
def _calc_ether_port_percent(self) -> None:
"""
Recalculate the overall percentage of locked Ethernet ports.
Calculates the percentage based on both processor and switch
port lock status for all assigned processors. Result is
pushed as a Tango change event.
"""
def notify_not_locked() -> None:
"""Notify subscribers Ethernet ports are not ready.
Even though Ethernet ports are not ready (e.g no longer READY
state) subscribers need to know about the new (changed) state.
"""
if self._ether_locked_percent is None:
return
self._ether_locked_percent = None
self._push_events("processorEthernetLockedPercent", 0, valid=False)
self.logger.info("Chk Ether percent in %s", self._obs_state)
if not self._is_obs_state(
(ObsState.READY, ObsState.SCANNING, ObsState.CONFIGURING)
):
notify_not_locked()
return
# the number of Ethernet links we track
if (count_all := len(self._eth_port_stat)) == 0:
return
if not self._configured_ok:
notify_not_locked()
return
# determine if corresponding switch side Ethernet port(s) are up
for stat in self._eth_port_stat.values():
dev, port = stat.sw_dev_name, stat.sw_port
stat.sw_ether_locked = self._conn_port.is_up(dev, port)
# this should be so rare that it's worth logging:
if not stat.sw_ether_locked:
self.logger.warning("SW:%s PORT:%s NOT LOCKED", dev, port)
# the number of switch<-->Alveo links that are up:
count_ok = sum(
(int(all((i.proc_ether_locked, i.sw_ether_locked))))
for i in self._eth_port_stat.values()
)
percent = round(100 * count_ok / count_all + 0.5)
self.logger.info(
"NEW ETHER percent: %d, count_ok:%d, count_all:%d",
percent,
count_ok,
count_all,
)
if self._ether_locked_percent != percent:
self._ether_locked_percent = percent
self._push_events("processorEthernetLockedPercent", percent)
def _update_obs_state(self, obs_state: ObsState) -> None:
"""Override SKAObsDevice class implementation as we need to know about
``obsState`` changes.
"""
super()._update_obs_state(obs_state)
# some Tango attributes will depend on the obsState value
for evt in (
Evt.CALC_PROC_PERCENT,
Evt.CALC_ETHER_PERCENT,
Evt.CALC_HOSTLINK_PERCENT,
):
self._enqueue_event(evt)
def _poly_delay_callback(self, fqdn: str, tango_evt) -> None:
if self.is_evt_error(tango_evt, "_poly_delay_callback"):
return
value = tango_evt.attr_value.value
self._enqueue_event(Evt.DELAY_POLY_SUMMARY, (fqdn, value))
# ----------
# Threads
# ----------
def _subscribe_allocator(self):
"""A short lived thread to subscribe to Allocator attributes informing
us which Alveo cards were assigned to this subarray
"""
alloc_proxy = MccsDeviceProxy(
self.AllocatorAddress,
self.logger,
connect=False,
tango_timeout_ms=self.AllocatorTimeoutMs,
)
attribute_callbacks = (
("procDevFqdn", self._fqdn_callback),
("internal_alveo", self._internal_alveo_callback),
("internal_subarray", self._internal_subarray_callback),
("ip_to_resolve", self._ip_to_resolve_callback),
)
for att, cb in attribute_callbacks:
alloc_proxy.evt_sub_on_connect(att, cb)
try:
alloc_proxy.connect()
self._subscribed_to_alloc = True
except DevFailed as e:
self.logger.error("_subscribe_allocator failed: %s", e)
def _ip_to_resolve_callback(self, attr: str, data, qual: AttrQuality):
"""Allocator callback for ip_to_resolve attrribute.
Update host resolution percentages.
"""
# pylint: disable=unused-argument
self._enqueue_event(Evt.CALC_HOSTLINK_PERCENT)
def _calc_hostlink_percent(self):
"""Fetch percentage of ARP resolved hosts from Allocator.
Notify subscribers, update archive.
"""
sdp_host_ready_percent = self._get_percent_sdp_resolved()
pss_host_ready_percent = self._get_percent_pss_resolved()
pst_host_ready_percent = self._get_percent_pst_resolved()
qual = (
AttrQuality.ATTR_VALID
if self._is_obs_state((ObsState.READY, ObsState.SCANNING))
else AttrQuality.ATTR_INVALID
)
now = time.time()
changes = {}
if self._sdp_host_ready_percent != sdp_host_ready_percent:
self._sdp_host_ready_percent = sdp_host_ready_percent
evt = (sdp_host_ready_percent[0], now, qual)
changes["sdpHostsResolvedPercent"] = evt
if self._pss_host_ready_percent != pss_host_ready_percent:
self._pss_host_ready_percent = pss_host_ready_percent
evt = (pss_host_ready_percent[0], now, qual)
changes["pssHostsResolvedPercent"] = evt
if self._pst_host_ready_percent != pst_host_ready_percent:
self._pst_host_ready_percent = pst_host_ready_percent
evt = (pst_host_ready_percent[0], now, qual)
changes["pstHostsResolvedPercent"] = evt
for attr_name, evt in changes.items():
self.logger.info("%s CHANGED: %s", attr_name, evt[0])
self.push_change_event(attr_name, *evt)
self.push_archive_event(attr_name, *evt)
def _tango_event_completion(self):
"""A thread to decouple (nontrivial) Tango event processing from
callback in which it was reported.
"""
while True:
event, data = self._event_q.get()
if event == Evt.ALVEOS_RELEASED:
# resources are released: flush all references to Alveos
self._flush_alveos()
self._recalculate_health()
self.clear_sps_stats_cache()
self._calc_proc_scan_quality()
self._calc_ether_port_percent()
elif event == Evt.ALVEOS_ASSIGNED:
# data is a dict:
# {"alveo_SN": {"name": "p4_01", "port": "17/0"}, ...}
self._assigned_proc_details = data
# keep track of which sw. port is associated with which Alveo
for sn, sw in data.items():
fqdn = self._proc_sn_to_fqdn[sn]
if fqdn not in self._eth_port_stat:
sw_name, port = sw["name"], sw["port"]
msg = f"ASSOC {fqdn} with sw:{sw_name} port:{port}"
self.logger.info(msg)
self._eth_port_stat[fqdn] = EtherPortStat(
sw_name, port
)
proc_sn_str = json.dumps(list(data.keys()))
self._push_events("assigned_processors", proc_sn_str)
# ensure the thread handling Processor attribute subscriptions
# is alive
# pylint: disable=access-member-before-definition
proc_count = len(self._assigned_proc_details)
if proc_count > 0 and not self._proc_thread:
subscription_loop = self._check_subscribe_processors
self._proc_thread = Thread(target=subscription_loop)
self._proc_thread.start()
elif event == Evt.DELAY_POLY_SUMMARY:
self._update_delays_valid(*data)
elif event == Evt.CALC_PROC_PERCENT:
self._calc_proc_scan_quality() # extract these into a table
elif event == Evt.CALC_ETHER_PERCENT:
self._calc_ether_port_percent()
elif event == Evt.PROCESS_HEALTH:
self._process_health(*data)
elif event == Evt.CALC_HOSTLINK_PERCENT:
self._calc_hostlink_percent()
elif event == Evt.CALC_TEMPERATURE:
self._calc_temperature_mean()
elif event == Evt.CALC_POWER:
self._calc_power_mean()
elif event == Evt.INTERNAL_ALVEO:
self._process_internal_alveo(data)
else:
self.logger.error("Unknown event %s", event)
self._event_q.task_done()
def _check_subscribe_processors(self):
"""
Update subscriptions to processor Tango attribute changes.
As allocator assigns and removes processors used by this subarray,
subscriptions to attrs of new processors need to be created and
subscriptions to processors no longer in use deleted
"""
proc_attr_callback = (
("subarrayDelaysSummary", self._poly_delay_callback),
("healthState", self._health_callback),
# Receive SPS statistics from processors used by subarray
("stats_sps_npkts", self._update_sps_stats_npkts),
("stats_sps_nflags", self._update_sps_stats_nflags),
("stats_sps_rms", self._update_sps_stats_rms),
("stats_sps_vchans", self._update_sps_stats_vchans),
("stats_mode", self._update_stats_mode),
("stationDelayValid", self._beam_delay_callback),
# TODO: needs to implement those attributes in processor
("stats_jones_update", self._update_stats_pst_jones),
# ("stats_pss_jones", self._update_stats_pss_jones),
("stats_ethernet_status", self._update_proc_ether_port),
# Receive hardware monitoring from processors used by subarray
("hardware_fpga_temperature", self._update_fpga_temperature),
("hardware_fpga_power", self._update_fpga_power),
)
"""Processor attribute name, event callback to be called with fqdn & event."""
while True:
time.sleep(1)
assigned_procs = self._assigned_proc_details # short alias
changes = False
# subscribe to each newly assigned processor
for sn in [
p
for p in assigned_procs
if p not in self._proc_subscriptions.keys()
]:
if sn not in self._proc_sn_to_fqdn:
self.logger.info("CAN'T FIND %s", sn)
continue
# Get Tango proxy for new processor
fqdn = self._proc_sn_to_fqdn[sn]
try:
new_proxy = DeviceProxy(fqdn)
new_proxy.set_timeout_millis(self.ProcessorTimeoutMs)
except DevFailed:
# Processors may be unresponsive if loading firmware
continue
# Subscribe to Tango attributes listed in subs above
sub_ids = []
try:
for attr_name, cbk in proc_attr_callback:
sub_id = new_proxy.subscribe_event(
attr_name,
EventType.CHANGE_EVENT,
partial(cbk, fqdn),
stateless=True,
)
sub_ids.append(sub_id)
except EventSystemFailed:
# Tango docs lie about this exception never occurring
for sub_id in sub_ids:
new_proxy.unsubscribe_event(sub_id)
del new_proxy
continue
# Success with subscription
self._proc_subscriptions[sn] = (new_proxy, sub_ids)
self._subscribed_fqdn.append(fqdn)
changes = True
# unsubscribe from any no-longer-used processors
for sn in [
p
for p in self._proc_subscriptions.keys()
if p not in assigned_procs
]:
old_proxy, sub_ids = self._proc_subscriptions.pop(sn)
for sub_id in sub_ids:
old_proxy.unsubscribe_event(sub_id) # TODO can throw??
del old_proxy
fqdn = self._proc_sn_to_fqdn[sn]
self._subscribed_fqdn.remove(fqdn)
changes = True
if changes:
subscribed_procs = list(self._proc_subscriptions.keys())
txt = f"Monitor Processor health: {subscribed_procs}"
self.logger.info(txt)
# Subscribe to Connector port_up attribute associated with proc.
for sn, item in self._assigned_proc_details.items():
# _assigned_proc_details:
# { "XFL10NIYKVEU"": {"name": "p4_01", "port": "17/0"}, ...}
fqdn = self._proc_sn_to_fqdn[sn]
if (eth_stat := self._eth_port_stat.get(fqdn)) is None:
continue
if eth_stat.is_subscribed:
continue
switch_name, port = item["name"], item["port"]
proxy = self._get_switch_proxy(switch_name)
if proxy is None:
self.logger.warning(
"Failed to get switch %s proxy", switch_name
)
continue
try:
# Tango event can arrive really fast so be ready:
self._eth_port_stat[fqdn].sw_proxy = proxy
self._eth_port_stat[fqdn].sw_dev_name = proxy.name()
subscr_id = proxy.subscribe_event(
"diagnostics_port_up",
EventType.CHANGE_EVENT,
self._conn_port_up_cb,
)
self._eth_port_stat[fqdn].subscription_id = subscr_id
except TANGO_SUB_EXC as exc:
del proxy
self._eth_port_stat[fqdn].sw_proxy = None
msg = f"Failed to subscribe to {switch_name} {port} {exc}"
self.logger.warning(msg)
continue
self._eth_port_stat[fqdn].sw_proxy = proxy
self._eth_port_stat[fqdn].subscription_id = subscr_id
self._eth_port_stat[fqdn].sw_dev_name = proxy.name()
self.logger.info("Subscribed to %s port %s", switch_name, port)
def _update_sps_stats_npkts(self, fqdn: str, tango_evt) -> None:
"""
Callback receives npkts statistics from a processor.
Warning: This is a tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_sps_stats_npkts"):
return
self._sps_stats_npkts_cache[fqdn] = tango_evt.attr_value.value
def _update_sps_stats_nflags(self, fqdn: str, tango_evt) -> None:
"""
Callback receives nflags statistics from a processor.
Warning: This is a tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_sps_stats_nflags"):
return
self._sps_stats_nflags_cache[fqdn] = tango_evt.attr_value.value
def _update_sps_stats_rms(self, fqdn: str, tango_evt) -> None:
"""
Callback receives rms statistics from a processor.
Warning: This is a tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_sps_stats_rms"):
return
np_rms = tango_evt.attr_value.value
self._sps_stats_rms_cache[fqdn] = np_rms.T # fix Tango to NP order
def _update_stats_mode(self, fqdn: str, tango_evt) -> None:
"""
Callback receives details about processor state change.
Warning: This is a Tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_stats_mode"):
return
value_str = tango_evt.attr_value.value
self.logger.info("stats_mode from %s: %s", fqdn, value_str)
value = json.loads(value_str)
self._proc_stats_mode[fqdn] = value
self._enqueue_event(Evt.CALC_PROC_PERCENT)
self._enqueue_event(Evt.CALC_ETHER_PERCENT)
def _update_fpga_temperature(self, fqdn: str, tango_evt) -> None:
"""
Callback receives FPGA temperature from a processor.
Warning: This is a Tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_fpga_temperature"):
return
with self._callback_lock:
self._proc_fpga_temperatures[fqdn] = tango_evt.attr_value.value
self._enqueue_event(Evt.CALC_TEMPERATURE)
def _update_fpga_power(self, fqdn: str, tango_evt) -> None:
"""
Callback receives FPGA power from a processor.
Warning: This is a Tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_fpga_power"):
return
with self._callback_lock:
self._proc_fpga_powers[fqdn] = tango_evt.attr_value.value
self._enqueue_event(Evt.CALC_POWER)
def _calc_float_mean(
self, cache: dict, state_attr: str, tango_attr: str
) -> None:
"""Recalculate mean of a per-processor float cache and push events.
Used by ``_calc_temperature_mean`` and ``_calc_power_mean``.
:param cache: per-FQDN value dict (e.g. ``_proc_fpga_temperatures``)
:param state_attr: name of the instance variable holding current mean
:param tango_attr: Tango attribute name to push events on
"""
# pylint: disable=access-member-before-definition
if not self._is_obs_state((ObsState.READY, ObsState.SCANNING)):
if getattr(self, state_attr) != 0.0:
setattr(self, state_attr, 0.0)
data = (0.0, time.time(), AttrQuality.ATTR_INVALID)
self.push_change_event(tango_attr, *data)
self.push_archive_event(tango_attr, *data)
return
with self._callback_lock:
values = [
v for fqdn, v in cache.items() if fqdn in self._subscribed_fqdn
]
if not values:
return
new_mean = sum(values) / len(values)
if new_mean != getattr(self, state_attr):
setattr(self, state_attr, new_mean)
data = (new_mean, time.time(), AttrQuality.ATTR_VALID)
self.push_change_event(tango_attr, *data)
self.push_archive_event(tango_attr, *data)
def _calc_temperature_mean(self) -> None:
"""Recalculate mean FPGA temperature across assigned processors."""
self._calc_float_mean(
self._proc_fpga_temperatures,
"_proc_temperature_mean",
"processorFpgaTemperatureMean",
)
def _calc_power_mean(self) -> None:
"""Recalculate mean FPGA power across assigned processors."""
self._calc_float_mean(
self._proc_fpga_powers,
"_proc_power_mean",
"processorFpgaPowerMean",
)
def _beam_delay_callback(self, fqdn: str, tango_evt) -> None:
"""
Callback receives details about processor state change.
"""
# pylint: disable=access-member-before-definition
if self.is_evt_error(tango_evt, "_beam_delay_callback"):
return
value_str = tango_evt.attr_value.value
self.logger.info("stationDelayValid from %s: %s", fqdn, value_str)
value = json.loads(value_str)
sub_id = self._subarray_id
expected_fqdn = self._subscribed_fqdn
self._proc_delay_stat[fqdn] = value
various_delay_percentage = self._calculate_valid_delay_percentages(
self._proc_delay_stat,
sub_id,
expected_fqdn,
)
beam_valid_delay_percentage = 0
pst_valid_delay_percentage = 0
pss_valid_delay_percentage = 0
total_nb_beam = 0
total_nb_pst = 0
total_nb_pss = 0
delay_items = [
delay_results["total_beams"]
for delay_results in various_delay_percentage.values()
]
max_nb_beam = max(delay_items) if delay_items else 0
delay_items = [
delay_results["total_pst"]
for delay_results in various_delay_percentage.values()
]
max_nb_pst = max(delay_items) if delay_items else 0
delay_items = [
delay_results["total_pss"]
for delay_results in various_delay_percentage.values()
]
max_nb_pss = max(delay_items) if delay_items else 0
for delay_results in various_delay_percentage.values():
nb_beam = (
delay_results["total_beams"]
if delay_results["total_beams"] != 0
else max_nb_beam
)
nb_pst = (
max_nb_pst
if delay_results["total_pst"] == 0
and delay_results["total_beams"] == 0
else delay_results["total_pst"]
)
nb_pss = (
max_nb_pss
if delay_results["total_pss"] == 0
and delay_results["total_beams"] == 0
else delay_results["total_pss"]
)
total_nb_beam += nb_beam
total_nb_pst += nb_pst
total_nb_pss += nb_pss
beam_valid_delay_percentage += (
nb_beam * delay_results["beam_valid_delay_percentage"]
)
pst_valid_delay_percentage += (
nb_pst * delay_results["pst_valid_delay_percentage"]
)
pss_valid_delay_percentage += (
nb_pss * delay_results["pss_valid_delay_percentage"]
)
nb_fqdn = len(expected_fqdn)
station_beam_delay_ready_percent = 0
pss_beam_delay_ready_percent = 0
pst_beam_delay_ready_percent = 0
if nb_fqdn > 0:
station_beam_delay_ready_percent = (
round(beam_valid_delay_percentage / total_nb_beam)
if total_nb_beam > 0
else 0
)
pss_beam_delay_ready_percent = (
int(pss_valid_delay_percentage / total_nb_pss)
if total_nb_pss > 0
else 0
)
pst_beam_delay_ready_percent = (
int(pst_valid_delay_percentage / total_nb_pst)
if total_nb_pst > 0
else 0
)
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
if ready or scanning:
if (
station_beam_delay_ready_percent
!= self._station_beam_delay_ready_percent
):
self._station_beam_delay_ready_percent = (
station_beam_delay_ready_percent
)
self._push_events(
"stationBeamDelayReadyPercent",
station_beam_delay_ready_percent,
)
if (
pss_beam_delay_ready_percent
!= self._pss_beam_delay_ready_percent
):
self._pss_beam_delay_ready_percent = (
pss_beam_delay_ready_percent
)
self._push_events(
"pssBeamDelayReadyPercent", pss_beam_delay_ready_percent
)
if (
pst_beam_delay_ready_percent
!= self._pst_beam_delay_ready_percent
):
self._pst_beam_delay_ready_percent = (
pst_beam_delay_ready_percent
)
self._push_events(
"pstBeamDelayReadyPercent", pst_beam_delay_ready_percent
)
self.logger.info(f"Delay Total: {various_delay_percentage}")
def _calculate_valid_delay_percentages(
self, data, subarray_id, expected_fqdns
):
"""
Calculate valid delay percentage using information from the processors.
"""
results = {}
for fqdn_name in expected_fqdns:
if fqdn_name in data:
subarrays = data[fqdn_name]
subarray = next(
(
sa
for sa in subarrays
if sa["subarray_id"] == subarray_id
),
None,
)
if not subarray:
results[fqdn_name] = {
"subarray_id": subarray_id,
"beam_valid_delay_percentage": 0.0,
"pst_valid_delay_percentage": 0.0,
"pss_valid_delay_percentage": 0.0,
"total_beams": 0,
"valid_beams": 0,
"total_pst": 0,
"valid_pst": 0,
"total_pss": 0,
"valid_pss": 0,
"note": f"Subarray {subarray_id} not found in {fqdn_name}. Template used.",
}
continue
beams = subarray["beams"]
total_beams = len(beams)
valid_beams = sum(
1 for beam in beams if beam.get("valid_delay", False)
)
total_pst = 0
valid_pst = 0
for beam in beams:
if "pst" in beam:
total_pst += len(beam["pst"])
valid_pst += sum(
1
for pst_device in beam["pst"]
if pst_device.get("valid_delay", False)
)
total_pss = 0
valid_pss = 0
for beam in beams:
if "pss" in beam:
total_pss += len(beam["pss"])
valid_pss += sum(
1
for pss_device in beam["pss"]
if pss_device.get("valid_delay", False)
)
beam_valid_percentage = (
(valid_beams / total_beams * 100) if total_beams > 0 else 0
)
pst_valid_percentage = (
(valid_pst / total_pst * 100) if total_pst > 0 else 0
)
pss_valid_percentage = (
(valid_pss / total_pss * 100) if total_pss > 0 else 0
)
results[fqdn_name] = {
"subarray_id": subarray_id,
"beam_valid_delay_percentage": beam_valid_percentage,
"pst_valid_delay_percentage": pst_valid_percentage,
"pss_valid_delay_percentage": pss_valid_percentage,
"total_beams": total_beams,
"valid_beams": valid_beams,
"total_pst": total_pst,
"valid_pst": valid_pst,
"total_pss": total_pss,
"valid_pss": valid_pss,
}
else:
# Template for missing FQDMs
results[fqdn_name] = {
"subarray_id": subarray_id,
"beam_valid_delay_percentage": 0.0,
"pst_valid_delay_percentage": 0.0,
"pss_valid_delay_percentage": 0.0,
"total_beams": 0,
"valid_beams": 0,
"total_pst": 0,
"valid_pst": 0,
"total_pss": 0,
"valid_pss": 0,
"note": f"{fqdn_name} not found in input data. Template used.",
}
return results
def _update_stats_pst_jones(self, fqdn: str, tango_evt) -> None:
if self.is_evt_error(tango_evt, "_update_stats_pst_jones"):
return
value_str = tango_evt.attr_value.value
self.logger.info("stats_pst_jones from %s: %s", fqdn, value_str)
value = json.loads(value_str)
sub_id = self._subarray_id
expected_fqdn = self._subscribed_fqdn
self._pst_jones_information[fqdn] = value
if "timing_beams" in self._subarray_config:
total_beams = len(self.component_manager.subarray.pst_beams)
various_delay_percentage = self._calculate_jones_information(
self._pst_jones_information, sub_id, expected_fqdn, total_beams
)
total_beams_reported = 0
average_age = 0
for values in various_delay_percentage.values():
total_beams_reported += values["total_pst_beams_with_time"]
nb_beam_with_age = 0
for age in values["avg_age_per_pst_beam"].values():
nb_beam_with_age += 1
average_age += age
# if we don't know we assume a 1-hour age for the beam jones
average_age += (total_beams - nb_beam_with_age) * 3600
divisor = total_beams * len(expected_fqdn)
pst_jones_applied_percent = (
round(100 * total_beams_reported / divisor)
if divisor
else None
)
pst_jones_age = round(average_age / divisor) if divisor else None
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
if ready or scanning:
if (
pst_jones_applied_percent
!= self._pst_jones_applied_percent
):
self._pst_jones_applied_percent = pst_jones_applied_percent
valid = pst_jones_applied_percent is not None
self._push_events(
"pstJonesAppliedPercent",
pst_jones_applied_percent,
valid,
)
if pst_jones_age != self._pst_jones_age:
self._pst_jones_age = pst_jones_age
valid = pst_jones_age is not None
self._push_events("pstJonesAge", pst_jones_age, valid)
def _push_events(self, attr_name: str, value: Any, valid: bool = True):
"""
Helper method to push change and archive events at once.
Simplifies client code when both events are fired.
Push evens as usual unless ``valid`` argument is ``False`` in which
case set the ``AttrQuality`` to ``ATTR_INVALID`` and value is set to 0
(which is probably irrelevant).
:param attr_name: name of the attribute
:param value: value of the attribute
:param valid: True if attribute quality is OK, False otherwise
"""
if valid:
self.push_change_event(attr_name, value)
self.push_archive_event(attr_name, value)
else:
evt = (0, time.time(), AttrQuality.ATTR_INVALID)
self.push_change_event(attr_name, *evt)
self.push_archive_event(attr_name, *evt)
def _calculate_jones_information(
self, data, subarray_id, expected_fqdns, total_beams
):
"""
Calculate the various stats related to the Jones.
"""
# k_time in Kafka payloads is seconds since SKA_EPOCH,
# so anchor "now" to the same epoch and scale.
current_time = utc_to_ska_seconds(datetime.now(timezone.utc))
results = {}
for fqdn in expected_fqdns:
if fqdn in data:
entries = data[fqdn]
subarray_entries = [
entry
for entry in entries
if entry["subarray_id"] == subarray_id
]
if not subarray_entries:
results[fqdn] = {
"subarray_id": subarray_id,
"avg_age_per_pst_beam": {},
"percentage_with_kafka_time": 0.0,
"total_pst_beams_with_time": 0,
"note": f"Subarray {subarray_id} not found in {fqdn}. Template used.",
}
continue
# Collect all pst_beam_ids and their kafka_time
pst_beam_times = {}
pst_beams_with_time = set()
for entry in subarray_entries:
kafka_times = entry.get("kafka_time", [])
for kafka_entry in kafka_times:
pst_beam_id = kafka_entry["pst_beam_id"]
k_time = kafka_entry["time"]
age = max(0, current_time - k_time) # Age in seconds
if pst_beam_id not in pst_beam_times:
pst_beam_times[pst_beam_id] = []
pst_beam_times[pst_beam_id].append(age)
pst_beams_with_time.add(pst_beam_id)
# Calculate average age for each pst_beam_id
avg_ages = {}
for pst_beam_id, ages in pst_beam_times.items():
avg_ages[pst_beam_id] = sum(ages) / len(ages)
# Calculate percentage of pst_beams with kafka_time
percentage_with_time = (
round((len(pst_beams_with_time) / total_beams * 100))
if total_beams > 0
else 0
)
results[fqdn] = {
"subarray_id": subarray_id,
"avg_age_per_pst_beam": avg_ages,
"percentage_with_kafka_time": percentage_with_time,
"total_pst_beams_with_time": len(pst_beams_with_time),
}
else:
# Template for missing FQDNs
results[fqdn] = {
"subarray_id": subarray_id,
"avg_age_per_pst_beam": {},
"percentage_with_kafka_time": 0.0,
"total_pst_beams_with_time": 0,
"note": f"{fqdn} not found in input data. Template used.",
}
return results
def _update_stats_pss_jones(self, fqdn: str, tango_evt) -> None:
if self.is_evt_error(tango_evt, "_update_stats_pss_jones"):
return
value_str = tango_evt.attr_value.value
self.logger.info("stats_pss_jones from %s: %s", fqdn, value_str)
# TODO: update the local version of PSS Jones when we are ready with processor
# FIXME is this ready?
def _update_sps_stats_vchans(self, fqdn: str, tango_evt) -> None:
"""
Callback receives vchans statistics from a processor.
Warning: This is a tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_sps_stats_vchans"):
return
self._sps_stats_vchans_cache[fqdn] = tango_evt.attr_value.value
# Processors send stats_vchans last so this evt completes an update
self._sps_stats_update_time[fqdn] = time.time()
# Spawn thread because this is running in a Tango event notify thread
thr = Thread(
target=self.component_manager.announce_sps_stats,
args=(
self._sps_stats_nflags_cache,
self._sps_stats_npkts_cache,
self._sps_stats_rms_cache,
self._sps_stats_vchans_cache,
self._sps_stats_update_time,
),
)
thr.start()
def _get_conn_db(self) -> Union[Database, None]:
"""
Get Tango database for Connector device.
:return: Tango Database object, or None if the connection failed
"""
try:
db = Database(self.ConnectorDbHost, self.ConnectorDbPort)
return db
except DevFailed as e:
# CI/CD k8s-test won't have the connector deployed so bail out
self.logger.warning(f"DB EXCEPTION {e}")
return None
def _get_switch_proxy(self, sw_name: str) -> Union[DeviceProxy, None]:
"""
Find a LowCbfConnector device that corresponds to a switch name.
Iterates through all exported LowCbfConnector devices and checks
their ``switchName`` attribute.
:param sw_name: The name of the switch to find (e.g. 'p4_01')
:return: A DeviceProxy for the connector, or None if not found
"""
if (db := self._get_conn_db()) is None:
return None
prefix = f"{self.ConnectorDbHost}:{self.ConnectorDbPort}/"
dev_names = db.get_device_exported_for_class("LowCbfConnector")
for dev in dev_names:
fqdn = prefix + dev
try:
proxy = DeviceProxy(fqdn)
proxy.set_timeout_millis(self.ConnectorTimeoutMs)
name_str = proxy.switchName
# conn.switchName typically: '{"Name": "p4_01"}'
inst_name = json.loads(name_str)
if "Name" in inst_name and sw_name == inst_name["Name"]:
return proxy
except DevFailed:
self.logger.warning("failed to get proxy for %s", fqdn)
continue
return None
def _get_allocator_proxy(self) -> Union[DeviceProxy, None]:
"""
Get Tango DeviceProxy for Allocator device.
:return: A DeviceProxy for the Allocator, or None if not found
"""
# pylint: disable=access-member-before-definition,undefined-variable
if self._allocator_proxy:
return self._allocator_proxy
try:
proxy = DeviceProxy(self.AllocatorAddress)
proxy.set_timeout_millis(self.AllocatorTimeoutMs)
self._allocator_proxy = proxy
return proxy
except DevFailed:
self.logger.error("failed to get proxy %s", self.AllocatorAddress)
return None
def _update_proc_ether_port(self, fqdn: str, evt: EventData) -> None:
"""
Handle a change event for a processor's Ethernet port status.
Updates the internal state and triggers a recalculation of the
overall Ethernet locked percentage.
:param fqdn: FQDN of the processor device
:param evt: The Tango event data
Warning: This is a tango event callback. Don't do any work here
"""
if self.is_evt_error(evt, "_update_proc_ether_port"):
return
# we get an integer: 1 == Ether port locked, 0 == not locked
status = bool(evt.attr_value.value)
self.logger.info("%s ETHERNET locked: %s", fqdn, status)
with self._callback_lock:
self._eth_port_stat[fqdn].proc_ether_locked = status
self._enqueue_event(Evt.CALC_ETHER_PERCENT)
def _switch_port_to_proc_fqdn(
self, name: str, port: str
) -> Union[str, None]:
"""
Map a switch name and port back to a processor FQDN.
:param name: Switch name
:param port: Switch port
:return: FQDN of the processor connected to this port, or None
"""
for fqdn, stat in self._eth_port_stat.items():
if stat.sw_name == name and stat.sw_port == port:
return fqdn
return None
def clear_sps_stats_cache(self):
"""Erase cache of SPS stats, and clear attribute values"""
self._sps_stats_nflags_cache = {}
self._sps_stats_npkts_cache = {}
self._sps_stats_rms_cache = {}
self._sps_stats_vchans_cache = {}
self._sps_stats_update_time = {}
self._sps_stats_push_ctr = 0
self.sps_stats_update(
[],
[],
np.asarray([], dtype=np.uint8),
np.asarray([], dtype=np.uint8),
np.asarray([], dtype=np.int8),
)
def sps_stats_update(
self, stn_sstns, bm_freqs, np_rms_xpol, np_rms_ypol, np_flags
): # pylint: disable=too-many-arguments
"""Update all the SPS statistics attributes"""
# save attribute value data so it can be read anytime
self._sps_bm_frq = json.dumps(bm_freqs)
self._sps_stn_sstn = json.dumps(stn_sstns)
# Tango C order vs Numpy order for the following
self._sps_stats_rms_xpol = np_rms_xpol.T
self._sps_stats_rms_ypol = np_rms_ypol.T
self._sps_stats_flags_percent = np_flags.T
try:
self.logger.info(
f"Calculating range between {self.RmsMinValue} and {self.RmsMaxValue}"
)
total_x_in_range = np.sum(
(np_rms_xpol >= self.RmsMinValue)
& (np_rms_xpol <= self.RmsMaxValue)
)
total_y_in_range = np.sum(
(np_rms_ypol >= self.RmsMinValue)
& (np_rms_ypol <= self.RmsMaxValue)
)
total_x_and_y = np.sum((np_rms_xpol >= 0))
if total_x_and_y > 0:
self._sps_rms_in_range = round(
(total_x_in_range + total_y_in_range)
/ (2 * total_x_and_y)
* 100
)
else:
self._sps_rms_in_range = 0
except ValueError:
self.logger.error("Error computing % RMS in range")
self._sps_rms_in_range = 0
try:
if np_flags.size > 0:
self._sps_flags_percent_mean = round(np.mean(np_flags))
else:
self._sps_flags_percent_mean = 0
except ValueError:
self.logger.error("Error computing SPS flags mean")
self._sps_flags_percent_mean = 0
# update all the SPS-related tango attributes
self.push_change_event("sps_beam_freqs", self._sps_bm_frq)
self.push_change_event("sps_station_substations", self._sps_stn_sstn)
self.push_change_event("sps_stats_rms_xpol", self._sps_stats_rms_xpol)
self.push_change_event("sps_stats_rms_ypol", self._sps_stats_rms_ypol)
self.push_change_event(
"sps_stats_flag_percent", self._sps_stats_flags_percent
)
self.push_change_event("spsRmsInRange", self._sps_rms_in_range)
self.push_change_event(
"spsFlagsPercentMean", self._sps_flags_percent_mean
)
# Archive 1 in N (=sps_stats_push_per_archive)
self._sps_stats_push_ctr += 1
if self._sps_stats_push_ctr < self._sps_stats_push_per_archive:
self.logger.info("SPS statistics attributes updated")
return
self._sps_stats_push_ctr = 0
self.push_archive_event("sps_beam_freqs", self._sps_bm_frq)
self.push_archive_event("sps_station_substations", self._sps_stn_sstn)
self.push_archive_event("sps_stats_rms_xpol", self._sps_stats_rms_xpol)
self.push_archive_event("sps_stats_rms_ypol", self._sps_stats_rms_ypol)
self.push_archive_event(
"sps_stats_flag_percent", self._sps_stats_flags_percent
)
self.push_archive_event("spsRmsInRange", self._sps_rms_in_range)
self.push_archive_event(
"spsFlagsPercentMean", self._sps_flags_percent_mean
)
self.logger.info("SPS statistics attributes updated & archived")
def is_evt_error(self, tango_evt: EventData, cb_name: str) -> bool:
"""Utility function - check if Tango event has failed.
Log warning message with error details
:param tango_event: Tango event object
:param cb_name: name of the callback function handling event
:return: True if event failed, False otherwise
"""
if tango_evt.err:
txt = f"{cb_name} got failed event: stack{tango_evt.errors}"
self.logger.warning(txt)
return True
return False
@attribute(
dtype=((np.uint8,),),
max_dim_x=MAX_STATIONS,
max_dim_y=MAX_SPS_CHAN,
doc="SPS Xpol RMS levels",
)
def sps_stats_rms_xpol(self):
"""Return latest SPS X-polarisation RMS statistics
:return: 2D array (stations * channels) of ``np.uint8``
"""
return self._sps_stats_rms_xpol
@attribute(
dtype=((np.uint8,),),
max_dim_x=MAX_STATIONS,
max_dim_y=MAX_SPS_CHAN,
doc="SPS Ypol RMS levels",
)
def sps_stats_rms_ypol(self):
"""Return latest SPS Y-polarisation RMS statistics
:return: 2D array (stations * channels) of ``np.uint8``
"""
return self._sps_stats_rms_ypol
@attribute(
dtype=((np.uint8,),),
max_dim_x=MAX_STATIONS,
max_dim_y=MAX_SPS_CHAN,
doc="SPS flagging percent (0-100, 0xff if no data)",
)
def sps_stats_flag_percent(self):
"""Return latest SPS flagging statistics
:return: 2D array (stations * channels) of ``np.uint8``; range: [0, 100]%, 255 if no data
"""
return self._sps_stats_flags_percent
@attribute(dtype=str, doc="SPS stats (station, substation) order")
def sps_station_substations(self) -> str:
"""Return station/substation order used for SPS stats"""
return self._sps_stn_sstn
@attribute(dtype=str, doc="SPS stats (beam_id, freq_id) order")
def sps_beam_freqs(self) -> str:
"""
:return: beam/freq order used for SPS stats
"""
return self._sps_bm_frq
@attribute(
dtype=int,
doc="SPS statistics archive interval: 1 in N updates",
access=AttrWriteType.READ_WRITE,
)
def sps_archive_interval(self) -> int:
"""Set/get archive interval (1 in N updates)"""
return self._sps_stats_push_per_archive
def write_sps_archive_interval(self, value: int):
"""Change archive interval"""
self._sps_stats_push_per_archive = value
self._sps_stats_push_ctr = value # force archive immediately
@attribute(
dtype=int,
min_value="0",
max_value="100",
unit="%",
doc="Percentage of Ethernet ports (switch and proc.) ready for scan",
)
def processorEthernetLockedPercent(self) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of Ethernet ports (processor/switch) ready for scan.
The value is meaningful only when ``obsState`` is READY or SCANNING so
we also return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
ready = self._is_obs_state((ObsState.READY, ObsState.SCANNING))
value = self._ether_locked_percent if ready else 0
quality = AttrQuality.ATTR_VALID if ready else AttrQuality.ATTR_INVALID
return value, time.time(), quality
@attribute(
dtype=float,
unit="degC",
doc=(
"Mean FPGA core temperature across all processors "
"assigned to this subarray (degrees C). "
"Meaningful only in READY or SCANNING obsState."
),
)
def processorFpgaTemperatureMean(
self,
) -> tuple[float, float, AttrQuality]:
"""
Get the mean FPGA temperature across all assigned processors.
The value is meaningful only when ``obsState`` is READY or SCANNING so
we also return Tango attribute quality.
:return: tuple
* value: mean temperature in degrees C
* time: float
* quality: AttrQuality
"""
ready = self._is_obs_state((ObsState.READY, ObsState.SCANNING))
value = self._proc_temperature_mean if ready else 0.0
quality = AttrQuality.ATTR_VALID if ready else AttrQuality.ATTR_INVALID
return value, time.time(), quality
@attribute(
dtype=float,
unit="W",
doc=(
"Mean FPGA power consumption across all processors "
"assigned to this subarray (Watts). "
"Meaningful only in READY or SCANNING obsState."
),
)
def processorFpgaPowerMean(
self,
) -> tuple[float, float, AttrQuality]:
"""
Get the mean FPGA power across all assigned processors.
The value is meaningful only when ``obsState`` is READY or SCANNING so
we also return Tango attribute quality.
:return: tuple
* value: mean power in Watts
* time: float
* quality: AttrQuality
"""
ready = self._is_obs_state((ObsState.READY, ObsState.SCANNING))
value = self._proc_power_mean if ready else 0.0
quality = AttrQuality.ATTR_VALID if ready else AttrQuality.ATTR_INVALID
return value, time.time(), quality
# Run server
def main(args=None, **kwargs):
"""Main function of the LowCbfSubarray module."""
return run((LowCbfSubarray,), args=args, **kwargs)
if __name__ == "__main__":
main()