# -*- coding: utf-8 -*-
#
# This file is part of the Fsp project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
# """
# Copyright (c) 2019 National Research Council of Canada
# """
from __future__ import annotations
from typing import Any
import tango
from ska_tango_base.base.base_device import DevVarLongStringArrayType
from ska_tango_base.commands import SubmittedSlowCommand
from tango.server import attribute, command, device_property
from ska_mid_cbf_tdc_mcs.device.base_device import CbfDevice
from ska_mid_cbf_tdc_mcs.fsp.fsp_component_manager import FspComponentManager
__all__ = ["Fsp", "main"]
[docs]class Fsp(CbfDevice):
"""
Fsp TANGO device class for the prototype
"""
# -----------------
# Device Properties
# -----------------
FspCorrSubarray = device_property(dtype=("str",))
FspPstSubarray = device_property(dtype=("str",))
HpsFspControllerAddress = device_property(dtype="str")
# ----------
# Attributes
# ----------
@attribute(
abs_change=1,
dtype="DevEnum",
doc="Function mode; an int in the range [0, 4]",
enum_labels=["IDLE", "CORRELATION", "PSS", "PST", "VLBI"],
)
def functionMode(self: Fsp) -> tango.DevEnum:
"""
Read the functionMode attribute.
:return: a DevEnum representing the mode.
:rtype: tango.DevEnum
"""
return self.component_manager.function_mode
@attribute(
abs_change=1,
dtype=[int],
max_dim_x=16,
doc="Subarray membership",
)
def subarrayMembership(self: Fsp) -> list[int]:
"""
Read the subarrayMembership attribute.
:return: an array of affiliations of the FSP.
:rtype: list[int]
"""
return self.component_manager.subarray_membership
# ---------------
# General methods
# ---------------
[docs] def init_command_objects(self: Fsp) -> None:
"""
Sets up the command objects
"""
super().init_command_objects()
for command_name, method_name in [
("SetFunctionMode", "set_function_mode"),
("AddSubarrayMembership", "add_subarray_membership"),
("RemoveSubarrayMembership", "remove_subarray_membership"),
]:
self.register_command_object(
command_name,
SubmittedSlowCommand(
command_name=command_name,
command_tracker=self._command_tracker,
component_manager=self.component_manager,
method_name=method_name,
logger=self.logger,
),
)
[docs] def create_component_manager(self: Fsp) -> FspComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
self.logger.debug("Entering create_component_manager()")
return FspComponentManager(
fsp_id=self.DeviceID,
all_fsp_corr_subarray_fqdn=self.FspCorrSubarray,
all_fsp_pst_subarray_fqdn=self.FspPstSubarray,
hps_fsp_controller_fqdn=self.HpsFspControllerAddress,
logger=self.logger,
attr_change_callback=self.push_change_event,
attr_archive_callback=self.push_archive_event,
health_state_callback=self._update_health_state,
communication_state_callback=self._communication_state_changed,
component_state_callback=self._component_state_changed,
admin_mode_callback=self._admin_mode_perform_action,
)
# -------------
# Fast Commands
# -------------
[docs] class InitCommand(CbfDevice.InitCommand):
"""
A class for the Fsp's init_device() "command".
"""
[docs] def do(
self: Fsp.InitCommand,
*args: Any,
change_event_attrs: list[str] = [],
archive_event_attrs: list[str] = [],
**kwargs: Any,
) -> DevVarLongStringArrayType:
"""
Stateless hook for device initialisation.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self._device.set_change_event("functionMode", True)
self._device.set_archive_event("functionMode", True)
self._device.set_change_event("subarrayMembership", True)
self._device.set_archive_event("subarrayMembership", True)
# Configure particular attributes for change events and all attributes
# for archive events.
return super().do(
*args,
change_event_attrs=change_event_attrs
+ ["functionMode", "subarrayMembership"],
archive_event_attrs=archive_event_attrs
+ ["functionMode", "subarrayMembership"],
**kwargs,
)
# ---------------------
# Long Running Commands
# ---------------------
[docs] def is_SetFunctionMode_allowed(self: Fsp) -> bool:
"""
Returns True; SetFunctionMode must be allowed in DISABLED state for AA0.5 approach (CIP-2550)
"""
return True
[docs] @command(
dtype_in="str",
dtype_out="DevVarLongStringArray",
doc_in="FSP function mode",
)
def SetFunctionMode(
self: Fsp, function_mode: str
) -> DevVarLongStringArrayType:
"""
Set the FSP function mode to either IDLE, CORR, PSS-BF, PST-BF, or VLBI.
If IDLE, set the PSS, PST, CORR, and VLBI devices to DISABLE. Else,
turn ON the target function_mode, and DISABLE all others.
:param argin: one of 'IDLE','CORR','PSS-BF','PST-BF', or 'VLBI'
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: DevVarLongStringArrayType
"""
command_handler = self.get_command_object(
command_name="SetFunctionMode"
)
result_code, command_id = command_handler(function_mode)
return [[result_code], [command_id]]
[docs] @command(
dtype_in="uint16",
dtype_out="DevVarLongStringArray",
doc_in="Subarray ID",
)
def AddSubarrayMembership(
self: Fsp, sub_id: int
) -> DevVarLongStringArrayType:
"""
Add a subarray to the subarrayMembership list.
:param argin: an integer representing the subarray affiliation
"""
command_handler = self.get_command_object(
command_name="AddSubarrayMembership"
)
result_code, command_id = command_handler(sub_id)
return [[result_code], [command_id]]
[docs] @command(
dtype_in="uint16",
dtype_out="DevVarLongStringArray",
doc_in="Subarray ID",
)
def RemoveSubarrayMembership(
self: Fsp, sub_id: int
) -> DevVarLongStringArrayType:
"""
Remove subarray from the subarrayMembership list.
If subarrayMembership is empty after removing
(no subarray is using this FSP), set function mode to empty.
:param argin: an integer representing the subarray affiliation
"""
command_handler = self.get_command_object(
command_name="RemoveSubarrayMembership"
)
result_code, command_id = command_handler(sub_id)
return [[result_code], [command_id]]
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
return Fsp.run_server(args=args, **kwargs)
if __name__ == "__main__":
main()