Source code for ska_mid_cbf_mcs.talon_lru.talon_lru_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Mid.CBF MCS project
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.

# Copyright (c) 2019 National Research Council of Canada

from __future__ import annotations

import concurrent.futures
import logging
from typing import Callable, List, Optional, Tuple

import tango
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import AdminMode, PowerMode, SimulationMode
from tango import DevState

from ska_mid_cbf_mcs.component.component_manager import (
    CbfComponentManager,
    CommunicationStatus,
)
from ska_mid_cbf_mcs.device_proxy import CbfDeviceProxy


[docs]class TalonLRUComponentManager(CbfComponentManager): """A component manager for the TalonLRU device.""" def __init__( self: TalonLRUComponentManager, talons: List[str], pdus: List[str], pdu_outlets: List[str], pdu_cmd_timeout: int, logger: logging.Logger, push_change_event_callback: Optional[Callable], communication_status_changed_callback: Callable[ [CommunicationStatus], None ], component_power_mode_changed_callback: Callable[[PowerMode], None], component_fault_callback: Callable[[bool], None], check_power_mode_callback: Callable, ) -> None: """ Initialise a new instance. :param talons: FQDNs of the Talon DX board :param pdus: FQDNs of the power switch devices :param pdu_outlets: IDs of the PDU outlets :param logger: a logger for this object to use :param push_change_event_callback: method to call when the base classes want to send an event :param communication_status_changed_callback: callback to be called when the status of the communications channel between the component manager and its component changes :param component_power_mode_changed_callback: callback to be called when the component power mode changes :param component_fault_callback: callback to be called in event of component fault :param check_power_mode_callback: callback to be called in event of power switch simulationMode change """ self.connected = False # Get the device proxies of all the devices we care about # TODO: the talondx_board proxies are not currently used for anything # as the mirroring device on the HPS has not yet been created self._talons = talons self._pdus = pdus self._pdu_outlets = pdu_outlets self._pdu_cmd_timeout = pdu_cmd_timeout self.pdu1_power_mode = PowerMode.UNKNOWN self.pdu2_power_mode = PowerMode.UNKNOWN self._proxy_talondx_board1 = None self._proxy_talondx_board2 = None self._proxy_power_switch1 = None self._proxy_power_switch2 = None self.simulation_mode = SimulationMode.TRUE self._simulation_mode_events = [None, None] self._check_power_mode_callback = check_power_mode_callback super().__init__( logger=logger, push_change_event_callback=push_change_event_callback, communication_status_changed_callback=communication_status_changed_callback, component_power_mode_changed_callback=component_power_mode_changed_callback, component_fault_callback=component_fault_callback, ) # ------------- # Communication # -------------
[docs] def start_communicating(self: TalonLRUComponentManager) -> None: """ Establish communication with the component, then start monitoring. """ if self.connected: self._logger.info("Already communicating.") return super().start_communicating() if len(self._talons) < 2: self._logger.error("Expect two Talon board FQDNs") tango.Except.throw_exception( "TalonLRU_TalonBoardFailed", "Two FQDNs for Talon Boards are needed for the LRU", "start_communicating()", ) self._proxy_talondx_board1 = self.get_device_proxy( "mid_csp_cbf/talon_board/" + self._talons[0] ) self._proxy_talondx_board2 = self.get_device_proxy( "mid_csp_cbf/talon_board/" + self._talons[1] ) # Needs Admin mode == ONLINE to run ON command self._proxy_talondx_board1.adminMode = AdminMode.ONLINE self._proxy_talondx_board2.adminMode = AdminMode.ONLINE self._proxy_power_switch1 = self.get_device_proxy( "mid_csp_cbf/power_switch/" + self._pdus[0] ) if self._pdus[1] == self._pdus[0]: self._proxy_power_switch2 = self._proxy_power_switch1 else: self._proxy_power_switch2 = self.get_device_proxy( "mid_csp_cbf/power_switch/" + self._pdus[1] ) if (self._proxy_power_switch1 is None) and ( self._proxy_power_switch2 is None ): self.update_communication_status( CommunicationStatus.NOT_ESTABLISHED ) self.update_component_fault(True) self._logger.error("Both power switches failed to connect.") return # Subscribe to simulationMode change event and increase the access # timeout of the power switch proxies, since the HTTP connection # timeout must be >3s. if self._proxy_power_switch1 is not None: # TEMP: increase timeout to 30s until LRU2 is switched over to the ITF PDU # to handle the observed slowness in the PSI PDU self._proxy_power_switch1.set_timeout_millis( self._pdu_cmd_timeout * 1000 ) self._simulation_mode_events[ 0 ] = self._proxy_power_switch1.add_change_event_callback( "simulationMode", self._check_power_mode_callback, stateless=True, ) self.pdu1_power_mode = ( self._proxy_power_switch1.GetOutletPowerMode( self._pdu_outlets[0] ) ) if self._proxy_power_switch1.numOutlets == 0: self.pdu1_power_mode = PowerMode.UNKNOWN # Set the power switch 1's simulation mode self._proxy_power_switch1.adminMode = AdminMode.OFFLINE self._proxy_power_switch1.simulationMode = self.simulation_mode self._proxy_power_switch1.adminMode = AdminMode.ONLINE if self._proxy_power_switch2 is not None: if self._pdus[1] != self._pdus[0]: # TEMP: increase timeout to 30s until LRU2 is switched over to the ITF PDU # to handle the observed slowness in the PSI PDU self._proxy_power_switch2.set_timeout_millis( self._pdu_cmd_timeout * 1000 ) self._simulation_mode_events[ 1 ] = self._proxy_power_switch2.add_change_event_callback( "simulationMode", self._check_power_mode_callback, stateless=True, ) self.pdu2_power_mode = ( self._proxy_power_switch2.GetOutletPowerMode( self._pdu_outlets[1] ) ) if self._proxy_power_switch2.numOutlets == 0: self.pdu2_power_mode = PowerMode.UNKNOWN # Set the power switch 2's simulation mode self._proxy_power_switch2.adminMode = AdminMode.OFFLINE self._proxy_power_switch2.simulationMode = self.simulation_mode self._proxy_power_switch2.adminMode = AdminMode.ONLINE self.connected = True self.update_communication_status(CommunicationStatus.ESTABLISHED) self.update_component_power_mode(PowerMode.OFF)
[docs] def stop_communicating(self: TalonLRUComponentManager) -> None: """Stop communication with the component.""" super().stop_communicating() if self._simulation_mode_events[0]: self._proxy_power_switch1.remove_event( "simulationMode", self._simulation_mode_events[0] ) self._simulation_mode_events[0] = None if self._simulation_mode_events[1]: self._proxy_power_switch2.remove_event( "simulationMode", self._simulation_mode_events[1] ) self._simulation_mode_events[1] = None self.connected = False
[docs] def get_device_proxy( self: TalonLRUComponentManager, fqdn: str ) -> CbfDeviceProxy | None: """ Attempt to get a device proxy of the specified device. :param fqdn: FQDN of the device to connect to :return: CbfDeviceProxy to the device or None if no connection was made """ try: self._logger.info(f"Attempting connection to {fqdn} device") device_proxy = CbfDeviceProxy( fqdn=fqdn, logger=self._logger, connect=False ) device_proxy.connect(max_time=0) # Make one attempt at connecting return device_proxy except tango.DevFailed as df: for item in df.args: self._logger.error( f"Failed connection to {fqdn} device: {item.reason}" ) self.update_component_fault(True) return None
# --------------- # General methods # ---------------
[docs] def check_power_mode( self: TalonLRUComponentManager, state: DevState ) -> None: """ Get the power mode of both PDUs and check that it is consistent with the current device state. :param state: device operational state """ self._update_power_mode() expected_power_mode = self._get_expected_power_mode(state) if expected_power_mode is None: return # Check the power mode of each outlet matches expected for i, power_mode in enumerate( [self.pdu1_power_mode, self.pdu2_power_mode], start=1 ): if power_mode != expected_power_mode: self._logger.error( f"Power connection {i} expected power mode: ({expected_power_mode})," f" actual power mode: ({power_mode})" )
# Temporary fix to avoid redeploying MCS (CIP-1561) # PDU outlet state mismatch is logged but fault is not triggered # self.update_component_fault(True) def _update_power_mode(self: TalonLRUComponentManager) -> None: """ Check and update current PowerMode states of both PDUs. """ self.pdu1_power_mode = self._get_power_mode( self._proxy_power_switch1, self._pdu_outlets[0] ) self.pdu2_power_mode = self._get_power_mode( self._proxy_power_switch2, self._pdu_outlets[1] ) if (self._pdus[1] == self._pdus[0]) and ( self._pdu_outlets[1] == self._pdu_outlets[0] ): self.pdu2_power_mode = self.pdu1_power_mode def _get_power_mode( self: TalonLRUComponentManager, proxy_power_switch, outlet ) -> PowerMode: """ Get the power mode of the specified outlet from the power switch. :params: proxy_power_switch: the power switch proxy :params: outlet: the outlet to get the power mode of """ if ( proxy_power_switch is not None and proxy_power_switch.numOutlets != 0 ): return proxy_power_switch.GetOutletPowerMode(outlet) else: return PowerMode.UNKNOWN def _get_expected_power_mode( self: TalonLRUComponentManager, state: DevState ): """ Get the expected power mode based on given device state. :param state: device operational state :return: the expected PowerMode """ if state in [DevState.INIT, DevState.OFF]: return PowerMode.OFF elif state == DevState.ON: return PowerMode.ON else: # In other device states, we don't know what the expected power # mode should be. Don't check it. return None # --------------- # Command methods # ---------------
[docs] def on( self: TalonLRUComponentManager, ) -> Tuple[ResultCode, str]: """ Turn on the TalonLRU and its subordinate devices :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ if not self.connected: log_msg = "Attempted ON sequence without connected proxies" self._logger.error(log_msg) self.update_component_fault(True) return (ResultCode.FAILED, log_msg) self._update_power_mode() # Power on both outlets result1 = ResultCode.FAILED if self.pdu1_power_mode == PowerMode.ON: self._logger.info("PDU 1 is already on.") result1 = ResultCode.OK elif self._proxy_power_switch1 is not None: result1 = self._proxy_power_switch1.TurnOnOutlet( self._pdu_outlets[0] )[0][0] if result1 == ResultCode.OK: self.pdu1_power_mode = PowerMode.ON self._logger.info("PDU 1 successfully turned on.") result2 = ResultCode.FAILED if ( self._pdus[1] == self._pdus[0] and self._pdu_outlets[1] == self._pdu_outlets[0] ): self._logger.info("PDU 2 is not used.") result2 = result1 elif self.pdu2_power_mode == PowerMode.ON: self._logger.info("PDU 2 is already on.") result2 = ResultCode.OK elif self._proxy_power_switch2 is not None: result2 = self._proxy_power_switch2.TurnOnOutlet( self._pdu_outlets[1] )[0][0] if result2 == ResultCode.OK: self.pdu2_power_mode = PowerMode.ON self._logger.info("PDU 2 successfully turned on.") # Start monitoring talon board telemetries and fault status # This can fail if HPS devices are not deployed to the # board, but it's okay to continue. try: self._proxy_talondx_board1.On() except tango.DevFailed as df: self._logger.warn( f"Talon board {self._talons[0]} ON command failed: {df}" ) try: self._proxy_talondx_board2.On() except tango.DevFailed as df: self._logger.warn( f"Talon board {self._talons[1]} ON command failed: {df}" ) # Determine what result code to return if result1 == ResultCode.FAILED and result2 == ResultCode.FAILED: self.update_component_fault(True) return (ResultCode.FAILED, "Failed to turn on both outlets") elif result1 == ResultCode.FAILED or result2 == ResultCode.FAILED: self.update_component_power_mode(PowerMode.ON) return ( ResultCode.OK, "Only one outlet successfully turned on", ) else: self.update_component_power_mode(PowerMode.ON) return (ResultCode.OK, "Both outlets successfully turned on")
[docs] def off( self: TalonLRUComponentManager, ) -> Tuple[ResultCode, str]: """ Turn off the TalonLRU and its subordinate devices :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ if not self.connected: log_msg = "Proxies not connected" self._logger.error(log_msg) self.update_component_fault(True) return (ResultCode.FAILED, log_msg) # Power off both outlets result1 = ResultCode.FAILED if self._proxy_power_switch1 is not None: result1 = self._proxy_power_switch1.TurnOffOutlet( self._pdu_outlets[0] )[0][0] if result1 == ResultCode.OK: self.pdu1_power_mode = PowerMode.OFF self._logger.info("PDU 1 successfully turned off.") result2 = ResultCode.FAILED if self._proxy_power_switch2 is not None: if ( self._pdus[1] == self._pdus[0] and self._pdu_outlets[1] == self._pdu_outlets[0] ): self._logger.info("PDU 2 is not used.") result2 = result1 else: result2 = self._proxy_power_switch2.TurnOffOutlet( self._pdu_outlets[1] )[0][0] if result2 == ResultCode.OK: self.pdu2_power_mode = PowerMode.OFF self._logger.info("PDU 2 successfully turned off.") # Stop monitoring talon board telemetries and fault status talondx_board_proxies_by_id = { 1: self._proxy_talondx_board1, 2: self._proxy_talondx_board2, } with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit( self._turn_off_boards, board_id, proxy_talondx_board ) for board_id, proxy_talondx_board in talondx_board_proxies_by_id.items() ] results = [f.result() for f in futures] for result_code, msg in results: if result_code == ResultCode.FAILED: return ( ResultCode.FAILED, f"Failed to turn off Talon board: {msg}", ) elif result_code == ResultCode.OK: self._logger.info( f"Talon board successfully turned off: {msg}" ) else: self._logger.warn( f"Talon board turned off with unexpected result code {result_code}: {msg}" ) # Determine what result code to return if result1 == ResultCode.FAILED and result2 == ResultCode.FAILED: self.update_component_fault(True) return (ResultCode.FAILED, "Failed to turn off both outlets") elif result1 == ResultCode.FAILED or result2 == ResultCode.FAILED: self.update_component_fault(True) return ( ResultCode.FAILED, "Only one outlet successfully turned off", ) else: self.update_component_power_mode(PowerMode.OFF) return (ResultCode.OK, "Both outlets successfully turned off")
def _turn_off_boards( self: TalonLRUComponentManager, board_id, talondx_board_proxy ): try: talondx_board_proxy.Off() except tango.DevFailed as df: return ( ResultCode.FAILED, f"_turn_off_boards FAILED on Talon board {board_id}: {df}", ) return ( ResultCode.OK, f"_turn_off_boards completed OK on Talon board {board_id}", )
[docs] def standby( self: TalonLRUComponentManager, ) -> Tuple[ResultCode, str]: """ Turn the TalonLRU into low power standby mode :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ return (ResultCode.OK, "TalonLRU Standby command completed OK")