Source code for controller.controller_device

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low CBF project
#
# Copyright (c) 2024 CSIRO
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.

""" SKA Low CBF

Sub-element controller device for Low.CBf
"""
import json
import os
import time
from collections import deque
from enum import IntEnum
from threading import Condition, Lock, Thread
from typing import List, Union

import tango
from ska_tango_base import SKABaseDevice, SKAController
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import AdminMode, HealthState
from tango import AttrQuality, AttrWriteType, Database, DevFailed
from tango.server import attribute, device_property, run

from ska_low_cbf import release
from ska_low_cbf.controller.component_manager import (
    LowCbfControllerComponentManager,
)
from ska_low_cbf.controller.controller import SearchBeamBandwidthMode
from ska_low_cbf.device_proxy import MccsDeviceProxy
from ska_low_cbf.events import EventManager

# Tango naming conventions clash with Python conventions...
# pylint: disable=invalid-name,protected-access,too-few-public-methods
# pylint: disable=pointless-string-statement


__all__ = ["LowCbfController", "main"]


class Evt(IntEnum):
    """Enumerate events originating from Allocator"""

    PROC_REGISTERED = 0  # processor registered with the Allocator
    PROC_ALLOCATED = 1  # processor assigned to a subarray
    HEALTH_EVENT = 2  # processor/connector healthState change
    PROC_ADMIN_MODE = 3  # processor adminMode change
    ALLOCATOR_EVENT = 4  # internal_alveo, procDevFqdn attributes


