# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements component management for stations."""
# pylint: disable=too-many-lines
from __future__ import annotations
import functools
import json
import logging
import threading
import time
from typing import Any, Callable, Optional, Sequence
import numpy as np
import tango
from astropy.coordinates import Angle
from astropy.time.core import Time, TimeDelta
from ska_control_model import CommunicationStatus, PowerState, ResultCode, TaskStatus
from ska_low_mccs_common.component import DeviceComponentManager
from ska_low_mccs_common.utils import threadsafe
from ska_tango_base.base import check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager
from ska_low_mccs.station.point_station import Pointing, StationInformation
__all__ = ["StationComponentManager"]
MAX_NUMBER_OF_CHANNELS = 384
NUMBER_OF_ANTENNAS = 256
class _FieldStationProxy(DeviceComponentManager):
"""A proxy to a FieldStation device, for a station to use."""
def __init__(
self: _FieldStationProxy,
trl: str,
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
outside_temperature_changed_callback: Callable[[float], None],
) -> None:
super().__init__(
trl,
logger,
communication_state_callback,
component_state_callback,
)
self._outside_temperature_changed_callback = (
outside_temperature_changed_callback
)
@check_communicating
def configure(self: _FieldStationProxy, config: str) -> None:
"""
Configure the device proxy.
:param config: json string of configuration.
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
self._proxy._device.Configure(config)
@property
def outside_temperature(self: _FieldStationProxy) -> int:
"""
Return outsideTemperature.
:return: outsideTemperature
"""
assert self._proxy is not None # for the type checker
return self._proxy.outsideTemperature
def subscribe_to_attributes(self: _FieldStationProxy) -> None:
"""Subscribe to change events in field station attributes of interest."""
assert self._proxy is not None
if (
"outsideTemperature"
not in self._proxy._change_event_subscription_ids.keys()
):
self._proxy.add_change_event_callback(
"outsideTemperature", self._outside_temperature_changed
)
def _outside_temperature_changed(
self: _FieldStationProxy,
event_name: str,
event_value: float,
event_quality: tango.AttrQuality,
) -> None:
assert (
event_name.lower() == "outsidetemperature"
), f"outsideTemperature changed callback called but event_name is {event_name}."
self._outside_temperature_changed_callback(event_value)
class _AntennaProxy(DeviceComponentManager):
"""A proxy to a antenna device, for a station to use."""
def __init__(
self: _AntennaProxy,
trl: str,
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
) -> None:
self._power_state_lock = threading.RLock()
super().__init__(
trl,
logger,
communication_state_callback,
component_state_callback,
)
@check_communicating
def configure(self: _AntennaProxy, config: str) -> None:
"""
Configure the device proxy.
:param config: json string of configuration.
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
self._proxy._device.Configure(config)
class _StationCalibratorProxy(DeviceComponentManager):
"""A proxy to a station calibrator, for a station to use."""
@check_communicating
def get_calibration(self: _StationCalibratorProxy, channel: int) -> np.ndarray:
"""
Get the calibration cooefficients.
:param channel: channel for calibration coefficents.
:return: result code of GetCalibration and calibration coefficents
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
argin = json.dumps({"frequency_channel": channel})
return self._proxy._device.GetCalibration(argin)
class _SpsStationProxy(DeviceComponentManager):
"""A proxy to an SpsStation, for a station to use."""
def __init__(
self: _SpsStationProxy,
trl: str,
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
data_received_result_changed_callback: Callable[..., None],
) -> None:
super().__init__(
trl,
logger,
communication_state_callback,
component_state_callback,
)
self._data_received_result_changed_callback = (
data_received_result_changed_callback
)
def subscribe_to_attributes(self: _SpsStationProxy) -> None:
"""Subscribe to change events in SPS station attributes of interest."""
assert self._proxy is not None
if (
"dataReceivedResult"
not in self._proxy._change_event_subscription_ids.keys()
):
self._proxy.add_change_event_callback(
"dataReceivedResult", self._data_received_result_changed
)
@check_communicating
def load_calibration_coefficients(
self: _SpsStationProxy, calibration: list[float]
) -> ResultCode:
"""
Get the calibration cooefficients.
:param calibration: list comprises:
* antenna - (int) is the antenna to which the coefficients will be applied.
* calibration_coefficients - [array] a bidimensional complex array comprising
calibration_coefficients[channel, polarization], with each element
representing a normalized coefficient, with (1.0, 0.0) being the
normal, expected response for an ideal antenna.
* channel - (int) channel is the index specifying the channels at the
beamformer output, i.e. considering only those channels
actually processed and beam assignments.
* polarization index ranges from 0 to 3.
* 0: X polarization direct element
* 1: X->Y polarization cross element
* 2: Y->X polarization cross element
* 3: Y polarization direct element
:return: result code of LoadCalibrationCoefficients
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.LoadCalibrationCoefficients(
calibration
)
return result_code
@check_communicating
def apply_calibration(self: _SpsStationProxy, load_time: str) -> ResultCode:
"""
Apply the calibration cooefficients.
:param load_time: switch time, in ISO formatted time. Default: now
:return: result code of ApplyCalibration
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.ApplyCalibration(load_time)
return result_code
@check_communicating
def load_pointing_delays(self: _SpsStationProxy, delays: np.ndarray) -> ResultCode:
"""
Set the pointing delay parameters of this Station's Tiles.
Delay is the geometric delay to be corrected for each antenna, in seconds.
Delay is negative towards source.
:param delays: an array containing a beam index followed by antennar
delays and delay rate pairs for each antenna
:return: result code of LoadPointingDelays
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.LoadPointingDelays(delays)
return result_code
@check_communicating
def apply_pointing_delays(self: _SpsStationProxy, load_time: str) -> ResultCode:
"""
Set the pointing delay parameters of this Station's Tiles.
:param load_time: switch time, in ISO formatted time. Default: now
:return: result code of ApplyPointingDelays
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy.ApplyPointingDelays(load_time)
return result_code
@check_communicating
def set_beamformer_table(self: _SpsStationProxy, table: np.ndarray) -> ResultCode:
"""
Set the beamformer table which are going to be beamformed into each beam.
region_array is defined as a flattened 2D array, for a maximum of 48 entries.
Each entry corresponds to 8 consecutive frequency channels.
This is equivalent to SetBeamformerRegions, with a different way
to specify the bandwidth of each spectral region.
Input is consistent with the beamformerTable attribute
:param table: list of regions. Each region comprises:
* start_channel - (int) region starting channel, must be even in range 0 to 510
* beam_index - (int) beam used for this region with range 0 to 47
* subarray_id - (int) Subarray
* subarray_logical_channel - (int) logical channel # in the subarray
* subarray_beam_id - (int) ID of the subarray beam
* substation_id - (int) Substation
* aperture_id: ID of the aperture (APXX.YYY)
:return: result code of SetBeamformerTable
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.SetBeamformerTable(table)
return result_code
@check_communicating
def set_beamformer_regions(self: _SpsStationProxy, argin: np.ndarray) -> ResultCode:
"""
Set the frequency regions which are going to be beamformed into each beam..
:param argin: list of regions. Each region comprises:
* start_channel - (int) region starting channel, must be even in range 0 to 510
* num_channels - (int) size of the region, must be a multiple of 8
* beam_index - (int) beam used for this region with range 0 to 47
* subarray_id - (int) Subarray
* subarray_logical_channel - (int) logical channel # in the subarray
* subarray_beam_id - (int) ID of the subarray beam
* substation_id - (int) Substation
* aperture_id: ID of the aperture (APXX.YYY)
:return: result code of SetBeamformerRegions
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.SetBeamformerRegions(argin)
return result_code
@check_communicating
def start_beamformer(
self: _SpsStationProxy,
scan_id: int,
start_time: Optional[str],
duration: float = -1.0,
channel_mask: Optional[int] = -1,
) -> ResultCode:
"""
Start the beamformer for the selected channel blocks.
:param scan_id: unique scan ID
:param start_time: ISO-8691 formatted UTC scan start time
:param duration: Scan duration in seconds. If omitted or negative
scan lasts forever
:param channel_mask: Channel groups to which the command applies.
:return: result code of StartBeamformer
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
# TODO Duration is buggy in the MccsTile, is expressed in frames
# of 256*1.08e-6 seconds instead of seconds
if duration < 0.0:
duration_i = -1
else:
duration_i = int(round(duration * 3616.90))
scan_arg: dict[str, int | float | str] = {
"scan_id": scan_id,
"duration": duration_i,
}
if start_time is not None:
scan_arg["start_time"] = start_time
([result_code], _) = self._proxy._device.StartBeamformer(json.dumps(scan_arg))
return result_code
@check_communicating
def stop_beamformer(
self: _SpsStationProxy,
channel_mask: Optional[int] = -1,
) -> ResultCode:
"""
Stop the beamformer for the selected channel blocks.
:param channel_mask: Channel groups to which the command applies.
:return: result code of StopBeamformer
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
# TODO Reinsert the channel mask when it is supported in hardware
([result_code], _) = self._proxy._device.StopBeamformer()
return result_code
@check_communicating
def start_acquisition(self: _SpsStationProxy, argin: str) -> ResultCode:
"""
Start the acquisition synchronously for all tiles, checks for synchronisation.
:param argin: json dictionary with optional keywords
* start_time - (str) start time
* delay - (int) delay start
:return: result code of StartAcquisition
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.StartAcquisition(argin)
return result_code
@check_communicating
def initialise(self: _SpsStationProxy) -> ResultCode:
"""
Initialise the station's tiles.
:return: result code of Initialise
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.Initialise()
return result_code
@check_communicating
def set_lmc_download(self: _SpsStationProxy, argin: str) -> ResultCode:
"""
Specify whether control data will be transmitted over 1G or 40G networks.
:param argin: json dictionary with optional keywords:
* mode - (string) '1G' or '10G' (Mandatory) (use '10G' for 40G also)
* payload_length - (int) SPEAD payload length for channel data
* destination_ip - (string) Destination IP.
* source_port - (int) Source port for integrated data streams
* destination_port - (int) Destination port for integrated data streams
:return: result code of SetLmcDownload
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.SetLmcDownload(argin)
return result_code
@check_communicating
def set_lmc_integrated_download(self: _SpsStationProxy, argin: str) -> ResultCode:
"""
Configure link and size for integrated data packets, for all tiles.
:param argin: json dictionary with optional keywords:
* mode - (string) '1G' '10G' '40G' - default 40G
* channel_payload_length - (int) SPEAD payload length for integrated
channel data
* beam_payload_length - (int) SPEAD payload length for integrated beam data
* destination_ip - (string) Destination IP
* source_port - (int) Source port for integrated data streams
* destination_port - (int) Destination port for integrated data streams
:return: result code of SetLmcIntegratedDownload
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.SetLmcIntegratedDownload(argin)
return result_code
@check_communicating
def set_csp_ingest(self: _SpsStationProxy, argin: str) -> ResultCode:
"""
Configure link for beam data packets to CSP.
:param argin: json dictionary with optional keywords:
* destination_ip - (string) Destination IP
* source_port - (int) Source port for integrated data streams
* destination_port - (int) Destination port for integrated data streams
:return: result code of SetCspIngest
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.SetCspIngest(argin)
return result_code
@check_communicating
def configure_integrated_channel_data(
self: _SpsStationProxy, argin: str
) -> ResultCode:
"""
Configure and start the transmission of integrated channel data.
Using the provided integration time, first channel and last channel.
Data are sent continuously until the StopIntegratedData command is run.
:param argin: json dictionary with optional keywords:
* integration_time - (float) in seconds (default = 0.5)
* first_channel - (int) default 0
* last_channel - (int) default 511
:return: result code of ConfigureIntegratedChannelData
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.ConfigureIntegratedChannelData(argin)
return result_code
@check_communicating
def stop_integrated_data(self: _SpsStationProxy) -> ResultCode:
"""
Stop the integrated data.
:return: result code of StopIntegratedData
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.StopIntegratedData()
return result_code
@check_communicating
def send_data_samples(self: _SpsStationProxy, argin: str) -> ResultCode:
"""
Transmit a snapshot containing raw antenna data.
:param argin: json dictionary with optional keywords:
* data_type - type of snapshot data (mandatory): "raw", "channel",
"channel_continuous", "narrowband", "beam"
* start_time - Time (UTC string) to start sending data. Default immediately
* seconds - (float) Delay if timestamp is not specified. Default 0.2 seconds
Depending on the data type:
raw:
* sync: bool: send synchronised samples for all antennas, vs. round robin
larger snapshot from each antenna
channel:
* n_samples: Number of samples per channel, default 1024
* first_channel - (int) first channel to send, default 0
* last_channel - (int) last channel to send, default 511
channel_continuous
* channel_id - (int) channel_id (Mandatory)
* n_samples - (int) number of samples to send per packet, default 128
narrowband:
* frequency - (int) Sky frequency for band centre, in Hz (Mandatory)
* round_bits - (int) Specify whow many bits to round
* n_samples - (int) number of spectra to send
:return: result code of SendDataSamples
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.SendDataSamples(argin)
return result_code
@check_communicating
def stop_data_transmission(
self: _SpsStationProxy,
) -> ResultCode:
"""
Stop data transmission from board.
:return: result code of StopDataTransmission
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.StopDataTransmission()
return result_code
@check_communicating
def configure_test_generator(
self: _SpsStationProxy,
argin: str,
) -> ResultCode:
"""
Set the test signal generator.
:param argin: json dictionary with keywords:
* tone_frequency: first tone frequency, in Hz. The frequency
is rounded to the resolution of the generator. If this
is not specified, the tone generator is disabled.
* tone_amplitude: peak tone amplitude, normalized to 31.875 ADC
units. The amplitude is rounded to 1/8 ADC unit. Default
is 1.0. A value of -1.0 keeps the previously set value.
* tone_2_frequency: frequency for the second tone. Same
as ToneFrequency.
* tone_2_amplitude: peak tone amplitude for the second tone.
Same as ToneAmplitude.
* noise_amplitude: RMS amplitude of the pseudorandom Gaussian
white noise, normalized to 26.03 ADC units.
* pulse_frequency: frequency of the periodic pulse. A code
in the range 0 to 7, corresponding to (16, 12, 8, 6, 4, 3, 2)
times the ADC frame frequency.
* pulse_amplitude: peak amplitude of the periodic pulse, normalized
to 127 ADC units. Default is 1.0. A value of -1.0 keeps the
previously set value.
* set_time: time at which the generator is set, for synchronization
among different TPMs. In UTC ISO format (string)
* adc_channels: list of adc channels which will be substituted with
the generated signal. It is a 32 integer, with each bit representing
an input channel. Default: all if at least q source is specified,
none otherwises.
:return: result code of ConfigureTestGenerator
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.ConfigureTestGenerator(argin)
return result_code
@check_communicating
def acquire_data_for_calibration(
self: _SpsStationProxy, channel: int
) -> ResultCode:
"""
Start collecting data for calibration.
:param channel: the channel to get calibration data for.
:return: result code of AcquireDataForCalibration
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
([result_code], _) = self._proxy._device.AcquireDataForCalibration(channel)
return result_code
def _data_received_result_changed(
self: _SpsStationProxy,
event_name: str,
event_value: list[str],
event_quality: tango.AttrQuality,
) -> None:
if event_name.lower() != "datareceivedresult":
self.logger.error(
"dataReceivedResult callback called, "
f"but event_name is {event_name}."
)
return
self._data_received_result_changed_callback(event_value)
# pylint: disable=too-many-instance-attributes,too-many-public-methods
[docs]class StationComponentManager(TaskExecutorComponentManager):
"""A component manager for a station."""
# pylint: disable=too-many-arguments, too-many-statements, too-many-locals
[docs] def __init__(
self: StationComponentManager,
station_id: int,
ref_latitude: float,
ref_longitude: float,
ref_height: float,
field_station_trl: str,
antenna_trls: Sequence[str],
antenna_station_locations: np.ndarray,
antenna_element_ids: list[int],
station_calibrator_trl: str,
sps_station_trl: str,
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
) -> None:
"""
Initialise a new instance.
:param station_id: the id of this station
:param ref_latitude: reference latitude of the station.
:param ref_longitude: reference longitude of the station.
:param ref_height: reference ellipsoidal height of the station.
:param field_station_trl: TRL of the Tango device that manages this
station's FieldStation
:param antenna_trls: TRLs of the Tango devices and manage this
station's antennas
:param antenna_station_locations: array of the x, y, z positions of the antennas
:param antenna_element_ids: list of the element IDs of the antennas
:param station_calibrator_trl: TRL of the Tango devices and manage this
station's station calibrator
:param sps_station_trl: TRL of the Tango devices and manage this
station's Spshw station
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be
called when the status of the communications channel
between the component manager and its component changes
:param component_state_callback: callback to be
called when the component state changes
"""
self._power_state_lock = threading.RLock()
self._station_id = station_id
self._field_station_proxy: Optional[_FieldStationProxy] = None
self._sps_station_proxy: Optional[_SpsStationProxy] = None
self._field_station_power_state: Optional[PowerState] = None
self._sps_station_power_state: Optional[PowerState] = None
self._station_calibrator_power_state: Optional[PowerState] = None
self._field_station_trl = field_station_trl
self._is_configured = False
self._field_station_on_called = False
self.outside_temperature: Optional[float] = None
self.data_received_result: Optional[tuple[str, str]] = ("", "")
self._is_communicating: bool = False
self._ref_longitude: float = ref_longitude
self._ref_latitude: float = ref_latitude
self._ref_height: float = ref_height
self._pointing_delays: Optional[list[list]] = [[]]
self._scan_id = [0] * 17
self._scan_mask = [0] * 17
self._nof_subdevices = 0
self._powering_resources: set[str] = set() # this set of stations should go
self._desired_power_state: Optional[PowerState] = None # to this state
self.stop_ids: dict[int, bool] = {}
self.tracking_threads: dict[int, threading.Thread] = {}
self.tracking_id = 0
self._antenna_xyz = np.array(antenna_station_locations)
self._antenna_element_ids = np.array(antenna_element_ids)
self._nof_antennas = self._antenna_xyz.shape[0]
self._component_state_callback = component_state_callback
self._communication_state_callback = communication_state_callback
# beam_idx + 2*(number of antennas)
self.last_pointing_delays = [0.0] * (1 + 2 * NUMBER_OF_ANTENNAS)
self._communication_states = {}
if antenna_trls != [""]:
self._communication_states = {
trl: CommunicationStatus.DISABLED for trl in list(antenna_trls)
}
self._antenna_power_states = {trl: PowerState.UNKNOWN for trl in antenna_trls}
self._antenna_proxies = {
antenna_trl: _AntennaProxy(
antenna_trl,
logger,
functools.partial(
self._device_communication_state_changed, antenna_trl
),
functools.partial(component_state_callback, trl=antenna_trl),
)
for antenna_trl in antenna_trls
}
self._nof_subdevices += self._nof_antennas
# self._station_calibrator_power_state = PowerState.UNKNOWN
if field_station_trl != "":
self._communication_states[field_station_trl] = CommunicationStatus.DISABLED
self._field_station_power_state = PowerState.UNKNOWN
self._field_station_proxy = _FieldStationProxy(
self._field_station_trl,
logger,
functools.partial(
self._device_communication_state_changed, self._field_station_trl
),
functools.partial(
component_state_callback, trl=self._field_station_trl
),
self._field_station_outside_temperature_changed,
)
else:
self._field_station_proxy = None
self._field_station_power_state = None
self._station_calibrator_proxy = _StationCalibratorProxy(
station_calibrator_trl,
logger,
lambda *args: None, # Avaliability is checked when service required.
self._station_calibrator_power_state_changed,
)
if sps_station_trl != "":
self._communication_states[sps_station_trl] = CommunicationStatus.DISABLED
self._sps_station_power_state = PowerState.UNKNOWN
self._sps_station_trl = sps_station_trl
self._sps_station_proxy = _SpsStationProxy(
sps_station_trl,
logger,
functools.partial(
self._device_communication_state_changed, sps_station_trl
),
functools.partial(component_state_callback, trl=sps_station_trl),
self._sps_station_data_received_result_changed,
)
self._nof_subdevices += 1
else:
self._sps_station_power_state = None
self._sps_station_proxy = None
# Aggregate configuration tables
# Channel table: one entry per channel block (48 total)
# Bidimensional array of one row for each 8 channels, with elements:
# 0. start physical channel
# 1. beam number
# 2. subarray ID
# 3. subarray_logical_channel
# 4. subarray_beam_id
# 5. substation_id
# 6. aperture_id
self._beamformer_table = [[0] * 7 for _ in range(48)]
self._number_of_channels = 0
self._pointing_helper: Optional[Pointing] = None
super().__init__(
logger,
communication_state_callback,
component_state_callback,
power=None,
fault=None,
configuration_changed=None,
is_configured=None,
)
[docs] def setup_pointing_helper(self: StationComponentManager) -> None:
"""Set up the pointing helper."""
assert (
self.ref_longitude is not None
and self.ref_latitude is not None
and self.ref_height is not None
)
station_helper = StationInformation()
station_helper.set_location(
self.ref_latitude, self.ref_longitude, self.ref_height
)
station_helper.load_displacements_arrays(
self._antenna_xyz,
self._antenna_element_ids,
)
self._pointing_helper = Pointing(station_helper)
[docs] def start_communicating(self: StationComponentManager) -> None:
"""Establish communication with the station components."""
self._is_communicating = True
if self._communication_state == CommunicationStatus.ESTABLISHED:
return
if self._communication_state == CommunicationStatus.DISABLED:
self.update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
if self._field_station_proxy:
self._field_station_proxy.start_communicating()
if self._station_calibrator_proxy:
self._station_calibrator_proxy.start_communicating()
if self._sps_station_proxy:
self._sps_station_proxy.start_communicating()
for antenna_proxy in self._antenna_proxies.values():
antenna_proxy.start_communicating()
# If we have instantiated our MccsStation with no subdevices, it will always
# report CommunicationStatus.ESTABLISHED
if self._nof_subdevices == 0:
self.update_communication_state(CommunicationStatus.ESTABLISHED)
self._evaluate_power_state()
[docs] def stop_communicating(self: StationComponentManager) -> None:
"""Break off communication with the station components."""
self._is_communicating = False
for antenna_proxy in self._antenna_proxies.values():
antenna_proxy.stop_communicating()
if self._sps_station_proxy:
self._sps_station_proxy.stop_communicating()
if self._station_calibrator_proxy:
self._station_calibrator_proxy.stop_communicating()
if self._field_station_proxy:
self._field_station_proxy.stop_communicating()
if self.communication_state == CommunicationStatus.DISABLED:
return
self.update_communication_state(CommunicationStatus.DISABLED)
self._update_component_state(power=None, fault=None)
def _device_communication_state_changed(
self: StationComponentManager,
trl: str,
communication_state: CommunicationStatus,
) -> None:
# Many callback threads could be hitting this method at the same time, so it's
# possible (likely) that the GIL will suspend a thread between checking if it
# need to update, and actually updating. This leads to callbacks appearing out
# of order, which breaks tests. Therefore we need to serialise access.
self._communication_states[trl] = communication_state
self.logger.debug(
f"device {trl} changed communcation state to {communication_state.name}"
)
if CommunicationStatus.DISABLED in self._communication_states.values():
self.update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
elif CommunicationStatus.NOT_ESTABLISHED in self._communication_states.values():
self.update_communication_state(CommunicationStatus.NOT_ESTABLISHED)
else:
self.update_communication_state(CommunicationStatus.ESTABLISHED)
[docs] def update_communication_state(
self: StationComponentManager,
communication_state: CommunicationStatus,
) -> None:
"""
Update the status of communication with the component.
Overridden here to fire the "is configured" callback whenever
communication is freshly established
:param communication_state: the status of communication with
the component
"""
if self._communication_state == communication_state:
return
super()._update_communication_state(communication_state)
if communication_state == CommunicationStatus.ESTABLISHED:
if self._field_station_proxy is not None:
self._field_station_proxy.subscribe_to_attributes()
if self._sps_station_proxy is not None:
self._sps_station_proxy.subscribe_to_attributes()
if self._pointing_helper is None:
self.setup_pointing_helper()
if self._component_state_callback is not None:
self._component_state_callback(is_configured=self.is_configured)
def _field_station_outside_temperature_changed(
self: StationComponentManager,
outside_temperature: float,
) -> None:
self.outside_temperature = outside_temperature
if self._component_state_callback is not None:
self._component_state_callback(outside_temperature=self.outside_temperature)
def _sps_station_data_received_result_changed(
self: StationComponentManager,
data_received_result: tuple[str, str],
) -> None:
self.data_received_result = data_received_result
if self._component_state_callback is not None:
self._component_state_callback(
data_received_result=self.data_received_result
)
@threadsafe
def _station_calibrator_power_state_changed(
self: StationComponentManager,
power: PowerState | None = None,
fault: bool | None = None,
**kwargs: Any,
) -> None:
# Station calibrator is a software only device and should not roll up
# Into the PowerState of the MccsStation. We will log a message and do nothing.
self._station_calibrator_power_state = power
if power is not None:
self.logger.info(
f"Station calibrator reports its power as {PowerState(power).name}."
)
@threadsafe
def _antenna_power_state_changed(
self: StationComponentManager,
trl: str,
power_state: PowerState,
) -> None:
with self.power_state_lock:
self._antenna_power_states[trl] = power_state
@threadsafe
def _field_station_power_state_changed(
self: StationComponentManager,
power_state: PowerState,
) -> None:
with self._power_state_lock:
self._field_station_power_state = power_state
if (
self._field_station_trl in self._powering_resources
and power_state == self._desired_power_state
):
self._powering_resources.remove(self._field_station_trl)
self._evaluate_power_state()
# Antennas should just be passively reflecting the
# power state of their smartbox port?
# if power_state is PowerState.ON and self._field_station_on_called:
# self._field_station_on_called = False
# _ = self._turn_on_antennas()
@threadsafe
def _sps_station_power_state_changed(
self: StationComponentManager,
power_state: PowerState,
) -> None:
with self._power_state_lock:
self._sps_station_power_state = power_state
if power_state == PowerState.STANDBY:
self.logger.warning(
"There is no way for MCCS to turn SpsStation 'OFF'. "
"Due to there being no control over the subrack PDU. "
"At MccsStation we interpret STANDBY as OFF "
)
if (
self._sps_station_trl in self._powering_resources
and self._desired_power_state == PowerState.OFF
):
self._powering_resources.remove(self._sps_station_trl)
else:
if (
self._sps_station_trl in self._powering_resources
and power_state == self._desired_power_state
):
self._powering_resources.remove(self._sps_station_trl)
self._evaluate_power_state()
def _evaluate_power_state(
self: StationComponentManager,
) -> None:
with self.power_state_lock:
power_states = []
if (
self._field_station_proxy is not None
and self._field_station_power_state is not None
):
power_states.append(self._field_station_power_state)
if (
self._sps_station_proxy is not None
and self._sps_station_power_state is not None
):
if self._sps_station_power_state == PowerState.STANDBY:
# Handle special case due to there being no such thing
# as SpsStation OFF. The reason for this is that there
# is no control over the subrack PDU
# TODO: SP-4050
self.logger.warning(
"When evaluating MccsStation Power, "
"we classify SpsStation `STANDBY` as `OFF`. "
"This is required since MCCS has no control over "
"the subracks' PDU."
)
power_states.append(PowerState.OFF)
else:
power_states.append(self._sps_station_power_state)
if all(power_state == PowerState.ON for power_state in power_states):
evaluated_power_state = PowerState.ON
elif all(power_state == PowerState.OFF for power_state in power_states):
evaluated_power_state = PowerState.OFF
elif all(power_state == PowerState.STANDBY for power_state in power_states):
evaluated_power_state = PowerState.STANDBY
elif len(power_states) == 0:
evaluated_power_state = PowerState.ON
else:
evaluated_power_state = PowerState.UNKNOWN
self.logger.debug(
"In StationComponentManager._evaluatePowerState with:\n"
f"\tspsStation: {str(self._sps_station_power_state)}\n"
f"\tfieldStation: {str(self._field_station_power_state)}\n "
f"\tresult: {str(evaluated_power_state)}"
)
self._update_component_state(power=evaluated_power_state)
@property
def power_state_lock(self: StationComponentManager) -> threading.RLock:
"""
Return the power state lock of this component manager.
:return: the power state lock of this component manager.
"""
return self._power_state_lock
@property
def power_state(self: StationComponentManager) -> Optional[PowerState]:
"""
Return my power state.
:return: my power state
"""
return self._component_state["power"]
[docs] def off(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the _off method.
This method returns immediately after it submitted `self._off`
for execution.
:param task_callback: Update task state, defaults to None
:return: a result code and response message
"""
return self.submit_task(self._off, task_callback=task_callback)
@check_communicating
def _off(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Turn off this station.
The order to turn a station on is: FieldStation, then tiles and
antennas.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
task_status = TaskStatus.COMPLETED
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
self._desired_power_state = PowerState.OFF
if (
self._field_station_proxy
and self._field_station_power_state != PowerState.OFF
):
task_status, _ = self._field_station_proxy.off() # type: ignore
self.logger.info(f"FieldStation off command TaskStatus: {task_status.name}")
self._powering_resources.add(self._field_station_trl)
if self._sps_station_proxy and self._sps_station_power_state != PowerState.OFF:
self.logger.warning(
"There is no way for MCCS to turn `OFF` SpsStation, "
"due to there being no control over the subracks' PDU. "
"At MccsStation we interpret SpsStation STANDBY as OFF. "
"We are sending the Standby command to SpsStation."
)
task_status, _ = self._sps_station_proxy.standby()
self.logger.info(
f"SpshwStation standby command TaskStatus: {task_status.name}"
)
self._powering_resources.add(self._sps_station_trl)
task_status = self._wait_for_power_state(600)
if task_callback:
task_callback(status=task_status)
[docs] def on(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the _on method.
This method returns immediately after it submitted `self._on`
for execution.
:param task_callback: Update task state, defaults to None
:return: a task staus and response message
"""
return self.submit_task(self._on, task_callback=task_callback)
@check_communicating
def _on(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Turn on this station.
The order to turn a station on is: FieldStation, then tiles and
antennas.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
task_status = TaskStatus.COMPLETED
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
self._desired_power_state = PowerState.ON
if (
self._field_station_proxy
and self._field_station_power_state != PowerState.ON
):
task_status, _ = self._field_station_proxy.on() # type: ignore
self.logger.info(f"FieldStation on command TaskStatus: {task_status.name}")
self._powering_resources.add(self._field_station_trl)
if self._sps_station_proxy and self._sps_station_power_state != PowerState.ON:
task_status, _ = self._sps_station_proxy.on()
self.logger.info(f"SpshwStation on command TaskStatus: {task_status.name}")
self._powering_resources.add(self._sps_station_trl)
task_status = self._wait_for_power_state(600)
if task_callback:
task_callback(status=task_status)
@check_communicating
def _turn_on_antennas(
self: StationComponentManager,
) -> ResultCode:
"""
Turn on antennas if not already on.
:return: a result code
"""
with self.power_state_lock:
if not all(
power_state == PowerState.ON
for power_state in self._antenna_power_states.values()
):
results = [proxy.on() for proxy in self._antenna_proxies.values()]
if ResultCode.FAILED in results:
return ResultCode.FAILED
return ResultCode.QUEUED
[docs] @check_communicating
@check_on
def apply_pointing_delays(
self: StationComponentManager,
load_time: str,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the apply_pointing method.
This method returns immediately after it submitted
`self._apply_pointing` for execution.
:param load_time: time at which to load the pointing delay
:param task_callback: Update task state, defaults to None
:return: a task status and response message
"""
return self.submit_task(
self._apply_pointing_delays, [load_time], task_callback=task_callback
)
@check_communicating
@check_on
def _apply_pointing_delays(
self: StationComponentManager,
load_time: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Load the pointing delay at a specified time.
:param load_time: time at which to load the pointing delay
:param task_callback: Update the task state, defaults to None
:param task_abort_event: Abort the task
"""
assert self._sps_station_proxy
if task_callback:
task_callback(
status=TaskStatus.IN_PROGRESS,
result="ApplyPointingDelays command in progress",
)
result = self._sps_station_proxy.apply_pointing_delays(load_time)
if result in [ResultCode.OK, ResultCode.STARTED, ResultCode.QUEUED]:
task_status = TaskStatus.COMPLETED
msg = "ApplyPointingDelays command completed"
else:
task_status = TaskStatus.FAILED
msg = "ApplyPointingDelays command failed"
if task_callback:
task_callback(status=task_status, result=msg)
@property # type:ignore[misc]
@check_communicating
def is_configured(self: StationComponentManager) -> bool:
"""
Return whether this station component manager is configured.
:return: whether this station component manager is configured.
"""
# TODO: the station can be configured for some subarrays and not for others.
# should be changed to a vector attribute or removed
return self._is_configured
@property # type:ignore[misc]
@check_communicating
def ref_longitude(self: StationComponentManager) -> float:
"""
Return whether this stations longitude.
:return: this stations longitude.
"""
return self._ref_longitude
@property # type:ignore[misc]
@check_communicating
def ref_latitude(self: StationComponentManager) -> float:
"""
Return whether this stations latitude.
:return: this stations latitude.
"""
return self._ref_latitude
@property # type:ignore[misc]
@check_communicating
def ref_height(self: StationComponentManager) -> float:
"""
Return whether this stations height.
:return: this stations height.
"""
return self._ref_height
def _update_station_configs(
self: StationComponentManager,
configuration: dict,
) -> None:
"""
Update the config for the station device.
:param configuration: dict containing the config of the device
"""
if self._component_state_callback is not None:
self._is_configured = True
self._ref_longitude = (
configuration.get("refLongitude") or self._ref_longitude
)
self._ref_latitude = configuration.get("refLatitude") or self._ref_latitude
self._ref_height = configuration.get("refHeight") or self._ref_height
def _update_children_configs(
self: StationComponentManager,
field_station_config: Optional[dict],
antenna_config: Optional[dict],
) -> None:
"""
Update the config for the station device.
:param field_station_config: Configuration specification
for the field station device.
:param antenna_config: Configuration specification for the antenna deviced.
"""
for trl in self._antenna_proxies.keys():
self._antenna_proxies[trl].on()
if field_station_config is not None:
if self._field_station_proxy:
assert self._field_station_proxy._proxy is not None
self._field_station_proxy._proxy.Configure(
json.dumps(field_station_config)
)
else:
self.logger.error(
"Attempted to update config of non-existent FieldStation"
)
if antenna_config:
for trl in self._antenna_proxies.keys():
config = antenna_config.get(trl, None)
if config is not None:
self._antenna_proxies[trl].configure(
json.dumps({"antenna_config": config, "tile_config": {}})
)
# TODO: This needs to be implemented in SpsStation
# tiles_config = configuration.get("tiles")
# if tiles_config:
# for trl in self._tile_proxies.keys():
# config = tiles_config[trl]
# self._tile_proxies[trl].configure(json.dumps(config))
@check_communicating
def _configure_semi_static(
self: StationComponentManager,
station_config: dict,
field_station_config: Optional[dict],
antenna_config: Optional[dict],
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Configure the stations children.
This sends off configuration commands to all of the devices that
this station manages.
:param station_config: Configuration specification for the station device.
:param field_station_config: Configuration specification for the field
station device.
:param antenna_config: Configuration specification for the antenna deviced.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
:raises ValueError: Station value not correct
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
if station_config.get("StationId") != self._station_id:
raise ValueError("Wrong station id")
self._update_station_configs(station_config)
self._update_children_configs(field_station_config, antenna_config)
self.setup_pointing_helper()
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result="Configure command has completed",
)
def _pointing_delays_alt_az(
self: StationComponentManager,
values: dict[str, float],
) -> np.ndarray:
"""
Get pointing delays for ra dec pointing.
:param values: dict containing ra and dec values
:raises ValueError: pointing delays not found
:return: list of pointing delays
"""
altitude = Angle(values.get("altitude"), unit="deg")
azimuth = Angle(values.get("azimuth"), unit="deg")
assert self._pointing_helper is not None
self._pointing_helper.point_array_static(altitude, azimuth)
delays = self._pointing_helper.delays()
if delays is None:
raise ValueError("Could not get delays")
return delays
def _pointing_delays_ra_dec(
self: StationComponentManager,
values: dict[str, float],
reference_time: Time,
time_step: float,
) -> tuple[np.ndarray, np.ndarray]:
"""
Get pointing delays for ra dec pointing.
:param values: dict containing ra and dec values
:param reference_time: time in which coordinates are equal,
in ISO8601 formatted astropy.Time time
:param time_step: How long between each time step in seconds
:raises ValueError: pointing delays not found
:return: list of pointing delays for each antenna, negative towards source
"""
right_ascension = Angle(values.get("right_ascension"), unit="deg")
declination = Angle(values.get("declination"), unit="deg")
right_ascension_rate = Angle(
values.get("right_ascension_rate") or 0.0, unit="deg"
)
declination_rate = Angle(values.get("declination_rate") or 0.0, unit="deg")
assert self._pointing_helper is not None
self._pointing_helper.point_array_equatorial(
right_ascension,
declination,
right_ascension_rate,
declination_rate,
reference_time,
time_step,
)
delays = self._pointing_helper.delays()
delay_rates = self._pointing_helper.delay_rates()
if delays is None:
raise ValueError("Could not get delays")
return (delays, delay_rates)
[docs] def get_pointing_delays(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
*,
interface: Optional[str] = None,
pointing_type: str,
values: dict,
time_step: float = 10.0,
reference_time: Optional[str] = None,
) -> np.ndarray:
"""
Get the pointing delays for this station.
:param interface: the schema version this is running against.
:param pointing_type: the type of pointing requested
:param values: the pointing values, either in alt_az or ra_dec
:param reference_time: time in which coordinates are equal,
in ISO8601 formatted astropy.Time time
:param time_step: How long between each time step in seconds
:param task_callback: callback to signal end of command
:return: list of pointing delays
"""
start_time = Time(reference_time, format="isot", scale="utc")
pointing_delays, delay_rates = self._get_pointing_delays(
pointing_type, values, start_time, time_step
)
# Don't want to return the fake beam number.
return self._construct_delays(pointing_delays, delay_rates, 0)[1:]
@check_communicating
def _get_pointing_delays(
self: StationComponentManager,
pointing_type: str,
values: dict,
reference_time: Time,
time_step: float,
) -> tuple[np.ndarray, np.ndarray]:
"""
Get the pointing delays for this station.
:param pointing_type: the type of pointing requested
:param values: the pointing values, either in alt_az or ra_dec
:param reference_time: time in which coordinates are equal,
in ISO8601 formatted astropy.Time time
:param time_step: How long between each time step in seconds
:raises KeyError: pointing type not found.
:return: list of pointing delays. Delays are negative towards source
"""
if pointing_type == "alt_az":
delays = self._pointing_delays_alt_az(values)
delay_rates = np.zeros(256)
return delays, delay_rates
if pointing_type == "ra_dec":
return self._pointing_delays_ra_dec(values, reference_time, time_step)
raise KeyError("Couldn't find valid pointing type")
[docs] def track_object(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
*,
interface: Optional[str] = None,
pointing_type: str,
values: dict,
scan_time: float,
reference_time: Optional[str] = None,
station_beam_number: Optional[int] = 0,
time_step: Optional[float] = 1.0,
) -> tuple[TaskStatus, str]:
"""
Submit the `track_object` slow task.
This method returns immediately after it is submitted for
execution.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
:param interface: the schema version this is running against.
:param pointing_type: the type of pointing requested
:param values: Coordinates for object to be tracked
:param scan_time: Time to scan object in seconds
:param reference_time: time in which coordinates are equal,
in ISO8601 formatted astropy.Time time
:param station_beam_number: The station beam number to be used
:param time_step: How long between each time step in seconds
:return: A return code and a unique command ID.
"""
return self.submit_task(
self._track_object,
args=[
pointing_type,
values,
reference_time,
station_beam_number,
scan_time,
time_step,
],
task_callback=task_callback,
)
@check_communicating
def _track_object(
self: StationComponentManager,
pointing_type: str,
values: dict,
reference_time: str,
station_beam_number: int = 1,
scan_time: float = 86400.0,
time_step: Optional[float] = 10.0,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Track the object in the sky.
:param pointing_type: the type of pointing requested
:param values: Coordinates for object to be tracked
:param scan_time: Time to scan object in seconds
:param reference_time: time in which coordinates are equal,
in ISO8601 formatted astropy.Time time
:param station_beam_number: The station beam number to be used
:param time_step: How long between each time step in seconds
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
self.stop_ids[self.tracking_id] = False
tmp_id = self.tracking_id
self.tracking_threads[self.tracking_id] = threading.Thread(
target=self._track_object_thread,
args=(
lambda: self.stop_ids[tmp_id],
pointing_type,
values,
station_beam_number,
scan_time,
time_step,
reference_time,
tmp_id,
task_callback, # pass task_callback down
),
)
self.tracking_threads[tmp_id].start()
self.tracking_id += 1
if task_callback is not None:
task_callback(
status=TaskStatus.COMPLETED,
result="Track Object has completed, currently tracking.",
)
@check_communicating
def _track_object_thread(
self: StationComponentManager,
stop: Callable,
pointing_type: str,
values: dict,
station_beam_number: int,
scan_time: float,
time_step: float,
reference_time: str,
tracking_id: int,
task_callback: Optional[Callable] = None,
) -> None:
"""
Track the object in the sky in a seperate thread.
Used to stop the station hanging from other requests.
:param stop: Flag passed in to force stop the track if required.
:param pointing_type: The type of pointing requested
:param values: Coordinates for object to be tracked
:param station_beam_number: The station beam number to be used
:param scan_time: Time to scan object in seconds
:param reference_time: time in which coordinates are equal,
in ISO8601 formatted astropy.Time time
:param time_step: How long between each time step in seconds
:param tracking_id: The id of the tracking thread.
:param task_callback: Update task state, defaults to None
"""
end_time = Time.now() + TimeDelta(scan_time, format="sec")
ref_time = Time(reference_time)
time_next = Time.now() # next time pointing occurs
while Time.now() < end_time:
if stop():
self.logger.debug("Stopping tracking object command called")
break
if time_next <= Time.now():
self.logger.debug(f"Updating pointing for beam {station_beam_number}")
pointing_delays, delay_rates = self._get_pointing_delays(
pointing_type, values, ref_time, time_step
)
delays = self._construct_delays(
pointing_delays, delay_rates, station_beam_number
)
if self._sps_station_proxy:
self.last_pointing_delays = list(delays)
try:
self._sps_station_proxy.load_pointing_delays(delays)
except tango.DevFailed as devfailed:
dev_error = devfailed.args[0] # Get the Tango error object
# Check if the description contains the specific "KeyError: 1"
if "KeyError: 1" in dev_error.desc:
self.logger.error("No antenna mapping detected")
if task_callback is not None:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
"No antenna mapping detected",
),
)
else:
self.logger.error(
f"Tango DevFailed encountered: {dev_error.desc}"
)
if task_callback is not None:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
f"Tango DevFailed: {dev_error.desc}",
),
)
self.stop_ids.pop(tracking_id, None)
self.tracking_threads.pop(tracking_id, None)
return
self._sps_station_proxy.apply_pointing_delays("")
time_next = time_next + TimeDelta(time_step, format="sec")
time.sleep(0.1)
self.stop_ids.pop(tracking_id)
self.tracking_threads.pop(tracking_id)
self.logger.debug("Tracking complete")
def _construct_delays(
self: StationComponentManager,
pointing_delays: np.ndarray,
delay_rate: np.ndarray,
beam_number: int,
) -> np.ndarray:
delays = np.zeros(513)
delays[0] = beam_number
for i in range(self._nof_antennas):
delays[2 * i + 1] = pointing_delays[i]
delays[2 * i + 2] = delay_rate[i]
return delays
[docs] def stop_tracking(
self: StationComponentManager,
track_id: int,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Stop a tracking thread.
:param track_id: the id of the tracking you wish to stop.
:param task_callback: Update task state, defaults to None
:return: a result code and list of pointing delays
"""
return self.submit_task(
self._stop_tracking,
args=[track_id],
task_callback=task_callback,
)
def _stop_tracking(
self: StationComponentManager,
track_id: int,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Stop a tracking thread.
:param track_id: the id of the tracking you wish to stop.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
if task_callback:
task_callback(
status=TaskStatus.IN_PROGRESS,
result="Attempting to stop tracking thread",
)
if track_id not in self.tracking_threads:
if task_callback:
task_callback(
status=TaskStatus.REJECTED,
result="Tracking Id does not match any running thread",
)
return
if self.tracking_threads[track_id].is_alive():
self.stop_ids[track_id] = True
self.tracking_threads[track_id].join()
for i in range(5):
if (
track_id not in self.tracking_threads
or not self.tracking_threads[track_id].is_alive()
):
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result="Tracking stopped",
)
return
time.sleep(1)
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result="Failed to stop tracking thread",
)
else:
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result="No tracking needed to stop",
)
[docs] def stop_tracking_all(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Stop all tracking threads.
:param task_callback: Update task state, defaults to None
:return: a result code and list of pointing delays
"""
return self.submit_task(
self._stop_tracking_all,
args=[],
task_callback=task_callback,
)
def _stop_tracking_all(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Stop a tracking thread.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
if task_callback:
task_callback(
status=TaskStatus.IN_PROGRESS,
result="Attempting to stop tracking thread",
)
for key, _ in self.stop_ids.items():
self.stop_ids[key] = True
succesfully_closed = False
for i in range(5):
if len(self.tracking_threads) == 0:
succesfully_closed = True
break
time.sleep(1)
if succesfully_closed:
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result="Trackings stopped",
)
else:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result="Failed to stop trackings",
)
[docs] def load_pointing_delays(
self: StationComponentManager,
delays: np.ndarray,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Load the pointing delays for this station.
:param delays: list of delays
:param task_callback: Update task state, defaults to None
:return: a result code and list of pointing delays
"""
return self.submit_task(
self._load_pointing_delays,
args=[delays],
task_callback=task_callback,
)
@check_communicating
def _load_pointing_delays(
self: StationComponentManager,
delays: np.ndarray,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Load the pointing delays for this station.
:param delays: list of delays
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
if task_callback:
task_callback(
status=TaskStatus.IN_PROGRESS,
result="LoadPointingDelays command in progress",
)
if delays is not None:
self.last_pointing_delays = list(delays)
if self._sps_station_proxy:
self._sps_station_proxy.load_pointing_delays(delays)
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result="LoadPointingDelays command has completed",
)
else:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result="LoadPointingDelays called without SpsStation",
)
else:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result="Unable to set delays for station",
)
[docs] def apply_configuration(
self: StationComponentManager,
transaction_id: str,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Apply the beamformer and calibration configuration to SpsStation.
:param transaction_id: the transaction id for the configuration
:param task_callback: Update task state, defaults to None
:return: a result code and response string
"""
if self._station_calibrator_power_state != PowerState.ON:
station_calibrator_not_ready_message = (
"Service StationCalibrator is not avaliable or ON"
"Unable to apply configuration."
)
self.logger.error(station_calibrator_not_ready_message)
return (TaskStatus.REJECTED, station_calibrator_not_ready_message)
return self.submit_task(
self._apply_configuration,
args=[transaction_id],
task_callback=task_callback,
)
# pylint: disable=too-many-branches
@check_communicating
def _apply_configuration(
self: StationComponentManager,
transaction_id: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Apply the configuration to the SpsStation.
:param transaction_id: the transaction id for the configuration
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
result_message = ""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
self.logger.debug("Loading beamformer table")
assert self._sps_station_proxy
self._sps_station_proxy.set_beamformer_table(
np.array(self._beamformer_table).flatten()
)
# Compute and upload calibration
# Initialize calibration table to unity gain
self.logger.debug("Computing and downloading calibration coefficients")
calibration_table = np.zeros(
[MAX_NUMBER_OF_CHANNELS, self._nof_antennas, 4], np.csingle
)
unity_calibration = np.array([1.0, 0.0, 0.0, 1.0], np.csingle)
# compute and apply calibration
for channel in range(MAX_NUMBER_OF_CHANNELS):
for antenna in range(self._nof_antennas):
calibration_table[channel, antenna] = unity_calibration
try:
for block, table_entry in enumerate(self._beamformer_table):
channel = table_entry[0]
beamformer_channel = block * 8
if channel == 0:
# channel set to 0 marks unused (unconfigured) table entries.
# These do not require calibration
continue
for i in range(8): # 8 channels per block
# TODO Interface with calibration must be refined
# get_calibration should return an array of 4 complex per antenna
# ordered in station antenna number order
try:
solution_numpy = self._station_calibrator_proxy.get_calibration(
int(channel)
)
reshaped_array = self._format_solution_to_complex_jones(
solution_numpy
)
except ValueError as ve:
error_message = (
f"\nFailed to fill calibration_table for {channel=}:\n"
f"{repr(ve)}\n"
"We will use a unit calibration for this channel.\n"
)
self.logger.warning(error_message)
result_message += error_message
continue
except tango.DevFailed as df:
error_message = (
f"\nException raised when asking "
f"for a solution for {channel=}:\n"
f"{repr(df)}\n"
"We will use a unit calibration for this channel.\n"
)
result_message += error_message
self.logger.warning(error_message)
continue
self.logger.debug(f"Solution found for {channel=}\n")
calibration_table[beamformer_channel] = reshaped_array
beamformer_channel += 1
channel += 1
# write calibration coefficients to station
# TODO MCCS-1023
# Actual antenna number should be computed from configuration database:
# it should be (tile_number)*16 + (tile _port_number), both 0 based
#
for antenna in range(self._nof_antennas):
calibrations = [float(antenna)]
for channel in range(MAX_NUMBER_OF_CHANNELS):
for jones in calibration_table[channel, antenna]:
calibrations.append(jones.real)
calibrations.append(jones.imag)
self._sps_station_proxy.load_calibration_coefficients(calibrations)
self.logger.debug(
f"load_calibration_coefficients for antenna {antenna}"
)
self.logger.debug("Apply calibration")
# TODO Add load time
self._sps_station_proxy.apply_calibration("")
except ValueError as value_error:
self.logger.error(f"ApplyConfiguration command has failed: {value_error}")
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
f"ApplyConfiguration command has failed: {value_error}",
),
)
return
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.OK,
f"ApplyConfiguration command has completed{result_message}",
),
)
return
def _format_solution_to_complex_jones(
self: StationComponentManager,
solution_array: np.ndarray,
) -> np.ndarray:
"""
Format solution to complex jones.
:param solution_array: The solution array of type `np.ndarray`.
the shape of this is (self._number_of_antenna * 8).
:return: A complex numpy.ndarray of shape
(self._number_of_antenna, 4).
:raises ValueError: the solution is not in the correct format
specified by solution_array.
"""
if len(solution_array) != self._nof_antennas * 8:
raise ValueError(
f"\tThis station is configured with {self._nof_antennas} antenna. "
"We are expecting 4 complex numbers per antenna yielding a solution "
f"of length {self._nof_antennas * 8}.\n"
f"\tHowever, solution gathered is length {len(solution_array)}."
)
# Reshape the flattened array back into the transposed shape
transposed_shape = (len(solution_array) // 2, 2)
reshaped = solution_array.reshape(transposed_shape)
# Transpose it back to its original 2-row shape
original_2_row_shape = reshaped.T
# Combine the real and imaginary parts back into complex numbers
real_parts = original_2_row_shape[0]
imaginary_parts = original_2_row_shape[1]
original_complex_array = real_parts + 1j * imaginary_parts
reshaped_array = original_complex_array.reshape(self._nof_antennas, 4)
return reshaped_array
[docs] def scan(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
*,
interface: Optional[str] = None,
subarray_id: int,
scan_id: int,
start_time: Optional[str] = None,
duration: Optional[float] = 0.0,
) -> tuple[TaskStatus, str]:
"""
Submit the Scan slow task.
This method returns immediately after it is submitted for
execution.
:param interface: the schema version this is running against.
:param subarray_id: The subarray for whic the command applies
:param scan_id: The ID for this scan
:param start_time: UTC time for begin of scan, None for immediate start
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:param task_callback: Update task state, defaults to None
:return: Task status and response message
"""
return self.submit_task(
self._scan,
args=[subarray_id, scan_id, start_time, duration],
task_callback=task_callback,
)
def _scan(
self: StationComponentManager,
subarray_id: int,
scan_id: int,
start_time: Optional[str],
duration: float,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Execute the Scan slow task.
:param subarray_id: The subarray for whic the command applies
:param scan_id: The ID for this scan
:param start_time: UTC time for begin of scan, None for immediate start
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
# compute and save channel block mask: which blocks belong to the subarray
mask = 0
for block, table_entry in enumerate(self._beamformer_table):
if subarray_id == table_entry[2] and table_entry[0] != 0:
mask += 1 << block
self._scan_mask[subarray_id] = mask
self._scan_id[subarray_id] = scan_id
if duration == 0:
duration = -1 # this is the "infinite" time in SpsStation
self.logger.debug(f"Starting beamformer for scan {scan_id} mask {hex(mask)}")
assert self._sps_station_proxy
self._sps_station_proxy.start_beamformer(scan_id, start_time, duration, mask)
if task_callback is not None:
task_callback(
TaskStatus.COMPLETED,
result="Scan has completed.",
)
[docs] def end_scan(
self: StationComponentManager,
subarray_id: int,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the EndScan slow task.
This method returns immediately after it is submitted for
execution.
:param subarray_id: The subarray for which the command applies
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
:return: Task status and response message
"""
if subarray_id < 1 or subarray_id > 16:
self.logger.error(f"Invalid subarray ID {subarray_id}")
return (TaskStatus.REJECTED, "Invalid subscan ID")
return self.submit_task(
self._end_scan,
args=[subarray_id],
task_callback=task_callback,
)
def _end_scan(
self: StationComponentManager,
subarray_id: int,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Execute the EndScan slow task.
:param subarray_id: The subarray for which the command applies
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
assert self._sps_station_proxy
# TODO Patch for the demo
self._sps_station_proxy.stop_beamformer(self._scan_mask[subarray_id])
self._scan_id[subarray_id] = 0
self._scan_mask[subarray_id] = 0
if task_callback is not None:
task_callback(
TaskStatus.COMPLETED,
result="EndScan has completed.",
)
[docs] def acquire_data_for_calibration(
self: StationComponentManager,
channel: int,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the AcquireDataForCalibration slow task.
This method returns immediately after it is submitted for
execution.
:param channel: The channel to acquire data for
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
:return: Task status and response message
"""
if channel < 0 or channel > 510:
self.logger.error(f"Invalid channel{channel}")
return (TaskStatus.REJECTED, "Invalid channel")
if self._sps_station_proxy is None:
self.logger.error("There is no SpsStation proxy to collect data.")
return (TaskStatus.REJECTED, "No SpsStation proxy to collect data.")
return self.submit_task(
self._acquire_data_for_calibration,
args=[channel],
task_callback=task_callback,
)
def _acquire_data_for_calibration(
self: StationComponentManager,
channel: int,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Execute the AcquireDataForCalibration slow task.
:param channel: The channel to acquire data for
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
assert self._sps_station_proxy
self._sps_station_proxy.acquire_data_for_calibration(channel)
if task_callback is not None:
task_callback(
status=TaskStatus.COMPLETED,
result="AcquireDataForCalibration has completed.",
)
def _count_blocks(
self: StationComponentManager,
) -> int:
"""
Return the number of blocks required to specify the whole configuration.
It also updates the number of used channels.
:return: the number of blocks up to the last non-empty one.
"""
max_block = 0
n_chans = 0
for n, block in enumerate(self._beamformer_table):
if block[2] != 0:
n_chans += 8
max_block = n + 1
self._number_of_channels = n_chans
return max_block
[docs] def deallocate_subarray(
self: StationComponentManager,
subarray_id: int,
) -> ResultCode:
"""
Clear channels for a station beam in the channel table.
:param subarray_id: subarray_id to clear
:return: a result code and response string
"""
for i, block in enumerate(self._beamformer_table):
if block[2] == subarray_id:
self._beamformer_table[i] = [0] * 7
self._count_blocks() # this updates self:_number_of_channels
return ResultCode.OK
[docs] def start_acquisition(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
*,
start_time: Optional[str] = None,
delay: Optional[int] = 2,
) -> tuple[TaskStatus, str]:
"""
Submit the start acquisition method.
This method returns immediately after it submitted
`self._start_acquisition` for execution.
:param start_time: the time at which to start data acquisition, defaults to None
:param delay: delay start, defaults to 2
:param task_callback: Update task state, defaults to None
:return: a task staus and response message
"""
return self.submit_task(
self._start_acquisition,
args=[start_time, delay],
task_callback=task_callback,
)
@check_communicating
def _start_acquisition(
self: StationComponentManager,
start_time: Optional[str] = None,
delay: Optional[int] = 2,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Start acquisition using slow command.
:param start_time: the time at which to start data acquisition, defaults to None
:param delay: delay start, defaults to 2
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
success = True
parameter_list = {"start_time": start_time, "delay": delay}
json_argument = json.dumps(parameter_list)
assert self._sps_station_proxy is not None
result = self._sps_station_proxy.start_acquisition(json_argument)
if result not in (ResultCode.QUEUED, ResultCode.OK, ResultCode.STARTED):
success = False
if task_callback:
if success:
task_callback(
status=TaskStatus.COMPLETED,
result="Start acquisition has completed",
)
else:
task_callback(
status=TaskStatus.FAILED, result="Start acquisition task failed"
)
return
[docs] def initialise(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
) -> tuple[TaskStatus, str]:
"""
Submit the initialise method.
This method returns immediately after it submitted
`self._initialise` for execution.
:param task_callback: Update task state, defaults to None
:return: a task staus and response message
"""
return self.submit_task(
self._initialise,
args=[],
task_callback=task_callback,
)
@check_communicating
def _initialise(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Initialise using slow command.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
success = True
assert self._sps_station_proxy is not None
result = self._sps_station_proxy.initialise()
if result not in (ResultCode.QUEUED, ResultCode.OK, ResultCode.STARTED):
success = False
if task_callback:
if success:
task_callback(
status=TaskStatus.COMPLETED,
result="Initialisation has completed",
)
else:
task_callback(
status=TaskStatus.FAILED, result="Initialisation task failed"
)
return
def _wait_for_power_state(
self: StationComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for sub-station PowerState to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
assert self._desired_power_state is not None
resolution = 1 # seconds
ticks = int(timeout / resolution)
while self._powering_resources:
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(resolution)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting for PowerState in {timeout} seconds"
)
return TaskStatus.FAILED
self.logger.debug(
f"Waiting for {self._powering_resources} PowerState"
f" {self._desired_power_state.name},"
f" waiting for {ticks*resolution} more seconds"
)
self.logger.debug(f"Waited PowerState for {timeout-ticks*resolution} seconds")
return TaskStatus.COMPLETED
@property
def number_of_channels(self: StationComponentManager) -> int:
"""
Return the total number of channels in the beamformer.
:return: the total numebr of channels
"""
return self._number_of_channels
@property
def beamformer_table(self: StationComponentManager) -> list[list[int]]:
"""
Return the channel table reformatted as would be needed by ConfigureChannels.
:return: reformatted channel table
"""
table: list[list[int]] = []
n_blocks = self._count_blocks()
for n, block in enumerate(self._beamformer_table[:n_blocks]):
table.append([n, *block])
return table
@property
def scan_ids(self: StationComponentManager) -> list[int]:
"""
Return the current scan IDs for each subarray.
:return: list of scan IDs starting from subarray 1, 0 = subarray not scanning
"""
return self._scan_id[1:]
@property
def tileprogrammingstate(self: StationComponentManager) -> tuple[str]:
"""
Return the tileprogrammingstate of the SpsStation.
:return: the tileprogrammingstate of the SpsStation.
"""
assert self._sps_station_proxy is not None
assert self._sps_station_proxy._proxy is not None
return self._sps_station_proxy._proxy._device.tileprogrammingstate