# -*- coding: utf-8 -*-
#
# This file is part of the Mid.CBF MCS project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
# Copyright (c) 2019 National Research Council of Canada
from __future__ import annotations
import tango
# Tango imports
from ska_tango_base.base.base_device import DevVarLongStringArrayType
from ska_tango_base.commands import SubmittedSlowCommand
from tango.server import attribute, command, device_property
from ska_mid_cbf_tdc_mcs.device.obs_device import CbfObsDevice
from ska_mid_cbf_tdc_mcs.vcc.vcc_component_manager import VccComponentManager
__all__ = ["Vcc", "main"]
[docs]class Vcc(CbfObsDevice):
"""
Vcc TANGO device class for the prototype
"""
# -----------------
# Device Properties
# -----------------
TalonLRUAddress = device_property(dtype="str")
VccControllerAddress = device_property(dtype="str")
Band1And2Address = device_property(dtype="str")
Band3Address = device_property(dtype="str")
Band4Address = device_property(dtype="str")
Band5Address = device_property(dtype="str")
DefaultVccGain = device_property(dtype="DevFloat")
LRCTimeout = device_property(dtype=("str"))
# ----------
# Attributes
# ----------
@attribute(
dtype="str",
memorized=True,
hw_memorized=True,
doc="VCC's associated DISH ID",
)
def dishID(self: Vcc) -> str:
"""
Read the dishID attribute.
:return: the Vcc's DISH ID.
:rtype: str
"""
return self.component_manager.dish_id
@dishID.write
def dishID(self: Vcc, value: str) -> None:
"""
Write the dishID attribute.
:param value: the dishID value.
"""
self.logger.debug(f"Writing dishID to {value}")
if self.component_manager.dish_id != value:
self.component_manager.dish_id = value
self.push_change_event("dishID", value)
self.push_archive_event("dishID", value)
@attribute(
abs_change=1,
dtype="uint16",
memorized=True,
hw_memorized=True,
doc="Subarray membership",
)
def subarrayMembership(self: Vcc) -> int:
"""
Read the subarrayMembership attribute.
:return: the subarray membership (0 = no affiliation).
:rtype: int
"""
return self._subarray_membership
@subarrayMembership.write
def subarrayMembership(self: Vcc, value: int) -> None:
"""
Write the subarrayMembership attribute.
:param value: the subarray membership value (0 = no affiliation).
"""
self.logger.debug(f"Writing subarrayMembership to {value}")
if self._subarray_membership != value:
self._subarray_membership = value
self.push_change_event("subarrayMembership", value)
self.push_archive_event("subarrayMembership", value)
@attribute(
abs_change=1,
dtype=tango.DevEnum,
enum_labels=["1", "2", "3", "4", "5a", "5b"],
doc=(
"The frequency band observed by the current scan: "
"an enum that can be one of ['1', '2', '3', '4', '5a', '5b']"
),
)
def frequencyBand(self: Vcc) -> tango.DevEnum:
"""
Read the frequencyBand attribute.
:return: the frequency band (being observed by the current scan, one of
["1", "2", "3", "4", "5a", "5b"]).
:rtype: tango.DevEnum
"""
return self.component_manager.frequency_band
@attribute(
dtype="str", doc="The last valid scan configuration sent to HPS."
)
def lastHpsScanConfiguration(self: Vcc) -> str:
"""
Read the last valid scan configuration of the device sent to HPS.
:return: the current last_hps_scan_configuration value
"""
return self.component_manager.last_hps_scan_configuration
@attribute(
dtype=[[float]],
max_dim_x=10,
max_dim_y=2,
doc="list of coarse channel gains for the VCC",
)
def vccGains(self: Vcc) -> list[list[float]]:
"""
Return list of coarse channel gains for the VCC
:return: the list of coarse channel gains
:rtype: list[float]
"""
return self.component_manager.read_vcc_gains()
@attribute(
dtype="float",
memorized=True,
hw_memorized=True,
doc="Manual override for coarse channel gains; when written, applies the same value to all coarse channel gains.",
)
def applyVccGain(self: Vcc) -> float:
"""
Read the VCC gain value to apply.
:return: the VCC Gain value to apply.
:rtype: float
"""
return self.component_manager.apply_vcc_gain
@applyVccGain.write
def applyVccGain(self: Vcc, value: float) -> None:
"""
Write the apply_vcc_gain attribute.
:param value: the apply_vcc_gain value.
"""
self.logger.debug(f"Writing gain to {value}")
self.component_manager.update_vcc_gains(value)
# --------------
# Initialization
# --------------
[docs] def init_command_objects(self: Vcc) -> None:
"""
Sets up the command objects
"""
super().init_command_objects()
self.register_command_object(
"ConfigureBand",
SubmittedSlowCommand(
command_name="ConfigureBand",
command_tracker=self._command_tracker,
component_manager=self.component_manager,
method_name="configure_band",
logger=self.logger,
),
)
[docs] def create_component_manager(self: Vcc) -> VccComponentManager:
return VccComponentManager(
talon_lru=self.TalonLRUAddress,
vcc_controller=self.VccControllerAddress,
vcc_band={
0: self.Band1And2Address,
1: self.Band1And2Address,
2: self.Band3Address,
3: self.Band4Address,
4: self.Band5Address,
5: self.Band5Address,
},
lrc_timeout=int(self.LRCTimeout),
default_vcc_gain=float(self.DefaultVccGain),
logger=self.logger,
attr_change_callback=self.push_change_event,
attr_archive_callback=self.push_archive_event,
health_state_callback=self._update_health_state,
communication_state_callback=self._communication_state_changed,
obs_command_running_callback=self._obs_command_running,
component_state_callback=self._component_state_changed,
admin_mode_callback=self._admin_mode_perform_action,
)
# -------------
# Fast Commands
# -------------
[docs] class InitCommand(CbfObsDevice.InitCommand):
"""
A class for the Vcc's init_device() "command".
"""
[docs] def do(
self: Vcc.InitCommand,
*args: any,
change_event_attrs: list[str] = [],
archive_event_attrs: list[str] = [],
**kwargs: any,
) -> DevVarLongStringArrayType:
"""
Stateless hook for device initialisation.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
# initialize attribute values
self._device._subarray_membership = 0
# Configure particular attributes for change events and all attributes
# for archive events.
return super().do(
*args,
change_event_attrs=change_event_attrs
+ [
"dishID",
"frequencyBand",
"subarrayMembership",
],
archive_event_attrs=archive_event_attrs
+ [
"dishID",
"frequencyBand",
"subarrayMembership",
],
**kwargs,
)
# ---------------------
# Long Running Commands
# ---------------------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
return Vcc.run_server(args=args or None, **kwargs)
if __name__ == "__main__":
main()