Source code for ska_mid_cbf_mcs.slim.slim_link_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Mid.CBF MCS project
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.

# Copyright (c) 2022 National Research Council of Canada

from __future__ import annotations

import logging
from typing import Callable, Optional

import backoff
import tango
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import HealthState, PowerMode, SimulationMode

from ska_mid_cbf_mcs.commons.global_enum import const
from ska_mid_cbf_mcs.component.component_manager import (
    CbfComponentManager,
    CommunicationStatus,
)
from ska_mid_cbf_mcs.device_proxy import CbfDeviceProxy
from ska_mid_cbf_mcs.slim.slim_link_simulator import SlimLinkSimulator


[docs]class SlimLinkComponentManager(CbfComponentManager): """ A component manager for a SLIM link, which is made up of a Tx and Rx device from the ds-slim-tx-rx HPS device server. """ def __init__( self: SlimLinkComponentManager, update_health_state: Callable[[HealthState], None], logger: logging.Logger, push_change_event_callback: Optional[Callable], communication_status_changed_callback: Callable[ [CommunicationStatus], None ], component_power_mode_changed_callback: Callable[[PowerMode], None], component_fault_callback: Callable[[bool], None], simulation_mode: SimulationMode = SimulationMode.TRUE, ) -> None: """ Initialize a new instance. :param update_health_state: method to call when link health state changes :param logger: a logger for this object to use :param push_change_event_callback: callback used when the base classes want to send an event :param communication_status_changed_callback: callback used when the status of the communications channel between the component manager and its component changes :param component_power_mode_changed_callback: callback used when the component power mode changes :param component_fault_callback: callback used in event of component fault """ self.connected = False self._simulation_mode = simulation_mode self._link_name = "" self._tx_device_name = "" self._rx_device_name = "" self._tx_device_proxy = None self._rx_device_proxy = None self._link_enabled = False # True when tx rx are connected self.slim_link_simulator = SlimLinkSimulator( logger=logger, update_health_state=update_health_state ) self._update_health_state = update_health_state super().__init__( logger=logger, push_change_event_callback=push_change_event_callback, communication_status_changed_callback=communication_status_changed_callback, component_power_mode_changed_callback=component_power_mode_changed_callback, component_fault_callback=component_fault_callback, ) @property def tx_device_name(self: SlimLinkComponentManager) -> str: """ The name of the HPS tx device that the link is associated with. :return: the tx device name. :rtype: str """ if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator._tx_device_name return self._tx_device_name @tx_device_name.setter def tx_device_name( self: SlimLinkComponentManager, tx_device_name: str ) -> None: """ Sets the tx device name value. :param tx_device_name: The tx device name. """ if self._simulation_mode == SimulationMode.TRUE: self.slim_link_simulator.tx_device_name = tx_device_name self._tx_device_name = tx_device_name @property def rx_device_name(self: SlimLinkComponentManager) -> str: """ The name of the HPS rx device that the link is associated with. :return: the rx device name. :rtype: str """ if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator._rx_device_name return self._rx_device_name @rx_device_name.setter def rx_device_name( self: SlimLinkComponentManager, rx_device_name: str ) -> None: """ Sets the rx device name value. :param rx_device_name: The rx device name. """ if self._simulation_mode == SimulationMode.TRUE: self.slim_link_simulator.rx_device_name = rx_device_name self._rx_device_name = rx_device_name @property def link_name(self: SlimLinkComponentManager) -> str: """ The name of the SLIM link. :return: the link name. :rtype: str """ if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator._link_name return self._link_name @property def tx_idle_ctrl_word(self: SlimLinkComponentManager) -> int: """ The idle control word set in the tx device. Initially generated in the HPS by hashing the tx device's FQDN. :return: the tx idle control word. :raise Tango exception: if the tx device is not set. :rtype: int """ if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.tx_idle_ctrl_word if self._tx_device_proxy is None: tango.Except.throw_exception( "SlimLink_tx_idle_ctrl_word", "Tx Rx are not yet connected", "tx_idle_ctrl_word()", ) return self._tx_device_proxy.idle_ctrl_word @property def rx_idle_ctrl_word(self: SlimLinkComponentManager) -> int: """ The last idle control word received in the datastream by the HPS rx device. :return: the rx idle control word. :raise Tango exception: if the rx device is not set. :rtype: int """ if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.rx_idle_ctrl_word if self._rx_device_proxy is None: tango.Except.throw_exception( "SlimLink_rx_idle_ctrl_word", "Tx Rx are not yet connected", "rx_idle_ctrl_word()", ) return self._rx_device_proxy.idle_ctrl_word @property def bit_error_rate(self: SlimLinkComponentManager) -> float: """ The bit-error rate in 66b-word-errors per second. :return: The bit error rate. :raise Tango exception: if the rx device is not set. :rtype: float """ if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.bit_error_rate if self._rx_device_proxy is None: tango.Except.throw_exception( "SlimLink_Bit_Error_Rate", "Tx Rx are not yet connected", "bit_error_rate()", ) return self._rx_device_proxy.bit_error_rate @property def rx_debug_alignment_and_lock_status( self: SlimLinkComponentManager, ) -> list[bool]: """ Returns the Debug Alignment and Lock Status flags of the rx HPS device If rx_device_proxy is not connected or tango.DevFailed is caught when accessing rx_debug_alignment_and_lock_status from the rx_device_proxy, returns an empty list :return: Debug Alignment and Lock Status flags of the rx HPS Device :rtype: list[int] """ res = [] # if in simulation mode if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.rx_debug_alignment_and_lock_status # if the device proxy has not been set if self._rx_device_proxy is None: self._logger.error( "error reading rx_debug_alignment_and_lock_status: Tx Rx are not yet connected" ) return res # catch errors when trying to read from the device proxy try: return self._rx_device_proxy.debug_alignment_and_lock_status except tango.DevFailed as df: self._logger.error( f"error reading rx_debug_alignment_and_lock_status: {df}" ) return res @property def rx_link_occupancy(self: SlimLinkComponentManager) -> float: """ Retrieves and return the link occupancy of the rx device :return: Link Occupancy of the rx Device, defaults to -1.0 if not possible :raise Tango exception: if the rx device is not set. :rtype: float """ res = -1.0 # if in simulation mode if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.rx_link_occupancy # if the device proxy has not been set if self._rx_device_proxy is None: self._logger.error( "error reading rx_link_occupancy: Tx Rx are not yet connected" ) return res # catch errors when trying to read from the device proxy try: return self._rx_device_proxy.link_occupancy except tango.DevFailed as df: self._logger.error(f"error reading rx_link_occupancy: {df}") return res @property def tx_link_occupancy(self: SlimLinkComponentManager) -> float: """ Retrieves and return the link occupancy of the tx device :return: Link Occupancy of the tx Device, defaults to -1.0 if not possible :raise Tango exception: if the tx device is not set. :rtype: float """ res = -1.0 # if in simulation mode if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.tx_link_occupancy # if the device proxy has not been set if self._tx_device_proxy is None: self._logger.error( "error reading tx_link_occupancy: Tx Rx are not yet connected" ) return res # catch errors when trying to read from the device proxy try: return self._tx_device_proxy.link_occupancy except tango.DevFailed as df: self._logger.error(f"error reading tx_link_occupancy: {df}") return res @property def simulation_mode(self): """ Get the simulation mode. """ return self._simulation_mode @simulation_mode.setter def simulation_mode(self, value) -> None: """ Set the simulation mode value. :param value: The simulation mode. """ self._simulation_mode = value
[docs] def read_counters( self: SlimLinkComponentManager, ) -> list[int]: """ An array holding the counter values from the HPS tx and rx devices in the order: [0] rx_word_count [1] rx_packet_count [2] rx_idle_word_count [3] rx_idle_error_count [4] rx_block_lost_count [5] rx_cdr_lost_count [6] tx_word_count [7] tx_packet_count [8] tx_idle_word_count :return: The read_counters array. :raise Tango exception: if link is not enabled. :rtype: list[int] """ if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.read_counters() if ( not self._link_enabled or (self._tx_device_proxy is None) or (self._rx_device_proxy is None) ): tango.Except.throw_exception( "SlimLink_Read_Counters", "Tx Rx are not yet connected", "read_counters()", ) tx_counts = self._tx_device_proxy.read_counters rx_counts = self._rx_device_proxy.read_counters self._logger.debug(f"tx_counts = {tx_counts}") self._logger.debug(f"rx_counts = {rx_counts}") return [ rx_counts[0], rx_counts[1], rx_counts[2], rx_counts[3], rx_counts[4], rx_counts[5], tx_counts[0], tx_counts[1], tx_counts[2], ]
[docs] def start_communicating(self: SlimLinkComponentManager) -> None: """Establish communication with the component, then start monitoring.""" self._logger.debug( "Entering SlimLinkComponentManager.start_communicating()" ) if self.connected: self._logger.info("Already communicating.") return super().start_communicating() self.update_communication_status(CommunicationStatus.ESTABLISHED) self.connected = True
[docs] def stop_communicating(self: SlimLinkComponentManager) -> None: """Stop communication with the component.""" self._logger.debug( "Entering SlimLinkComponentManager.stop_communicating()" ) super().stop_communicating() self.update_component_power_mode(PowerMode.UNKNOWN) self.connected = False
[docs] def connect_slim_tx_rx( self: SlimLinkComponentManager, ) -> tuple[ResultCode, str]: """ Link the HPS tx and rx devices by synchronizing their idle control words and disabling serial loopback. Begin monitoring the Tx and Rx. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ self._logger.debug( "Entering SlimLinkComponentManager.connect_slim_tx_rx() - " + self._tx_device_name + "->" + self._rx_device_name ) if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.connect_slim_tx_rx() # Tx and Rx device names must be set to create proxies. if self._rx_device_name == "" or self._tx_device_name == "": msg = "Tx or Rx device FQDN have not been set." return (ResultCode.FAILED, msg) try: self._tx_device_proxy = CbfDeviceProxy( fqdn=self._tx_device_name, logger=self._logger ) self._rx_device_proxy = CbfDeviceProxy( fqdn=self._rx_device_name, logger=self._logger ) @backoff.on_exception( backoff.constant, (Exception, tango.DevFailed), max_tries=6, interval=1.5, jitter=None, ) def ping_slim_tx_rx() -> None: """ Attempts to connect to the Talon board for the first time after power-on. :param ip: IP address of the board :param ssh_client: SSH client to use for connection """ self._ping_count += 1 self._tx_device_proxy.ping() self._rx_device_proxy.ping() self._ping_count = 0 ping_slim_tx_rx() self._logger.info( f"Successfully pinged DsSlimTx and DsSlimRx devices after {self._ping_count} tries" ) # Sync the idle ctrl word between Tx and Rx idle_ctrl_word = self.tx_idle_ctrl_word # If Tx's IdleCtrlWord reads as None, regenerate. if idle_ctrl_word is None: idle_ctrl_word = ( hash(self._tx_device_name) & 0x00FFFFFFFFFFFFFF ) self._logger.warning( f"SlimTx idle_ctrl_word could not be read. Regenerating idle_ctrl_word={idle_ctrl_word}." ) self._tx_device_proxy.idle_ctrl_word = idle_ctrl_word self._rx_device_proxy.idle_ctrl_word = idle_ctrl_word self._logger.info( f"Tx idle_ctrl_word: {self._tx_device_proxy.idle_ctrl_word} type: {type(self._tx_device_proxy.idle_ctrl_word)}" ) self._logger.info( f"Rx idle_ctrl_word: {self._rx_device_proxy.idle_ctrl_word} type: {type(self._rx_device_proxy.idle_ctrl_word)}" ) # Take SLIM Rx out of serial loopback self._rx_device_proxy.initialize_connection(False) self.clear_counters() except tango.DevFailed as df: msg = f"Failed to connect Tx Rx for {self._link_name}: {df.args[0].desc}" self._logger.error(msg) self.update_component_fault(True) return (ResultCode.FAILED, msg) self._link_enabled = True self._link_name = f"{self._tx_device_name}->{self._rx_device_name}" return ( ResultCode.OK, f"Connected Tx Rx successfully: {self._link_name}", )
[docs] def verify_connection( self: SlimLinkComponentManager, ) -> tuple[ResultCode, str]: """ Performs a health check on the SLIM link. No check is done if the link is not active; instead, the health state is set to UNKNOWN. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ self._logger.debug( "Entering SlimLinkComponentManager.verify_connection() - " + self._link_name ) if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.verify_connection() if ( (not self._link_enabled) or (self._tx_device_proxy is None) or (self._rx_device_proxy is None) ): msg = "Tx and Rx devices have not been connected." self._logger.debug(msg) self._update_health_state(HealthState.UNKNOWN) return ResultCode.OK, msg error_msg = "" error_flag = False try: if self.rx_idle_ctrl_word != self.tx_idle_ctrl_word: error_flag = True error_msg += ( "Expected and received idle control word do not match. " ) counters = self.read_counters() if counters[4] != 0: error_flag = True error_msg += "block_lost_count not zero. " if counters[5] != 0: error_flag = True error_msg += "cdr_lost_count not zero. " if self.bit_error_rate > const.BER_PASS_THRESHOLD: error_flag = True error_msg += ( f"bit-error-rate higher than {const.BER_PASS_THRESHOLD}. " ) except tango.DevFailed as df: error_msg = f"verify_connection() failed for {self._link_name}: {df.args[0].desc}" self._logger.error(error_msg) self._update_health_state(HealthState.FAILED) return ResultCode.FAILED, error_msg if error_flag: self._logger.warn( f"Link failed health check for {self._link_name}: {error_msg}" ) self._update_health_state(HealthState.FAILED) return ResultCode.OK, error_msg self._update_health_state(HealthState.OK) return ResultCode.OK, f"Link health check OK: {self._link_name}"
[docs] def disconnect_slim_tx_rx( self: SlimLinkComponentManager, ) -> tuple[ResultCode, str]: """ Stops controlling and monitoring the HPS tx and rx devices. The link becomes inactive. Serial loopback is re-established. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ self._logger.debug( "Entering SlimLinkComponentManager.disconnect_slim_tx_rx() - " + self._link_name ) if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.disconnect_slim_tx_rx() try: if self._rx_device_proxy is not None: # Put SLIM Rx back in serial loopback rx = self._rx_device_name index = rx.split("/")[2].split("-")[1][2:] mesh = rx.split("/")[2].split("-")[0] rx_arr = rx.split("/") tx = rx_arr[0] + "/" + rx_arr[1] + "/" + mesh + "-tx" + index self._tx_device_name = tx self._tx_device_proxy = CbfDeviceProxy( fqdn=self._tx_device_name, logger=self._logger ) # Sync the idle ctrl word between Tx and Rx idle_ctrl_word = self.tx_idle_ctrl_word self._rx_device_proxy.idle_ctrl_word = idle_ctrl_word self._rx_device_proxy.initialize_connection(True) except tango.DevFailed: result_msg = f"Failed to enable Rx loopback: {self._tx_device_name}->{self._rx_device_name}" self._logger.warn(result_msg) return ResultCode.FAILED, result_msg finally: self._rx_device_proxy = None self._tx_device_proxy = None self._link_name = "" self._link_enabled = False return ( ResultCode.OK, f"Disconnected Tx Rx. {self._rx_device_name} now in serial loopback.", )
[docs] def clear_counters( self: SlimLinkComponentManager, ) -> tuple[ResultCode, str]: """ Clears the HPS tx and rx device's read counters. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ self._logger.debug( "Entering SlimLinkComponentManager.clearCounters() - " + self._link_name ) if self._simulation_mode == SimulationMode.TRUE: return self.slim_link_simulator.clear_counters() if ( (not self._link_enabled) or (self._tx_device_proxy is None) or (self._rx_device_proxy is None) ): msg = "Tx and Rx devices have not been connected." return ResultCode.OK, msg try: self._tx_device_proxy.clear_read_counters() self._rx_device_proxy.clear_read_counters() except tango.DevFailed: result_msg = f"Clearing counters failed: {self._link_name}" self._logger.error(result_msg) return ResultCode.FAILED, result_msg return ResultCode.OK, f"Counters cleared: {self._link_name}"