Source code for ska_mid_cbf_mcs.fsp.fsp_device

# -*- coding: utf-8 -*-
#
# This file is part of the Fsp project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.

# """
# Author: James Jiang James.Jiang@nrc-cnrc.gc.ca,
# Herzberg Astronomy and Astrophysics, National Research Council of Canada
# Copyright (c) 2019 National Research Council of Canada
# """

# Fsp Tango device prototype
# Fsp TANGO device class for the prototype
from __future__ import annotations  # allow forward references in type hints

# Additional import
# PROTECTED REGION ID(Fsp.additionnal_import) ENABLED START #
import os
from typing import List, Optional, Tuple

# tango imports
import tango
from ska_tango_base import SKABaseDevice, SKACapability
from ska_tango_base.commands import ResponseCommand, ResultCode
from ska_tango_base.control_model import PowerMode, SimulationMode
from tango import AttrWriteType
from tango.server import attribute, command, device_property, run

from ska_mid_cbf_mcs.component.component_manager import CommunicationStatus
from ska_mid_cbf_mcs.fsp.fsp_component_manager import FspComponentManager

file_path = os.path.dirname(os.path.abspath(__file__))


# PROTECTED REGION END # // Fsp.additionnal_import

__all__ = ["Fsp", "main"]


[docs]class Fsp(SKACapability): """ Fsp TANGO device class for the prototype """ # PROTECTED REGION ID(Fsp.class_variable) ENABLED START # # PROTECTED REGION END # // Fsp.class_variable # ----------------- # Device Properties # ----------------- FspID = device_property(dtype="uint16") FspCorrSubarray = device_property(dtype=("str",)) FspPssSubarray = device_property(dtype=("str",)) FspPstSubarray = device_property(dtype=("str",)) HpsFspControllerAddress = device_property(dtype="str") HpsFspCorrControllerAddress = device_property(dtype="str") # ---------- # Attributes # ---------- functionMode = attribute( dtype="DevEnum", access=AttrWriteType.READ, label="Function mode", doc="Function mode; an int in the range [0, 4]", enum_labels=["IDLE", "CORRELATION", "PSS", "PST", "VLBI"], ) subarrayMembership = attribute( dtype=("uint16",), max_dim_x=16, access=AttrWriteType.READ, label="Subarray membership", doc="Subarray membership", ) scanID = attribute( dtype="DevLong64", label="scanID", doc="scan ID, set when transition to SCANNING is performed", ) configID = attribute( dtype="str", access=AttrWriteType.READ_WRITE, label="Config ID", doc="set when transition to READY is performed", ) jonesMatrix = attribute( dtype="str", access=AttrWriteType.READ, label="Jones Matrix", doc="Jones Matrix, given per frequency slice", ) delayModel = attribute( dtype="str", access=AttrWriteType.READ, label="Delay Model", doc="Differential off-boresight beam delay model", ) timingBeamWeights = attribute( dtype="str", access=AttrWriteType.READ, label="Timing Beam Weights", doc="Amplitude weights used in the tied-array beamforming", ) simulationMode = attribute( dtype=SimulationMode, access=AttrWriteType.READ_WRITE, memorized=True, doc="Reports the simulation mode of the device.", ) # --------------- # General methods # ---------------
[docs] def init_command_objects(self: Fsp) -> None: """ Sets up the command objects """ super().init_command_objects() device_args = (self, self.op_state_model, self.logger) self.register_command_object("On", self.OnCommand(*device_args)) self.register_command_object("Off", self.OffCommand(*device_args)) self.register_command_object( "Standby", self.StandbyCommand(*device_args) ) self.register_command_object( "SetFunctionMode", self.SetFunctionModeCommand(*device_args) ) self.register_command_object( "AddSubarrayMembership", self.AddSubarrayMembershipCommand(*device_args), ) self.register_command_object( "RemoveSubarrayMembership", self.RemoveSubarrayMembershipCommand(*device_args), ) self.register_command_object( "UpdateJonesMatrix", self.UpdateJonesMatrixCommand(*device_args) ) self.register_command_object( "UpdateDelayModel", self.UpdateDelayModelCommand(*device_args) ) self.register_command_object( "UpdateTimingBeamWeights", self.UpdateTimingBeamWeightsCommand(*device_args), )
[docs] def always_executed_hook(self: Fsp) -> None: # PROTECTED REGION ID(Fsp.always_executed_hook) ENABLED START # """Hook to be executed before any commands."""
# PROTECTED REGION END # // Fsp.always_executed_hook
[docs] def create_component_manager(self: Fsp) -> FspComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ self.logger.debug("Entering create_component_manager()") self._communication_status: Optional[CommunicationStatus] = None self._component_power_mode: Optional[PowerMode] = None return FspComponentManager( logger=self.logger, fsp_id=self.FspID, fsp_corr_subarray_fqdns_all=self.FspCorrSubarray, fsp_pss_subarray_fqdns_all=self.FspPssSubarray, fsp_pst_subarray_fqdns_all=self.FspPstSubarray, hps_fsp_controller_fqdn=self.HpsFspControllerAddress, hps_fsp_corr_controller_fqdn=self.HpsFspCorrControllerAddress, push_change_event_callback=self.push_change_event, communication_status_changed_callback=self._communication_status_changed, component_power_mode_changed_callback=self._component_power_mode_changed, component_fault_callback=self._component_fault, )
[docs] def delete_device(self: Fsp) -> None: # PROTECTED REGION ID(Fsp.delete_device) ENABLED START # """Hook to delete device."""
# PROTECTED REGION END # // Fsp.delete_device # ------------------ # Attributes methods # ------------------
[docs] def write_simulationMode(self: Fsp, value: SimulationMode) -> None: """ Set the simulation mode of the device. :param value: SimulationMode """ super().write_simulationMode(value) self.component_manager.simulation_mode = value
[docs] def read_functionMode(self: Fsp) -> tango.DevEnum: # PROTECTED REGION ID(Fsp.functionMode_read) ENABLED START # """ Read the functionMode attribute. :return: a DevEnum representing the mode. :rtype: tango.DevEnum """ return self.component_manager.function_mode
# PROTECTED REGION END # // Fsp.functionMode_read
[docs] def read_subarrayMembership(self: Fsp) -> List[int]: # PROTECTED REGION ID(Fsp.subarrayMembership_read) ENABLED START # """ Read the subarrayMembership attribute. :return: an array of affiliations of the FSP. :rtype: List[int] """ return self.component_manager.subarray_membership
# PROTECTED REGION END # // Fsp.subarrayMembership_read
[docs] def read_scanID(self: Fsp) -> int: # PROTECTED REGION ID(FspCorrSubarray.scanID_read) ENABLED START # """ Read the scanID attribute. :return: the scanID attribute. :rtype: int """ return self._scan_id
# PROTECTED REGION END # // FspCorrSubarray.scanID_read
[docs] def read_configID(self: Fsp) -> str: # PROTECTED REGION ID(Fsp.configID_read) ENABLED START # """ Read the configID attribute. :return: the configID attribute. :rtype: str """ return self._config_id
# PROTECTED REGION END # // Fsp.configID_read
[docs] def write_configID(self: Fsp, value: str) -> None: # PROTECTED REGION ID(Fsp.configID_write) ENABLED START # """ Write the configID attribute. :param value: the configID value. """ self._config_id = value
# PROTECTED REGION END # // Fsp.configID_write
[docs] def read_jonesMatrix(self: Fsp) -> str: # PROTECTED REGION ID(Fsp.jonesMatrix_read) ENABLED START # """ Read the jonesMatrix attribute. :return: the jonesMatrix attribute. :rtype: string """ return self.component_manager.jones_matrix
# PROTECTED REGION END # // Fsp.jonesMatrix_read
[docs] def read_delayModel(self: Fsp) -> str: # PROTECTED REGION ID(Fsp.delayModel_read) ENABLED START # """ Read the delayModel attribute. :return: the delayModel attribute. :rtype: string """ return self.component_manager.delay_model
# PROTECTED REGION END # // Fsp.delayModel_read
[docs] def read_timingBeamWeights(self: Fsp) -> str: # PROTECTED REGION ID(Fsp.timingBeamWeights_read) ENABLED START # """ Read the timingBeamWeights attribute. :return: the timingBeamWeights attribute. :rtype: string """ return self.component_manager.timing_beam_weights
# PROTECTED REGION END # // Fsp.timingBeamWeights_read # -------- # Commands # --------
[docs] class InitCommand(SKACapability.InitCommand): """ A class for the Fsp's init_device() "command". """
[docs] def do( self: Fsp.InitCommand, ) -> Tuple[ResultCode, str]: """ Stateless hook for device initialisation. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ (result_code, message) = super().do() device = self.target # Setting initial simulation mode to True device.write_simulationMode(SimulationMode.TRUE) device._scan_id = 0 device._config_id = "" device.set_change_event("functionMode", True, True) device.set_change_event("subarrayMembership", True, True) # TODO remove when upgrading base class from 0.11.3 device.set_change_event("healthState", True, True) return (result_code, message)
[docs] class OnCommand(SKABaseDevice.OnCommand): """ A class for the Fsp's On() command. """
[docs] def do( self: Fsp.OnCommand, ) -> Tuple[ResultCode, str]: """ Stateless hook for On() command functionality. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ (result_code, message) = self.target.component_manager.on() if result_code == ResultCode.OK: self.target._component_power_mode_changed(PowerMode.ON) return (result_code, message)
[docs] class OffCommand(SKABaseDevice.OffCommand): """ A class for the Fsp's Off() command. """
[docs] def do( self: Fsp.OffCommand, ) -> Tuple[ResultCode, str]: """ Stateless hook for Off() command functionality. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ (result_code, message) = self.target.component_manager.off() if result_code == ResultCode.OK: self.target._component_power_mode_changed(PowerMode.OFF) return (result_code, message)
[docs] class StandbyCommand(SKABaseDevice.StandbyCommand): """ A class for the Fsp's Standby() command. """
[docs] def do( self: Fsp.StandbyCommand, ) -> Tuple[ResultCode, str]: """ Stateless hook for Standby() command functionality. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ (result_code, message) = self.target.component_manager.standby() if result_code == ResultCode.OK: self.target._component_power_mode_changed(PowerMode.STANDBY) return (result_code, message)
[docs] class SetFunctionModeCommand(ResponseCommand): """ A class for the Fsp's SetFunctionMode() command. """
[docs] def do( self: Fsp.SetFunctionModeCommand, argin: str ) -> Tuple[ResultCode, str]: """ Stateless hook for SetFunctionMode() command functionality. :param argin: one of 'IDLE','CORR','PSS-BF','PST-BF', or 'VLBI' :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ ( result_code, message, ) = self.target.component_manager.set_function_mode(argin) return (result_code, message)
[docs] @command(dtype_in="str", doc_in="Function mode") def SetFunctionMode(self: Fsp, argin: str) -> None: """ Set the Fsp Function Mode, either IDLE, CORR, PSS-BF, PST-BF, or VLBI If IDLE set the pss, pst, corr and vlbi devicess to DISABLE. OTherwise, turn one of them ON according to argin, and all others DISABLE. :param argin: one of 'IDLE','CORR','PSS-BF','PST-BF', or 'VLBI' """ handler = self.get_command_object("SetFunctionMode") return handler(argin)
[docs] def is_SetFunctionMode_allowed(self: Fsp) -> bool: """ Determine if SetFunctionMode is allowed (allowed if FSP state is ON). :return: if SetFunctionMode is allowed :rtype: bool """ if self.dev_state() == tango.DevState.ON: return True return False
[docs] class AddSubarrayMembershipCommand(ResponseCommand): """ A class for the Fsp's AddSubarrayMembership() command. """
[docs] def do( self: Fsp.AddSubarrayMembershipCommand, argin: int ) -> Tuple[ResultCode, str]: """ Stateless hook for AddSubarrayMembership() command functionality. :param argin: an integer representing the subarray affiliation :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ ( result_code, message, ) = self.target.component_manager.add_subarray_membership(argin) return (result_code, message)
[docs] @command(dtype_in="uint16", doc_in="Subarray ID") def AddSubarrayMembership(self: Fsp, argin: str) -> None: """ Add a subarray to the subarrayMembership list. :param argin: an integer representing the subarray affiliation """ handler = self.get_command_object("AddSubarrayMembership") return handler(argin)
[docs] def is_AddSubarrayMembership_allowed(self: Fsp) -> bool: """ Determine if AddSubarrayMembership is allowed (allowed if FSP state is ON). :return: if AddSubarrayMembership is allowed :rtype: bool """ if self.dev_state() == tango.DevState.ON: return True return False
[docs] class RemoveSubarrayMembershipCommand(ResponseCommand): """ A class for the Fsp's RemoveSubarrayMembership() command. """
[docs] def do( self: Fsp.RemoveSubarrayMembershipCommand, argin: int ) -> Tuple[ResultCode, str]: """ Stateless hook for RemoveSubarrayMembership() command functionality. :param argin: an integer representing the subarray affiliation :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ ( result_code, message, ) = self.target.component_manager.remove_subarray_membership(argin) return (result_code, message)
[docs] @command(dtype_in="uint16", doc_in="Subarray ID") def RemoveSubarrayMembership(self: Fsp, argin: str) -> None: """ Remove subarray from the subarrayMembership list. If subarrayMembership is empty after removing (no subarray is using this FSP), set function mode to empty. :param argin: an integer representing the subarray affiliation """ handler = self.get_command_object("RemoveSubarrayMembership") return handler(argin)
[docs] def is_RemoveSubarrayMembership_allowed(self: Fsp) -> bool: """ Determine if RemoveSubarrayMembership is allowed (allowed if FSP state is ON). :return: if RemoveSubarrayMembership is allowed :rtype: bool """ if self.dev_state() == tango.DevState.ON: return True return False
# TODO: is this command needed? # If not also remove the get_fsp_corr_config_id method
[docs] @command( dtype_out="DevString", doc_out="returns configID for all the fspCorrSubarray", ) def getConfigID(self: Fsp) -> str: # PROTECTED REGION ID(Fsp.getConfigID) ENABLED START # """ Get the configID for all the fspCorrSubarray :return: the configID :rtype: str """ return self.component_manager.get_fsp_corr_config_id()
# PROTECTED REGION END # // Fsp.getConfigID
[docs] class UpdateJonesMatrixCommand(ResponseCommand): """ A class for the Fsp's UpdateJonesMatrix() command. """
[docs] def do( self: Fsp.UpdateJonesMatrixCommand, argin: str ) -> Tuple[ResultCode, str]: """ Stateless hook for UpdateJonesMatrix() command functionality. :param argin: the jones matrix data :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ ( result_code, message, ) = self.target.component_manager.update_jones_matrix(argin) return (result_code, message)
[docs] @command(dtype_in="str", doc_in="Jones Matrix, given per frequency slice") def UpdateJonesMatrix(self: Fsp, argin: str) -> None: """ Update the FSP's jones matrix (serialized JSON object) :param argin: the jones matrix data """ handler = self.get_command_object("UpdateJonesMatrix") return handler(argin)
[docs] def is_UpdateJonesMatrix_allowed(self: Fsp) -> bool: """ Determine if UpdateJonesMatrix is allowed (allowed if FSP state is ON and ObsState is READY OR SCANNINNG). :return: if UpdateJonesMatrix is allowed :rtype: bool """ # TODO implement obsstate in FSP if self.dev_state() == tango.DevState.ON: return True return False
[docs] class UpdateDelayModelCommand(ResponseCommand): """ A class for the Fsp's UpdateDelayModel() command. """
[docs] def do( self: Fsp.UpdateDelayModelCommand, argin: str ) -> Tuple[ResultCode, str]: """ Stateless hook for UpdateDelayModel() command functionality. :param argin: the delay model data :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ ( result_code, message, ) = self.target.component_manager.update_delay_model(argin) return (result_code, message)
[docs] @command( dtype_in="str", doc_in="Delay Model, per receptor per polarization per timing beam", ) def UpdateDelayModel(self: Fsp, argin: str) -> None: """ Update the FSP's delay model (serialized JSON object) :param argin: the delay model data """ handler = self.get_command_object("UpdateDelayModel") return handler(argin)
[docs] def is_UpdateDelayModel_allowed(self: Fsp) -> bool: """ Determine if UpdateDelayModelis allowed (allowed if FSP state is ON and ObsState is READY OR SCANNINNG). :return: if UpdateDelayModel is allowed :rtype: bool """ # TODO implement obsstate in FSP if self.dev_state() == tango.DevState.ON: return True return False
[docs] class UpdateTimingBeamWeightsCommand(ResponseCommand): """ A class for the Fsp's UpdateTimingBeamWeights() command. """
[docs] def do( self: Fsp.UpdateTimingBeamWeightsCommand, argin: str ) -> Tuple[ResultCode, str]: """ Stateless hook for UpdateTimingBeamWeights() command functionality. :param argin: the timing beam weight data :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ ( result_code, message, ) = self.target.component_manager.update_timing_beam_weights(argin) return (result_code, message)
[docs] @command( dtype_in="str", doc_in="Timing Beam Weights, per beam per receptor per group of 8 channels", ) def UpdateTimingBeamWeights(self: Fsp, argin: str) -> None: """ Update the FSP's timing beam weights (serialized JSON object) :param argin: the timing beam weight data """ handler = self.get_command_object("UpdateTimingBeamWeights") return handler(argin)
[docs] def is_UpdateTimingBeamWeights_allowed(self: Fsp) -> bool: """ Determine if UpdateTimingBeamWeights is allowed (allowed if FSP state is ON and ObsState is READY OR SCANNINNG). :return: if UpdateTimingBeamWeights is allowed :rtype: bool """ # TODO implement obsstate in FSP if self.dev_state() == tango.DevState.ON: return True return False
# ---------- # Callbacks # ---------- def _communication_status_changed( self: Fsp, communication_status: CommunicationStatus ) -> None: """ Handle change in communications status between component manager and component. This is a callback hook, called by the component manager when the communications status changes. It is implemented here to drive the op_state. :param communication_status: the status of communications between the component manager and its component. """ self._communication_status = communication_status if communication_status == CommunicationStatus.DISABLED: self.op_state_model.perform_action("component_disconnected") elif communication_status == CommunicationStatus.NOT_ESTABLISHED: self.op_state_model.perform_action("component_unknown") def _component_power_mode_changed( self: Fsp, power_mode: PowerMode ) -> None: """ Handle change in the power mode of the component. This is a callback hook, called by the component manager when the power mode of the component changes. It is implemented here to drive the op_state. :param power_mode: the power mode of the component. """ self._component_power_mode = power_mode if self._communication_status == CommunicationStatus.ESTABLISHED: action_map = { PowerMode.OFF: "component_off", PowerMode.STANDBY: "component_standby", PowerMode.ON: "component_on", PowerMode.UNKNOWN: "component_unknown", } self.op_state_model.perform_action(action_map[power_mode]) def _component_fault(self: Fsp, faulty: bool) -> None: """ Handle component fault """ if faulty: self.op_state_model.perform_action("component_fault") self.set_status("The device is in FAULT state")
# ---------- # Run server # ---------- def main(args=None, **kwargs): # PROTECTED REGION ID(Fsp.main) ENABLED START # return run((Fsp,), args=args, **kwargs) # PROTECTED REGION END # // Fsp.main if __name__ == "__main__": main()