Source code for ska_mid_cbf_mcs.vcc.vcc_component_manager
# -*- coding: utf-8 -*-
#
# This file is part of the SKA Mid.CBF MCS project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
# Copyright (c) 2019 National Research Council of Canada
"""
VccComponentManager
Sub-element VCC component manager for Mid.CBF
"""
from __future__ import annotations # allow forward references in type hints
import copy
import json
import logging
from typing import Callable, List, Optional, Tuple
# tango imports
import tango
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import PowerMode, SimulationMode
from ska_tango_base.csp.obs import CspObsComponentManager
from ska_mid_cbf_mcs.commons.global_enum import const, freq_band_dict
from ska_mid_cbf_mcs.component.component_manager import (
CbfComponentManager,
CommunicationStatus,
)
from ska_mid_cbf_mcs.device_proxy import CbfDeviceProxy
from ska_mid_cbf_mcs.vcc.vcc_band_simulator import VccBandSimulator
from ska_mid_cbf_mcs.vcc.vcc_controller_simulator import VccControllerSimulator
# SKA Specific imports
__all__ = ["VccComponentManager"]
VCC_PARAM_PATH = "mnt/vcc_param/"
[docs]class VccComponentManager(CbfComponentManager, CspObsComponentManager):
"""Component manager for Vcc class."""
@property
def config_id(self: VccComponentManager) -> str:
"""
Configuration ID
:return: the configuration ID
"""
return self._config_id
@config_id.setter
def config_id(self: VccComponentManager, config_id: str) -> None:
"""
Set the configuration ID.
:param config_id: Configuration ID
"""
self._config_id = config_id
@property
def scan_id(self: VccComponentManager) -> int:
"""
Scan ID
:return: the scan ID
"""
return self._scan_id
@scan_id.setter
def scan_id(self: VccComponentManager, scan_id: int) -> None:
"""
Set the scan ID.
:param scan_id: Scan ID
"""
self._scan_id = scan_id
@property
def dish_id(self: VccComponentManager) -> str:
"""
DISH ID
:return: the DISH ID
"""
return self._dish_id
@dish_id.setter
def dish_id(self: VccComponentManager, dish_id: str) -> None:
"""
Set the DISH ID.
:param dish_id: DISH ID
"""
self._dish_id = dish_id
@property
def frequency_band(self: VccComponentManager) -> int:
"""
Frequency Band
:return: the frequency band as the integer index in an array
of frequency band labels: ["1", "2", "3", "4", "5a", "5b"]
"""
return self._frequency_band
@property
def stream_tuning(self: VccComponentManager) -> List[float]:
"""
Band 5 Stream Tuning
:return: the band 5 stream tuning
"""
return self._stream_tuning
@property
def frequency_band_offset_stream1(self: VccComponentManager) -> int:
"""
Frequency Band Offset Stream 1
:return: the frequency band offset for stream 1
"""
return self._frequency_band_offset_stream1
@property
def frequency_band_offset_stream2(self: VccComponentManager) -> int:
"""
Frequency Band Offset Stream 2
:return: the frequency band offset for stream 2, this
is only use when band 5 is active
"""
return self._frequency_band_offset_stream2
@property
def rfi_flagging_mask(self: VccComponentManager) -> str:
"""
RFI Flagging Mask
:return: the RFI flagging mask
"""
return self._rfi_flagging_mask
@property
def jones_matrix(self: VccComponentManager) -> str:
"""
Jones Matrix
:return: the last received Jones matrix
"""
return self._jones_matrix
@property
def delay_model(self: VccComponentManager) -> str:
"""
Delay Model
:return: the last received delay model
"""
return self._delay_model
@property
def doppler_phase_correction(self: VccComponentManager) -> List[float]:
"""
Doppler Phase Correction
:return: the last received Doppler phase correction array
"""
return self._doppler_phase_correction
def __init__(
self: VccComponentManager,
vcc_id: int,
talon_lru: str,
vcc_controller: str,
vcc_band: List[str],
search_window: List[str],
logger: logging.Logger,
push_change_event_callback: Optional[Callable],
communication_status_changed_callback: Callable[
[CommunicationStatus], None
],
component_power_mode_changed_callback: Callable[[PowerMode], None],
component_fault_callback: Callable,
component_obs_fault_callback: Callable,
simulation_mode: SimulationMode = SimulationMode.TRUE,
) -> None:
"""
Initialize a new instance.
:param vcc_id: integer ID of this VCC
:param talon_lru: FQDN of the TalonLRU device
:param vcc_controller: FQDN of the HPS VCC controller device
:param vcc_band: FQDNs of HPS VCC band devices
:param search_window: FQDNs of VCC search windows
:param logger: a logger for this object to use
:param push_change_event_callback: method to call when the base classes
want to send an event
:param communication_status_changed_callback: callback to be
called when the status of the communications channel between the
component manager and its component changes
:param component_power_mode_changed_callback: callback to be called when
the component power mode changes
:param component_fault_callback: callback to be called in event of
component fault (for op state model)
:param component_obs_fault_callback: callback to be called in event of
component fault (for obs state model)
:param simulation_mode: simulation mode identifies if the real VCC HPS
applications or the simulator should be connected
"""
self._logger = logger
self._simulation_mode = simulation_mode
self._vcc_id = vcc_id
self._talon_lru_fqdn = talon_lru
self._vcc_controller_fqdn = vcc_controller
self._vcc_band_fqdn = vcc_band
self._search_window_fqdn = search_window
self.connected = False
self._ready = False
self.obs_faulty = False
self._component_obs_fault_callback = component_obs_fault_callback
# Initialize attribute values
self._dish_id = ""
self._scan_id = 0
self._config_id = ""
self._frequency_band = 0
self._freq_band_name = ""
self._stream_tuning = (0, 0)
self._frequency_band_offset_stream1 = 0
self._frequency_band_offset_stream2 = 0
self._rfi_flagging_mask = ""
self._jones_matrix = ""
self._delay_model = ""
self._doppler_phase_correction = [0 for _ in range(4)]
# Initialize list of band proxies and band -> index translation;
# entry for each of: band 1 & 2, band 3, band 4, band 5
self._band_proxies = []
self._freq_band_index = dict(
zip(freq_band_dict().keys(), [0, 0, 1, 2, 3, 3])
)
self._sw_proxies = []
self._talon_lru_proxy = None
self._vcc_controller_proxy = None
# Create simulators
self._band_simulators = [
VccBandSimulator(vcc_band[0]),
VccBandSimulator(vcc_band[1]),
VccBandSimulator(vcc_band[2]),
VccBandSimulator(vcc_band[3]),
]
self._vcc_controller_simulator = VccControllerSimulator(
vcc_controller,
self._band_simulators[0],
self._band_simulators[1],
self._band_simulators[2],
self._band_simulators[3],
)
super().__init__(
logger=logger,
push_change_event_callback=push_change_event_callback,
communication_status_changed_callback=communication_status_changed_callback,
component_power_mode_changed_callback=component_power_mode_changed_callback,
component_fault_callback=component_fault_callback,
obs_state_model=None,
)
@property
def simulation_mode(self: VccComponentManager) -> SimulationMode:
"""
Get the simulation mode of the component manager.
:return: simulation mode of the component manager
"""
return self._simulation_mode
@simulation_mode.setter
def simulation_mode(
self: VccComponentManager, value: SimulationMode
) -> None:
"""
Set the simulation mode of the component manager.
:param value: value to set simulation mode to
"""
self._simulation_mode = value
[docs] def start_communicating(self: VccComponentManager) -> None:
"""Establish communication with the component, then start monitoring."""
if self.connected:
self._logger.info("Already connected.")
return
super().start_communicating()
try:
self._talon_lru_proxy = CbfDeviceProxy(
fqdn=self._talon_lru_fqdn, logger=self._logger
)
self._sw_proxies = [
CbfDeviceProxy(fqdn=fqdn, logger=self._logger)
for fqdn in self._search_window_fqdn
]
except tango.DevFailed:
self.update_component_fault(True)
self._logger.error("Error in proxy connection")
return
self.connected = True
self.update_communication_status(CommunicationStatus.ESTABLISHED)
self.update_component_power_mode(self._get_power_mode())
self.update_component_fault(False)
[docs] def stop_communicating(self: VccComponentManager) -> None:
"""Stop communication with the component."""
super().stop_communicating()
self.update_component_power_mode(PowerMode.UNKNOWN)
self.connected = False
def _get_power_mode(self: VccComponentManager) -> PowerMode:
"""
Get the power mode of this VCC based on the current power
mode of the LRU this VCC belongs to.
:return: VCC power mode
"""
try:
return self._talon_lru_proxy.LRUPowerMode
except tango.DevFailed:
self._logger.error("Could not connect to Talon LRU device")
self.update_component_fault(True)
return PowerMode.UNKNOWN
[docs] def on(self: VccComponentManager) -> Tuple[ResultCode, str]:
"""
Turn on VCC component. This attempts to establish communication
with the VCC devices on the HPS.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
:raise ConnectionError: if unable to connect to HPS VCC devices
"""
self._logger.info("Entering VccComponentManager.on")
try:
# Try to connect to HPS devices, they should be running at this point
if not self._simulation_mode:
self._logger.info(
"Connecting to HPS VCC controller and band devices"
)
self._vcc_controller_proxy = CbfDeviceProxy(
fqdn=self._vcc_controller_fqdn, logger=self._logger
)
self._band_proxies = [
CbfDeviceProxy(fqdn=fqdn, logger=self._logger)
for fqdn in self._vcc_band_fqdn
]
except tango.DevFailed as df:
self._logger.error(str(df.args[0].desc))
self.update_component_fault(True)
return (ResultCode.FAILED, "Failed to connect to HPS VCC devices")
self._logger.info("Completed VccComponentManager.on")
self.update_component_power_mode(PowerMode.ON)
return (ResultCode.OK, "On command completed OK")
[docs] def off(self: VccComponentManager) -> Tuple[ResultCode, str]:
"""
Turn off VCC component; currently unimplemented.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self.update_component_power_mode(PowerMode.OFF)
return (ResultCode.OK, "Off command completed OK")
[docs] def standby(self: VccComponentManager) -> Tuple[ResultCode, str]:
"""
Turn VCC component to standby; currently unimplemented.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self.update_component_power_mode(PowerMode.STANDBY)
return (ResultCode.OK, "Standby command completed OK")
[docs] def configure_band(
self: VccComponentManager, argin: str
) -> Tuple[ResultCode, str]:
"""
Configure the corresponding band. At the HPS level, this reconfigures the
FPGA to the correct bitstream and enables the respective band device. All
other band devices are disabled.
:param argin: the frequency band name
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
(result_code, msg) = (ResultCode.OK, "ConfigureBand completed OK.")
try:
band_config = json.loads(argin)
freq_band_name = band_config["frequency_band"]
# Configure the band via the VCC Controller device
self._logger.info(f"Configuring VCC band {freq_band_name}")
frequency_band = freq_band_dict()[freq_band_name]["band_index"]
self._freq_band_name = freq_band_name
if self._simulation_mode:
self._vcc_controller_simulator.ConfigureBand(frequency_band)
else:
self._vcc_controller_proxy.ConfigureBand(frequency_band)
# Set internal params for the configured band
self._logger.info(
f"Configuring internal parameters for VCC band {freq_band_name}"
)
internal_params_file_name = f"{VCC_PARAM_PATH}internal_params_receptor{self._dish_id}_band{freq_band_name}.json"
self._logger.debug(
f"Using parameters stored in {internal_params_file_name}"
)
try:
with open(internal_params_file_name, "r") as f:
json_string = f.read()
except FileNotFoundError:
self._logger.info(
f"Could not find internal parameters file for receptor {self._dish_id}, band {freq_band_name}; using default."
)
with open(
f"{VCC_PARAM_PATH}internal_params_default.json", "r"
) as f:
json_string = f.read()
self._logger.info(f"VCC internal parameters: {json_string}")
args = json.loads(json_string)
args.update({"dish_sample_rate": band_config["dish_sample_rate"]})
args.update(
{"samples_per_frame": band_config["samples_per_frame"]}
)
json_string = json.dumps(args)
idx = self._freq_band_index[self._freq_band_name]
if self._simulation_mode:
self._band_simulators[idx].SetInternalParameters(json_string)
else:
self._band_proxies[idx].SetInternalParameters(json_string)
self._frequency_band = frequency_band
self._push_change_event("frequencyBand", self._frequency_band)
except tango.DevFailed as df:
self._logger.error(str(df.args[0].desc))
self._component_obs_fault_callback(True)
(result_code, msg) = (
ResultCode.FAILED,
"Failed to connect to HPS VCC devices.",
)
except FileNotFoundError:
self._logger.error(
"Could not find default internal parameters file."
)
(result_code, msg) = (
ResultCode.FAILED,
"Missing default internal parameters file.",
)
return (result_code, msg)
[docs] def deconfigure(self: VccComponentManager) -> None:
"""Deconfigure scan configuration parameters."""
self._doppler_phase_correction = [0 for _ in range(4)]
self._jones_matrix = ""
self._delay_model = ""
self._rfi_flagging_mask = ""
self._frequency_band_offset_stream2 = 0
self._frequency_band_offset_stream1 = 0
self._stream_tuning = (0, 0)
self._frequency_band = 0
self._push_change_event("frequencyBand", self._frequency_band)
self._freq_band_name = ""
self._config_id = ""
self._scan_id = 0
if self._ready:
if self._simulation_mode:
self._vcc_controller_simulator.Unconfigure()
else:
try:
pass
# TODO CIP-1850
# self._vcc_controller_proxy.Unconfigure()
except tango.DevFailed as df:
self._logger.error(str(df.args[0].desc))
self._component_obs_fault_callback(True)
self._ready = False
[docs] def configure_scan(
self: VccComponentManager, argin: str
) -> Tuple[ResultCode, str]:
"""
Execute configure scan operation.
:param argin: JSON string with the configure scan parameters
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
configuration = json.loads(argin)
self._config_id = configuration["config_id"]
# TODO: The frequency band attribute is optional but
# if not specified the previous frequency band set should be used
# (see Mid.CBF Scan Configuration in ICD). Therefore, the previous frequency
# band value needs to be stored, and if the frequency band is not
# set in the config it should be replaced with the previous value.
freq_band = freq_band_dict()[configuration["frequency_band"]][
"band_index"
]
if self._frequency_band != freq_band:
return (
ResultCode.FAILED,
f"Error in Vcc.ConfigureScan; scan configuration frequency band {freq_band} "
+ f"not the same as enabled band device {self._frequency_band}",
)
if self._frequency_band in [4, 5]:
self._stream_tuning = configuration["band_5_tuning"]
self._frequency_band_offset_stream1 = int(
configuration["frequency_band_offset_stream1"]
)
self._frequency_band_offset_stream2 = int(
configuration["frequency_band_offset_stream2"]
)
if "rfi_flagging_mask" in configuration:
self._rfi_flagging_mask = str(configuration["rfi_flagging_mask"])
else:
self._logger.warning("'rfiFlaggingMask' not given. Proceeding.")
# Send the ConfigureScan command to the HPS
idx = self._freq_band_index[self._freq_band_name]
if self._simulation_mode:
self._band_simulators[idx].ConfigureScan(argin)
else:
try:
self._band_proxies[idx].ConfigureScan(argin)
except tango.DevFailed as df:
self._logger.error(str(df.args[0].desc))
self._component_obs_fault_callback(True)
return (
ResultCode.FAILED,
"Failed to connect to VCC band device",
)
self._ready = True
return (ResultCode.OK, "Vcc ConfigureScanCommand completed OK")
[docs] def scan(
self: VccComponentManager, scan_id: int
) -> Tuple[ResultCode, str]:
"""
Begin scan operation.
:param argin: scan ID integer
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self._logger.info("Starting scan")
self._scan_id = scan_id
# Send the Scan command to the HPS
idx = self._freq_band_index[self._freq_band_name]
if self._simulation_mode:
self._band_simulators[idx].Scan(scan_id)
else:
try:
self._band_proxies[idx].Scan(scan_id)
except tango.DevFailed as df:
self._logger.error(str(df.args[0].desc))
self._component_obs_fault_callback(True)
return (
ResultCode.FAILED,
"Failed to connect to VCC band device",
)
return (ResultCode.STARTED, "Vcc ScanCommand completed OK")
[docs] def end_scan(self: VccComponentManager) -> Tuple[ResultCode, str]:
"""
End scan operation.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self._logger.info("Ending scan")
# Send the EndScan command to the HPS
idx = self._freq_band_index[self._freq_band_name]
if self._simulation_mode:
self._band_simulators[idx].EndScan()
else:
try:
self._band_proxies[idx].EndScan()
except tango.DevFailed as df:
self._logger.error(str(df.args[0].desc))
self._component_obs_fault_callback(True)
return (
ResultCode.FAILED,
"Failed to connect to VCC band device",
)
return (ResultCode.OK, "Vcc EndScanCommand completed OK")
[docs] def abort(self):
"""Tell the current VCC band device to abort whatever it was doing."""
if self._freq_band_name != "":
idx = self._freq_band_index[self._freq_band_name]
if self._simulation_mode:
self._band_simulators[idx].Abort()
else:
try:
pass
# TODO CIP-1850
# self._vcc_controller_proxy.Unconfigure()
# self._band_proxies[idx].Abort()
except tango.DevFailed as df:
self._logger.error(str(df.args[0].desc))
self._component_obs_fault_callback(True)
return (
ResultCode.FAILED,
"Failed to connect to VCC band device",
)
# If the VCC has been aborted from READY, update accordingly.
if self._ready:
self._ready = False
else:
# if no value for _freq_band_name, assume in IDLE state,
# either from initialization or after deconfigure has been called
self._logger.info(
"Aborting from IDLE; not issuing Abort command to VCC band devices"
)
return (ResultCode.OK, "Vcc Abort command completed OK")
[docs] def obsreset(self):
"""Reset the configuration."""
if self._freq_band_name != "":
idx = self._freq_band_index[self._freq_band_name]
if self._simulation_mode:
self._band_simulators[idx].ObsReset()
else:
try:
pass
# TODO CIP-1850
# self._band_proxies[idx].ObsReset()
except tango.DevFailed as df:
self._logger.error(str(df.args[0].desc))
self._component_obs_fault_callback(True)
return (
ResultCode.FAILED,
"Failed to connect to VCC band device",
)
else:
# if no value for _freq_band_name, assume in IDLE state,
# either from initialization or after deconfigure has been called
self._logger.info(
"Aborted from IDLE; not issuing ObsReset command to VCC band devices"
)
return (ResultCode.OK, "Vcc ObsReset command completed OK")
[docs] def configure_search_window(
self: VccComponentManager, argin: str
) -> Tuple[ResultCode, str]:
"""
Configure a search window by sending parameters from the input(JSON) to
SearchWindow self. This function is called by the subarray after the
configuration has already been validated, so the checks here have been
removed to reduce overhead.
:param argin: JSON string with the search window parameters
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
result_code = ResultCode.OK
msg = "ConfigureSearchWindow completed OK"
argin = json.loads(argin)
self._logger.debug(f"vcc argin: {json.dumps(argin)}")
# variable to use as SW proxy
proxy_sw = None
# Configure searchWindowID.
if int(argin["search_window_id"]) == 1:
proxy_sw = self._sw_proxies[0]
elif int(argin["search_window_id"]) == 2:
proxy_sw = self._sw_proxies[1]
self._logger.debug(f"search_window_id == {argin['search_window_id']}")
try:
# Configure searchWindowTuning.
if self._frequency_band in list(
range(4)
): # frequency band is not band 5
proxy_sw.searchWindowTuning = argin["search_window_tuning"]
start_freq_Hz, stop_freq_Hz = [
const.FREQUENCY_BAND_1_RANGE_HZ,
const.FREQUENCY_BAND_2_RANGE_HZ,
const.FREQUENCY_BAND_3_RANGE_HZ,
const.FREQUENCY_BAND_4_RANGE_HZ,
][self._frequency_band]
if (
start_freq_Hz
+ self._frequency_band_offset_stream1
+ const.SEARCH_WINDOW_BW_HZ / 2
<= int(argin["search_window_tuning"])
<= stop_freq_Hz
+ self._frequency_band_offset_stream1
- const.SEARCH_WINDOW_BW_HZ / 2
):
# this is the acceptable range
pass
else:
# log a warning message
log_msg = (
"'searchWindowTuning' partially out of observed band. "
"Proceeding."
)
self._logger.warning(log_msg)
else: # frequency band 5a or 5b (two streams with bandwidth 2.5 GHz)
proxy_sw.searchWindowTuning = argin["search_window_tuning"]
frequency_band_range_1 = (
self._stream_tuning[0] * 10**9
+ self._frequency_band_offset_stream1
- const.BAND_5_STREAM_BANDWIDTH * 10**9 / 2,
self._stream_tuning[0] * 10**9
+ self._frequency_band_offset_stream1
+ const.BAND_5_STREAM_BANDWIDTH * 10**9 / 2,
)
frequency_band_range_2 = (
self._stream_tuning[1] * 10**9
+ self._frequency_band_offset_stream2
- const.BAND_5_STREAM_BANDWIDTH * 10**9 / 2,
self._stream_tuning[1] * 10**9
+ self._frequency_band_offset_stream2
+ const.BAND_5_STREAM_BANDWIDTH * 10**9 / 2,
)
if (
frequency_band_range_1[0]
+ const.SEARCH_WINDOW_BW * 10**6 / 2
<= int(argin["search_window_tuning"])
<= frequency_band_range_1[1]
- const.SEARCH_WINDOW_BW * 10**6 / 2
) or (
frequency_band_range_2[0]
+ const.SEARCH_WINDOW_BW * 10**6 / 2
<= int(argin["search_window_tuning"])
<= frequency_band_range_2[1]
- const.SEARCH_WINDOW_BW * 10**6 / 2
):
# this is the acceptable range
pass
else:
# log a warning message
log_msg = (
"'searchWindowTuning' partially out of observed band. "
"Proceeding."
)
self._logger.warning(log_msg)
# Configure tdcEnable.
proxy_sw.tdcEnable = argin["tdc_enable"]
# if argin["tdc_enable"]:
# proxy_sw.On()
# else:
# proxy_sw.Off()
# Configure tdcNumBits.
if argin["tdc_enable"]:
proxy_sw.tdcNumBits = int(argin["tdc_num_bits"])
# Configure tdcPeriodBeforeEpoch.
if "tdc_period_before_epoch" in argin:
proxy_sw.tdcPeriodBeforeEpoch = int(
argin["tdc_period_before_epoch"]
)
else:
proxy_sw.tdcPeriodBeforeEpoch = 2
log_msg = (
"Search window specified, but 'tdcPeriodBeforeEpoch' not given. "
"Defaulting to 2."
)
self._logger.warning(log_msg)
# Configure tdcPeriodAfterEpoch.
if "tdc_period_after_epoch" in argin:
proxy_sw.tdcPeriodAfterEpoch = int(
argin["tdc_period_after_epoch"]
)
else:
proxy_sw.tdcPeriodAfterEpoch = 22
log_msg = (
"Search window specified, but 'tdcPeriodAfterEpoch' not given. "
"Defaulting to 22."
)
self._logger.warning(log_msg)
# Configure tdcDestinationAddress.
# Note: subarray has translated DISH IDs to VCC IDs in the JSON at this point
if argin["tdc_enable"]:
for tdc_dest in argin["tdc_destination_address"]:
if tdc_dest["receptor_id"] == self._vcc_id:
# TODO: validate input
proxy_sw.tdcDestinationAddress = tdc_dest[
"tdc_destination_address"
]
break
except tango.DevFailed as df:
self._logger.error(str(df.args[0].desc))
self._component_obs_fault_callback(True)
(result_code, msg) = (
ResultCode.FAILED,
"Error configuring search window.",
)
return (result_code, msg)
[docs] def update_doppler_phase_correction(
self: VccComponentManager, argin: str
) -> None:
"""
Update Vcc's doppler phase correction
:param argin: the doppler phase correction JSON string
"""
argin = json.loads(argin)
# Note: subarray has translated DISH IDs to VCC IDs in the JSON at this point
for dopplerDetails in argin:
if dopplerDetails["receptor"] == self._vcc_id:
coeff = dopplerDetails["dopplerCoeff"]
if len(coeff) == 4:
self._doppler_phase_correction = coeff.copy()
else:
log_msg = "Invalid length for 'dopplerCoeff' "
self._logger.error(log_msg)
[docs] def update_delay_model(self: VccComponentManager, argin: str) -> None:
"""
Update Vcc's delay model
:param argin: the delay model JSON string
"""
delay_model_obj = json.loads(argin)
# Find the delay model that applies to this VCC's DISH ID and store it
dm_found = False
# The delay model schema allows for a set of dishes to be included.
# Even though there will only be one entryfor a VCC, there should still
# be a list with a single entry so that the schema is followed.
# Set up the delay model to be a list.
# Note: subarray has translated DISH IDs to VCC IDs in the JSON at this point
list_of_entries = []
for entry in delay_model_obj["receptor_delays"]:
self._logger.debug(
f"Received delay model for VCC {entry['receptor']}"
)
if entry["receptor"] == self._vcc_id:
self._logger.debug("Updating delay model for this VCC")
list_of_entries.append(copy.deepcopy(entry))
self._delay_model = json.dumps(
{"receptor_delays": list_of_entries}
)
dm_found = True
break
if not dm_found:
log_msg = f"Delay Model for VCC (DISH: {self._dish_id}) not found"
self._logger.error(log_msg)
[docs] def update_jones_matrix(self: VccComponentManager, argin: str) -> None:
"""
Update Vcc's jones matrix
:param argin: the jones matrix JSON string
"""
matrix = json.loads(argin)
# Find the Jones matrix that applies to this VCC's DISH ID and store it
jm_found = False
# The Jones matrix schema allows for a set of receptors/dishes to be included.
# Even though there will only be one entry for a VCC, there should still
# be a list with a single entry so that the schema is followed.
# Set up the Jones matrix to be a list.
# Note: subarray has translated DISH IDs to VCC IDs in the JSON at this point
list_of_entries = []
for entry in matrix["jones_matrix"]:
self._logger.debug(
f"Received Jones matrix for VCC {entry['receptor']}"
)
if entry["receptor"] == self._vcc_id:
self._logger.debug("Updating Jones Matrix for this VCC")
list_of_entries.append(copy.deepcopy(entry))
self._jones_matrix = json.dumps(
{"jones_matrix": list_of_entries}
)
jm_found = True
break
if not jm_found:
log_msg = f"Jones matrix for VCC (DISH: {self._dish_id}) not found"
self._logger.error(log_msg)