Source code for ska_mid_cbf_mcs.fsp.fsp_mode_subarray_device

# -*- coding: utf-8 -*-
#
# This file is part of the FspCorrSubarray project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.

# Copyright (c) 2019 National Research Council of Canada

# """

# """ FspModeSubarray Tango device prototype

# FspModeSubarray TANGO device class for the FspModeSubarray prototype
# Parrent class for the various FSP Modes to inherit
# """
from __future__ import annotations

import os

from ska_control_model import ObsState, ResultCode
from ska_tango_base.base.base_device import DevVarLongStringArrayType
from tango.server import attribute, command, device_property

from ska_mid_cbf_mcs.device.base_device import CbfFastCommand
from ska_mid_cbf_mcs.device.obs_device import CbfObsDevice

file_path = os.path.dirname(os.path.abspath(__file__))

__all__ = ["FspModeSubarray", "main"]


[docs]class FspModeSubarray(CbfObsDevice): """ FspModeSubarray TANGO device class for the FspModeSubarray prototype Abstract class for the various FSP Modes to inherit """ # ----------------- # Device Properties # ----------------- LRCTimeout = device_property(dtype=("str")) # ---------- # Attributes # ---------- @attribute( dtype=("uint16",), max_dim_x=197, doc="Assigned VCC IDs", ) def vccIDs(self: FspModeSubarray) -> list[int]: """ Read the vccIDs attribute; FSP deals with VCC, not DISH (receptor) IDs. :return: the list of assigned VCC IDs :rtype: list[int] """ return self.component_manager.vcc_ids @attribute( dtype="str", doc="The last valid FSP scan configuration sent to HPS." ) def lastHpsScanConfiguration(self: FspModeSubarray) -> str: """ Read the last valid FSP scan configuration of the device sent to HPS. :return: the current last_hps_scan_configuration value """ return self.component_manager.last_hps_scan_configuration @attribute( dtype="str", doc="Differential off-boresight beam delay model", ) def delayModel(self: FspModeSubarray) -> str: """ Read the delayModel attribute. :return: the delayModel attribute. :rtype: string """ return self.component_manager.delay_model # -------------- # Initialization # --------------
[docs] class InitCommand(CbfObsDevice.InitCommand): """ A class for the FspCorrSubarray's init_device() "command". """
[docs] def do( self: FspModeSubarray.InitCommand, *args: any, **kwargs: any, ) -> DevVarLongStringArrayType: """ Stateless hook for device initialisation. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ (result_code, msg) = super().do(*args, **kwargs) self._device.set_change_event("delayModel", True) self._device.set_archive_event("delayModel", True) return (result_code, msg)
[docs] def init_command_objects(self: FspModeSubarray) -> None: """ Sets up the command objects """ super().init_command_objects() self.register_command_object( "UpdateDelayModel", self.UpdateDelayModelCommand( component_manager=self.component_manager, logger=self.logger ), )
# ------------- # Fast Commands # -------------
[docs] class UpdateDelayModelCommand(CbfFastCommand): """ A class for the Fsp's UpdateDelayModel() command. """
[docs] def is_allowed(self: FspModeSubarray.UpdateDelayModelCommand) -> bool: """ Determine if UpdateDelayModel command is allowed. :return: True if command is allowed, otherwise False """ if not self.component_manager.is_communicating: return False obs_state = self.component_manager.obs_state if obs_state not in [ObsState.READY, ObsState.SCANNING]: self.logger.warning( f"Ignoring delay model received in {obs_state} (must be READY or SCANNING)." ) return False return True
[docs] def do( self: FspModeSubarray.UpdateDelayModelCommand, argin: str, ) -> DevVarLongStringArrayType: """ Stateless hook for UpdateDelayModel() command functionality. :param argin: the delay model data :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ if self.is_allowed(): return self.component_manager.update_delay_model(argin) return (ResultCode.REJECTED, "UpdateDelayModel not allowed")
[docs] @command( dtype_in="str", dtype_out="DevVarLongStringArray", doc_in="Delay Model, per receptor per polarization per timing beam", ) def UpdateDelayModel( self: FspModeSubarray, argin: str ) -> DevVarLongStringArrayType: """ Update the FSP's delay model (serialized JSON object) :param argin: the delay model data """ command_handler = self.get_command_object("UpdateDelayModel") result_code, message = command_handler(argin) return [[result_code], [message]]
# --------------------- # Long Running Commands # --------------------- # ---------- # Run server # ---------- def main(args=None, **kwargs): raise TypeError( "FspModeSubarray is an Abstract Class; This server cannot be ran." ) if __name__ == "__main__": main()