class LowCbfController(SKAController):
    """
    Sub-element controller device for Low.CBf

    **Properties:**

    - Device Property
    """

    # pylint: disable=too-many-instance-attributes
    ConnectorDbHost = device_property(
        dtype="str", default_value="tango-databaseds"
    )
    ConnectorDbPort = device_property(dtype="int", default_value=10000)

    PacketLossReportInterval = device_property(dtype="int", default_value=10)
    # Attributes
    searchBeamBandwidthMode = attribute(
        dtype=SearchBeamBandwidthMode,
        access=AttrWriteType.READ_WRITE,
        label="Search Beam Bandwidth Mode",
        doc=(
            "Search Beam Bandwidth Mode is configured at sub-element level "
            " and applies for all the instances of the Capability Search "
            "Beams in all sub-arrays.\n\nSupported modes are listed and "
            "described in Table 9-9\n\nTM can change the value of the "
            "parameter Search Beam Bandwidth Mode only when all the "
            "sub-arrays are IDLE."
        ),
    )

    @attribute(
        dtype=("DevString",),
        max_dim_x=32,
        label="Subelement Subarrays",
        doc="List of Low.CBF SubArray TANGO Device names.",
    )
    def subelementSubarrays(self):
        """
        List of Low.CBF SubArray TANGO Device names.
        """

    @attribute(doc="Table (JSON dict) of Low CBF device healthStates.")
    def health_table(self) -> str:
        """
        Read FQDNs and health states of all monitored devices.

        :return: JSON string
        """
        return json.dumps(
            self.component_manager.controller.device_health_states
        )

    # FIXME - requires pytango v9.4+
    # @attribute(dtype=(HealthState,), max_dim_x=512)
    @attribute(dtype=(int,), max_dim_x=512)
    def health_processors(self) -> List[int]:
        """Health of all Processors."""
        return [
            int(health)
            for health in self.component_manager.controller.health_by_type[
                "LowCbfProcessor"
            ]
        ]
        # FIXME - pytango 9.4+
        # return self.component_manager.controller.health_by_type["LowCbfProcessor"]

    # FIXME - requires pytango v9.4+
    # @attribute(dtype=(HealthState,), max_dim_x=32)
    @attribute(dtype=(int,), max_dim_x=32)
    def health_connectors(self) -> List[int]:
        """Health of all Connectors."""
        return [
            int(health)
            for health in self.component_manager.controller.health_by_type[
                "LowCbfConnector"
            ]
        ]
        # FIXME - pytango 9.4+
        # return self.component_manager.controller.health_by_type["LowCbfConnector"]

    @attribute(dtype=str, doc="A JSON array of all Alveos SNs")
    def all_alveos(self) -> str:
        """Return JSON string listing all Alveos registered with Allocator."""
        return json.dumps(list(self._registered_alveos.keys()))

    @attribute(dtype=str, doc="A JSON array of available Alveos.")
    def available_alveos(self) -> str:
        """Return JSON string listing all available Alveos."""
        return json.dumps(self._get_available_alveos())

    @attribute(dtype=float, doc="Percentage of SPS links up.")
    def spsLinkUpPercent(self) -> float:
        """
        Return the quality attribute percent SPS.

        Note that this quality attribute needs to be pulled by TMC. This is not
        automatically updated.
        """
        return self._get_percent_sps()

    @attribute(
        dtype=float, doc="Packet Loss rate as observed by CBF.", rel_change=1
    )
    def cbfPacketLossRate(self) -> tuple[float, float, AttrQuality]:
        """
        Get the CBF Packet Loss Rate.

        This packet loss rate is calculated as
        PLR = absent SPEAD packets / received SPEAD packets
        Note that the number of packets counters are coded over 32 bits. This
        might provide erroneous values from time to time when they wrap around

        When there are no Alveos assigned to any subarrays set the attribute
        quality to ``ATTR_INVALID``
        """
        no_alveos = self._assigned_alveos_cnt == 0
        qual = (
            AttrQuality.ATTR_INVALID if no_alveos else AttrQuality.ATTR_VALID
        )
        return self._packet_loss_rate, time.time(), qual

    @attribute(
        dtype=float,
        doc="Packet Corrupted rate as observed by CBF.",
        rel_change=1,
    )
    def cbfPacketCorruptionRate(self) -> tuple[float, float, AttrQuality]:
        """
        Get the CBF Packet Corruption Rate.

        This packet corruption rate is calculated as
        PCR = corrupted packets / received SPEAD packets
        Note that the number of packets counters are coded over 32 bits. This
        might provide erroneous values from time to time when they wrap around

        When there are no Alveos assigned to any subarrays set the attribute
        quality to ``ATTR_INVALID``
        """
        no_alveos = self._assigned_alveos_cnt == 0
        qual = (
            AttrQuality.ATTR_INVALID if no_alveos else AttrQuality.ATTR_VALID
        )
        return self._packet_corrupted_rate, time.time(), qual

    def _get_percent_sps(self) -> float:
        """Return the percentage of SPS link up"""
        # Need the Allocator for quality attributes
        try:
            allocator_proxy = tango.DeviceProxy(self.allocator_device)
            allocator_proxy.set_timeout_millis(self.AllocatorTimeoutMs)
        except tango.DevFailed:
            return 0.0
        return allocator_proxy.GetSPSPercent()

    def _get_available_alveos(self) -> list[str]:
        """Return a list of Alveo serial numbers which are not allocated
        to any subarrays.
        """
        return [
            sn
            for sn in self._registered_alveos.keys()
            if sn not in self._unavailable_alveos
        ]

    # Properties (value in database)
    allocator_device = device_property(
        dtype="DevString",
        default_value="low-cbf/allocator/0",
        doc="Tango device with allocation info",
    )

    AllocatorTimeoutMs = device_property(dtype="int", default_value=10_000)
    ProcessorTimeoutMs = device_property(dtype="int", default_value=30_000)

    # General methods

    # inherited
    def create_component_manager(self):
        """Create obligatory ComponentManager."""
        return LowCbfControllerComponentManager(
            logger=self.logger,
            communication_state_callback=self._communication_state_changed,
            component_state_callback=self._component_state_changed,
        )

    def always_executed_hook(self):
        """Method always executed before any TANGO command is executed."""

    def delete_device(self):
        """Hook to delete resources allocated in init_device.

        This method allows for any memory or other resources allocated in the
        init_device method to be released.  This method is called by the device
        destructor and by the device Init command.
        """
        self.set_state(tango.DevState.OFF)

    # Attributes methods
