Source code for ska_mid_cbf_mcs.controller.controller_device

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Mid.CBF MCS project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.

# Copyright (c) 2019 National Research Council of Canada

"""
CbfController
Sub-element controller device for Mid.CBf
"""

from __future__ import annotations  # allow forward references in type hints

from typing import List, Optional, Tuple

import tango
from ska_tango_base import SKABaseDevice, SKAController
from ska_tango_base.commands import ResponseCommand, ResultCode
from ska_tango_base.control_model import PowerMode, SimulationMode
from tango import AttrWriteType, DebugIt, DevState
from tango.server import attribute, command, device_property, run

from ska_mid_cbf_mcs.component.component_manager import CommunicationStatus
from ska_mid_cbf_mcs.controller.controller_component_manager import (
    ControllerComponentManager,
)
from ska_mid_cbf_mcs.controller.talondx_component_manager import (
    TalonDxComponentManager,
)

__all__ = ["CbfController", "main"]


[docs]class CbfController(SKAController): """ CbfController TANGO device class. Primary point of contact for monitoring and control of Mid.CBF. Implements state and mode indicators, and a set of state transition commmands. """ # PROTECTED REGION ID(CbfController.class_variable) ENABLED START # # PROTECTED REGION END # // CbfController.class_variable # ----------------- # Device Properties # ----------------- CbfSubarray = device_property(dtype=("str",)) VCC = device_property(dtype=("str",)) FSP = device_property(dtype=("str",)) TalonLRU = device_property(dtype=("str",)) TalonBoard = device_property(dtype=("str",)) PowerSwitch = device_property(dtype=("str",)) FsSLIM = device_property(dtype=("str")) VisSLIM = device_property(dtype=("str")) TalonDxConfigPath = device_property(dtype=("str")) HWConfigPath = device_property(dtype=("str")) FsSLIMConfigPath = device_property(dtype=("str")) VisSLIMConfigPath = device_property(dtype=("str")) LruTimeout = device_property(dtype=("str")) # ---------- # Attributes # ---------- commandProgress = attribute( dtype="uint16", label="Command progress percentage", max_value=100, min_value=0, polling_period=3000, abs_change=5, rel_change=2, doc="Percentage progress implemented for commands that result in state/mode transitions for a large \nnumber of components and/or are executed in stages (e.g power up, power down)", ) sysParam = attribute( dtype="str", access=AttrWriteType.READ, label="Dish ID to VCC and frequency offset k mapping", doc="Maps Dish ID to VCC and frequency offset k. The string is in JSON format.", ) sourceSysParam = attribute( dtype="str", access=AttrWriteType.READ, label="The location of the file containing Dish ID to VCC and frequency offset k mapping.", doc="Source and file path to the file to be retrieved through the Telescope Model. The string is in JSON format.", ) simulationMode = attribute( dtype=SimulationMode, access=AttrWriteType.READ_WRITE, memorized=True, doc="Reports the simulation mode of the device. \nSome devices may implement " "both modes, while others will have simulators that set simulationMode " "to True while the real devices always set simulationMode to False.", ) # --------------- # General methods # ---------------
[docs] def init_command_objects(self: CbfController) -> None: """ Sets up the command objects """ super().init_command_objects() device_args = (self, self.op_state_model, self.logger) self.register_command_object("On", self.OnCommand(*device_args)) self.register_command_object("Off", self.OffCommand(*device_args)) self.register_command_object( "Standby", self.StandbyCommand(*device_args) ) self.register_command_object( "InitSysParam", self.InitSysParamCommand(*device_args) )
[docs] def get_num_capabilities( self: CbfController, ) -> None: # self._max_capabilities inherited from SKAController # check first if property exists in DB """Get number of capabilities for _init_Device. If property not found in db, then assign a default amount(197,27,16)""" if self._max_capabilities: return self._max_capabilities else: self.logger.warning("MaxCapabilities device property not defined")
[docs] class InitCommand(SKAController.InitCommand): def _get_num_capabilities( self: CbfController.InitCommand, ) -> None: # self._max_capabilities inherited from SKAController # check first if property exists in DB """Get number of capabilities for _init_Device. If property not found in db, then assign a default amount(197,27,16) """ device = self.target device.write_simulationMode(True) if device._max_capabilities: try: device._count_vcc = device._max_capabilities["VCC"] except KeyError: # not found in DB self.logger.warning( "VCC capabilities not defined; defaulting to 197." ) device._count_vcc = 197 try: device._count_fsp = device._max_capabilities["FSP"] except KeyError: # not found in DB self.logger.warning( "FSP capabilities not defined; defaulting to 27." ) device._count_fsp = 27 try: device._count_subarray = device._max_capabilities[ "Subarray" ] except KeyError: # not found in DB self.logger.warning( "Subarray capabilities not defined; defaulting to 16." ) device._count_subarray = 16 else: self.logger.warning( "MaxCapabilities device property not defined - \ using default value" )
[docs] def do( self: CbfController.InitCommand, ) -> Tuple[ResultCode, str]: """ Stateless hook for device initialisation. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :return: (ResultCode, str) """ self.logger.debug("Entering InitCommand()") message = "Entering InitCommand()" self.logger.info(message) super().do() device = self.target # initialize attribute values device._command_progress = 0 # defines self._count_vcc, self._count_fsp, and self._count_subarray self._get_num_capabilities() # # initialize attribute values device._command_progress = 0 # TODO remove when upgrading base class from 0.11.3 device.set_change_event("healthState", True, True) message = "CbfController Init command completed OK" self.logger.info(message) return (ResultCode.OK, message)
[docs] def always_executed_hook(self: CbfController) -> None: # PROTECTED REGION ID(CbfController.always_executed_hook) ENABLED START # """ Hook to be executed before any command. """
# PROTECTED REGION END # // CbfController.always_executed_hook
[docs] def create_component_manager( self: CbfController, ) -> ControllerComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ self.logger.debug("Entering create_component_manager()") self._communication_status: Optional[CommunicationStatus] = None self._component_power_mode: Optional[PowerMode] = None # Create the Talon-DX component manager and initialize simulation # mode to on self._simulation_mode = SimulationMode.TRUE self._talondx_component_manager = TalonDxComponentManager( talondx_config_path=self.TalonDxConfigPath, hw_config_path=self.HWConfigPath, simulation_mode=self._simulation_mode, logger=self.logger, ) return ControllerComponentManager( get_num_capabilities=self.get_num_capabilities, vcc_fqdns_all=self.VCC, fsp_fqdns_all=self.FSP, subarray_fqdns_all=self.CbfSubarray, talon_lru_fqdns_all=self.TalonLRU, talon_board_fqdns_all=self.TalonBoard, power_switch_fqdns_all=self.PowerSwitch, fs_slim_fqdn=self.FsSLIM, vis_slim_fqdn=self.VisSLIM, lru_timeout=int(self.LruTimeout), talondx_component_manager=self._talondx_component_manager, talondx_config_path=self.TalonDxConfigPath, hw_config_path=self.HWConfigPath, fs_slim_config_path=self.FsSLIMConfigPath, vis_slim_config_path=self.VisSLIMConfigPath, logger=self.logger, push_change_event=self.push_change_event, communication_status_changed_callback=self._communication_status_changed, component_power_mode_changed_callback=self._component_power_mode_changed, component_fault_callback=self._component_fault, )
[docs] def delete_device(self: CbfController) -> None: """Unsubscribe to events, turn all the subarrays, VCCs and FSPs off"""
# PROTECTED REGION ID(CbfController.delete_device) ENABLED START # # PROTECTED REGION END # // CbfController.delete_device # ------------------ # Attributes methods # ------------------
[docs] def read_commandProgress(self: CbfController) -> int: # PROTECTED REGION ID(CbfController.commandProgress_read) ENABLED START # """Return commandProgress attribute: percentage progress implemented for commands that result in state/mode transitions for a large number of components and/or are executed in stages (e.g power up, power down)""" return self._command_progress
# PROTECTED REGION END # // CbfController.commandProgress_read
[docs] def read_sysParam(self: CbfController) -> str: # PROTECTED REGION ID(CbfController.read_sysParam) ENABLED START # """Return the mapping from Dish ID to VCC and frequency offset k. The string is in JSON format.""" return self.component_manager._init_sys_param
# PROTECTED REGION END # // CbfController.sysParam_read
[docs] def read_sourceSysParam(self: CbfController) -> str: # PROTECTED REGION ID(CbfController.read_sourceSysParam) ENABLED START # """Return the location of the json file that contains the mapping from Dish ID to VCC and frequency offset k, to be retrieved using the Telescope Model. """ return self.component_manager._source_init_sys_param
# PROTECTED REGION END # // CbfController.read_sourceSysParam
[docs] def read_dishToVcc(self: CbfController) -> List[str]: # PROTECTED REGION ID(CbfController.dishToVcc_read) ENABLED START # """Return 'dishID:vccID'""" if self.component_manager.dish_utils is None: return [] out_str = [ f"{r}:{v}" for r, v in self.component_manager.dish_utils.dish_id_to_vcc_id.items() ] return out_str
# PROTECTED REGION END # // CbfController.dishToVcc_read
[docs] def read_vccToDish(self: CbfController) -> List[str]: # PROTECTED REGION ID(CbfController.vccToDish_read) ENABLED START # """Return dishToVcc attribute: 'vccID:dishID'""" if self.component_manager.dish_utils is None: return [] out_str = [ f"{v}:{r}" for r, v in self.component_manager.dish_utils.dish_id_to_vcc_id.items() ] return out_str
# PROTECTED REGION END # // CbfController.vccToDish_read
[docs] def write_simulationMode( self: CbfController, value: SimulationMode ) -> None: """ Set the Simulation Mode of the device. :param value: SimulationMode """ super().write_simulationMode(value) self._talondx_component_manager.simulation_mode = value
# -------- # Commands # --------
[docs] class OnCommand(SKABaseDevice.OnCommand): """ A class for the CbfController's On() command. """
[docs] def is_allowed( self: CbfController.OnCommand, raise_if_disallowed=False ) -> bool: """ Determine if OnCommand is allowed. :return: if OnCommand is allowed :rtype: bool """ result = super().is_allowed(raise_if_disallowed) if self.target.get_state() == tango.DevState.ON: result = False return result
[docs] def do( self: CbfController.OnCommand, ) -> Tuple[ResultCode, str]: """ Stateless hook for On() command functionality. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ self.logger.info("Trying ON Command") (result_code, message) = self.target.component_manager.on() if result_code == ResultCode.OK: self.target._component_power_mode_changed(PowerMode.ON) self.logger.info(message) elif result_code == ResultCode.FAILED: self.logger.error(message) return (result_code, message)
[docs] class OffCommand(SKABaseDevice.OffCommand): """ A class for the CbfController's Off() command. """
[docs] def is_allowed( self: CbfController.OffCommand, raise_if_disallowed=False ) -> bool: """ Determine if OffCommand is allowed. :return: if OffCommand is allowed :rtype: bool """ result = super().is_allowed(raise_if_disallowed) if self.target.get_state() == tango.DevState.OFF: result = False return result
[docs] def do( self: CbfController.OffCommand, ) -> Tuple[ResultCode, str]: """ Stateless hook for Off() command functionality. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ if self.is_allowed(): (result_code, message) = self.target.component_manager.off() else: result_code = ResultCode.FAILED message = f"Off command is not allowed when op state is {self.target.op_state_model.op_state}" if result_code == ResultCode.OK: self.target._component_power_mode_changed(PowerMode.OFF) self.logger.info(message) elif result_code == ResultCode.FAILED: self.logger.error(message) return (result_code, message)
[docs] class StandbyCommand(SKABaseDevice.StandbyCommand): """ A class for the CbfController's Standby() command. """
[docs] def do( self: CbfController.StandbyCommand, ) -> Tuple[ResultCode, str]: """ Stateless hook for Standby() command functionality. Turn off subarray, vcc, fsp, turn CbfController to standby :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ (result_code, message) = self.target.component_manager.standby() if result_code == ResultCode.OK: self.target._component_power_mode_changed(PowerMode.STANDBY) self.logger.info(message) return (result_code, message)
[docs] class InitSysParamCommand(ResponseCommand): """ A class for the CbfController's InitSysParam() command. """
[docs] def is_allowed(self: CbfController.InitSysParamCommand) -> bool: """ Determine if InitSysParamCommand is allowed (allowed when state is OFF). :return: if InitSysParamCommand is allowed :rtype: bool """ return self.target.op_state_model.op_state == DevState.OFF
[docs] def do( self: CbfController.InitSysParamCommand, argin: str ) -> Tuple[ResultCode, str]: """ This command sets the Dish ID - VCC ID mapping and k values :param argin: the Dish ID - VCC ID mapping and frequency offset (k) in a json string. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ if not self.is_allowed(): return ( ResultCode.FAILED, "InitSysParam command may be called only when state is OFF", ) ( result_code, message, ) = self.target.component_manager.init_sys_param(argin) if result_code == ResultCode.OK: self.logger.info(message) elif result_code == ResultCode.FAILED: self.logger.error(message) return (result_code, message)
[docs] @command( dtype_in="DevString", doc_in="the Dish ID - VCC ID mapping and frequency offset (k) in a json string", dtype_out="DevVarLongStringArray", doc_out="Tuple containing a return code and a string message indicating the status of the command.", ) @DebugIt() def InitSysParam( self: CbfController, argin: str ) -> tango.DevVarLongStringArray: # PROTECTED REGION ID(CbfController.InitSysParam) ENABLED START # handler = self.get_command_object("InitSysParam") return_code, message = handler(argin) return [[return_code], [message]]
# PROTECTED REGION END # // CbfController.InitSysParam # ---------- # Callbacks # ---------- def _communication_status_changed( self: CbfController, communication_status: CommunicationStatus ) -> None: """ Handle change in communications status between component manager and component. This is a callback hook, called by the component manager when the communications status changes. It is implemented here to drive the op_state. :param communication_status: the status of communications between the component manager and its component. """ self._communication_status = communication_status if communication_status == CommunicationStatus.DISABLED: self.op_state_model.perform_action("component_disconnected") elif communication_status == CommunicationStatus.NOT_ESTABLISHED: self.op_state_model.perform_action("component_unknown") def _component_power_mode_changed( self: CbfController, power_mode: PowerMode ) -> None: """ Handle change in the power mode of the component. This is a callback hook, called by the component manager when the power mode of the component changes. It is implemented here to drive the op_state. :param power_mode: the power mode of the component. """ self._component_power_mode = power_mode if self._communication_status == CommunicationStatus.ESTABLISHED: action_map = { PowerMode.OFF: "component_off", PowerMode.STANDBY: "component_standby", PowerMode.ON: "component_on", PowerMode.UNKNOWN: "component_unknown", } self.op_state_model.perform_action(action_map[power_mode]) def _component_fault(self: CbfController, faulty: bool) -> None: """ Handle component fault :param faulty: whether the component has faulted. """ if faulty: self.op_state_model.perform_action("component_fault") self.set_status("The device is in FAULT state.") else: self.set_status("The device has recovered from FAULT state.")
# ---------- # Run server # ---------- def main(args=None, **kwargs): # PROTECTED REGION ID(CbfController.main) ENABLED START # return run((CbfController,), args=args, **kwargs) # PROTECTED REGION END # // CbfController.main if __name__ == "__main__": main()