Source code for ska_mid_cbf_mcs.vcc.vcc_device
# -*- coding: utf-8 -*-
#
# This file is part of the Mid.CBF MCS project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
# """
# Author: James Jiang James.Jiang@nrc-cnrc.gc.ca,
# Herzberg Astronomy and Astrophysics, National Research Council of Canada
# Copyright (c) 2019 National Research Council of Canada
# """
# Vcc TANGO device class
# PROTECTED REGION ID(Vcc.additional_import) ENABLED START #
from __future__ import annotations # allow forward references in type hints
import json
from typing import List, Optional, Tuple
# Tango imports
import tango
from ska_tango_base.commands import BaseCommand, ResponseCommand, ResultCode
from ska_tango_base.control_model import ObsState, PowerMode, SimulationMode
from ska_tango_base.csp.obs.obs_device import CspSubElementObsDevice
from tango import AttrWriteType, DebugIt
from tango.server import attribute, command, device_property, run
# SKA imports
from ska_mid_cbf_mcs.commons.global_enum import const, freq_band_dict
from ska_mid_cbf_mcs.component.component_manager import CommunicationStatus
from ska_mid_cbf_mcs.vcc.vcc_component_manager import VccComponentManager
# PROTECTED REGION END # // Vcc.additional_import
__all__ = ["Vcc", "main"]
[docs]class Vcc(CspSubElementObsDevice):
"""
Vcc TANGO device class for the prototype
"""
# -----------------
# Device Properties
# -----------------
VccID = device_property(dtype="DevUShort")
TalonLRUAddress = device_property(dtype="str")
VccControllerAddress = device_property(dtype="str")
Band1And2Address = device_property(dtype="str")
Band3Address = device_property(dtype="str")
Band4Address = device_property(dtype="str")
Band5Address = device_property(dtype="str")
SW1Address = device_property(dtype="str")
SW2Address = device_property(dtype="str")
# ----------
# Attributes
# ----------
dishID = attribute(
dtype="str",
access=AttrWriteType.READ_WRITE,
label="DISH ID",
doc="DISH ID",
)
subarrayMembership = attribute(
dtype="uint16",
access=AttrWriteType.READ_WRITE,
label="subarrayMembership",
doc="Subarray membership",
)
frequencyBand = attribute(
dtype="DevEnum",
access=AttrWriteType.READ,
label="Frequency band",
doc="Frequency band; an int in the range [0, 5]",
enum_labels=["1", "2", "3", "4", "5a", "5b"],
)
band5Tuning = attribute(
dtype=("float",),
max_dim_x=2,
access=AttrWriteType.READ,
label="Stream tuning (GHz)",
doc="Stream tuning (GHz)",
)
frequencyBandOffsetStream1 = attribute(
dtype="int",
access=AttrWriteType.READ,
label="Frequency band offset (stream 1) (Hz)",
doc="Frequency band offset (stream 1) (Hz)",
)
frequencyBandOffsetStream2 = attribute(
dtype="int",
access=AttrWriteType.READ,
label="Frequency band offset (stream 2) (Hz)",
doc="Frequency band offset (stream 2) (Hz)",
)
dopplerPhaseCorrection = attribute(
dtype=("float",),
access=AttrWriteType.READ_WRITE,
max_dim_x=4,
label="Doppler phase correction coefficients",
doc="Doppler phase correction coefficients",
)
rfiFlaggingMask = attribute(
dtype="str",
access=AttrWriteType.READ,
label="RFI Flagging Mask",
doc="RFI Flagging Mask",
)
delayModel = attribute(
dtype="str",
access=AttrWriteType.READ,
label="Delay model coefficients",
doc="Delay model coefficients, given per frequency slice",
)
jonesMatrix = attribute(
dtype=str,
access=AttrWriteType.READ,
label="Jones Matrix elements",
doc="Jones Matrix elements, given per frequency slice",
)
scanID = attribute(
dtype="DevULong",
access=AttrWriteType.READ,
label="scanID",
doc="scan ID",
)
configID = attribute(
dtype="DevString",
access=AttrWriteType.READ,
label="config ID",
doc="config ID",
)
simulationMode = attribute(
dtype=SimulationMode,
access=AttrWriteType.READ_WRITE,
memorized=True,
doc="Reports the simulation mode of the device. \nSome devices may implement "
"both modes, while others will have simulators that set simulationMode "
"to True while the real devices always set simulationMode to False.",
)
# ---------------
# General methods
# ---------------
# PROTECTED REGION ID(Vcc.class_variable) ENABLED START #
[docs] def init_command_objects(self: Vcc) -> None:
"""
Sets up the command objects
"""
super().init_command_objects()
device_args = (self, self.op_state_model, self.logger)
self.register_command_object("On", self.OnCommand(*device_args))
self.register_command_object("Off", self.OffCommand(*device_args))
self.register_command_object(
"Standby", self.StandbyCommand(*device_args)
)
self.register_command_object(
"ConfigureBand", self.ConfigureBandCommand(*device_args)
)
self.register_command_object(
"UpdateDopplerPhaseCorrection",
self.UpdateDopplerPhaseCorrectionCommand(*device_args),
)
self.register_command_object(
"UpdateDelayModel", self.UpdateDelayModelCommand(*device_args)
)
self.register_command_object(
"UpdateJonesMatrix", self.UpdateJonesMatrixCommand(*device_args)
)
self.register_command_object(
"ConfigureSearchWindow",
self.ConfigureSearchWindowCommand(*device_args),
)
device_args = (
self,
self.op_state_model,
self.obs_state_model,
self.logger,
)
self.register_command_object(
"ConfigureScan", self.ConfigureScanCommand(*device_args)
)
self.register_command_object("Scan", self.ScanCommand(*device_args))
self.register_command_object(
"EndScan", self.EndScanCommand(*device_args)
)
self.register_command_object("Abort", self.AbortCommand(*device_args))
self.register_command_object(
"ObsReset", self.ObsResetCommand(*device_args)
)
self.register_command_object(
"GoToIdle", self.GoToIdleCommand(*device_args)
)
# PROTECTED REGION END # // Vcc.class_variable
[docs] def create_component_manager(self: Vcc) -> VccComponentManager:
self._communication_status: Optional[CommunicationStatus] = None
self._component_power_mode: Optional[PowerMode] = None
return VccComponentManager(
vcc_id=self.VccID,
talon_lru=self.TalonLRUAddress,
vcc_controller=self.VccControllerAddress,
vcc_band=[
self.Band1And2Address,
self.Band3Address,
self.Band4Address,
self.Band5Address,
],
search_window=[self.SW1Address, self.SW2Address],
logger=self.logger,
push_change_event_callback=self.push_change_event,
communication_status_changed_callback=self._communication_status_changed,
component_power_mode_changed_callback=self._component_power_mode_changed,
component_fault_callback=self._component_fault,
component_obs_fault_callback=self._component_obsfault,
)
[docs] def always_executed_hook(self: Vcc) -> None:
# PROTECTED REGION ID(Vcc.always_executed_hook) ENABLED START #
"""Hook to be executed before any commands."""
# PROTECTED REGION END # // Vcc.always_executed_hook
[docs] def delete_device(self: Vcc) -> None:
# PROTECTED REGION ID(Vcc.delete_device) ENABLED START #
"""Hook to delete device."""
# PROTECTED REGION END # // Vcc.delete_device
# ---------
# Callbacks
# ---------
def _communication_status_changed(
self: Vcc, communication_status: CommunicationStatus
) -> None:
"""
Handle change in communications status between component manager and component.
This is a callback hook, called by the component manager when
the communications status changes. It is implemented here to
drive the op_state.
:param communication_status: the status of communications
between the component manager and its component.
"""
self._communication_status = communication_status
if communication_status == CommunicationStatus.DISABLED:
self.op_state_model.perform_action("component_disconnected")
elif communication_status == CommunicationStatus.NOT_ESTABLISHED:
self.op_state_model.perform_action("component_unknown")
elif (
communication_status == CommunicationStatus.ESTABLISHED
and self._component_power_mode is not None
):
self._component_power_mode_changed(self._component_power_mode)
else: # self._component_power_mode is None
pass # wait for a power mode update
def _component_power_mode_changed(
self: Vcc, power_mode: PowerMode
) -> None:
"""
Handle change in the power mode of the component.
This is a callback hook, called by the component manager when
the power mode of the component changes. It is implemented here
to drive the op_state.
:param power_mode: the power mode of the component.
"""
self._component_power_mode = power_mode
if self._communication_status == CommunicationStatus.ESTABLISHED:
action_map = {
PowerMode.OFF: "component_off",
PowerMode.STANDBY: "component_standby",
PowerMode.ON: "component_on",
PowerMode.UNKNOWN: "component_unknown",
}
self.op_state_model.perform_action(action_map[power_mode])
def _component_fault(self: Vcc, faulty: bool) -> None:
"""
Handle component fault
"""
if faulty:
self.op_state_model.perform_action("component_fault")
self.set_status("The device is in FAULT state.")
else:
self.set_status("The device has recovered from FAULT state.")
def _component_obsfault(self: Vcc, faulty: bool) -> None:
"""
Handle notification that the component has obsfaulted.
This is a callback hook.
"""
self.component_manager.obs_faulty = faulty
if faulty:
self.obs_state_model.perform_action("component_obsfault")
self.set_status("The device is in FAULT state")
# ------------------
# Attributes methods
# ------------------
[docs] def write_simulationMode(self: Vcc, value: SimulationMode) -> None:
"""
Set the simulation mode of the device.
:param value: SimulationMode
"""
super().write_simulationMode(value)
self.component_manager.simulation_mode = value
[docs] def read_dishID(self: Vcc) -> str:
# PROTECTED REGION ID(Vcc.dishID_read) ENABLED START #
"""
Read the dishID attribute.
:return: the Vcc's DISH ID.
:rtype: str
"""
return self.component_manager.dish_id
# PROTECTED REGION END # // Vcc.dishID_read
[docs] def write_dishID(self: Vcc, value: str) -> None:
# PROTECTED REGION ID(Vcc.dishID_write) ENABLED START #
"""
Write the dishID attribute.
:param value: the dishID value.
"""
self.component_manager.dish_id = value
# PROTECTED REGION END # // Vcc.dishID_write
[docs] def read_subarrayMembership(self: Vcc) -> int:
# PROTECTED REGION ID(Vcc.subarrayMembership_read) ENABLED START #
"""
Read the subarrayMembership attribute.
:return: the subarray membership (0 = no affiliation).
:rtype: int
"""
return self._subarray_membership
# PROTECTED REGION END # // Vcc.subarrayMembership_read
[docs] def write_subarrayMembership(self: Vcc, value: int) -> None:
# PROTECTED REGION ID(Vcc.subarrayMembership_write) ENABLED START #
"""
Write the subarrayMembership attribute.
:param value: the subarray membership value (0 = no affiliation).
"""
self.logger.debug(
f"Entering write_subarrayMembership(), value = {value}"
)
self._subarray_membership = value
self.push_change_event("subarrayMembership", value)
self.component_manager.deconfigure()
# PROTECTED REGION END # // Vcc.subarrayMembership_write
[docs] def read_frequencyBand(self: Vcc) -> tango.DevEnum:
# PROTECTED REGION ID(Vcc.frequencyBand_read) ENABLED START #
"""
Read the frequencyBand attribute.
:return: the frequency band (being observed by the current scan, one of
["1", "2", "3", "4", "5a", "5b"]).
:rtype: tango.DevEnum
"""
return self.component_manager.frequency_band
# PROTECTED REGION END # // Vcc.frequencyBand_read
[docs] def read_band5Tuning(self: Vcc) -> List[float]:
# PROTECTED REGION ID(Vcc.band5Tuning_read) ENABLED START #
"""
Read the band5Tuning attribute.
:return: the band5Tuning attribute (stream tuning (GHz)).
:rtype: list of float
"""
return self.component_manager.stream_tuning
# PROTECTED REGION END # // Vcc.band5Tuning_read
[docs] def read_frequencyBandOffsetStream1(self: Vcc) -> int:
# PROTECTED REGION ID(Vcc.frequencyBandOffsetStream1_read) ENABLED START #
"""
Read the frequencyBandOffsetStream1 attribute.
:return: the frequencyBandOffsetStream1 attribute.
:rtype: int
"""
return self.component_manager.frequency_band_offset_stream1
# PROTECTED REGION END # // Vcc.frequencyBandOffsetStream1_read
[docs] def read_frequencyBandOffsetStream2(self: Vcc) -> int:
# PROTECTED REGION ID(Vcc.frequencyBandOffsetStream2_read) ENABLED START #
"""
Read the frequencyBandOffsetStream2 attribute.
:return: the frequencyBandOffsetStream2 attribute.
:rtype: int
"""
return self.component_manager.frequency_band_offset_stream2
# PROTECTED REGION END # // Vcc.frequencyBandOffsetStream2_read
[docs] def read_dopplerPhaseCorrection(self: Vcc) -> List[float]:
# PROTECTED REGION ID(Vcc.dopplerPhaseCorrection_read) ENABLED START #
"""
Read the dopplerPhaseCorrection attribute.
:return: the dopplerPhaseCorrection attribute.
:rtype: list of float
"""
return self.component_manager.doppler_phase_correction
# PROTECTED REGION END # // Vcc.dopplerPhaseCorrection_read
[docs] def write_dopplerPhaseCorrection(self: Vcc, value: List[float]) -> None:
# PROTECTED REGION ID(Vcc.dopplerPhaseCorrection_write) ENABLED START #
"""
Write the dopplerPhaseCorrection attribute.
:param value: the dopplerPhaseCorrection attribute value.
"""
self.component_manager.doppler_phase_correction = value
# PROTECTED REGION END # // Vcc.dopplerPhaseCorrection_write
[docs] def read_rfiFlaggingMask(self: Vcc) -> str:
# PROTECTED REGION ID(Vcc.rfiFlaggingMask_read) ENABLED START #
"""
Read the rfiFlaggingMask attribute.
:return: the rfiFlaggingMask attribute.
:rtype: str/JSON
"""
return self.component_manager.rfi_flagging_mask
# PROTECTED REGION END # // Vcc.rfiFlaggingMask_read
[docs] def read_delayModel(self: Vcc) -> str:
# PROTECTED REGION ID(Vcc.delayModel_read) ENABLED START #
"""
Read the delayModel attribute.
:return: the delayModel attribute (delay model coefficients,
:return: the delayModel attribute (delay model coefficients,
:return: the delayModel attribute (delay model coefficients,
given per frequency slice).
:rtype: list of list of float
"""
return self.component_manager.delay_model
# PROTECTED REGION END # // Vcc.delayModel_read
[docs] def read_jonesMatrix(self: Vcc) -> str:
# PROTECTED REGION ID(Vcc.jonesMatrix_read) ENABLED START #
"""
Read the jonesMatrix attribute.
:return: the jonesMatrix attribute (jones matrix values,
given per frequency slice).
:rtype: str
"""
return self.component_manager.jones_matrix
# PROTECTED REGION END # // Vcc.jonesMatrix_read
[docs] def read_scanID(self: Vcc) -> int:
# PROTECTED REGION ID(Vcc.scanID_read) ENABLED START #
"""
Read the scanID attribute.
:return: the scanID attribute.
:rtype: int
"""
return self.component_manager.scan_id
# PROTECTED REGION END # // Vcc.scanID_read
[docs] def read_configID(self: Vcc) -> str:
# PROTECTED REGION ID(Vcc.configID_read) ENABLED START #
"""
Read the configID attribute.
:return: the configID attribute.
:rtype: str
"""
return self.component_manager.config_id
# PROTECTED REGION END # // Vcc.configID_read
# --------
# Commands
# --------
[docs] class InitCommand(CspSubElementObsDevice.InitCommand):
"""
A class for the Vcc's init_device() "command".
"""
[docs] def do(
self: Vcc.InitCommand,
) -> Tuple[ResultCode, str]:
"""
Stateless hook for device initialisation.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
(result_code, msg) = super().do()
device = self.target
# Make a private copy of the device properties:
device._vcc_id = device.VccID
# initialize attribute values
device._subarray_membership = 0
device._last_scan_configuration = ""
device._configuring_from_idle = False
device.set_change_event("frequencyBand", True, True)
# Setting initial simulation mode to True
device.write_simulationMode(SimulationMode.TRUE)
device.set_change_event("subarrayMembership", True, True)
# TODO remove when upgrading base class from 0.11.3
device.set_change_event("healthState", True, True)
return (result_code, msg)
[docs] class OnCommand(CspSubElementObsDevice.OnCommand):
"""
A class for the Vcc's on command.
"""
[docs] def do(
self: Vcc.OnCommand,
) -> Tuple[ResultCode, str]:
"""
Stateless hook for device initialisation.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self.logger.info("Entering Vcc.OnCommand")
return self.target.component_manager.on()
[docs] class OffCommand(CspSubElementObsDevice.OffCommand):
"""
A class for the Vcc's off command.
"""
[docs] def do(
self: Vcc.OffCommand,
) -> Tuple[ResultCode, str]:
"""
Stateless hook for device initialisation.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
return self.target.component_manager.off()
[docs] class StandbyCommand(CspSubElementObsDevice.StandbyCommand):
"""
A class for the Vcc's standby command.
"""
[docs] def do(
self: Vcc.StandbyCommand,
) -> Tuple[ResultCode, str]:
"""
Stateless hook for device initialisation.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
return self.target.component_manager.standby()
[docs] class ConfigureBandCommand(ResponseCommand):
"""
A class for the Vcc's ConfigureBand() command.
Turn on the corresponding band device and disable all the others.
"""
[docs] def do(
self: Vcc.ConfigureBandCommand, argin: str
) -> Tuple[ResultCode, str]:
"""
Stateless hook for ConfigureBand() command functionality.
:param freq_band_name: the frequency band name
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
return self.target.component_manager.configure_band(argin)
[docs] @command(dtype_in="DevString", doc_in="Band config string.")
@DebugIt()
def ConfigureBand(self, band_config: str) -> Tuple[ResultCode, str]:
# PROTECTED REGION ID(CspSubElementObsDevice.ConfigureBand) ENABLED START #
"""
Turn on the corresponding band device and disable all the others.
:param band_config: json string containing: the frequency band name, dish sample rate, and number of samples per frame
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
command = self.get_command_object("ConfigureBand")
(result_code, message) = command(band_config)
return [[result_code], [message]]
# PROTECTED REGION END # // CspSubElementObsDevice.ConfigureBand
def _raise_configuration_fatal_error(
self: Vcc, msg: str, cmd: str
) -> None:
"""
Throw an error message if ConfigureScan/ConfigureSearchWindow fails
:param msg: the error message
::param cmd: the failed command
"""
self.logger.error(msg)
tango.Except.throw_exception(
"Command failed", msg, cmd + " execution", tango.ErrSeverity.ERR
)
[docs] class ConfigureScanCommand(CspSubElementObsDevice.ConfigureScanCommand):
"""
A class for the Vcc's ConfigureScan() command.
"""
[docs] def do(
self: Vcc.ConfigureScanCommand, argin: str
) -> Tuple[ResultCode, str]:
"""
Stateless hook for ConfigureScan() command functionality.
:param argin: The configuration as JSON formatted string
:type argin: str
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
:raises: ``CommandError`` if the configuration data validation fails.
"""
device = self.target
# By this time, the DISH ID should be set:
device.logger.debug(f"dishID: {device.component_manager.dish_id}")
(result_code, msg) = device.component_manager.configure_scan(argin)
if result_code == ResultCode.OK:
# store the configuration on command success
device._last_scan_configuration = argin
if device._configuring_from_idle:
device.obs_state_model.perform_action(
"component_configured"
)
return (result_code, msg)
[docs] def validate_input(
self: Vcc.ConfigureScanCommand, argin: str
) -> Tuple[bool, str]:
"""
Validate the configuration parameters against allowed values, as needed.
:param argin: The JSON formatted string with configuration for the device.
:type argin: 'DevString'
:return: A tuple containing a boolean and a string message.
:rtype: (bool, str)
"""
try:
configuration = json.loads(argin)
except json.JSONDecodeError:
msg = (
"Scan configuration object is not a valid JSON object."
" Aborting configuration."
)
return (False, msg)
# Validate config_id.
if "config_id" not in configuration:
msg = "'configID' attribute is required."
return (False, msg)
# Validate frequency_band.
if "frequency_band" not in configuration:
msg = "'frequencyBand' attribute is required."
return (False, msg)
# Validate frequency_band_offset_stream1.
if "frequency_band_offset_stream1" not in configuration:
configuration["frequency_band_offset_stream1"] = 0
if (
abs(int(configuration["frequency_band_offset_stream1"]))
<= const.FREQUENCY_SLICE_BW * 10**6 / 2
):
pass
else:
msg = (
"Absolute value of 'frequencyBandOffsetStream1' must be at "
"most half of the frequency slice bandwidth. Aborting configuration."
)
return (False, msg)
# Validate frequency_band_offset_stream2.
if "frequency_band_offset_stream2" not in configuration:
configuration["frequency_band_offset_stream2"] = 0
if (
abs(int(configuration["frequency_band_offset_stream2"]))
<= const.FREQUENCY_SLICE_BW * 10**6 / 2
):
pass
else:
msg = (
"Absolute value of 'frequencyBandOffsetStream2' must be at "
"most half of the frequency slice bandwidth. Aborting configuration."
)
return (False, msg)
# Validate frequency_band.
valid_freq_bands = ["1", "2", "3", "4", "5a", "5b"]
if configuration["frequency_band"] not in valid_freq_bands:
msg = (
configuration["frequency_band"]
+ " not a valid frequency band. Aborting configuration."
)
return (False, msg)
# Validate band_5_tuning, frequency_band_offset_stream2
# if frequency_band is 5a or 5b.
if configuration["frequency_band"] in ["5a", "5b"]:
# band_5_tuning is optional
if "band_5_tuning" in configuration:
pass
# check if stream_tuning is an array of length 2
try:
assert len(configuration["band_5_tuning"]) == 2
except (TypeError, AssertionError):
msg = "'band_5_tuning' must be an array of length 2. Aborting configuration."
return (False, msg)
stream_tuning = [
*map(float, configuration["band_5_tuning"])
]
if configuration["frequency_band"] == "5a":
if all(
[
const.FREQUENCY_BAND_5a_TUNING_BOUNDS[0]
<= stream_tuning[i]
<= const.FREQUENCY_BAND_5a_TUNING_BOUNDS[1]
for i in [0, 1]
]
):
pass
else:
msg = (
"Elements in 'band_5_tuning must be floats between "
+ f"{const.FREQUENCY_BAND_5a_TUNING_BOUNDS[0]} and "
+ f"{const.FREQUENCY_BAND_5a_TUNING_BOUNDS[1]} "
+ f"(received {stream_tuning[0]} and {stream_tuning[1]})"
+ " for a 'frequencyBand' of 5a. Aborting configuration."
)
return (False, msg)
else: # configuration["frequency_band"] == "5b"
if all(
[
const.FREQUENCY_BAND_5b_TUNING_BOUNDS[0]
<= stream_tuning[i]
<= const.FREQUENCY_BAND_5b_TUNING_BOUNDS[1]
for i in [0, 1]
]
):
pass
else:
msg = (
"Elements in 'band_5_tuning must be floats between "
+ f"{const.FREQUENCY_BAND_5b_TUNING_BOUNDS[0]} and "
+ f"{const.FREQUENCY_BAND_5b_TUNING_BOUNDS[1]} "
+ f"(received {stream_tuning[0]} and {stream_tuning[1]})"
+ " for a 'frequencyBand' of 5b. Aborting configuration."
)
return (False, msg)
return (True, "Configuration validated OK")
[docs] @command(
dtype_in="DevString",
doc_in="JSON formatted string with the scan configuration.",
dtype_out="DevVarLongStringArray",
doc_out="A tuple containing a return code and a string message indicating status. "
"The message is for information purpose only.",
)
@DebugIt()
def ConfigureScan(self, argin):
# PROTECTED REGION ID(CspSubElementObsDevice.ConfigureScan) ENABLED START #
"""
Configure the observing device parameters for the current scan.
:param argin: JSON formatted string with the scan configuration.
:type argin: 'DevString'
:return: A tuple containing a return code and a string message indicating status.
The message is for information purpose only.
:rtype: (ResultCode, str)
"""
# This validation is already performed in the CbfSubarray ConfigureScan.
# TODO: Improve validation (validation should only be done once,
# most of the validation can be done through a schema instead of manually
# through functions).
command = self.get_command_object("ConfigureScan")
(valid, message) = command.validate_input(argin)
if not valid:
self._raise_configuration_fatal_error(message, "ConfigureScan")
else:
if self._obs_state == ObsState.IDLE:
self._configuring_from_idle = True
else:
self._configuring_from_idle = False
(result_code, message) = command(argin)
# store the configuration on command success
self._last_scan_configuration = argin
return [[result_code], [message]]
# PROTECTED REGION END # // CspSubElementObsDevice.ConfigureScan
[docs] class ScanCommand(CspSubElementObsDevice.ScanCommand):
"""
A class for the Vcc's Scan() command.
"""
[docs] def do(self: Vcc.ScanCommand, argin: int) -> Tuple[ResultCode, str]:
"""
Stateless hook for Scan() command functionality.
:param argin: The scan ID as JSON formatted string
:type argin: int
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
device = self.target
(result_code, msg) = device.component_manager.scan(argin)
if result_code == ResultCode.STARTED:
device.obs_state_model.perform_action("component_scanning")
return (result_code, msg)
[docs] @command(
dtype_in="DevShort",
doc_in="An integer with the scan ID",
dtype_out="DevVarLongStringArray",
doc_out="A tuple containing a return code and a string message indicating status."
"The message is for information purpose only.",
)
@DebugIt()
def Scan(self, argin):
# PROTECTED REGION ID(CspSubElementObsDevice.Scan) ENABLED START #
"""
Start an observing scan.
:param argin: A string with the scan ID
:type argin: 'DevShort'
:return: A tuple containing a return code and a string message indicating status.
The message is for information purpose only.
:rtype: (ResultCode, str)
"""
command = self.get_command_object("Scan")
(return_code, message) = command(argin)
return [[return_code], [message]]
[docs] class EndScanCommand(CspSubElementObsDevice.EndScanCommand):
"""
A class for the Vcc's EndScan() command.
"""
[docs] def do(self: Vcc.EndScanCommand) -> Tuple[ResultCode, str]:
"""
Stateless hook for EndScan() command functionality.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
device = self.target
(result_code, msg) = device.component_manager.end_scan()
if result_code == ResultCode.OK:
device.obs_state_model.perform_action("component_not_scanning")
return (result_code, msg)
[docs] class ObsResetCommand(CspSubElementObsDevice.ObsResetCommand):
"""A class for the VCC's ObsReset command."""
[docs] def do(self):
"""
Stateless hook for ObsReset() command functionality.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
device = self.target
return device.component_manager.obsreset()
[docs] class AbortCommand(CspSubElementObsDevice.AbortCommand):
"""A class for the VCC's Abort command."""
[docs] def do(self):
"""
Stateless hook for Abort() command functionality.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
device = self.target
return device.component_manager.abort()
[docs] class GoToIdleCommand(CspSubElementObsDevice.GoToIdleCommand):
"""
A class for the Vcc's GoToIdle command.
"""
[docs] def do(
self: Vcc.GoToIdleCommand,
) -> Tuple[ResultCode, str]:
"""
Stateless hook for GoToIdle() command functionality.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self.logger.debug("Entering GoToIdleCommand()")
device = self.target
# Reset all values intialized in InitCommand.do():
device.component_manager.deconfigure()
if device._obs_state == ObsState.IDLE:
return (
ResultCode.OK,
"GoToIdle command completed OK. Device already IDLE",
)
device.obs_state_model.perform_action("component_unconfigured")
return (ResultCode.OK, "GoToIdle command completed OK")
[docs] class UpdateDopplerPhaseCorrectionCommand(BaseCommand):
"""
A class for the Vcc's UpdateDopplerPhaseCorrection() command.
Update Vcc's doppler phase correction.
"""
[docs] def is_allowed(self: Vcc.UpdateDopplerPhaseCorrectionCommand) -> bool:
"""
Determine if UpdateDopplerPhaseCorrection is allowed
(allowed when Devstate is ON and ObsState is READY OR SCANNING).
:return: if UpdateDopplerPhaseCorrection is allowed
:rtype: bool
"""
if (
self.target.get_state() == tango.DevState.ON
and self.target._obs_state
in [
ObsState.READY,
ObsState.SCANNING,
]
):
return True
return False
[docs] def do(
self: Vcc.UpdateDopplerPhaseCorrectionCommand, argin: str
) -> None:
"""
Stateless hook for UpdateDopplerPhaseCorrection() command functionality.
:param argin: the doppler phase correction JSON
"""
if self.is_allowed():
self.target.component_manager.update_doppler_phase_correction(
argin
)
[docs] @command(
dtype_in="DevString",
doc_in="JSON formatted string with the Doppler phase correction model.",
)
@DebugIt()
def UpdateDopplerPhaseCorrection(self, argin: str):
# PROTECTED REGION ID(CspSubElementObsDevice.UpdateDopplerPhaseCorrection) ENABLED START #
"""
Update Vcc's doppler phase correction.
"""
self.get_command_object("UpdateDopplerPhaseCorrection")(argin)
# PROTECTED REGION END # // CspSubElementObsDevice.UpdateDopplerPhaseCorrection
[docs] class UpdateDelayModelCommand(BaseCommand):
"""
A class for the Vcc's UpdateDelayModel() command.
Update Vcc's delay model.
"""
[docs] def is_allowed(self: Vcc.UpdateDelayModelCommand) -> bool:
"""
Determine if UpdateDelayModel is allowed
(allowed when Devstate is ON and ObsState is READY OR SCANNING).
:return: if UpdateDelayModel is allowed
:rtype: bool
"""
if (
self.target.get_state() == tango.DevState.ON
and self.target._obs_state
in [
ObsState.READY,
ObsState.SCANNING,
]
):
return True
return False
[docs] def do(self: Vcc.UpdateDelayModelCommand, argin: str) -> None:
"""
Stateless hook for UpdateDelayModel() command functionality.
:param argin: the delay model JSON
"""
if self.is_allowed():
self.target.component_manager.update_delay_model(argin)
[docs] @command(
dtype_in="DevString",
doc_in="JSON formatted string with the delay model.",
)
@DebugIt()
def UpdateDelayModel(self, argin: str):
# PROTECTED REGION ID(CspSubElementObsDevice.UpdateDelayModel) ENABLED START #
"""
Update Vcc's delay model.
"""
self.get_command_object("UpdateDelayModel")(argin)
# PROTECTED REGION END # // CspSubElementObsDevice.UpdateDelayModel
[docs] class UpdateJonesMatrixCommand(BaseCommand):
"""
A class for the Vcc's UpdateJonesMatrix() command.
Update Vcc's Jones matrix.
"""
[docs] def is_allowed(self: Vcc.UpdateJonesMatrixCommand) -> bool:
"""
Determine if UpdateJonesMatrix is allowed
(allowed when Devstate is ON and ObsState is READY OR SCANNING).
:return: if UpdateJonesMatrix is allowed
:rtype: bool
"""
if (
self.target.get_state() == tango.DevState.ON
and self.target._obs_state
in [
ObsState.READY,
ObsState.SCANNING,
]
):
return True
return False
[docs] def do(self: Vcc.UpdateJonesMatrixCommand, argin: str) -> None:
"""
Stateless hook for UpdateJonesMatrix() command functionality.
:param argin: the Jones Matrix JSON
"""
if self.is_allowed():
self.target.component_manager.update_jones_matrix(argin)
[docs] @command(
dtype_in="DevString",
doc_in="JSON formatted string with the delay model.",
)
@DebugIt()
def UpdateJonesMatrix(self, argin: str):
# PROTECTED REGION ID(CspSubElementObsDevice.UpdateJonesMatrix) ENABLED START #
"""
Update Vcc's Jones matrix.
"""
self.get_command_object("UpdateJonesMatrix")(argin)
# PROTECTED REGION END # // CspSubElementObsDevice.UpdateJonesMatrix
[docs] class ConfigureSearchWindowCommand(ResponseCommand):
"""
A class for the Vcc's ConfigureSearchWindow() command.
Configure a search window by sending parameters from the input(JSON) to
SearchWindow device.
This function is called by the subarray after the configuration has
already been validated.
"""
[docs] def is_allowed(self: Vcc.ConfigureSearchWindowCommand) -> bool:
"""
Determine if ConfigureSearchWindow is allowed
(allowed if DevState is ON and ObsState is CONFIGURING)
:return: if ConfigureSearchWindow is allowed
:rtype: bool
"""
if self.target.get_state() == tango.DevState.ON and (
self.target._obs_state == ObsState.CONFIGURING
or self.target._obs_state == ObsState.READY
):
return True
return False
[docs] def validate_input(
self: Vcc.ConfigureSearchWindowCommand, argin: str
) -> Tuple[bool, str]:
"""
Validate a search window configuration
:param argin: JSON object with the search window parameters
"""
self.logger.debug(f"Validating argin: {argin}")
device = self.target
# try to deserialize input string to a JSON object
try:
argin = json.loads(argin)
except json.JSONDecodeError: # argument not a valid JSON object
msg = "Search window configuration object is not a valid JSON object."
return (False, msg)
# Validate searchWindowID.
if "search_window_id" in argin:
sw_id = argin["search_window_id"]
if sw_id in [1, 2]:
pass
else: # searchWindowID not in valid range
msg = f"'search_window_id' must be one of [1, 2] (received {sw_id})."
return (False, msg)
else:
msg = "Search window specified, but 'search_window_id' not given."
return (False, msg)
self.logger.debug(
f"Validated search_window_id: {argin['search_window_id']}"
)
# Validate searchWindowTuning.
if "search_window_tuning" in argin:
freq_band_name = argin["frequency_band"]
if freq_band_name not in [
"5a",
"5b",
]: # frequency band is not band 5
frequencyBand_mi = freq_band_dict()[freq_band_name][
"band_index"
]
frequencyBand = ["1", "2", "3", "4", "5a", "5b"].index(
argin["frequency_band"]
)
assert frequencyBand_mi == frequencyBand
start_freq_Hz, stop_freq_Hz = [
const.FREQUENCY_BAND_1_RANGE_HZ,
const.FREQUENCY_BAND_2_RANGE_HZ,
const.FREQUENCY_BAND_3_RANGE_HZ,
const.FREQUENCY_BAND_4_RANGE_HZ,
][frequencyBand]
device.logger.debug(f"start_freq_Hz = {start_freq_Hz}")
device.logger.debug(f"stop_freq_Hz = {stop_freq_Hz}")
if (
start_freq_Hz + argin["frequency_band_offset_stream1"]
<= int(argin["search_window_tuning"])
<= stop_freq_Hz
+ argin["frequency_band_offset_stream1"]
):
pass
else:
msg = "'search_window_tuning' must be within observed band."
return (False, msg)
# frequency band 5a or 5b (two streams with bandwidth 2.5 GHz)
else:
if argin["band_5_tuning"] == [
0,
0,
]: # band 5 tuning not specified in configuration
pass
else:
frequency_band_range_1 = (
argin["band_5_tuning"][0] * 10**9
+ argin["frequency_band_offset_stream1"]
- const.BAND_5_STREAM_BANDWIDTH * 10**9 / 2,
argin["band_5_tuning"][0] * 10**9
+ argin["frequency_band_offset_stream1"]
+ const.BAND_5_STREAM_BANDWIDTH * 10**9 / 2,
)
frequency_band_range_2 = (
argin["band_5_tuning"][1] * 10**9
+ argin["frequency_band_offset_stream2"]
- const.BAND_5_STREAM_BANDWIDTH * 10**9 / 2,
argin["band_5_tuning"][1] * 10**9
+ argin["frequency_band_offset_stream2"]
+ const.BAND_5_STREAM_BANDWIDTH * 10**9 / 2,
)
if (
frequency_band_range_1[0]
<= int(argin["search_window_tuning"])
<= frequency_band_range_1[1]
) or (
frequency_band_range_2[0]
<= int(argin["search_window_tuning"])
<= frequency_band_range_2[1]
):
pass
else:
msg = "'searchWindowTuning' must be within observed band."
device.logger.error(msg)
tango.Except.throw_exception(
"Command failed",
msg,
"ConfigureSearchWindow execution",
tango.ErrSeverity.ERR,
)
else:
msg = "Search window specified, but 'search_window_tuning' not given."
return (False, msg)
self.logger.debug(
f"Validated search_window_tuning: {json.dumps(argin['search_window_tuning'])}"
)
# Validate tdcEnable.
if "tdc_enable" in argin:
if argin["tdc_enable"] in [True, False]:
pass
else:
msg = (
"Search window specified, but 'tdc_enable' not given."
)
return (False, msg)
else:
msg = "Search window specified, but 'tdc_enable' not given."
return (False, msg)
self.logger.debug("Validated tdcEnable")
# Validate tdcNumBits.
if argin["tdc_enable"]:
if "tdc_num_bits" in argin:
tdc_num_bits = argin["tdc_num_bits"]
if tdc_num_bits in [2, 4, 8]:
pass
else:
msg = f"'tdcNumBits' must be one of [2, 4, 8] (received {tdc_num_bits})."
return (False, msg)
else:
msg = "Search window specified with TDC enabled, but 'tdcNumBits' not given."
return (False, msg)
self.logger.debug("Validated tdcNumBits")
# Validate tdcPeriodBeforeEpoch.
if "tdc_period_before_epoch" in argin:
tdc_pbe = argin["tdc_period_before_epoch"]
if tdc_pbe > 0:
pass
else:
msg = f"'tdcPeriodBeforeEpoch' must be a positive integer (received {tdc_pbe})."
return (False, msg)
else:
pass
self.logger.debug("Validated tdcPeriodBeforeEpoch")
# Validate tdcPeriodAfterEpoch.
if "tdc_period_after_epoch" in argin:
tdc_pae = argin["tdc_period_after_epoch"]
if tdc_pae > 0:
pass
else:
msg = f"'tdcPeriodAfterEpoch' must be a positive integer (received {tdc_pae})."
return (False, msg)
else:
pass
self.logger.debug("Validated tdcPeriodAfterEpoch")
# Validate tdcDestinationAddress.
if argin["tdc_enable"]:
try:
for receptor in argin["tdc_destination_address"]:
if (
receptor["receptor_id"]
== device.component_manager.dish_id
):
# TODO: validate tdc_destination_address
break
else:
pass
except KeyError:
# tdcDestinationAddress not given or receptor_id not in tdcDestinationAddress
msg = (
"Search window specified with TDC enabled, but 'tdcDestinationAddress' "
"not given or missing receptors."
)
return (False, msg)
self.logger.debug("Validated tdcDestinationAddress")
self.logger.debug("Search window validation complete")
return (True, "Search window validated.")
[docs] def do(
self: Vcc.ConfigureSearchWindowCommand, argin: str
) -> Tuple[ResultCode, str]:
"""
Stateless hook for ConfigureSearchWindow() command functionality.
:param argin: JSON object with the search window parameters
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self.logger.debug(f"argin: {argin}")
return self.target.component_manager.configure_search_window(argin)
[docs] @command(
dtype_in="DevString",
doc_in="JSON formatted string with the search window configuration.",
dtype_out="DevVarLongStringArray",
doc_out="A tuple containing a return code and a string message indicating status. "
"The message is for information purpose only.",
)
@DebugIt()
def ConfigureSearchWindow(self, argin):
# PROTECTED REGION ID(CspSubElementObsDevice.ConfigureScan) ENABLED START #
"""
Configure the observing device parameters for a search window.
:param argin: JSON formatted string with the search window configuration.
:type argin: 'DevString'
:return: A tuple containing a return code and a string message indicating status.
The message is for information purpose only.
:rtype: (ResultCode, str)
"""
# This validation is already performed in the CbfSubarray ConfigureScan.
# TODO: Improve validation (validation should only be done once,
# most of the validation can be done through a schema instead of manually
# through functions).
command = self.get_command_object("ConfigureSearchWindow")
self.logger.debug(f"argin: {argin}")
(valid, message) = command.validate_input(argin)
self.logger.debug(
f"ConfigureSearchWindow validation result: {message}"
)
if not valid:
self._raise_configuration_fatal_error(
message, "ConfigureSearchWindow"
)
(result_code, message) = command(argin)
self.logger.debug(f"ConfigureSearchWindow result: {message}")
return [[result_code], [message]]
# PROTECTED REGION END # // CspSubElementObsDevice.ConfigureScan
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
# PROTECTED REGION ID(Vcc.main) ENABLED START #
return run((Vcc,), args=args, **kwargs)
# PROTECTED REGION END # // Vcc.main
if __name__ == "__main__":
main()