# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low CBF project
#
# Copyright (c) 2024 CSIRO
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
""" SKA Low CBF
Sub-element controller device for Low.CBf
"""
import json
import os
import time
from collections import deque
from enum import IntEnum
from threading import Condition, Lock, Thread
from typing import List, Union
import tango
from ska_tango_base import SKABaseDevice, SKAController
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import AdminMode, HealthState
from tango import AttrQuality, AttrWriteType, Database, DevFailed
from tango.server import attribute, device_property, run
from ska_low_cbf import release
from ska_low_cbf.controller.component_manager import (
LowCbfControllerComponentManager,
)
from ska_low_cbf.controller.controller import SearchBeamBandwidthMode
from ska_low_cbf.device_proxy import MccsDeviceProxy
from ska_low_cbf.events import EventManager
# Tango naming conventions clash with Python conventions...
# pylint: disable=invalid-name,protected-access,too-few-public-methods
# pylint: disable=pointless-string-statement
__all__ = ["LowCbfController", "main"]
class Evt(IntEnum):
"""Enumerate events originating from Allocator"""
PROC_REGISTERED = 0 # processor registered with the Allocator
PROC_ALLOCATED = 1 # processor assigned to a subarray
HEALTH_EVENT = 2 # processor/connector healthState change
PROC_ADMIN_MODE = 3 # processor adminMode change
ALLOCATOR_EVENT = 4 # internal_alveo, procDevFqdn attributes
class LowCbfController(SKAController):
"""
Sub-element controller device for Low.CBf
**Properties:**
- Device Property
"""
# pylint: disable=too-many-instance-attributes
ConnectorDbHost = device_property(
dtype="str", default_value="tango-databaseds"
)
ConnectorDbPort = device_property(dtype="int", default_value=10000)
PacketLossReportInterval = device_property(dtype="int", default_value=10)
# Attributes
searchBeamBandwidthMode = attribute(
dtype=SearchBeamBandwidthMode,
access=AttrWriteType.READ_WRITE,
label="Search Beam Bandwidth Mode",
doc=(
"Search Beam Bandwidth Mode is configured at sub-element level "
" and applies for all the instances of the Capability Search "
"Beams in all sub-arrays.\n\nSupported modes are listed and "
"described in Table 9-9\n\nTM can change the value of the "
"parameter Search Beam Bandwidth Mode only when all the "
"sub-arrays are IDLE."
),
)
@attribute(
dtype=("DevString",),
max_dim_x=32,
label="Subelement Subarrays",
doc="List of Low.CBF SubArray TANGO Device names.",
)
def subelementSubarrays(self):
"""
List of Low.CBF SubArray TANGO Device names.
"""
@attribute(doc="Table (JSON dict) of Low CBF device healthStates.")
def health_table(self) -> str:
"""
Read FQDNs and health states of all monitored devices.
:return: JSON string
"""
return json.dumps(
self.component_manager.controller.device_health_states
)
# FIXME - requires pytango v9.4+
# @attribute(dtype=(HealthState,), max_dim_x=512)
@attribute(dtype=(int,), max_dim_x=512)
def health_processors(self) -> List[int]:
"""Health of all Processors."""
return [
int(health)
for health in self.component_manager.controller.health_by_type[
"LowCbfProcessor"
]
]
# FIXME - pytango 9.4+
# return self.component_manager.controller.health_by_type["LowCbfProcessor"]
# FIXME - requires pytango v9.4+
# @attribute(dtype=(HealthState,), max_dim_x=32)
@attribute(dtype=(int,), max_dim_x=32)
def health_connectors(self) -> List[int]:
"""Health of all Connectors."""
return [
int(health)
for health in self.component_manager.controller.health_by_type[
"LowCbfConnector"
]
]
# FIXME - pytango 9.4+
# return self.component_manager.controller.health_by_type["LowCbfConnector"]
@attribute(dtype=str, doc="A JSON array of all Alveos SNs")
def all_alveos(self) -> str:
"""Return JSON string listing all Alveos registered with Allocator."""
return json.dumps(list(self._registered_alveos.keys()))
@attribute(dtype=str, doc="A JSON array of available Alveos.")
def available_alveos(self) -> str:
"""Return JSON string listing all available Alveos."""
return json.dumps(self._get_available_alveos())
@attribute(dtype=float, doc="Percentage of SPS links up.")
def spsLinkUpPercent(self) -> float:
"""
Return the quality attribute percent SPS.
Note that this quality attribute needs to be pulled by TMC. This is not
automatically updated.
"""
return self._get_percent_sps()
@attribute(
dtype=float, doc="Packet Loss rate as observed by CBF.", rel_change=1
)
def cbfPacketLossRate(self) -> tuple[float, float, AttrQuality]:
"""
Get the CBF Packet Loss Rate.
This packet loss rate is calculated as
PLR = absent SPEAD packets / received SPEAD packets
Note that the number of packets counters are coded over 32 bits. This
might provide erroneous values from time to time when they wrap around
When there are no Alveos assigned to any subarrays set the attribute
quality to ``ATTR_INVALID``
"""
no_alveos = self._assigned_alveos_cnt == 0
qual = (
AttrQuality.ATTR_INVALID if no_alveos else AttrQuality.ATTR_VALID
)
return self._packet_loss_rate, time.time(), qual
@attribute(
dtype=float,
doc="Packet Corrupted rate as observed by CBF.",
rel_change=1,
)
def cbfPacketCorruptionRate(self) -> tuple[float, float, AttrQuality]:
"""
Get the CBF Packet Corruption Rate.
This packet corruption rate is calculated as
PCR = corrupted packets / received SPEAD packets
Note that the number of packets counters are coded over 32 bits. This
might provide erroneous values from time to time when they wrap around
When there are no Alveos assigned to any subarrays set the attribute
quality to ``ATTR_INVALID``
"""
no_alveos = self._assigned_alveos_cnt == 0
qual = (
AttrQuality.ATTR_INVALID if no_alveos else AttrQuality.ATTR_VALID
)
return self._packet_corrupted_rate, time.time(), qual
def _get_percent_sps(self) -> float:
"""Return the percentage of SPS link up"""
# Need the Allocator for quality attributes
try:
allocator_proxy = tango.DeviceProxy(self.allocator_device)
allocator_proxy.set_timeout_millis(self.AllocatorTimeoutMs)
except tango.DevFailed:
return 0.0
return allocator_proxy.GetSPSPercent()
def _get_available_alveos(self) -> list[str]:
"""Return a list of Alveo serial numbers which are not allocated
to any subarrays.
"""
return [
sn
for sn in self._registered_alveos.keys()
if sn not in self._unavailable_alveos
]
# Properties (value in database)
allocator_device = device_property(
dtype="DevString",
default_value="low-cbf/allocator/0",
doc="Tango device with allocation info",
)
AllocatorTimeoutMs = device_property(dtype="int", default_value=10_000)
ProcessorTimeoutMs = device_property(dtype="int", default_value=30_000)
# General methods
# inherited
def create_component_manager(self):
"""Create obligatory ComponentManager."""
return LowCbfControllerComponentManager(
logger=self.logger,
communication_state_callback=self._communication_state_changed,
component_state_callback=self._component_state_changed,
)
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
self.set_state(tango.DevState.OFF)
# Attributes methods
[docs] def read_searchBeamBandwidthMode(self) -> SearchBeamBandwidthMode:
"""
Search Beam Bandwidth Mode is configured at sub-element level and
applies for all the instances of the Capability Search Beams in
all sub-arrays.
Supported modes are listed and described in Table 9-9
TM can change the value of the parameter Search Beam Bandwidth Mode
only when all the sub-arrays are IDLE.
:return: the searchBeamBandwidthMode attribute (SINGLE, DOUBLE)
"""
return self.component_manager.controller.search_beam_bandwidth_mode
[docs] def write_searchBeamBandwidthMode(self, value: SearchBeamBandwidthMode):
"""Set the searchBeamBandwidthMode attribute.
:param value: SINGLE, DOUBLE
"""
self.component_manager.controller.search_beam_bandwidth_mode = value
def read_subelementSubarrays(self):
"""Return the subelementSubarrays attribute."""
return self._subelement_subarrays
def _update_admin_mode(self: SKABaseDevice, admin_mode: AdminMode) -> None:
"""Override adminMode change callback."""
# pylint: disable=attribute-defined-outside-init,access-member-before-definition
if admin_mode == self._admin_mode:
return # nothing to do
# healthState reporting is adminMode dependent e.g UNKNOWN while
# adminMode is OFFLINE
self._adjust_health(admin_mode)
self._admin_mode = admin_mode
for func in (self.push_change_event, self.push_archive_event):
func("adminMode", admin_mode)
if self.is_monitoring_mode():
current_health = self.component_manager.controller.health_state
if current_health != self.healthState:
self._update_health_state(current_health)
if not self._health_subscribed:
self._subscribe_to_health()
def _adjust_health(self, admin_mode: AdminMode) -> None:
"""Set ``healthState`` to UNKNOWN when switching to e.g. OFFLINE MODE
Add log entry about ``healthState`` tracking. This is called just
before ``adminMode`` change.
:param admin_mode: the ``adminMode`` we are about to transition to
"""
monitoring_now = self.is_monitoring_mode()
monitoring_next = self.is_monitoring_mode(admin_mode)
if not monitoring_now and monitoring_next:
msg = f"Entering {admin_mode.name} mode: will track healthState"
self.logger.info(msg)
# if currently in a monitoring mode but won't be after this change
# change the healthState to UNKNOWN
elif monitoring_now and not monitoring_next:
msg = f"Entering {admin_mode.name} mode: won't track healthState"
self.logger.info(msg)
self._update_health_state(HealthState.UNKNOWN)
def _subscribe_to_health(self):
"""Subscribe to healthState of all Low CBF hardware devices."""
try:
db = Database()
except DevFailed:
# unit tests won't have the DB deployed and that's ok
return
device_type = "LowCbfProcessor"
# few short aliases:
em = self._event_manager
cbk_fn = self._health_callback
add_health_device = self.component_manager.controller.add_health_device
for fqdn in db.get_device_exported_for_class(device_type):
add_health_device(device_type, fqdn)
self.logger.info(f"SUBSCRIBING to {fqdn}:healthState")
em.register_callback(cbk_fn, fqdn, "healthState")
# for Connector device(s) we need to consult it's own db
try:
db = Database(self.ConnectorDbHost, self.ConnectorDbPort)
except DevFailed as e:
# if we got this far this shouldn't be running as unit/CI test
self.logger.error(f"Connector DB EXCEPTION {e}")
return
device_type = "LowCbfConnector"
for device_name in db.get_device_exported_for_class(device_type):
add_health_device(device_type, device_name)
fqdn = self.conn_db_prefix + device_name
self.logger.info(f"SUBSCRIBING to {fqdn}:healthState")
em.register_callback(cbk_fn, fqdn, "healthState")
# pylint: disable=attribute-defined-outside-init
self._health_subscribed = True
def subscribe_to_allocator(self, tango_dev_name: str) -> None:
"""
Subscribe to Alveo related allocator attribute (internal_alveo,
procDevFqdn) changes.
:param tango_dev_name: eg 'low-cbf/allocator/0'
"""
alloc_proxy = MccsDeviceProxy(
tango_dev_name,
self.logger,
connect=False,
tango_timeout_ms=self.AllocatorTimeoutMs,
)
for attr_name in ("internal_alveo", "procDevFqdn"):
self.logger.info(f"SUBSCRIBE to {attr_name} from {tango_dev_name}")
alloc_proxy.evt_sub_on_connect(attr_name, self._allocator_callback)
alloc_proxy.connect(max_time=120)
def _subscribe_to_proc_admin_mode(self, proc_dict):
"""Subscribe to Processor adminMode changes so we can flag
ENGINEERING/OFFLINE Alveos as unavailable
:param proc_dict: e.g {"XFL10NIYKVEU": "low-cbf/processor/0.0.0", ...}
"""
def is_subscribed_to(fqdn: str) -> bool:
"""Determine if we are already subscribed to the processor"""
return (
fqdn in self._proc_subscribed and self._proc_subscribed[fqdn]
)
em = self._event_manager
with self._lock:
for fqdn in proc_dict.values():
if is_subscribed_to(fqdn):
continue
self.logger.info(f"SUBSCRIBING to {fqdn}:adminMode")
cb_func = self._proc_adminmode_callback
em.register_callback(cb_func, fqdn, "adminMode")
self._proc_subscribed[fqdn] = True
def is_monitoring_mode(self, mode: Union[AdminMode, None] = None) -> bool:
"""Determine if we are in monitoring mode
In monitoring mode we keep track of healthState and report its
changes.
ONLINE and ENGINEERING admin modes are considered monitoring while
NOT_FITTED, OFFLINE and RESERVED are not.
See RtD https://is.gd/nCuv1q
:param mode: adminMode we are about to transition to; when not
supplied use the current ``self._admin_mode`` value instead
:return: True when in monitoring mode; False otherwise
"""
value = self._admin_mode if mode is None else mode
return value in self._admin_modes_using_health
# ----------
# Commands
# ----------
def init_command_objects(self):
"""
Initialises the command handlers for commands supported by this device.
"""
# pylint: disable=useless-super-delegation
super().init_command_objects()
class InitCommand(SKAController.InitCommand):
"""Init Command class"""
def do(self):
"""
Initialises the attributes and properties of the LowCbfController.
"""
super().do()
self._device._version_id = release.version
self._device._build_state = (
f"{release.name}, {release.version}, {release.description}"
)
self._device.component_manager.controller.search_beam_bandwidth_mode = (
SearchBeamBandwidthMode.SINGLE
)
self._device.set_change_event("healthState", True, False)
self._device.set_archive_event("healthState", True, False)
self._device._event_manager = EventManager(
self._device.logger,
events=["healthState", "adminMode"],
tango_timeout_ms=self._device.ProcessorTimeoutMs,
)
self._device._health_subscribed = False
"""A flag indicating we already subscribed to health events"""
# Keep track of all registed Alveos - {serial_nr: fqdn}
self._device._registered_alveos = {}
# Keep track of Alveos (serial numbers) that can't be used
self._device._unavailable_alveos = set()
self._device._assigned_alveos_cnt: int = 0
"""Number of Alveos assigned to all subarrays."""
self._device._admin_modes_using_health = [AdminMode.ONLINE]
"""adminMode values for which we report changes in healthState"""
# environment variable determines if healthState is ignored
# in ENGINEERING mode
ignore_engineering = os.getenv(
"ENGINEERING_MODE_IGNORE_HEALTH", default="false"
)
if ignore_engineering.lower() != "true":
self._device._admin_modes_using_health.append(
AdminMode.ENGINEERING
)
self._device._spead_pkt_counter = 0
self._device._spead_missing_counter = 0
self._device._bad_eth_packet_counter = 0
self._device._packet_loss_rate = 0.0
self._device._packet_corrupted_rate = 0.0
self._device._proc_subscribed: dict[str, bool] = {}
"""Keep track of processors to which adminMode attribute we are subscribed
dict: { "low-cbf/processor/0.0.0": True }, """
self._device._deque = deque()
"""A queue to serialise callback events"""
# Synchronise producer/consumer without time.sleep()
self._device._callback_cond_var = Condition()
self._device._lock = Lock()
"""Ensure that processor polling thread and event handling thread
change/use data (e.g. _proc_proxy) in orderly manner"""
# thread to process callback events; needed as we subscribe to
# processor's adminMode changes but we're notified about the
# processor being available in an allocator callback - a potential
# for chained callbacks can lock up execution of this module
self._device._subelement_subarrays = ""
self._device._proc_proxy: dict[str, tango.DeviceProxy] = {}
"""dict mapping processor FQDN to device proxy"""
self._device._thread = Thread(
target=self._device._handle_queued_events
)
self._device._thread.start()
self._device._plr_thread = Thread(
target=self._device._calculate_packet_loss_rate
)
self._device._plr_thread.start()
# update subscribers on attribute change:
for attrib in (
"all_alveos",
"available_alveos",
):
self._device.set_change_event(attrib, True, False)
self._device.set_archive_event(attrib, True, False)
for attrib in (
"cbfPacketLossRate",
"cbfPacketCorruptionRate",
):
# Tango can check these updates to see if the value has changed
self._device.set_change_event(attrib, True, True)
self._device.set_archive_event(attrib, True, True)
pre = f"{self._device.ConnectorDbHost}:{self._device.ConnectorDbPort}/"
self._device.conn_db_prefix = pre
self._device.logger.info(
f"Using Connector DB:{self._device.conn_db_prefix}"
)
self._device.subscribe_to_allocator(self._device.allocator_device)
message = "LowCbfController init complete"
self._device.logger.info(message)
return ResultCode.OK, message
def _process_health(self, fqdn: str, value, quality) -> None:
"""
Process a health state event from a monitored device.
Updates the internal health state table and, if in monitoring mode,
updates this device's health state if it has changed.
:param fqdn: Tango device name
:param value: health state value
:param quality: attribute quality
"""
fqdn = fqdn.removeprefix(self.conn_db_prefix)
self.logger.info(f"FROM {fqdn} val: {value} Q: {quality}")
self.component_manager.controller.update_health_state(
fqdn, value, quality == AttrQuality.ATTR_VALID
)
# updates reported only while in monitoring mode
if self.is_monitoring_mode():
previous_health = self._health_state
new_health = self.component_manager.controller.health_state
if new_health != previous_health:
self._update_health_state(new_health)
def _process_adminmode_change(
self, fqdn: str, admin_mode, quality: AttrQuality
) -> None:
"""Called by LowCbfProcessor when adminMode changes.
Used to maintain a list of available/not-available Alveos
(primarily for system operators).
:param fqdn: Tango device name e.g. "low-cbf/processor/0.0.0"
:param name: Tango attribute name e.g. "adminMode"
:param admin_mode: AdminMode enumeration `see <https://is.gd/nCuv1q>`_
:param quality: AttrQuality enumeration `see <https://is.gd/nTVEzR>`_
"""
self.logger.info(f"PROC {fqdn} adminMode={admin_mode} Q: {quality}")
if fqdn not in self._registered_alveos.values():
self.logger.error("UNKNOWN Alveo")
return
if admin_mode == AdminMode.ONLINE:
# remove from "unavailable Alveos" list (if applicable)
for sn, dev_name in self._registered_alveos.items():
if dev_name == fqdn:
self._unavailable_alveos.discard(sn)
break
else:
# no longer ONLINE, flag as unavailable
for sn, dev_name in self._registered_alveos.items():
if dev_name == fqdn:
self._unavailable_alveos.add(sn)
break
self.logger.info(f"UNAVAILABLE Alveos {self._unavailable_alveos}")
def _process_allocator_event(
self, attr_name: str, evt_json: str, quality: AttrQuality
) -> None:
self.logger.info(f"GOT {attr_name} val: {evt_json} Q: {quality}")
if quality != AttrQuality.ATTR_VALID:
self.logger.warning(f"Invalid quality {quality}")
return
fsp_dict = json.loads(evt_json)
match attr_name.lower():
case "procdevfqdn": # NOTE we get this only once
# expecting evt_json:
# {"XFL10NIYKVEU": "low-cbf/processor/0.0.0", ...}
# pylint: disable=attribute-defined-outside-init
if not fsp_dict: # ignore empty dict
return
self._registered_alveos = fsp_dict
# notify subscribers of value change
alveos_str = json.dumps(list(self._registered_alveos.keys()))
avail_alveos_str = json.dumps(self._get_available_alveos())
for attr, val in (
("all_alveos", alveos_str),
("available_alveos", avail_alveos_str),
):
self.push_change_event(attr, val)
self.push_archive_event(attr, val)
with self._callback_cond_var:
self._deque.append((Evt.PROC_REGISTERED, fsp_dict))
self._callback_cond_var.notify()
case "internal_alveo":
# Expecting evt_json:
# {"XFL1TJCHM3ON": {"fw": ...}, "XFL1E35JVJTQ": {"fw"..}..}
# pylint: disable=attribute-defined-outside-init
# unavailable Alveos are simply those that are already used
self._assigned_alveos_cnt = len(fsp_dict)
self._unavailable_alveos = set(fsp_dict.keys())
# notify subscribers of value change
avail_alveos_str = json.dumps(self._get_available_alveos())
self.push_change_event("available_alveos", avail_alveos_str)
self.push_archive_event("available_alveos", avail_alveos_str)
if self._assigned_alveos_cnt == 0:
# clear _proc_subscribed if there are no Alveos
self.logger.info("No Alveos currently in use")
with self._lock:
self._proc_proxy = {}
for key in self._proc_subscribed:
self._proc_subscribed[key] = False
else:
self._subscribe_to_proc_admin_mode(self._registered_alveos)
valid = self._assigned_alveos_cnt > 0
self._push_counter_attributes(
self._packet_loss_rate, self._packet_corrupted_rate, valid
)
self.logger.info(
f"UNAVAILABLE ALVEOS {self._unavailable_alveos}"
)
case _:
self.logger.warning("UNKNOWN attribute")
# ----------
# Callbacks
# NOTE: do as little as possible in callbacks
# ----------
# pylint: disable=unused-argument
def _health_callback(self, fqdn: str, name: str, value, quality) -> None:
"""
Process a health state event from a monitored device.
Updates the internal health state table and, if in monitoring mode,
updates this device's health state if it has changed.
:param fqdn: Tango device name
:param value: health state value
:param quality: attribute quality
"""
with self._callback_cond_var:
self._deque.append((Evt.HEALTH_EVENT, (fqdn, value, quality)))
self._callback_cond_var.notify()
def _allocator_callback(
self, attr_name: str, evt_json: str, quality: AttrQuality
) -> None:
"""
Handle allocator events triggred by a new Alveo discovery or state change
(e.g. assigned to a subarray)
:param attr_name: attribute name (procDevFqdn, internal_alveo)
:param evt_json: event details as JSON string
:param quality: ATTR_VALID when OK, `see <https://is.gd/nTVEzR>`_
"""
with self._callback_cond_var:
self._deque.append(
(Evt.ALLOCATOR_EVENT, (attr_name, evt_json, quality))
)
self._callback_cond_var.notify()
def _proc_adminmode_callback(
self, fqdn: str, name: str, admin_mode, quality: AttrQuality
) -> None:
"""Called by LowCbfProcessor when adminMode changes.
:param fqdn: Tango device name e.g. "low-cbf/processor/0.0.0"
:param name: Tango attribute name e.g. "adminMode"
:param admin_mode: AdminMode enumeration `see <https://is.gd/nCuv1q>`_
:param quality: AttrQuality enumeration `see <https://is.gd/nTVEzR>`_
"""
with self._callback_cond_var:
self._deque.append(
(Evt.PROC_ADMIN_MODE, (fqdn, admin_mode, quality))
)
self._callback_cond_var.notify()
# ----------
# threads
# ----------
def _handle_queued_events(self):
"""A thread handling queued up callback events. Blocks on _callback
condition variable - a producer will unblock it.
"""
# map event code to the method handling it
event_to_handler_map = {
Evt.HEALTH_EVENT: self._process_health,
Evt.PROC_ADMIN_MODE: self._process_adminmode_change,
Evt.ALLOCATOR_EVENT: self._process_allocator_event,
}
cv = self._callback_cond_var
while True:
# keep the condition variable context small in order to avoid
# race condition
with cv:
while len(self._deque) == 0:
cv.wait()
item = self._deque.popleft()
if not isinstance(item, tuple):
self.logger.error(f"ERROR need tuple, got {item}")
continue
key, val = item
match key:
case Evt.PROC_REGISTERED:
# don't subscribe to auto-registered pretend processors
if "ALLOW_AUTO_REGISTER_PROCESSORS" not in os.environ:
self.logger.info(f"need to register {val}")
self._subscribe_to_proc_admin_mode(val)
case Evt.HEALTH_EVENT | Evt.PROC_ADMIN_MODE | Evt.ALLOCATOR_EVENT:
event_to_handler_map[key](*val)
case _:
self.logger.warning(f"UNKNOWN event {key}")
# FIXME convert this to event subscription instead of polling in PI#31
def _calculate_packet_loss_rate(self):
"""
A thread function to retrieve packet loss rate.
"""
# pylint: disable=access-member-before-definition,attribute-defined-outside-init
def read_counter(proxy: tango.DeviceProxy, attr: str) -> int:
"""
Read a processor counter with retries.
:param proxy: Tango DeviceProxy for the processor.
:param attr: Name of the attribute to read.
:return: The counter value, or 0 if reading failed after retries.
"""
RETRIES = 3
for _ in range(RETRIES):
try:
return proxy.read_attribute(attr).value
except DevFailed:
self.logger.error(f"reading {attr} failed")
time.sleep(0.4)
return 0
proxies = self._proc_proxy # alias
while True:
# do nothing if no Alveos are assigned to subarrays
if self._assigned_alveos_cnt == 0:
time.sleep(self.PacketLossReportInterval)
continue
spead_pkt_counter = 0
spead_missing_counter = 0
bad_eth_packet_counter = 0
# Update our collection of proxies
with self._lock:
active_alveos = {
fqdn
for fqdn, subscribed in self._proc_subscribed.items()
if subscribed
}
new_subscriptions = active_alveos - set(proxies.keys())
for fqdn in new_subscriptions:
self.logger.info(f"NEW processor {fqdn}")
try:
proxies[fqdn] = tango.DeviceProxy(fqdn)
proxies[fqdn].set_timeout_millis(
self.ProcessorTimeoutMs
)
except DevFailed:
continue
obsolete_subscriptions = set(proxies.keys()) - active_alveos
for fqdn in obsolete_subscriptions:
proxies.pop(fqdn, None)
# Sum counters from all Processors
for proxy in proxies.values():
spead_pkt_counter += read_counter(
proxy, "stats_spead_pkt_counter"
)
spead_missing_counter += read_counter(
proxy, "stats_spead_missing_counter"
)
bad_eth_packet_counter += read_counter(
proxy, "stats_bad_eth_packet_counter"
)
increase_spead_pkt = max(
0, spead_pkt_counter - self._spead_pkt_counter
)
increase_spead_missing = max(
0, spead_missing_counter - self._spead_missing_counter
)
increase_bad_eth_packet = max(
0, bad_eth_packet_counter - self._bad_eth_packet_counter
)
self._spead_pkt_counter = spead_pkt_counter
self._spead_missing_counter = spead_missing_counter
self._bad_eth_packet_counter = bad_eth_packet_counter
self._packet_loss_rate = (
max(increase_spead_missing / increase_spead_pkt, 0)
if increase_spead_pkt != 0
else 1.0
)
self._packet_corrupted_rate = (
max(increase_bad_eth_packet / increase_spead_pkt, 0)
if increase_spead_pkt != 0
else 1.0
)
self._push_counter_attributes(
self._packet_loss_rate, self._packet_corrupted_rate
)
time.sleep(self.PacketLossReportInterval)
def _push_counter_attributes(
self, loss_rate: float, corrupted: float, valid: bool = True
):
"""
Push change and archive events for packet loss and corruption rates.
:param loss_rate: Calculated packet loss rate.
:param corrupted: Calculated packet corruption rate.
:param valid: whether the ``AttrQuality`` is considered valid.
"""
now = time.time()
qual = AttrQuality.ATTR_VALID if valid else AttrQuality.ATTR_INVALID
for attr, value in (
("cbfPacketLossRate", loss_rate),
("cbfPacketCorruptionRate", corrupted),
):
data = (value, now, qual)
self.push_change_event(attr, *data)
self.push_archive_event(attr, *data)
# Run server
def main(args=None, **kwargs):
"""Main function of the LowCbfController module."""
return run((LowCbfController,), args=args, **kwargs)
if __name__ == "__main__":
main()