Source code for ska_mid_cbf_mcs.subarray.subarray_component_manager
# -*- coding: utf-8 -*-
#
# This file is part of the SKA Mid.CBF MCS project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
# Copyright (c) 2019 National Research Council of Canada
"""
CbfSubarray
Sub-element subarray device for Mid.CBF
"""
from __future__ import annotations # allow forward references in type hints
import copy
import json
import logging
import sys
from threading import Lock, Thread
from typing import Any, Callable, Dict, List, Optional, Tuple
# Tango imports
import tango
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import (
AdminMode,
ObsState,
PowerMode,
SimulationMode,
)
from ska_tango_base.csp.subarray.component_manager import (
CspSubarrayComponentManager,
)
from ska_telmodel.schema import validate as telmodel_validate
from tango import AttrQuality
from ska_mid_cbf_mcs.attribute_proxy import CbfAttributeProxy
from ska_mid_cbf_mcs.commons.dish_utils import DISHUtils
from ska_mid_cbf_mcs.commons.global_enum import (
FspModes,
const,
freq_band_dict,
mhz_to_hz,
vcc_oversampling_factor,
)
from ska_mid_cbf_mcs.component.component_manager import (
CbfComponentManager,
CommunicationStatus,
)
from ska_mid_cbf_mcs.component.util import check_communicating
# SKA imports
from ska_mid_cbf_mcs.device_proxy import CbfDeviceProxy
from ska_mid_cbf_mcs.group_proxy import CbfGroupProxy
[docs]class CbfSubarrayComponentManager(
CbfComponentManager, CspSubarrayComponentManager
):
"""A component manager for the CbfSubarray class."""
@property
def config_id(self: CbfSubarrayComponentManager) -> str:
"""Return the configuration ID."""
return self._config_id
@property
def scan_id(self: CbfSubarrayComponentManager) -> int:
"""Return the scan ID."""
return self._scan_id
@property
def subarray_id(self: CbfSubarrayComponentManager) -> int:
"""Return the subarray ID."""
return self._subarray_id
@property
def frequency_band(self: CbfSubarrayComponentManager) -> int:
"""Return the frequency band."""
return self._frequency_band
@property
def dish_ids(self: CbfSubarrayComponentManager) -> List[str]:
"""Return the DISH/receptor ID list."""
return self._dish_ids
@property
def vcc_ids(self: CbfSubarrayComponentManager) -> List[int]:
"""Return the VCC ID list."""
return self._vcc_ids
def __init__(
self: CbfSubarrayComponentManager,
subarray_id: int,
controller: str,
vcc: List[str],
fsp: List[str],
fsp_corr_sub: List[str],
fsp_pss_sub: List[str],
fsp_pst_sub: List[str],
talon_board: List[str],
logger: logging.Logger,
simulation_mode: SimulationMode,
push_change_event_callback: Optional[Callable],
component_resourced_callback: Callable[[bool], None],
component_configured_callback: Callable[[bool], None],
component_scanning_callback: Callable[[bool], None],
communication_status_changed_callback: Callable[
[CommunicationStatus], None
],
component_power_mode_changed_callback: Callable[[PowerMode], None],
component_fault_callback: Callable,
component_obs_fault_callback: Callable,
) -> None:
"""
Initialise a new instance.
:param subarray_id: ID of subarray
:param controller: FQDN of controller device
:param vcc: FQDNs of subordinate VCC devices
:param fsp: FQDNs of subordinate FSP devices
:param fsp_corr_sub: FQDNs of subordinate FSP CORR subarray devices
:param fsp_pss_sub: FQDNs of subordinate FSP PSS-BF subarray devices
:param fsp_pst_sub: FQDNs of subordinate FSP PST-BF devices
:param talon_board: FQDNs of talon board devices
:param logger: a logger for this object to use
:param push_change_event_callback: method to call when the base classes
want to send an event
:param component_resourced_callback: callback to be called when
the component resource status changes
:param component_configured_callback: callback to be called when
the component configuration status changes
:param component_scanning_callback: callback to be called when
the component scanning status changes
:param communication_status_changed_callback: callback to be
called when the status of the communications channel between the
component manager and its component changes
:param component_power_mode_changed_callback: callback to be called when
the component power mode changes
:param component_fault_callback: callback to be called in event of
component fault (for op state model)
:param component_obs_fault_callback: callback to be called in event of
component fault (for obs state model)
"""
self._logger = logger
self._simulation_mode = simulation_mode
self._logger.info("Entering CbfSubarrayComponentManager.__init__)")
self._dish_utils = None
self._component_op_fault_callback = component_fault_callback
self._component_obs_fault_callback = component_obs_fault_callback
self._subarray_id = subarray_id
self._fqdn_controller = controller
self._fqdn_vcc = vcc
self._fqdn_fsp = fsp
self._fqdn_fsp_corr_subarray_device = fsp_corr_sub
self._fqdn_fsp_pss_subarray_device = fsp_pss_sub
self._fqdn_fsp_pst_subarray_device = fsp_pst_sub
self._fqdn_talon_board_device = talon_board
# set to determine if resources are assigned
self._resourced = False
# set to determine if ready to receive subscribed parameters;
# also indicates whether component is currently configured
self._ready = False
self.connected = False
self.obs_faulty = False
# initialize attribute values
self._sys_param_str = ""
self._dish_ids = []
self._vcc_ids = []
self._frequency_band = 0
self._config_id = ""
self._scan_id = 0
# store list of fsp configurations being used for each function mode
self._corr_config = []
self._pss_config = []
self._pst_config = []
# store list of fsp being used for each function mode
self._corr_fsp_list = []
self._pss_fsp_list = []
self._pst_fsp_list = []
self._latest_scan_config = ""
# TODO
# self._output_links_distribution = {"configID": ""}
# self._published_output_links = False
# self._last_received_vis_destination_address = "{}"
self._last_received_delay_model = "{}"
self._last_received_jones_matrix = "{}"
self._last_received_timing_beam_weights = "{}"
self._mutex_delay_model_config = Lock()
self._mutex_jones_matrix_config = Lock()
self._mutex_beam_weights_config = Lock()
# store the subscribed telstate events as event_ID:attribute_proxy key:value pairs
self._events_telstate = {}
# for easy device-reference
self._frequency_band_offset_stream1 = 0
self._frequency_band_offset_stream2 = 0
self._stream_tuning = [0, 0]
# device proxy for easy reference to CBF controller
self._proxy_cbf_controller = None
self._controller_max_capabilities = {}
self._count_vcc = 0
self._count_fsp = 0
# proxies to subordinate devices
self._proxies_vcc = []
self._proxies_assigned_vcc = {}
self._proxies_fsp = []
self._proxies_fsp_corr_subarray_device = []
self._proxies_fsp_pss_subarray_device = []
self._proxies_fsp_pst_subarray_device = []
self._proxies_talon_board_device = []
# group proxies to subordinate devices
# Note: VCC connected both individual and in group
self._group_vcc = None
self._group_fsp = None
self._group_fsp_corr_subarray = None
self._group_fsp_pss_subarray = None
self._group_fsp_pst_subarray = None
self._component_resourced_callback = component_resourced_callback
self._component_configured_callback = component_configured_callback
self._component_scanning_callback = component_scanning_callback
super().__init__(
logger=logger,
push_change_event_callback=push_change_event_callback,
communication_status_changed_callback=communication_status_changed_callback,
component_power_mode_changed_callback=component_power_mode_changed_callback,
component_fault_callback=component_fault_callback,
obs_state_model=None,
)
[docs] def start_communicating(self: CbfSubarrayComponentManager) -> None:
"""Establish communication with the component, then start monitoring."""
self._logger.info(
"Entering CbfSubarrayComponentManager.start_communicating"
)
if self.connected:
self._logger.info("Already connected.")
return
super().start_communicating()
try:
if self._proxy_cbf_controller is None:
self._proxy_cbf_controller = CbfDeviceProxy(
fqdn=self._fqdn_controller, logger=self._logger
)
self._controller_max_capabilities = dict(
pair.split(":")
for pair in self._proxy_cbf_controller.get_property(
"MaxCapabilities"
)["MaxCapabilities"]
)
self._count_vcc = int(self._controller_max_capabilities["VCC"])
self._count_fsp = int(self._controller_max_capabilities["FSP"])
self._fqdn_vcc = self._fqdn_vcc[: self._count_vcc]
self._fqdn_fsp = self._fqdn_fsp[: self._count_fsp]
self._fqdn_fsp_corr_subarray_device = (
self._fqdn_fsp_corr_subarray_device[: self._count_fsp]
)
self._fqdn_fsp_pss_subarray_device = (
self._fqdn_fsp_pss_subarray_device[: self._count_fsp]
)
self._fqdn_fsp_pst_subarray_device = (
self._fqdn_fsp_pst_subarray_device[: self._count_fsp]
)
if len(self._proxies_vcc) == 0:
self._proxies_vcc = [
CbfDeviceProxy(fqdn=fqdn, logger=self._logger)
for fqdn in self._fqdn_vcc
]
if len(self._proxies_fsp) == 0:
self._proxies_fsp = [
CbfDeviceProxy(fqdn=fqdn, logger=self._logger)
for fqdn in self._fqdn_fsp
]
if len(self._proxies_fsp_corr_subarray_device) == 0:
for fqdn in self._fqdn_fsp_corr_subarray_device:
proxy = CbfDeviceProxy(fqdn=fqdn, logger=self._logger)
self._proxies_fsp_corr_subarray_device.append(proxy)
if len(self._proxies_fsp_pss_subarray_device) == 0:
for fqdn in self._fqdn_fsp_pss_subarray_device:
proxy = CbfDeviceProxy(fqdn=fqdn, logger=self._logger)
self._proxies_fsp_pss_subarray_device.append(proxy)
if len(self._proxies_fsp_pst_subarray_device) == 0:
for fqdn in self._fqdn_fsp_pst_subarray_device:
proxy = CbfDeviceProxy(fqdn=fqdn, logger=self._logger)
self._proxies_fsp_pst_subarray_device.append(proxy)
if len(self._proxies_talon_board_device) == 0:
for fqdn in self._fqdn_talon_board_device:
proxy = CbfDeviceProxy(fqdn=fqdn, logger=self._logger)
self._proxies_talon_board_device.append(proxy)
if self._group_vcc is None:
self._group_vcc = CbfGroupProxy(
name="VCC", logger=self._logger
)
if self._group_fsp is None:
self._group_fsp = CbfGroupProxy(
name="FSP", logger=self._logger
)
if self._group_fsp_corr_subarray is None:
self._group_fsp_corr_subarray = CbfGroupProxy(
name="FSP Subarray Corr", logger=self._logger
)
if self._group_fsp_pss_subarray is None:
self._group_fsp_pss_subarray = CbfGroupProxy(
name="FSP Subarray Pss", logger=self._logger
)
if self._group_fsp_pst_subarray is None:
self._group_fsp_pst_subarray = CbfGroupProxy(
name="FSP Subarray Pst", logger=self._logger
)
for proxy in self._proxies_fsp_corr_subarray_device:
proxy.adminMode = AdminMode.ONLINE
for proxy in self._proxies_fsp_pss_subarray_device:
proxy.adminMode = AdminMode.ONLINE
for proxy in self._proxies_fsp_pst_subarray_device:
proxy.adminMode = AdminMode.ONLINE
except tango.DevFailed as dev_failed:
self.update_component_power_mode(PowerMode.UNKNOWN)
self.update_communication_status(
CommunicationStatus.NOT_ESTABLISHED
)
self._component_op_fault_callback(True)
raise ConnectionError("Error in proxy connection.") from dev_failed
self.connected = True
self.update_communication_status(CommunicationStatus.ESTABLISHED)
self.update_component_power_mode(PowerMode.OFF)
self._component_op_fault_callback(False)
[docs] def stop_communicating(self: CbfSubarrayComponentManager) -> None:
"""Stop communication with the component."""
self._logger.info(
"Entering CbfSubarrayComponentManager.stop_communicating"
)
super().stop_communicating()
for proxy in self._proxies_fsp_corr_subarray_device:
proxy.adminMode = AdminMode.OFFLINE
for proxy in self._proxies_fsp_pss_subarray_device:
proxy.adminMode = AdminMode.OFFLINE
for proxy in self._proxies_fsp_pst_subarray_device:
proxy.adminMode = AdminMode.OFFLINE
self.connected = False
self.update_component_power_mode(PowerMode.UNKNOWN)
[docs] @check_communicating
def on(self: CbfSubarrayComponentManager) -> None:
for proxy in self._proxies_fsp_corr_subarray_device:
proxy.On()
for proxy in self._proxies_fsp_pss_subarray_device:
proxy.On()
for proxy in self._proxies_fsp_pst_subarray_device:
proxy.On()
self.update_component_power_mode(PowerMode.ON)
[docs] @check_communicating
def off(self: CbfSubarrayComponentManager) -> None:
for proxy in self._proxies_fsp_corr_subarray_device:
proxy.Off()
for proxy in self._proxies_fsp_pss_subarray_device:
proxy.Off()
for proxy in self._proxies_fsp_pst_subarray_device:
proxy.Off()
self.update_component_power_mode(PowerMode.OFF)
[docs] @check_communicating
def standby(self: CbfSubarrayComponentManager) -> None:
self._logger.warning(
"Operating state Standby invalid for CbfSubarray."
)
[docs] def update_sys_param(
self: CbfSubarrayComponentManager, sys_param_str: str
) -> None:
self._logger.debug(f"Received sys param: {sys_param_str}")
self._sys_param_str = sys_param_str
sys_param = json.loads(sys_param_str)
self._dish_utils = DISHUtils(sys_param)
self._logger.info(
"Updated DISH ID to VCC ID and frequency offset k mapping"
)
@check_communicating
def _doppler_phase_correction_event_callback(
self: CbfSubarrayComponentManager,
fqdn: str,
name: str,
value: Any,
quality: AttrQuality,
) -> None:
"""
Callback for dopplerPhaseCorrection change event subscription.
:param fqdn: attribute FQDN
:param name: attribute name
:param value: attribute value
:param quality: attribute quality
"""
# TODO: investigate error in this callback (subarray logs)
if value is not None:
try:
self._group_vcc.write_attribute(
"dopplerPhaseCorrection", value
)
log_msg = f"Value of {name} is {value}"
self._logger.debug(log_msg)
except Exception as e:
self._logger.error(str(e))
else:
self._logger.warning(f"None value for {fqdn}")
@check_communicating
def _delay_model_event_callback(
self: CbfSubarrayComponentManager,
fqdn: str,
name: str,
value: Any,
quality: AttrQuality,
) -> None:
""" "
Callback for delayModel change event subscription.
:param fqdn: attribute FQDN
:param name: attribute name
:param value: attribute value
:param quality: attribute quality
"""
self._logger.debug("Entering _delay_model_event_callback()")
if value is not None:
if not self._ready:
log_msg = f"Ignoring delay model (obsState not correct). Delay model being passed in is: {value}"
self._logger.warning(log_msg)
return
try:
self._logger.info("Received delay model update.")
if value == self._last_received_delay_model:
log_msg = "Ignoring delay model (identical to previous)."
self._logger.warning(log_msg)
return
self._last_received_delay_model = value
delay_model_json = json.loads(value)
# Validate delay_model_json against the telescope model
self._logger.info(
f"Attempting to validate the following json against the telescope model: {delay_model_json}"
)
try:
telmodel_validate(
version=delay_model_json["interface"],
config=delay_model_json,
strictness=1,
)
self._logger.info("Delay model is valid!")
except ValueError as e:
msg = f"Delay model validation against the telescope model failed with the following exception:\n {str(e)}."
self.raise_update_delay_model_fatal_error(msg)
# pass DISH ID as VCC ID integer to FSPs and VCCs
for delay_detail in delay_model_json["receptor_delays"]:
dish_id = delay_detail["receptor"]
delay_detail[
"receptor"
] = self._dish_utils.dish_id_to_vcc_id[dish_id]
t = Thread(
target=self._update_delay_model,
args=(json.dumps(delay_model_json),),
)
t.start()
except Exception as e:
self._logger.error(str(e))
else:
self._logger.warning(f"None value for {fqdn}")
def _update_delay_model(
self: CbfSubarrayComponentManager, model: str
) -> None:
"""
Update FSP and VCC delay models.
:param destination_type: type of device to send the delay model to
:param epoch: system time of delay model reception
:param model: delay model
"""
# This method is always called on a separate thread
self._logger.debug("CbfSubarray._update_delay_model")
log_msg = f"Updating delay model ...{model}"
self._logger.info(log_msg)
data = tango.DeviceData()
data.insert(tango.DevString, model)
# we lock the mutex, forward the configuration, then immediately unlock it
self._mutex_delay_model_config.acquire()
self._group_vcc.command_inout("UpdateDelayModel", data)
self._group_fsp.command_inout("UpdateDelayModel", data)
self._mutex_delay_model_config.release()
@check_communicating
def _jones_matrix_event_callback(
self: CbfSubarrayComponentManager,
fqdn: str,
name: str,
value: Any,
quality: AttrQuality,
) -> None:
""" "
Callback for jonesMatrix change event subscription.
:param fqdn: attribute FQDN
:param name: attribute name
:param value: attribute value
:param quality: attribute quality
"""
self._logger.debug("CbfSubarray._jones_matrix_event_callback")
if value is not None:
if not self._ready:
log_msg = "Ignoring Jones matrix (obsState not correct)."
self._logger.warning(log_msg)
return
try:
self._logger.info("Received Jones Matrix update.")
if value == self._last_received_jones_matrix:
log_msg = "Ignoring Jones matrix (identical to previous)."
self._logger.warning(log_msg)
return
self._last_received_jones_matrix = value
jones_matrix_all = json.loads(value)
for jones_matrix in jones_matrix_all["jones_matrix"]:
# pass DISH ID as VCC ID integer to FSPs and VCCs
for matrix in jones_matrix["matrix_details"]:
dish_id = matrix["receptor"]
matrix[
"receptor"
] = self._dish_utils.dish_id_to_vcc_id[dish_id]
t = Thread(
target=self._update_jones_matrix,
args=(json.dumps(jones_matrix),),
)
t.start()
except Exception as e:
self._logger.error(str(e))
else:
self._logger.warning(f"None value for {fqdn}")
def _update_jones_matrix(
self: CbfSubarrayComponentManager, matrix: str
) -> None:
"""
Update FSP and VCC Jones matrices.
:param destination_type: type of device to send the delay model to
:param epoch: system time of delay model reception
:param matrix: Jones matrix value
"""
# This method is always called on a separate thread
self._logger.debug("CbfSubarray._update_jones_matrix")
log_msg = f"Updating Jones Matrix {matrix}"
self._logger.info(log_msg)
data = tango.DeviceData()
data.insert(tango.DevString, matrix)
# we lock the mutex, forward the configuration, then immediately unlock it
self._mutex_jones_matrix_config.acquire()
self._group_vcc.command_inout("UpdateJonesMatrix", data)
self._group_fsp.command_inout("UpdateJonesMatrix", data)
self._mutex_jones_matrix_config.release()
@check_communicating
def _timing_beam_weights_event_callback(
self: CbfSubarrayComponentManager,
fqdn: str,
name: str,
value: Any,
quality: AttrQuality,
) -> None:
""" "
Callback for beamWeights change event subscription.
:param fqdn: attribute FQDN
:param name: attribute name
:param value: attribute value
:param quality: attribute quality
"""
self._logger.debug("CbfSubarray._timing_beam_weights_event_callback")
if value is not None:
if not self._ready:
log_msg = (
"Ignoring timing beam weights (obsState not correct)."
)
self._logger.warning(log_msg)
return
try:
self._logger.info("Received timing beam weights update.")
if value == self._last_received_timing_beam_weights:
log_msg = (
"Ignoring timing beam weights (identical to previous)."
)
self._logger.warning(log_msg)
return
self._last_received_timing_beam_weights = value
timing_beam_weights = json.loads(value)
# pass DISH ID as VCC ID integer to FSPs and VCCs
for weights in timing_beam_weights[
"timing_beam_weights_details"
]:
dish_id = weights["receptor"]
weights["receptor"] = self._dish_utils.dish_id_to_vcc_id[
dish_id
]
t = Thread(
target=self._update_timing_beam_weights,
args=(
int(timing_beam_weights["epoch"]),
json.dumps(
timing_beam_weights[
"timing_beam_weights_details"
]
),
),
)
t.start()
except Exception as e:
self._logger.error(str(e))
else:
self._logger.warning(f"None value for {fqdn}")
def _update_timing_beam_weights(
self: CbfSubarrayComponentManager, weights: str
) -> None:
"""
Update FSP beam weights.
:param destination_type: type of device to send the delay model to
:param epoch: system time of delay model reception
:param weights: beam weights value
"""
# This method is always called on a separate thread
self._logger.debug("CbfSubarray._update_timing_beam_weights")
log_msg = f"Updating timing beam weights {weights}"
self._logger.info(log_msg)
data = tango.DeviceData()
data.insert(tango.DevString, weights)
# we lock the mutex, forward the configuration, then immediately unlock it
self._mutex_beam_weights_config.acquire()
self._group_fsp.command_inout("UpdateTimingBeamWeights", data)
self._mutex_beam_weights_config.release()
[docs] def validate_ip(self: CbfSubarrayComponentManager, ip: str) -> bool:
"""
Validate IP address format.
:param ip: IP address to be evaluated
:return: whether or not the IP address format is valid
:rtype: bool
"""
splitip = ip.split(".")
if len(splitip) != 4:
return False
for ipparts in splitip:
if not ipparts.isdigit():
return False
ipval = int(ipparts)
if ipval < 0 or ipval > 255:
return False
return True
[docs] def raise_configure_scan_fatal_error(
self: CbfSubarrayComponentManager, msg: str
) -> Tuple[ResultCode, str]:
"""
Raise fatal error in ConfigureScan execution
:param msg: error message
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self._component_obs_fault_callback(True)
self._logger.error(msg)
tango.Except.throw_exception(
"Command failed",
msg,
"ConfigureScan execution",
tango.ErrSeverity.ERR,
)
[docs] def raise_update_delay_model_fatal_error(
self: CbfSubarrayComponentManager, msg: str
) -> Tuple[ResultCode, str]:
"""
Raise fatal error in UpdateDelayModel execution
:param msg: error message
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self._component_obs_fault_callback(True)
self._logger.error(msg)
tango.Except.throw_exception(
"Command failed",
msg,
"UpdateDelayModel execution",
tango.ErrSeverity.ERR,
)
@check_communicating
def _deconfigure(
self: CbfSubarrayComponentManager,
) -> None:
"""Completely deconfigure the subarray; all initialization performed
by by the ConfigureScan command must be 'undone' here."""
# component_manager._deconfigure is invoked by GoToIdle, ConfigureScan,
# ObsReset and Restart here in the CbfSubarray
if self._ready:
if self._group_fsp.get_size() > 0:
# change FSP subarray membership
data = tango.DeviceData()
data.insert(tango.DevUShort, self._subarray_id)
self._logger.debug(data)
self._group_fsp.command_inout("RemoveSubarrayMembership", data)
self._group_fsp.remove_all()
for group in [
self._group_fsp_corr_subarray,
self._group_fsp_pss_subarray,
self._group_fsp_pst_subarray,
]:
if group.get_size() > 0:
group.remove_all()
try:
# unsubscribe from TMC events
for event_id in list(self._events_telstate.keys()):
self._events_telstate[event_id].remove_event(event_id)
del self._events_telstate[event_id]
except tango.DevFailed:
self._component_obs_fault_callback(True)
# reset all private data to their initialization values:
self._pst_fsp_list = []
self._pss_fsp_list = []
self._corr_fsp_list = []
self._pst_config = []
self._pss_config = []
self._corr_config = []
self._scan_id = 0
self._config_id = ""
self._frequency_band = 0
self._last_received_delay_model = "{}"
self._last_received_jones_matrix = "{}"
self._last_received_timing_beam_weights = "{}"
[docs] def go_to_idle(
self: CbfSubarrayComponentManager,
) -> Tuple[ResultCode, str]:
"""
Send subarray from READY to IDLE.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self._deconfigure()
# issue GoToIdle to assigned VCCs
if self._group_vcc.get_size() > 0:
results = self._group_vcc.command_inout("GoToIdle")
self._logger.info("Results from VCC GoToIdle:")
for res in results:
self._logger.info(res.get_data())
self.update_component_configuration(False)
return (ResultCode.OK, "GoToIdle command completed OK")
[docs] @check_communicating
def validate_input(
self: CbfSubarrayComponentManager, argin: str
) -> Tuple[bool, str]:
"""
Validate scan configuration.
:param argin: The configuration as JSON formatted string.
:return: A tuple containing a boolean indicating if the configuration
is valid and a string message. The message is for information
purpose only.
:rtype: (bool, str)
"""
# try to deserialize input string to a JSON object
try:
full_configuration = json.loads(argin)
common_configuration = copy.deepcopy(full_configuration["common"])
configuration = copy.deepcopy(full_configuration["cbf"])
except json.JSONDecodeError: # argument not a valid JSON object
msg = f"Scan configuration object is not a valid JSON object. Aborting configuration. argin is: {argin}"
return (False, msg)
# Validate dopplerPhaseCorrSubscriptionPoint.
if "doppler_phase_corr_subscription_point" in configuration:
try:
attribute_proxy = CbfAttributeProxy(
fqdn=configuration[
"doppler_phase_corr_subscription_point"
],
logger=self._logger,
)
attribute_proxy.ping()
except (
tango.DevFailed
): # attribute doesn't exist or is not set up correctly
msg = (
f"Attribute {configuration['doppler_phase_corr_subscription_point']}"
" not found or not set up correctly for "
"'dopplerPhaseCorrSubscriptionPoint'. Aborting configuration."
)
return (False, msg)
# Validate delayModelSubscriptionPoint.
if "delay_model_subscription_point" in configuration:
try:
attribute_proxy = CbfAttributeProxy(
fqdn=configuration["delay_model_subscription_point"],
logger=self._logger,
)
attribute_proxy.ping()
except (
tango.DevFailed
): # attribute doesn't exist or is not set up correctly
msg = (
f"Attribute {configuration['delay_model_subscription_point']}"
" not found or not set up correctly for "
"'delayModelSubscriptionPoint'. Aborting configuration."
)
return (False, msg)
# Validate jonesMatrixSubscriptionPoint.
if "jones_matrix_subscription_point" in configuration:
try:
attribute_proxy = CbfAttributeProxy(
fqdn=configuration["jones_matrix_subscription_point"],
logger=self._logger,
)
attribute_proxy.ping()
except (
tango.DevFailed
): # attribute doesn't exist or is not set up correctly
msg = (
f"Attribute {configuration['jones_matrix_subscription_point']}"
" not found or not set up correctly for "
"'jonesMatrixSubscriptionPoint'. Aborting configuration."
)
return (False, msg)
# Validate beamWeightsSubscriptionPoint.
if "timing_beam_weights_subscription_point" in configuration:
try:
attribute_proxy = CbfAttributeProxy(
fqdn=configuration[
"timing_beam_weights_subscription_point"
],
logger=self._logger,
)
attribute_proxy.ping()
except (
tango.DevFailed
): # attribute doesn't exist or is not set up correctly
msg = (
f"Attribute {configuration['timing_beam_weights_subscription_point']}"
" not found or not set up correctly for "
"'beamWeightsSubscriptionPoint'. Aborting configuration."
)
return (False, msg)
for dish_id, proxy in self._proxies_assigned_vcc.items():
if proxy.State() != tango.DevState.ON:
msg = f"VCC {self._proxies_vcc.index(proxy) + 1} is not ON. Aborting configuration."
return (False, msg)
# Validate searchWindow.
if "search_window" in configuration:
# check if searchWindow is an array of maximum length 2
if len(configuration["search_window"]) > 2:
msg = (
"'searchWindow' must be an array of maximum length 2. "
"Aborting configuration."
)
return (False, msg)
for sw in configuration["search_window"]:
if sw["tdc_enable"]:
for receptor in sw["tdc_destination_address"]:
dish = receptor["receptor_id"]
if dish not in self._dish_ids:
msg = (
f"'searchWindow' DISH ID {dish} "
+ "not assigned to subarray. Aborting configuration."
)
return (False, msg)
else:
pass
# Validate fsp.
for fsp in configuration["fsp"]:
try:
# Validate fsp_id.
if int(fsp["fsp_id"]) in list(range(1, self._count_fsp + 1)):
fsp_id = int(fsp["fsp_id"])
fsp_proxy = self._proxies_fsp[fsp_id - 1]
else:
msg = (
f"'fsp_id' must be an integer in the range [1, {self._count_fsp}]."
" Aborting configuration."
)
return (False, msg)
# Validate functionMode.
valid_function_modes = [
"IDLE",
"CORR",
"PSS-BF",
"PST-BF",
"VLBI",
]
try:
function_mode_value = valid_function_modes.index(
fsp["function_mode"]
)
except ValueError:
return (
False,
f"{fsp['function_mode']} is not a valid FSP function mode.",
)
fsp_function_mode = fsp_proxy.functionMode
if fsp_function_mode not in [
FspModes.IDLE.value,
function_mode_value,
]:
msg = f"FSP {fsp_id} currently set to function mode {valid_function_modes.index(fsp_function_mode)}, \
cannot be used for {fsp['function_mode']} \
until it is returned to IDLE."
return (False, msg)
# TODO - why add these keys to the fsp dict - not good practice!
# TODO - create a new dict from a deep copy of the fsp dict.
fsp["frequency_band"] = common_configuration["frequency_band"]
if "frequency_band_offset_stream1" in configuration:
fsp["frequency_band_offset_stream1"] = configuration[
"frequency_band_offset_stream1"
]
if "frequency_band_offset_stream2" in configuration:
fsp["frequency_band_offset_stream2"] = configuration[
"frequency_band_offset_stream2"
]
if fsp["frequency_band"] in ["5a", "5b"]:
fsp["band_5_tuning"] = common_configuration[
"band_5_tuning"
]
# CORR #
if fsp["function_mode"] == "CORR":
# dishes may not be specified in the
# configuration at all, or the list may be empty
if "receptors" in fsp and len(fsp["receptors"]) > 0:
self._logger.debug(
f"List of receptors: {self._dish_ids}"
)
for dish in fsp["receptors"]:
if dish not in self._dish_ids:
msg = (
f"Receptor {dish} does not belong to "
f"subarray {self._subarray_id}."
)
self._logger.error(msg)
return (False, msg)
else:
msg = (
"'receptors' not specified for Fsp CORR config."
"Per ICD all receptors allocated to subarray are used"
)
self._logger.info(msg)
frequencyBand = freq_band_dict()[fsp["frequency_band"]][
"band_index"
]
# Validate frequencySliceID.
# TODO: move these to consts
# See for ex. Fig 8-2 in the Mid.CBF DDD
num_frequency_slices = [4, 5, 7, 12, 26, 26]
if int(fsp["frequency_slice_id"]) in list(
range(1, num_frequency_slices[frequencyBand] + 1)
):
pass
else:
msg = (
"'frequencySliceID' must be an integer in the range "
f"[1, {num_frequency_slices[frequencyBand]}] "
f"for a 'frequencyBand' of {fsp['frequency_band']}."
)
self._logger.error(msg)
return (False, msg)
# Validate zoom_factor.
if int(fsp["zoom_factor"]) in list(range(7)):
pass
else:
msg = "'zoom_factor' must be an integer in the range [0, 6]."
# this is a fatal error
self._logger.error(msg)
return (False, msg)
# Validate zoomWindowTuning.
if (
int(fsp["zoom_factor"]) > 0
): # zoomWindowTuning is required
if "zoom_window_tuning" in fsp:
if fsp["frequency_band"] not in [
"5a",
"5b",
]: # frequency band is not band 5
frequencyBand = [
"1",
"2",
"3",
"4",
"5a",
"5b",
].index(fsp["frequency_band"])
frequency_band_start = [
*map(
lambda j: j[0] * 10**9,
[
const.FREQUENCY_BAND_1_RANGE,
const.FREQUENCY_BAND_2_RANGE,
const.FREQUENCY_BAND_3_RANGE,
const.FREQUENCY_BAND_4_RANGE,
],
)
][frequencyBand] + fsp[
"frequency_band_offset_stream1"
]
frequency_slice_range = (
frequency_band_start
+ (fsp["frequency_slice_id"] - 1)
* const.FREQUENCY_SLICE_BW
* 10**6,
frequency_band_start
+ fsp["frequency_slice_id"]
* const.FREQUENCY_SLICE_BW
* 10**6,
)
if (
frequency_slice_range[0]
<= int(fsp["zoom_window_tuning"]) * 10**3
<= frequency_slice_range[1]
):
pass
else:
msg = "'zoomWindowTuning' must be within observed frequency slice."
self._logger.error(msg)
return (False, msg)
# frequency band 5a or 5b (two streams with bandwidth 2.5 GHz)
else:
if common_configuration["band_5_tuning"] == [
0,
0,
]: # band5Tuning not specified
pass
else:
# TODO: these validations of BW range are done many times
# in many places - use a common function; also may be possible
# to do them only once (ex. for band5Tuning)
frequency_slice_range_1 = (
fsp["band_5_tuning"][0] * 10**9
+ fsp["frequency_band_offset_stream1"]
- const.BAND_5_STREAM_BANDWIDTH
* 10**9
/ 2
+ (fsp["frequency_slice_id"] - 1)
* const.FREQUENCY_SLICE_BW
* 10**6,
fsp["band_5_tuning"][0] * 10**9
+ fsp["frequency_band_offset_stream1"]
- const.BAND_5_STREAM_BANDWIDTH
* 10**9
/ 2
+ fsp["frequency_slice_id"]
* const.FREQUENCY_SLICE_BW
* 10**6,
)
frequency_slice_range_2 = (
fsp["band_5_tuning"][1] * 10**9
+ fsp["frequency_band_offset_stream2"]
- const.BAND_5_STREAM_BANDWIDTH
* 10**9
/ 2
+ (fsp["frequency_slice_id"] - 1)
* const.FREQUENCY_SLICE_BW
* 10**6,
fsp["band_5_tuning"][1] * 10**9
+ fsp["frequency_band_offset_stream2"]
- const.BAND_5_STREAM_BANDWIDTH
* 10**9
/ 2
+ fsp["frequency_slice_id"]
* const.FREQUENCY_SLICE_BW
* 10**6,
)
if (
frequency_slice_range_1[0]
<= int(fsp["zoom_window_tuning"])
* 10**3
<= frequency_slice_range_1[1]
) or (
frequency_slice_range_2[0]
<= int(fsp["zoom_window_tuning"])
* 10**3
<= frequency_slice_range_2[1]
):
pass
else:
msg = "'zoomWindowTuning' must be within observed frequency slice."
self._logger.error(msg)
return (False, msg)
else:
msg = "FSP specified, but 'zoomWindowTuning' not given."
self._logger.error(msg)
return (False, msg)
# Validate integrationTime.
if int(fsp["integration_factor"]) in list(
range(
const.MIN_INT_TIME,
10 * const.MIN_INT_TIME + 1,
const.MIN_INT_TIME,
)
):
pass
else:
msg = (
"'integrationTime' must be an integer in the range"
f" [1, 10] multiplied by {const.MIN_INT_TIME}."
)
self._logger.error(msg)
return (False, msg)
# Validate fspChannelOffset
try:
if "channel_offset" in fsp:
if int(fsp["channel_offset"]) >= 0:
pass
# TODO has to be a multiple of 14880
else:
msg = "fspChannelOffset must be greater than or equal to zero"
self._logger.error(msg)
return (False, msg)
except (TypeError, ValueError):
msg = "fspChannelOffset must be an integer"
self._logger.error(msg)
return (False, msg)
# validate outputlink
# check the format
try:
for element in fsp["output_link_map"]:
(int(element[0]), int(element[1]))
except (TypeError, ValueError, IndexError):
msg = "'outputLinkMap' format not correct."
self._logger.error(msg)
return (False, msg)
# Validate channelAveragingMap.
if "channel_averaging_map" in fsp:
try:
# validate dimensions
for i in range(
0, len(fsp["channel_averaging_map"])
):
assert (
len(fsp["channel_averaging_map"][i]) == 2
)
# validate averaging factor
for i in range(
0, len(fsp["channel_averaging_map"])
):
# validate channel ID of first channel in group
if (
int(fsp["channel_averaging_map"][i][0])
== i
* const.NUM_FINE_CHANNELS
/ const.NUM_CHANNEL_GROUPS
):
pass # the default value is already correct
else:
msg = (
f"'channelAveragingMap'[{i}][0] is not the channel ID of the "
f"first channel in a group (received {fsp['channel_averaging_map'][i][0]})."
)
self._logger.error(msg)
return (False, msg)
# validate averaging factor
if int(fsp["channel_averaging_map"][i][1]) in [
0,
1,
2,
3,
4,
6,
8,
]:
pass
else:
msg = (
f"'channelAveragingMap'[{i}][1] must be one of "
f"[0, 1, 2, 3, 4, 6, 8] (received {fsp['channel_averaging_map'][i][1]})."
)
self._logger.error(msg)
return (False, msg)
except (
TypeError,
AssertionError,
): # dimensions not correct
msg = (
"channel Averaging Map dimensions not correct"
)
self._logger.error(msg)
return (False, msg)
# TODO: validate destination addresses: outputHost, outputPort
# PSS-BF #
# TODO currently only CORR function mode is supported outside of Mid.CBF MCS
if fsp["function_mode"] == "PSS-BF":
if int(fsp["search_window_id"]) in [1, 2]:
pass
else: # searchWindowID not in valid range
msg = (
"'searchWindowID' must be one of [1, 2] "
f"(received {fsp['search_window_id']})."
)
return (False, msg)
if len(fsp["search_beam"]) <= 192:
for searchBeam in fsp["search_beam"]:
if 1 > int(searchBeam["search_beam_id"]) > 1500:
# searchbeamID not in valid range
msg = (
"'searchBeamID' must be within range 1-1500 "
f"(received {searchBeam['search_beam_id']})."
)
return (False, msg)
for (
fsp_pss_subarray_proxy
) in self._proxies_fsp_pss_subarray_device:
searchBeamID = (
fsp_pss_subarray_proxy.searchBeamID
)
fsp_id = fsp_pss_subarray_proxy.get_property(
"FspID"
)["FspID"][0]
if searchBeamID is None:
pass
else:
for search_beam_ID in searchBeamID:
if (
int(searchBeam["search_beam_id"])
!= search_beam_ID
):
pass
elif (
fsp_pss_subarray_proxy.obsState
== ObsState.IDLE
):
pass
else:
msg = (
f"'searchBeamID' {search_beam_ID} is already "
f"being used in another subarray by FSP {fsp_id}"
)
return (False, msg)
# Validate dishes
# if not given, assign first DISH ID in subarray, as
# there is currently only support for 1 DISH per beam
if "receptor_ids" not in searchBeam:
searchBeam["receptor_ids"] = [
self._dish_ids.copy()[0]
]
# Sanity check:
for dish in searchBeam["receptor_ids"]:
if dish not in self._dish_ids:
msg = (
f"Receptor {dish} does not belong to "
f"subarray {self._subarray_id}."
)
self._logger.error(msg)
return (False, msg)
if (
searchBeam["enable_output"] is False
or searchBeam["enable_output"] is True
):
pass
else:
msg = "'outputEnabled' is not a valid boolean"
return (False, msg)
if isinstance(
searchBeam["averaging_interval"], int
):
pass
else:
msg = "'averagingInterval' is not a valid integer"
return (False, msg)
if self.validate_ip(
searchBeam["search_beam_destination_address"]
):
pass
else:
msg = "'searchBeamDestinationAddress' is not a valid IP address"
return (False, msg)
else:
msg = "More than 192 SearchBeams defined in PSS-BF config"
return (False, msg)
# PST-BF #
# TODO currently only CORR function mode is supported outside of Mid.CBF MCS
if fsp["function_mode"] == "PST-BF":
if len(fsp["timing_beam"]) <= 16:
for timingBeam in fsp["timing_beam"]:
if 1 > int(timingBeam["timing_beam_id"]) > 16:
# timingBeamID not in valid range
msg = (
"'timingBeamID' must be within range 1-16 "
f"(received {timingBeam['timing_beam_id']})."
)
return (False, msg)
for (
fsp_pst_subarray_proxy
) in self._proxies_fsp_pst_subarray_device:
timingBeamID = (
fsp_pst_subarray_proxy.timingBeamID
)
fsp_id = fsp_pst_subarray_proxy.get_property(
"FspID"
)["FspID"][0]
if timingBeamID is None:
pass
else:
for timing_beam_ID in timingBeamID:
if (
int(timingBeam["timing_beam_id"])
!= timing_beam_ID
):
pass
elif (
fsp_pst_subarray_proxy.obsState
== ObsState.IDLE
):
pass
else:
msg = (
f"'timingBeamID' {timing_beam_ID} is already "
f"being used in another subarray by FSP {fsp_id}"
)
return (False, msg)
# Validate dishes
# if not given, assign all DISH IDs belonging to subarray
if "receptor_ids" not in timingBeam:
timingBeam[
"receptor_ids"
] = self._dish_ids.copy()
for dish in timingBeam["receptor_ids"]:
if dish not in self._dish_ids:
msg = (
f"Receptor {dish} does not belong to "
f"subarray {self._subarray_id}."
)
self._logger.error(msg)
return (False, msg)
if (
timingBeam["enable_output"] is False
or timingBeam["enable_output"] is True
):
pass
else:
msg = "'outputEnabled' is not a valid boolean"
return (False, msg)
if self.validate_ip(
timingBeam["timing_beam_destination_address"]
):
pass
else:
msg = "'timingBeamDestinationAddress' is not a valid IP address"
return (False, msg)
else:
msg = (
"More than 16 TimingBeams defined in PST-BF config"
)
return (False, msg)
except tango.DevFailed: # exception in ConfigureScan
msg = (
"An exception occurred while configuring FSPs:"
f"\n{sys.exc_info()[1].args[0].desc}\n"
"Aborting configuration"
)
return (False, msg)
# At this point, everything has been validated.
return (True, "Scan configuration is valid.")
[docs] @check_communicating
def configure_scan(
self: CbfSubarrayComponentManager, argin: str
) -> Tuple[ResultCode, str]:
"""
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
# deconfigure to reset assigned FSPs and unsubscribe to events.
self._deconfigure()
full_configuration = json.loads(argin)
common_configuration = copy.deepcopy(full_configuration["common"])
configuration = copy.deepcopy(full_configuration["cbf"])
# Configure configID.
self._config_id = str(common_configuration["config_id"])
self._logger.debug(f"config_id: {self._config_id}")
# Configure frequencyBand.
frequency_bands = ["1", "2", "3", "4", "5a", "5b"]
self._frequency_band = frequency_bands.index(
common_configuration["frequency_band"]
)
self._logger.debug(f"frequency_band: {self._frequency_band}")
# Prepare args for ConfigureBand
for dish_id in self._dish_ids:
if dish_id in self._dish_utils.dish_id_to_vcc_id.keys():
# Fetch K-value based on dish_id
vccID = self._dish_utils.dish_id_to_vcc_id[dish_id]
vccProxy = self._proxies_vcc[vccID - 1]
freq_offset_k = self._dish_utils.dish_id_to_k[dish_id]
# Calculate dish sample rate
dish_sample_rate = self._calculate_dish_sample_rate(
freq_band_dict()[common_configuration["frequency_band"]],
freq_offset_k,
)
# Fetch samples per frame for this freq band
samples_per_frame = freq_band_dict()[
common_configuration["frequency_band"]
]["num_samples_per_frame"]
args = {
"frequency_band": common_configuration["frequency_band"],
"dish_sample_rate": int(dish_sample_rate),
"samples_per_frame": int(samples_per_frame),
}
data = tango.DeviceData()
data.insert(tango.DevString, json.dumps(args))
vccProxy.command_inout("ConfigureBand", data)
else:
return (
ResultCode.FAILED,
f"Invalid receptor {dish_id}. ConfigureScan command failed.",
)
# Configure band5Tuning, if frequencyBand is 5a or 5b.
if self._frequency_band in [4, 5]:
stream_tuning = [
*map(float, common_configuration["band_5_tuning"])
]
self._stream_tuning = stream_tuning
# Configure frequencyBandOffsetStream1.
if "frequency_band_offset_stream1" in configuration:
self._frequency_band_offset_stream1 = int(
configuration["frequency_band_offset_stream1"]
)
else:
self._frequency_band_offset_stream1 = 0
log_msg = (
"'frequencyBandOffsetStream1' not specified. Defaulting to 0."
)
self._logger.warning(log_msg)
# If not given, use a default value.
# If malformed, use a default value, but append an error.
if "frequency_band_offset_stream2" in configuration:
self._frequency_band_offset_stream2 = int(
configuration["frequency_band_offset_stream2"]
)
else:
self._frequency_band_offset_stream2 = 0
log_msg = (
"'frequencyBandOffsetStream2' not specified. Defaulting to 0."
)
self._logger.warn(log_msg)
config_dict = {
"config_id": self._config_id,
"frequency_band": common_configuration["frequency_band"],
"band_5_tuning": self._stream_tuning,
"frequency_band_offset_stream1": self._frequency_band_offset_stream1,
"frequency_band_offset_stream2": self._frequency_band_offset_stream2,
"rfi_flagging_mask": configuration["rfi_flagging_mask"],
}
# Add subset of FSP configuration to the VCC configure scan argument
# TODO determine necessary parameters to send to VCC for each function mode
# TODO VLBI
reduced_fsp = []
for fsp in configuration["fsp"]:
function_mode = fsp["function_mode"]
fsp_cfg = {"fsp_id": fsp["fsp_id"], "function_mode": function_mode}
if function_mode == "CORR":
fsp_cfg["frequency_slice_id"] = fsp["frequency_slice_id"]
elif function_mode == "PSS-BF":
fsp_cfg["search_window_id"] = fsp["search_window_id"]
reduced_fsp.append(fsp_cfg)
config_dict["fsp"] = reduced_fsp
json_str = json.dumps(config_dict)
data = tango.DeviceData()
data.insert(tango.DevString, json_str)
self._group_vcc.command_inout("ConfigureScan", data)
# Configure dopplerPhaseCorrSubscriptionPoint.
if "doppler_phase_corr_subscription_point" in configuration:
attribute_proxy = CbfAttributeProxy(
fqdn=configuration["doppler_phase_corr_subscription_point"],
logger=self._logger,
)
event_id = attribute_proxy.add_change_event_callback(
self._doppler_phase_correction_event_callback
)
self._logger.info(
f"Subscribing to doppler phase correction event of id: {event_id}"
)
self._events_telstate[event_id] = attribute_proxy
# Configure delayModelSubscriptionPoint.
if "delay_model_subscription_point" in configuration:
self._last_received_delay_model = "{}"
attribute_proxy = CbfAttributeProxy(
fqdn=configuration["delay_model_subscription_point"],
logger=self._logger,
)
event_id = attribute_proxy.add_change_event_callback(
self._delay_model_event_callback
)
self._logger.info(
f"Subscribing to delay model event of id: {event_id}"
)
self._events_telstate[event_id] = attribute_proxy
# Configure jonesMatrixSubscriptionPoint
if "jones_matrix_subscription_point" in configuration:
self._last_received_jones_matrix = "{}"
attribute_proxy = CbfAttributeProxy(
fqdn=configuration["jones_matrix_subscription_point"],
logger=self._logger,
)
event_id = attribute_proxy.add_change_event_callback(
self._jones_matrix_event_callback
)
self._logger.info(
f"Subscribing to jones matrix event of id: {event_id}"
)
self._events_telstate[event_id] = attribute_proxy
# Configure beamWeightsSubscriptionPoint
if "timing_beam_weights_subscription_point" in configuration:
self._last_received_timing_beam_weights = "{}"
attribute_proxy = CbfAttributeProxy(
fqdn=configuration["timing_beam_weights_subscription_point"],
logger=self._logger,
)
event_id = attribute_proxy.add_change_event_callback(
self._timing_beam_weights_event_callback
)
self._logger.info(
f"Subscribing to timing beam weights event of id: {event_id}"
)
self._events_telstate[event_id] = attribute_proxy
# Configure searchWindow.
if "search_window" in configuration:
for search_window in configuration["search_window"]:
search_window["frequency_band"] = common_configuration[
"frequency_band"
]
search_window[
"frequency_band_offset_stream1"
] = self._frequency_band_offset_stream1
search_window[
"frequency_band_offset_stream2"
] = self._frequency_band_offset_stream2
if search_window["frequency_band"] in ["5a", "5b"]:
search_window["band_5_tuning"] = common_configuration[
"band_5_tuning"
]
# pass DISH ID as VCC ID integer to VCCs
if search_window["tdc_enable"]:
for tdc_dest in search_window["tdc_destination_address"]:
tdc_dest[
"receptor_id"
] = self._dish_utils.dish_id_to_vcc_id[
tdc_dest["receptor_id"]
]
# pass on configuration to VCC
data = tango.DeviceData()
data.insert(tango.DevString, json.dumps(search_window))
self._logger.debug(
f"configuring search window: {json.dumps(search_window)}"
)
self._group_vcc.command_inout("ConfigureSearchWindow", data)
else:
log_msg = "'searchWindow' not given."
self._logger.warning(log_msg)
# TODO: the entire vcc configuration should move to Vcc
# for now, run ConfigScan only wih the following data, so that
# the obsState are properly (implicitly) updated by the command
# (And not manually by SetObservingState as before) - relevant???
# FSP ##################################################################
# Configure FSP.
# TODO add VLBI once implemented
for fsp in configuration["fsp"]:
# Configure fsp_id.
fsp_id = int(fsp["fsp_id"])
fsp_proxy = self._proxies_fsp[fsp_id - 1]
fsp_corr_proxy = self._proxies_fsp_corr_subarray_device[fsp_id - 1]
fsp_pss_proxy = self._proxies_fsp_pss_subarray_device[fsp_id - 1]
fsp_pst_proxy = self._proxies_fsp_pst_subarray_device[fsp_id - 1]
self._group_fsp.add(self._fqdn_fsp[fsp_id - 1])
# Set simulation mode of FSPs to subarray sim mode
self._logger.info(
f"Setting Simulation Mode of FSP {fsp_id} proxies to: {self._simulation_mode} and turning them on."
)
for proxy in [
fsp_proxy,
fsp_corr_proxy,
fsp_pss_proxy,
fsp_pst_proxy,
]:
proxy.write_attribute("adminMode", AdminMode.OFFLINE)
proxy.write_attribute("simulationMode", self._simulation_mode)
proxy.write_attribute("adminMode", AdminMode.ONLINE)
proxy.command_inout("On")
# Configure functionMode if IDLE
if fsp_proxy.functionMode == FspModes.IDLE.value:
fsp_proxy.SetFunctionMode(fsp["function_mode"])
# change FSP subarray membership
fsp_proxy.AddSubarrayMembership(self._subarray_id)
# Add configID, frequency_band, band_5_tuning, and sub_id to fsp. They are not included in the "FSP" portion in configScan JSON
fsp["config_id"] = common_configuration["config_id"]
fsp["sub_id"] = common_configuration["subarray_id"]
fsp["frequency_band"] = common_configuration["frequency_band"]
if fsp["frequency_band"] in ["5a", "5b"]:
fsp["band_5_tuning"] = common_configuration["band_5_tuning"]
if "frequency_band_offset_stream1" in configuration:
fsp[
"frequency_band_offset_stream1"
] = self._frequency_band_offset_stream1
if "frequency_band_offset_stream2" in configuration:
fsp[
"frequency_band_offset_stream2"
] = self._frequency_band_offset_stream2
# Add channel_offset if it was omitted from the configuration (it is optional).
if "channel_offset" not in fsp:
self._logger.warning(
"channel_offset not defined in configuration. Assigning default of 1."
)
fsp["channel_offset"] = 1
# Add all DISH IDs for subarray and for correlation to fsp
# Parameter named "subarray_vcc_ids" used by HPS contains all the
# VCCs assigned to the subarray
# Parameter named "corr_vcc_ids" used by HPS contains the
# subset of the subarray VCCs for which the correlation results
# are requested to be used in Mid.CBF output products (visibilities)
fsp["subarray_vcc_ids"] = []
for dish in self._dish_ids:
fsp["subarray_vcc_ids"].append(
self._dish_utils.dish_id_to_vcc_id[dish]
)
# Add the fs_sample_rate for all dishes
fsp["fs_sample_rates"] = self._calculate_fs_sample_rates(
common_configuration["frequency_band"]
)
match fsp["function_mode"]:
case "CORR":
# dishes may not be specified in the
# configuration at all, or the list may be empty
fsp["corr_vcc_ids"] = []
if "receptors" not in fsp or len(fsp["receptors"]) == 0:
# In this case by the ICD, all subarray allocated resources should be used.
fsp["corr_vcc_ids"] = fsp["subarray_vcc_ids"].copy()
else:
for dish in fsp["receptors"]:
fsp["corr_vcc_ids"].append(
self._dish_utils.dish_id_to_vcc_id[dish]
)
self._corr_config.append(fsp)
self._corr_fsp_list.append(fsp["fsp_id"])
self._group_fsp_corr_subarray.add(
self._fqdn_fsp_corr_subarray_device[fsp_id - 1]
)
# TODO: PSS, PST below may fall out of date; currently only CORR function mode is supported outside of Mid.CBF MCS
case "PSS-BF":
for searchBeam in fsp["search_beam"]:
search_beam_vcc_ids = []
for dish in searchBeam["receptor_ids"]:
search_beam_vcc_ids.append(
self._dish_utils.dish_id_to_vcc_id[dish]
)
searchBeam["receptor_ids"] = search_beam_vcc_ids
self._pss_config.append(fsp)
self._pss_fsp_list.append(fsp["fsp_id"])
self._group_fsp_pss_subarray.add(
self._fqdn_fsp_pss_subarray_device[fsp_id - 1]
)
case "PST-BF":
for timingBeam in fsp["timing_beam"]:
timing_beam_vcc_ids = []
for dish in timingBeam["receptor_ids"]:
timing_beam_vcc_ids.append(
self._dish_utils.dish_id_to_vcc_id[dish]
)
timingBeam["receptor_ids"] = timing_beam_vcc_ids
self._pst_config.append(fsp)
self._pst_fsp_list.append(fsp["fsp_id"])
self._group_fsp_pst_subarray.add(
self._fqdn_fsp_pst_subarray_device[fsp_id - 1]
)
# Call ConfigureScan for all FSP Subarray devices (CORR/PSS/PST)
# NOTE:_corr_config is a list of fsp config JSON objects, each
# augmented by a number of vcc-fsp common parameters
if len(self._corr_config) != 0:
for this_fsp in self._corr_config:
try:
this_proxy = self._proxies_fsp_corr_subarray_device[
int(this_fsp["fsp_id"]) - 1
]
this_proxy.set_timeout_millis(12000)
this_proxy.ConfigureScan(json.dumps(this_fsp))
self._logger.info(
f"cbf_subarray this_fsp: {json.dumps(this_fsp)}"
)
except tango.DevFailed:
msg = (
"An exception occurred while configuring "
"FspCorrSubarray; Aborting configuration"
)
self.raise_configure_scan_fatal_error(msg)
# NOTE: _pss_config is constructed similarly to _corr_config
if len(self._pss_config) != 0:
for this_fsp in self._pss_config:
try:
this_proxy = self._proxies_fsp_pss_subarray_device[
int(this_fsp["fsp_id"]) - 1
]
this_proxy.set_timeout_millis(12000)
this_proxy.ConfigureScan(json.dumps(this_fsp))
except tango.DevFailed:
msg = (
"An exception occurred while configuring "
"FspPssSubarray; Aborting configuration"
)
self.raise_configure_scan_fatal_error(msg)
# NOTE: _pst_config is constructed similarly to _corr_config
if len(self._pst_config) != 0:
for this_fsp in self._pst_config:
try:
this_proxy = self._proxies_fsp_pst_subarray_device[
int(this_fsp["fsp_id"]) - 1
]
this_proxy.set_timeout_millis(12000)
this_proxy.ConfigureScan(json.dumps(this_fsp))
except tango.DevFailed:
msg = (
"An exception occurred while configuring "
"FspPstSubarray; Aborting configuration"
)
self.raise_configure_scan_fatal_error(msg)
# save configuration into latestScanConfig
self._latest_scan_config = str(configuration)
self.update_component_configuration(True)
return (ResultCode.OK, "ConfigureScan command completed OK")
[docs] @check_communicating
def release_vcc(
self: CbfSubarrayComponentManager, argin: List[str]
) -> Tuple[ResultCode, str]:
"""
Remove receptor/dish from subarray.
:param argin: The list of receptor/DISH IDs to be released
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self._logger.debug(f"Current receptors: {*self._dish_ids,}")
for dish_id in argin:
self._logger.debug(f"Attempting to remove {dish_id}")
if dish_id in self._dish_ids:
if dish_id in self._dish_utils.dish_id_to_vcc_id.keys():
vccID = self._dish_utils.dish_id_to_vcc_id[dish_id]
else:
return (
ResultCode.FAILED,
f"Invalid receptor {dish_id}. RemoveReceptors command failed.",
)
vccFQDN = self._fqdn_vcc[vccID - 1]
vccProxy = self._proxies_vcc[vccID - 1]
self._dish_ids.remove(dish_id)
self._vcc_ids.remove(vccID)
self._group_vcc.remove(vccFQDN)
del self._proxies_assigned_vcc[dish_id]
try:
# reset subarrayMembership Vcc attribute:
vccProxy.subarrayMembership = 0
self._logger.debug(
f"VCC {vccID} subarray_id: "
+ f"{vccProxy.subarrayMembership}"
)
except tango.DevFailed as df:
msg = str(df.args[0].desc)
self._component_obs_fault_callback(True)
return (ResultCode.FAILED, msg)
# clear the subarray ID off the talon board with the matching
# DISH ID
self._assign_talon_board_subarray_id(
dish_id=dish_id, assign=False
)
else:
msg = f"Receptor {dish_id} not found. Skipping."
self._logger.warning(msg)
if len(self._dish_ids) == 0:
self.update_component_resources(False)
self._logger.info("No receptors remaining.")
else:
self._logger.info(f"Receptors remaining: {*self._dish_ids,}")
return (ResultCode.OK, "RemoveReceptors completed OK")
[docs] @check_communicating
def release_all_vcc(
self: CbfSubarrayComponentManager,
) -> Tuple[ResultCode, str]:
"""
Remove all receptors/dishes from subarray.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
if len(self._dish_ids) > 0:
(rc, msg) = self.release_vcc(self._dish_ids.copy())
return (rc, msg.replace("RemoveReceptors", "RemoveAllReceptors"))
return (
ResultCode.FAILED,
"RemoveAllReceptors failed; no receptors currently assigned",
)
[docs] @check_communicating
def assign_vcc(
self: CbfSubarrayComponentManager, argin: List[str]
) -> Tuple[ResultCode, str]:
"""
Add receptors/dishes to subarray.
:param argin: The list of receptor/DISH IDs to be assigned
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
self._logger.debug(f"Current receptors: {*self._dish_ids,}")
for dish_id in argin:
self._logger.debug(f"Attempting to add receptor {dish_id}")
self._logger.debug(
f"Receptor to vcc keys: {self._dish_utils.dish_id_to_vcc_id.keys()}"
)
if dish_id in self._dish_utils.dish_id_to_vcc_id.keys():
vccID = self._dish_utils.dish_id_to_vcc_id[dish_id]
else:
return (
ResultCode.FAILED,
f"Invalid receptor {dish_id}. AddReceptors command failed.",
)
if vccID > const.MAX_VCC:
return (
ResultCode.FAILED,
f"VCC ID {vccID} is not supported. AddReceptors command failed.",
)
vccProxy = self._proxies_vcc[vccID - 1]
self._logger.debug(
f"dish_id = {dish_id}, vccProxy.dishID = "
+ f"{vccProxy.dishID}"
)
vccSubarrayID = vccProxy.subarrayMembership
self._logger.debug(f"VCC {vccID} subarray_id: {vccSubarrayID}")
# only add dish if it does not already belong to a
# different subarray
if vccSubarrayID not in [0, self._subarray_id]:
msg = (
f"{dish_id} already in use by "
+ f"subarray {vccSubarrayID}. Skipping."
)
self._logger.warning(msg)
elif dish_id not in self._dish_ids:
try:
# Setting simulation mode of VCC proxies based on simulation mode of subarray
self._logger.info(
f"Writing VCC simulation mode to: {self._simulation_mode}"
)
vccProxy.write_attribute("adminMode", AdminMode.OFFLINE)
vccProxy.write_attribute(
"simulationMode", self._simulation_mode
)
vccProxy.write_attribute("adminMode", AdminMode.ONLINE)
vccProxy.command_inout("On")
# update resourced state once first dish is added
if len(self._dish_ids) == 0:
self.update_component_resources(True)
self._dish_ids.append(dish_id)
self._vcc_ids.append(vccID)
self._group_vcc.add(self._fqdn_vcc[vccID - 1])
self._proxies_assigned_vcc[dish_id] = vccProxy
# change subarray membership of vcc
vccProxy.subarrayMembership = self._subarray_id
self._logger.debug(
f"VCC {vccID} subarray_id: "
+ f"{vccProxy.subarrayMembership}"
)
# assign the subarray ID to the talon board with the matching
# DISH ID
self._assign_talon_board_subarray_id(
dish_id=dish_id, assign=True
)
except tango.DevFailed as df:
msg = str(df.args[0].desc)
self._component_obs_fault_callback(True)
return (ResultCode.FAILED, msg)
else:
msg = f"{dish_id} already assigned to " + "subarray. Skipping."
self._logger.warning(msg)
self._logger.info(f"Receptors after adding: {*self._dish_ids,}")
return (ResultCode.OK, "AddReceptors completed OK")
[docs] @check_communicating
def scan(
self: CbfSubarrayComponentManager, argin: Dict[Any]
) -> Tuple[ResultCode, str]:
"""
Start subarray Scan operation.
:param argin: The scan ID as JSON formatted string.
:type argin: str
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
# Validate scan_json against the telescope model
try:
telmodel_validate(
version=argin["interface"], config=argin, strictness=1
)
self._logger.info("Scan is valid!")
except ValueError as e:
msg = f"Scan validation against ska-telmodel schema failed with exception:\n {str(e)}"
return (False, msg)
scan_id = argin["scan_id"]
data = tango.DeviceData()
data.insert(tango.DevShort, scan_id)
for group in [
self._group_vcc,
self._group_fsp_corr_subarray,
self._group_fsp_pss_subarray,
self._group_fsp_pst_subarray,
]:
if group.get_size() > 0:
results = group.command_inout("Scan", data)
self._logger.info("Results from Scan:")
for res in results:
self._logger.info(res.get_data())
self._scan_id = scan_id
self._component_scanning_callback(True)
return (ResultCode.STARTED, "Scan command successful")
[docs] @check_communicating
def end_scan(self: CbfSubarrayComponentManager) -> Tuple[ResultCode, str]:
"""
End subarray Scan operation.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
# EndScan for all subordinate devices:
for group in [
self._group_vcc,
self._group_fsp_corr_subarray,
self._group_fsp_pss_subarray,
self._group_fsp_pst_subarray,
]:
if group.get_size() > 0:
results = group.command_inout("EndScan")
self._logger.info("Results from EndScan:")
for res in results:
self._logger.info(res.get_data())
self._scan_id = 0
self._component_scanning_callback(False)
return (ResultCode.OK, "EndScan command completed OK")
[docs] @check_communicating
def abort(self: CbfSubarrayComponentManager) -> None:
"""
Abort subarray configuration or operation.
"""
# reset ready flag
self._ready = False
for group in [
self._group_vcc,
self._group_fsp_corr_subarray, # TODO CIP-1850 Abort/ObsReset per FSP subarray
self._group_fsp_pss_subarray,
self._group_fsp_pst_subarray,
]:
if group.get_size() > 0:
results = group.command_inout("Abort")
self._logger.info("Results from Abort:")
for res in results:
self._logger.info(res.get_data())
[docs] @check_communicating
def obsreset(self: CbfSubarrayComponentManager) -> None:
"""
Reset to IDLE from abort/fault.
"""
# if subarray is in FAULT, we must first abort VCC and FSP operation
# this will allow us to call ObsReset on them even if they are not in FAULT
if self.obs_faulty:
self.abort()
# use callback to reset FAULT state
self._component_obs_fault_callback(False)
try:
# send Vcc devices to IDLE
if self._group_vcc.get_size() > 0:
self._group_vcc.command_inout("ObsReset")
# send any previously assigned FSPs to IDLE
for group in [
self._group_fsp_corr_subarray,
self._group_fsp_pss_subarray,
self._group_fsp_pst_subarray,
]:
# TODO CIP-1850 Abort/ObsReset per FSP subarray
if group.get_size() > 0:
results = group.command_inout("ObsReset")
self._logger.info("Results from ObsReset:")
for res in results:
self._logger.info(res.get_data())
except tango.DevFailed:
self._component_obs_fault_callback(True)
# We might have interrupted a long-running command such as a Configure
# or a Scan, so we need to clean up from that.
self._deconfigure()
[docs] @check_communicating
def restart(self: CbfSubarrayComponentManager) -> None:
"""
Restart to EMPTY from abort/fault.
"""
# leverage obsreset to send assigned resources to IDLE and deconfigure
self.obsreset()
# remove all assigned VCCs to return to EMPTY
self.release_all_vcc()
[docs] def update_component_resources(
self: CbfSubarrayComponentManager, resourced: bool
) -> None:
"""
Update the component resource status, calling callbacks as required.
:param resourced: whether the component is resourced.
"""
self._logger.debug(f"update_component_resources({resourced})")
if resourced:
# perform "component_resourced" if not previously resourced
if not self._resourced:
self._component_resourced_callback(True)
elif self._resourced:
self._component_resourced_callback(False)
self._resourced = resourced
[docs] def update_component_configuration(
self: CbfSubarrayComponentManager, configured: bool
) -> None:
"""
Update the component configuration status, calling callbacks as required.
:param configured: whether the component is configured.
"""
self._logger.debug(
f"update_component_configuration({configured}); configured == {configured}, self._ready == {self._ready}"
)
# perform component_configured/unconfigured callback if in a VALID case
# Cases:
# configured == False and self._ready == False -> INVALID: cannot issue component_unconfigured from IDLE
# configured == True and self._ready == False -> VALID: can issue component_configured from IDLE
# configured == False and self._ready == True -> VALID: can issue component_unconfigured from READY
# configured == True and self._ready == True -> INVALID: cannot issue component_configured from READY
if configured and not self._ready:
self._component_configured_callback(True)
self._ready = True
elif not configured and self._ready:
self._component_configured_callback(False)
self._ready = False
def _calculate_fs_sample_rate(
self: CbfSubarrayComponentManager, freq_band: str, dish: str
) -> Dict:
log_msg = (
f"Calculate fs_sample_rate for freq_band:{freq_band} and {dish}"
)
self._logger.info(log_msg)
# convert the DISH ID to a VCC ID integer using DISHUtils
vcc_id = self._dish_utils.dish_id_to_vcc_id[dish]
# find the k value for this DISH
freq_offset_k = self._dish_utils.dish_id_to_k[dish]
freq_band_info = freq_band_dict()[freq_band]
total_num_fs = freq_band_info["total_num_FSs"]
dish_sample_rate = self._calculate_dish_sample_rate(
freq_band_info, freq_offset_k
)
log_msg = f"dish_sample_rate: {dish_sample_rate}"
self._logger.debug(log_msg)
fs_sample_rate = int(
dish_sample_rate * vcc_oversampling_factor / total_num_fs
)
fs_sample_rate_for_band = {
"vcc_id": vcc_id,
"fs_sample_rate": fs_sample_rate,
}
log_msg = f"fs_sample_rate_for_band: {fs_sample_rate_for_band}"
self._logger.info(log_msg)
return fs_sample_rate_for_band
def _calculate_fs_sample_rates(
self: CbfSubarrayComponentManager, freq_band: str
) -> List[Dict]:
output_sample_rates = []
for dish in self._dish_ids:
output_sample_rates.append(
self._calculate_fs_sample_rate(freq_band, dish)
)
return output_sample_rates
def _calculate_dish_sample_rate(
self: CbfSubarrayComponentManager, freq_band_info, freq_offset_k
):
base_dish_sample_rate_MH = freq_band_info["base_dish_sample_rate_MHz"]
sample_rate_const = freq_band_info["sample_rate_const"]
return (base_dish_sample_rate_MH * mhz_to_hz) + (
sample_rate_const * freq_offset_k * const.DELTA_F
)
def _assign_talon_board_subarray_id(
self: CbfSubarrayComponentManager, dish_id: str, assign: bool
) -> None:
"""
Assign subarray ID to the talon board with the dish_id
:param dish_id: the DISH ID
:param assign: true to assign the subarray id, false to remove
"""
for proxy in self._proxies_talon_board_device:
board_dish_id = proxy.read_attribute("dishID").value
if board_dish_id == dish_id:
if assign:
proxy.write_attribute("subarrayID", str(self._subarray_id))
else:
proxy.write_attribute("subarrayID", "")
return
return