[docs] def read_searchBeamBandwidthMode(self) -> SearchBeamBandwidthMode: """ Search Beam Bandwidth Mode is configured at sub-element level and applies for all the instances of the Capability Search Beams in all sub-arrays. Supported modes are listed and described in Table 9-9 TM can change the value of the parameter Search Beam Bandwidth Mode only when all the sub-arrays are IDLE. :return: the searchBeamBandwidthMode attribute (SINGLE, DOUBLE) """ return self.component_manager.controller.search_beam_bandwidth_mode
[docs] def write_searchBeamBandwidthMode(self, value: SearchBeamBandwidthMode): """Set the searchBeamBandwidthMode attribute. :param value: SINGLE, DOUBLE """ self.component_manager.controller.search_beam_bandwidth_mode = value
def read_subelementSubarrays(self): """Return the subelementSubarrays attribute.""" return self._subelement_subarrays def _update_admin_mode(self: SKABaseDevice, admin_mode: AdminMode) -> None: """Override adminMode change callback.""" # pylint: disable=attribute-defined-outside-init,access-member-before-definition if admin_mode == self._admin_mode: return # nothing to do # healthState reporting is adminMode dependent e.g UNKNOWN while # adminMode is OFFLINE self._adjust_health(admin_mode) self._admin_mode = admin_mode for func in (self.push_change_event, self.push_archive_event): func("adminMode", admin_mode) if self.is_monitoring_mode(): current_health = self.component_manager.controller.health_state if current_health != self.healthState: self._update_health_state(current_health) if not self._health_subscribed: self._subscribe_to_health() def _adjust_health(self, admin_mode: AdminMode) -> None: """Set ``healthState`` to UNKNOWN when switching to e.g. OFFLINE MODE Add log entry about ``healthState`` tracking. This is called just before ``adminMode`` change. :param admin_mode: the ``adminMode`` we are about to transition to """ monitoring_now = self.is_monitoring_mode() monitoring_next = self.is_monitoring_mode(admin_mode) if not monitoring_now and monitoring_next: msg = f"Entering {admin_mode.name} mode: will track healthState" self.logger.info(msg) # if currently in a monitoring mode but won't be after this change # change the healthState to UNKNOWN elif monitoring_now and not monitoring_next: msg = f"Entering {admin_mode.name} mode: won't track healthState" self.logger.info(msg) self._update_health_state(HealthState.UNKNOWN) def _subscribe_to_health(self): """Subscribe to healthState of all Low CBF hardware devices.""" try: db = Database() except DevFailed: # unit tests won't have the DB deployed and that's ok return device_type = "LowCbfProcessor" # few short aliases: em = self._event_manager cbk_fn = self._health_callback add_health_device = self.component_manager.controller.add_health_device for fqdn in db.get_device_exported_for_class(device_type): add_health_device(device_type, fqdn) self.logger.info(f"SUBSCRIBING to {fqdn}:healthState") em.register_callback(cbk_fn, fqdn, "healthState") # for Connector device(s) we need to consult it's own db try: db = Database(self.ConnectorDbHost, self.ConnectorDbPort) except DevFailed as e: # if we got this far this shouldn't be running as unit/CI test self.logger.error(f"Connector DB EXCEPTION {e}") return device_type = "LowCbfConnector" for device_name in db.get_device_exported_for_class(device_type): add_health_device(device_type, device_name) fqdn = self.conn_db_prefix + device_name self.logger.info(f"SUBSCRIBING to {fqdn}:healthState") em.register_callback(cbk_fn, fqdn, "healthState") # pylint: disable=attribute-defined-outside-init self._health_subscribed = True def subscribe_to_allocator(self, tango_dev_name: str) -> None: """ Subscribe to Alveo related allocator attribute (internal_alveo, procDevFqdn) changes. :param tango_dev_name: eg 'low-cbf/allocator/0' """ alloc_proxy = MccsDeviceProxy( tango_dev_name, self.logger, connect=False, tango_timeout_ms=self.AllocatorTimeoutMs, ) for attr_name in ("internal_alveo", "procDevFqdn"): self.logger.info(f"SUBSCRIBE to {attr_name} from {tango_dev_name}") alloc_proxy.evt_sub_on_connect(attr_name, self._allocator_callback) alloc_proxy.connect(max_time=120) def _subscribe_to_proc_admin_mode(self, proc_dict): """Subscribe to Processor adminMode changes so we can flag ENGINEERING/OFFLINE Alveos as unavailable :param proc_dict: e.g {"XFL10NIYKVEU": "low-cbf/processor/0.0.0", ...} """ def is_subscribed_to(fqdn: str) -> bool: """Determine if we are already subscribed to the processor""" return ( fqdn in self._proc_subscribed and self._proc_subscribed[fqdn] ) em = self._event_manager with self._lock: for fqdn in proc_dict.values(): if is_subscribed_to(fqdn): continue self.logger.info(f"SUBSCRIBING to {fqdn}:adminMode") cb_func = self._proc_adminmode_callback em.register_callback(cb_func, fqdn, "adminMode") self._proc_subscribed[fqdn] = True def is_monitoring_mode(self, mode: Union[AdminMode, None] = None) -> bool: """Determine if we are in monitoring mode In monitoring mode we keep track of healthState and report its changes. ONLINE and ENGINEERING admin modes are considered monitoring while NOT_FITTED, OFFLINE and RESERVED are not. See RtD https://is.gd/nCuv1q :param mode: adminMode we are about to transition to; when not supplied use the current ``self._admin_mode`` value instead :return: True when in monitoring mode; False otherwise """ value = self._admin_mode if mode is None else mode return value in self._admin_modes_using_health # ---------- # Commands # ---------- def init_command_objects(self): """ Initialises the command handlers for commands supported by this device. """ # pylint: disable=useless-super-delegation super().init_command_objects() class InitCommand(SKAController.InitCommand): """Init Command class""" def do(self): """ Initialises the attributes and properties of the LowCbfController. """ super().do() self._device._version_id = release.version self._device._build_state = ( f"{release.name}, {release.version}, {release.description}" ) self._device.component_manager.controller.search_beam_bandwidth_mode = ( SearchBeamBandwidthMode.SINGLE ) self._device.set_change_event("healthState", True, False) self._device.set_archive_event("healthState", True, False) self._device._event_manager = EventManager( self._device.logger, events=["healthState", "adminMode"], tango_timeout_ms=self._device.ProcessorTimeoutMs, ) self._device._health_subscribed = False """A flag indicating we already subscribed to health events""" # Keep track of all registed Alveos - {serial_nr: fqdn} self._device._registered_alveos = {} # Keep track of Alveos (serial numbers) that can't be used self._device._unavailable_alveos = set() self._device._assigned_alveos_cnt: int = 0 """Number of Alveos assigned to all subarrays.""" self._device._admin_modes_using_health = [AdminMode.ONLINE] """adminMode values for which we report changes in healthState""" # environment variable determines if healthState is ignored # in ENGINEERING mode ignore_engineering = os.getenv( "ENGINEERING_MODE_IGNORE_HEALTH", default="false" ) if ignore_engineering.lower() != "true": self._device._admin_modes_using_health.append( AdminMode.ENGINEERING ) self._device._spead_pkt_counter = 0 self._device._spead_missing_counter = 0 self._device._bad_eth_packet_counter = 0 self._device._packet_loss_rate = 0.0 self._device._packet_corrupted_rate = 0.0 self._device._proc_subscribed: dict[str, bool] = {} """Keep track of processors to which adminMode attribute we are subscribed dict: { "low-cbf/processor/0.0.0": True }, """ self._device._deque = deque() """A queue to serialise callback events""" # Synchronise producer/consumer without time.sleep() self._device._callback_cond_var = Condition() self._device._lock = Lock() """Ensure that processor polling thread and event handling thread change/use data (e.g. _proc_proxy) in orderly manner""" # thread to process callback events; needed as we subscribe to # processor's adminMode changes but we're notified about the # processor being available in an allocator callback - a potential # for chained callbacks can lock up execution of this module self._device._subelement_subarrays = "" self._device._proc_proxy: dict[str, tango.DeviceProxy] = {} """dict mapping processor FQDN to device proxy""" self._device._thread = Thread( target=self._device._handle_queued_events ) self._device._thread.start() self._device._plr_thread = Thread( target=self._device._calculate_packet_loss_rate ) self._device._plr_thread.start() # update subscribers on attribute change: for attrib in ( "all_alveos", "available_alveos", ): self._device.set_change_event(attrib, True, False) self._device.set_archive_event(attrib, True, False) for attrib in ( "cbfPacketLossRate", "cbfPacketCorruptionRate", ): # Tango can check these updates to see if the value has changed self._device.set_change_event(attrib, True, True) self._device.set_archive_event(attrib, True, True) pre = f"{self._device.ConnectorDbHost}:{self._device.ConnectorDbPort}/" self._device.conn_db_prefix = pre self._device.logger.info( f"Using Connector DB:{self._device.conn_db_prefix}" ) self._device.subscribe_to_allocator(self._device.allocator_device) message = "LowCbfController init complete" self._device.logger.info(message) return ResultCode.OK, message def _process_health(self, fqdn: str, value, quality) -> None: """ Process a health state event from a monitored device. Updates the internal health state table and, if in monitoring mode, updates this device's health state if it has changed. :param fqdn: Tango device name :param value: health state value :param quality: attribute quality """ fqdn = fqdn.removeprefix(self.conn_db_prefix) self.logger.info(f"FROM {fqdn} val: {value} Q: {quality}") self.component_manager.controller.update_health_state( fqdn, value, quality == AttrQuality.ATTR_VALID ) # updates reported only while in monitoring mode if self.is_monitoring_mode(): previous_health = self._health_state new_health = self.component_manager.controller.health_state if new_health != previous_health: self._update_health_state(new_health) def _process_adminmode_change( self, fqdn: str, admin_mode, quality: AttrQuality ) -> None: """Called by LowCbfProcessor when adminMode changes. Used to maintain a list of available/not-available Alveos (primarily for system operators). :param fqdn: Tango device name e.g. "low-cbf/processor/0.0.0" :param name: Tango attribute name e.g. "adminMode" :param admin_mode: AdminMode enumeration `see <https://is.gd/nCuv1q>`_ :param quality: AttrQuality enumeration `see <https://is.gd/nTVEzR>`_ """ self.logger.info(f"PROC {fqdn} adminMode={admin_mode} Q: {quality}") if fqdn not in self._registered_alveos.values(): self.logger.error("UNKNOWN Alveo") return if admin_mode == AdminMode.ONLINE: # remove from "unavailable Alveos" list (if applicable) for sn, dev_name in self._registered_alveos.items(): if dev_name == fqdn: self._unavailable_alveos.discard(sn) break else: # no longer ONLINE, flag as unavailable for sn, dev_name in self._registered_alveos.items(): if dev_name == fqdn: self._unavailable_alveos.add(sn) break self.logger.info(f"UNAVAILABLE Alveos {self._unavailable_alveos}") def _process_allocator_event( self, attr_name: str, evt_json: str, quality: AttrQuality ) -> None: self.logger.info(f"GOT {attr_name} val: {evt_json} Q: {quality}") if quality != AttrQuality.ATTR_VALID: self.logger.warning(f"Invalid quality {quality}") return fsp_dict = json.loads(evt_json) match attr_name.lower(): case "procdevfqdn": # NOTE we get this only once # expecting evt_json: # {"XFL10NIYKVEU": "low-cbf/processor/0.0.0", ...} # pylint: disable=attribute-defined-outside-init if not fsp_dict: # ignore empty dict return self._registered_alveos = fsp_dict # notify subscribers of value change alveos_str = json.dumps(list(self._registered_alveos.keys())) avail_alveos_str = json.dumps(self._get_available_alveos()) for attr, val in ( ("all_alveos", alveos_str), ("available_alveos", avail_alveos_str), ): self.push_change_event(attr, val) self.push_archive_event(attr, val) with self._callback_cond_var: self._deque.append((Evt.PROC_REGISTERED, fsp_dict)) self._callback_cond_var.notify() case "internal_alveo": # Expecting evt_json: # {"XFL1TJCHM3ON": {"fw": ...}, "XFL1E35JVJTQ": {"fw"..}..} # pylint: disable=attribute-defined-outside-init # unavailable Alveos are simply those that are already used self._assigned_alveos_cnt = len(fsp_dict) self._unavailable_alveos = set(fsp_dict.keys()) # notify subscribers of value change avail_alveos_str = json.dumps(self._get_available_alveos()) self.push_change_event("available_alveos", avail_alveos_str) self.push_archive_event("available_alveos", avail_alveos_str) if self._assigned_alveos_cnt == 0: # clear _proc_subscribed if there are no Alveos self.logger.info("No Alveos currently in use") with self._lock: self._proc_proxy = {} for key in self._proc_subscribed: self._proc_subscribed[key] = False else: self._subscribe_to_proc_admin_mode(self._registered_alveos) valid = self._assigned_alveos_cnt > 0 self._push_counter_attributes( self._packet_loss_rate, self._packet_corrupted_rate, valid ) self.logger.info( f"UNAVAILABLE ALVEOS {self._unavailable_alveos}" ) case _: self.logger.warning("UNKNOWN attribute") # ---------- # Callbacks # NOTE: do as little as possible in callbacks # ---------- # pylint: disable=unused-argument def _health_callback(self, fqdn: str, name: str, value, quality) -> None: """ Process a health state event from a monitored device. Updates the internal health state table and, if in monitoring mode, updates this device's health state if it has changed. :param fqdn: Tango device name :param value: health state value :param quality: attribute quality """ with self._callback_cond_var: self._deque.append((Evt.HEALTH_EVENT, (fqdn, value, quality))) self._callback_cond_var.notify() def _allocator_callback( self, attr_name: str, evt_json: str, quality: AttrQuality ) -> None: """ Handle allocator events triggred by a new Alveo discovery or state change (e.g. assigned to a subarray) :param attr_name: attribute name (procDevFqdn, internal_alveo) :param evt_json: event details as JSON string :param quality: ATTR_VALID when OK, `see <https://is.gd/nTVEzR>`_ """ with self._callback_cond_var: self._deque.append( (Evt.ALLOCATOR_EVENT, (attr_name, evt_json, quality)) ) self._callback_cond_var.notify() def _proc_adminmode_callback( self, fqdn: str, name: str, admin_mode, quality: AttrQuality ) -> None: """Called by LowCbfProcessor when adminMode changes. :param fqdn: Tango device name e.g. "low-cbf/processor/0.0.0" :param name: Tango attribute name e.g. "adminMode" :param admin_mode: AdminMode enumeration `see <https://is.gd/nCuv1q>`_ :param quality: AttrQuality enumeration `see <https://is.gd/nTVEzR>`_ """ with self._callback_cond_var: self._deque.append( (Evt.PROC_ADMIN_MODE, (fqdn, admin_mode, quality)) ) self._callback_cond_var.notify() # ---------- # threads # ---------- def _handle_queued_events(self): """A thread handling queued up callback events. Blocks on _callback condition variable - a producer will unblock it. """ # map event code to the method handling it event_to_handler_map = { Evt.HEALTH_EVENT: self._process_health, Evt.PROC_ADMIN_MODE: self._process_adminmode_change, Evt.ALLOCATOR_EVENT: self._process_allocator_event, } cv = self._callback_cond_var while True: # keep the condition variable context small in order to avoid # race condition with cv: while len(self._deque) == 0: cv.wait() item = self._deque.popleft() if not isinstance(item, tuple): self.logger.error(f"ERROR need tuple, got {item}") continue key, val = item match key: case Evt.PROC_REGISTERED: # don't subscribe to auto-registered pretend processors if "ALLOW_AUTO_REGISTER_PROCESSORS" not in os.environ: self.logger.info(f"need to register {val}") self._subscribe_to_proc_admin_mode(val) case Evt.HEALTH_EVENT | Evt.PROC_ADMIN_MODE | Evt.ALLOCATOR_EVENT: event_to_handler_map[key](*val) case _: self.logger.warning(f"UNKNOWN event {key}") # FIXME convert this to event subscription instead of polling in PI#31 def _calculate_packet_loss_rate(self): """ A thread function to retrieve packet loss rate. """ # pylint: disable=access-member-before-definition,attribute-defined-outside-init def read_counter(proxy: tango.DeviceProxy, attr: str) -> int: """ Read a processor counter with retries. :param proxy: Tango DeviceProxy for the processor. :param attr: Name of the attribute to read. :return: The counter value, or 0 if reading failed after retries. """ RETRIES = 3 for _ in range(RETRIES): try: return proxy.read_attribute(attr).value except DevFailed: self.logger.error(f"reading {attr} failed") time.sleep(0.4) return 0 proxies = self._proc_proxy # alias while True: # do nothing if no Alveos are assigned to subarrays if self._assigned_alveos_cnt == 0: time.sleep(self.PacketLossReportInterval) continue spead_pkt_counter = 0 spead_missing_counter = 0 bad_eth_packet_counter = 0 # Update our collection of proxies with self._lock: active_alveos = { fqdn for fqdn, subscribed in self._proc_subscribed.items() if subscribed } new_subscriptions = active_alveos - set(proxies.keys()) for fqdn in new_subscriptions: self.logger.info(f"NEW processor {fqdn}") try: proxies[fqdn] = tango.DeviceProxy(fqdn) proxies[fqdn].set_timeout_millis( self.ProcessorTimeoutMs ) except DevFailed: continue obsolete_subscriptions = set(proxies.keys()) - active_alveos for fqdn in obsolete_subscriptions: proxies.pop(fqdn, None) # Sum counters from all Processors for proxy in proxies.values(): spead_pkt_counter += read_counter( proxy, "stats_spead_pkt_counter" ) spead_missing_counter += read_counter( proxy, "stats_spead_missing_counter" ) bad_eth_packet_counter += read_counter( proxy, "stats_bad_eth_packet_counter" ) increase_spead_pkt = max( 0, spead_pkt_counter - self._spead_pkt_counter ) increase_spead_missing = max( 0, spead_missing_counter - self._spead_missing_counter ) increase_bad_eth_packet = max( 0, bad_eth_packet_counter - self._bad_eth_packet_counter ) self._spead_pkt_counter = spead_pkt_counter self._spead_missing_counter = spead_missing_counter self._bad_eth_packet_counter = bad_eth_packet_counter self._packet_loss_rate = ( max(increase_spead_missing / increase_spead_pkt, 0) if increase_spead_pkt != 0 else 1.0 ) self._packet_corrupted_rate = ( max(increase_bad_eth_packet / increase_spead_pkt, 0) if increase_spead_pkt != 0 else 1.0 ) self._push_counter_attributes( self._packet_loss_rate, self._packet_corrupted_rate ) time.sleep(self.PacketLossReportInterval) def _push_counter_attributes( self, loss_rate: float, corrupted: float, valid: bool = True ): """ Push change and archive events for packet loss and corruption rates. :param loss_rate: Calculated packet loss rate. :param corrupted: Calculated packet corruption rate. :param valid: whether the ``AttrQuality`` is considered valid. """ now = time.time() qual = AttrQuality.ATTR_VALID if valid else AttrQuality.ATTR_INVALID for attr, value in ( ("cbfPacketLossRate", loss_rate), ("cbfPacketCorruptionRate", corrupted), ): data = (value, now, qual) self.push_change_event(attr, *data) self.push_archive_event(attr, *data) # Run server def main(args=None, **kwargs): """Main function of the LowCbfController module.""" return run((LowCbfController,), args=args, **kwargs) if __name__ == "__main__": main()