# -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements component management for stations."""
# pylint: disable=too-many-lines
from __future__ import annotations
import functools
import json
import logging
import threading
import time
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Optional, Sequence
import numpy as np
import tango
from astropy.time.core import Time
from ska_control_model import CommunicationStatus, PowerState, ResultCode, TaskStatus
from ska_low_mccs_common import EventSerialiser
from ska_low_mccs_common.communication_manager import CommunicationManager
from ska_low_mccs_common.component import DeviceComponentManager, MccsCommandProxy
from ska_low_mccs_common.utils import threadsafe
from ska_tango_base.base import check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager
from ska_tango_base.type_hints import DevVarLongStringArrayType
from ska_low_mccs.station.point_station import Pointing, StationInformation
from ska_low_mccs.station.station_pointing_manager import (
PointingRequest,
StationPointingManager,
)
__all__ = ["StationComponentManager", "DatabaseSolutionStructureError"]
MAX_NUMBER_OF_CHANNELS = 384
CHANNELS_PER_BLOCK = 8
NUMBER_OF_ANTENNAS = 256
# The number of antenna is always 256 for correlation. Even if the station
# has fewer antenna the correlation matrix is always 256 !
NOF_CORRELATION_ANTENNA = 256
RFC_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
[docs]
class DatabaseSolutionStructureError(ValueError):
"""Raise when solution has unexpected structure."""
class _FieldStationProxy(DeviceComponentManager):
"""A proxy to a FieldStation device, for a station to use."""
# pylint: disable=too-many-arguments
def __init__(
self: _FieldStationProxy,
trl: str,
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
outside_temperature_changed_callback: Callable[[float], None],
event_serialiser: Optional[EventSerialiser] = None,
) -> None:
super().__init__(
trl,
logger,
communication_state_callback,
component_state_callback,
event_serialiser=event_serialiser,
)
self._outside_temperature_changed_callback = (
outside_temperature_changed_callback
)
def get_change_event_callbacks(self) -> dict[str, Callable]:
return {
**super().get_change_event_callbacks(),
"outsideTemperature": self._outside_temperature_changed,
}
@check_communicating
def configure(self: _FieldStationProxy, config: str) -> None:
"""
Configure the device proxy.
:param config: json string of configuration.
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
self._proxy._device.Configure(config)
@property
def outside_temperature(self: _FieldStationProxy) -> int:
"""
Return outsideTemperature.
:return: outsideTemperature
"""
assert self._proxy is not None # for the type checker
return self._proxy.outsideTemperature
def _outside_temperature_changed(
self: _FieldStationProxy,
event_name: str,
event_value: float,
event_quality: tango.AttrQuality,
) -> None:
assert (
event_name.lower() == "outsidetemperature"
), f"outsideTemperature changed callback called but event_name is {event_name}."
self._outside_temperature_changed_callback(event_value)
class _AntennaProxy(DeviceComponentManager):
"""A proxy to a antenna device, for a station to use."""
def __init__(
self: _AntennaProxy,
trl: str,
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
event_serialiser: Optional[EventSerialiser] = None,
) -> None:
self._power_state_lock = threading.RLock()
super().__init__(
trl,
logger,
communication_state_callback,
component_state_callback,
event_serialiser=event_serialiser,
)
@check_communicating
def configure(self: _AntennaProxy, config: str) -> None:
"""
Configure the device proxy.
:param config: json string of configuration.
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
self._proxy._device.Configure(config)
class _StationCalibratorProxy(DeviceComponentManager):
"""A proxy to a station calibrator, for a station to use."""
@check_communicating
def get_calibration(
self: _StationCalibratorProxy,
channel: int,
calibration_id: Optional[str] = None,
) -> np.ndarray:
"""
Get the calibration cooefficients.
:param channel: channel for calibration coefficents.
:param calibration_id: Unique identifier for calibration.
:return: result code of GetCalibration and calibration coefficents
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
argin: dict = {"frequency_channel": channel}
if calibration_id:
argin["calibration_id"] = calibration_id
return self._proxy._device.GetCalibration(json.dumps(argin))
@check_communicating
def get_fitted_solutions(
self: _StationCalibratorProxy,
frequency_channels: list[int],
calibration_id: Optional[str] = None,
) -> np.ndarray:
"""
Get fitted calibration solutions for a set of frequency channels.
:param frequency_channels: channel indices to compute solutions for.
:param calibration_id: optional unique identifier for calibration.
:return: flat float array of shape ``(n_freqs * n_ant * 8,)``.
"""
assert self._proxy is not None
assert self._proxy._device is not None
argin: dict = {"frequency_channels": frequency_channels}
if calibration_id:
argin["calibration_id"] = calibration_id
return self._proxy._device.GetFittedSolutions(json.dumps(argin))
@check_communicating
def get_latest_preferred_job_id(self: _StationCalibratorProxy) -> str:
"""
Return the user_friendly_name of the most recent preferred calibration job.
:return: the user_friendly_name of the most recent preferred job,
or an empty string if none exists.
"""
assert self._proxy is not None
assert self._proxy._device is not None
return str(self._proxy._device.latestPreferredJobId)
# pylint: disable = too-many-public-methods
class _SpsStationProxy(DeviceComponentManager):
"""A proxy to an SpsStation, for a station to use."""
# pylint: disable=too-many-arguments
def __init__(
self: _SpsStationProxy,
trl: str,
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
data_received_result_changed_callback: Callable[..., None],
event_serialiser: Optional[EventSerialiser] = None,
) -> None:
super().__init__(
trl,
logger,
communication_state_callback,
component_state_callback,
event_serialiser=event_serialiser,
)
self._data_received_result_changed_callback = (
data_received_result_changed_callback
)
def get_change_event_callbacks(self) -> dict[str, Callable]:
return {
**super().get_change_event_callbacks(),
"dataReceivedResult": self._data_received_result_changed,
}
@check_communicating
def load_pointing_delays(self: _SpsStationProxy, delays: np.ndarray) -> ResultCode:
"""
Set the pointing delay parameters of this Station's Tiles.
Delay is the geometric delay to be corrected for each antenna, in seconds.
Delay is negative towards source.
:param delays: an array containing a beam index followed by antennar
delays and delay rate pairs for each antenna
:return: result code of LoadPointingDelays
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.LoadPointingDelays(delays)
return result_code
@check_communicating
def apply_pointing_delays(self: _SpsStationProxy, load_time: str) -> ResultCode:
"""
Set the pointing delay parameters of this Station's Tiles.
:param load_time: switch time, in ISO formatted time. Default: now
:return: result code of ApplyPointingDelays
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy.ApplyPointingDelays(load_time)
return result_code
@check_communicating
def set_beamformer_table(
self: _SpsStationProxy, table: np.ndarray
) -> tuple[list[ResultCode], list[Optional[str]]]:
"""
Set the beamformer table which are going to be beamformed into each beam.
region_array is defined as a flattened 2D array, for a maximum of 48 entries.
Each entry corresponds to 8 consecutive frequency channels.
This is equivalent to SetBeamformerRegions, with a different way
to specify the bandwidth of each spectral region.
Input is consistent with the beamformerTable attribute
:param table: list of regions. Each region comprises:
* start_channel - (int) region starting channel, must be even in range 0 to 510
* beam_index - (int) beam used for this region with range 0 to 47
* subarray_id - (int) Subarray
* subarray_logical_channel - (int) logical channel # in the subarray
* subarray_beam_id - (int) ID of the subarray beam
* substation_id - (int) Substation
* aperture_id: ID of the aperture (APXX.YYY)
:return: result code of SetBeamformerTable
"""
assert self._proxy is not None # for the type checker
return self._proxy.SetBeamformerTable(table)
@check_communicating
def set_beamformer_regions(self: _SpsStationProxy, argin: np.ndarray) -> ResultCode:
"""
Set the frequency regions which are going to be beamformed into each beam..
:param argin: list of regions. Each region comprises:
* start_channel - (int) region starting channel, must be even in range 0 to 510
* num_channels - (int) size of the region, must be a multiple of 8
* beam_index - (int) beam used for this region with range 0 to 47
* subarray_id - (int) Subarray
* subarray_logical_channel - (int) logical channel # in the subarray
* subarray_beam_id - (int) ID of the subarray beam
* substation_id - (int) Substation
* aperture_id: ID of the aperture (APXX.YYY)
:return: result code of SetBeamformerRegions
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.SetBeamformerRegions(argin)
return result_code
@check_communicating
def start_beamformer(
self: _SpsStationProxy,
scan_id: int,
start_time: Optional[str],
duration: float = -1.0,
channel_groups: Optional[list[int]] = None,
) -> tuple[ResultCode, str]:
"""
Start the beamformer for the selected channel blocks.
:param scan_id: unique scan ID
:param start_time: ISO-8691 formatted UTC scan start time
:param duration: Scan duration in seconds. If omitted or negative
scan lasts forever
:param channel_groups: Channel groups to which the command applies.
:return: result code of StartBeamformer
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
# TODO Duration is buggy in the MccsTile, is expressed in frames
# of 256*1.08e-6 seconds instead of seconds
if duration < 0.0:
duration_i = -1
else:
duration_i = int(round(duration * 3616.90))
scan_arg: dict[str, int | list[int] | float | str] = {
"scan_id": scan_id,
"duration": duration_i,
}
if channel_groups is not None:
scan_arg["channel_groups"] = channel_groups
if start_time is not None:
scan_arg["start_time"] = start_time
start_beamformer_command = MccsCommandProxy(
self._name, "StartBeamformer", self.logger
)
result_code, message = start_beamformer_command( # type: ignore[return-value]
is_lrc=True, wait_for_result=True, arg=json.dumps(scan_arg)
)
return ResultCode(result_code), message
@check_communicating
def stop_beamformer_for_channels(
self: _SpsStationProxy,
channel_groups: Optional[list[int]] = None,
) -> tuple[ResultCode, str]:
"""
Stop the beamformer for the selected channel blocks.
:param channel_groups: Channel groups to which the command applies.
:return: result code of StopBeamformerForChannels command
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
if channel_groups is not None:
scan_arg = {"channel_groups": channel_groups}
else:
scan_arg = {}
stop_beamformer_command = MccsCommandProxy(
self._name, "StopBeamformerForChannels", self.logger
)
result_code, message = stop_beamformer_command( # type: ignore[return-value]
is_lrc=True, wait_for_result=True, arg=json.dumps(scan_arg)
)
self.logger.debug(
f"StopBeamformerForChannels({json.dumps(scan_arg)}) returned {result_code}"
)
return ResultCode(result_code), message
@check_communicating
def beamformer_running_for_channels(
self: _SpsStationProxy,
channel_groups: Optional[list[int]] = None,
) -> bool:
"""
Check the beamformer irunning status for the selected channel blocks.
:param channel_groups: Channel groups to which the command applies.
:return: whether the beamformer is running
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
if channel_groups is not None:
scan_arg = {"channel_groups": channel_groups}
else:
scan_arg = {}
return self._proxy._device.BeamformerRunningForChannels(json.dumps(scan_arg))
@check_communicating
def start_acquisition(self: _SpsStationProxy, argin: str) -> ResultCode:
"""
Start the acquisition synchronously for all tiles, checks for synchronisation.
:param argin: json dictionary with optional keywords
* start_time - (str) start time
* delay - (int) delay start
:return: result code of StartAcquisition
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.StartAcquisition(argin)
return result_code
@check_communicating
def initialise(self: _SpsStationProxy) -> ResultCode:
"""
Initialise the station's tiles.
:return: result code of Initialise
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.Initialise()
return result_code
@check_communicating
def set_lmc_download(self: _SpsStationProxy, argin: str) -> ResultCode:
"""
Specify whether control data will be transmitted over 1G or 40G networks.
:param argin: json dictionary with optional keywords:
* mode - (string) '1G' or '10G' (Mandatory) (use '10G' for 40G also)
* payload_length - (int) SPEAD payload length for channel data
* destination_ip - (string) Destination IP.
* source_port - (int) Source port for integrated data streams
* destination_port - (int) Destination port for integrated data streams
:return: result code of SetLmcDownload
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.SetLmcDownload(argin)
return result_code
@check_communicating
def set_lmc_integrated_download(self: _SpsStationProxy, argin: str) -> ResultCode:
"""
Configure link and size for integrated data packets, for all tiles.
:param argin: json dictionary with optional keywords:
* mode - (string) '1G' '10G' '40G' - default 40G
* channel_payload_length - (int) SPEAD payload length for integrated
channel data
* beam_payload_length - (int) SPEAD payload length for integrated beam data
* destination_ip - (string) Destination IP
* source_port - (int) Source port for integrated data streams
* destination_port - (int) Destination port for integrated data streams
:return: result code of SetLmcIntegratedDownload
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.SetLmcIntegratedDownload(argin)
return result_code
@check_communicating
def set_csp_ingest(self: _SpsStationProxy, argin: str) -> tuple[ResultCode, str]:
"""
Configure link for beam data packets to CSP.
:param argin: json dictionary with optional keywords:
* destination_ip - (string) Destination IP
* source_port - (int) Source port for integrated data streams
* destination_port - (int) Destination port for integrated data streams
:return: result code of SetCspIngest
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.SetCspIngest(argin)
@check_communicating
def reset_csp_ingest(
self: _SpsStationProxy,
) -> DevVarLongStringArrayType:
"""
Reset link for beam data packets to CSP to defaults.
:return: result code of ResetCspIngest
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.ResetCspIngest()
@check_communicating
def csp_ingest_config(self: _SpsStationProxy) -> str:
"""
Return the current CspIngest configuration.
:return: current CspIngest configuration
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.CspIngestConfig
@check_communicating
def static_delays(self: _SpsStationProxy) -> list[float]:
"""
Return the current static delays.
:return: current static delays
"""
assert self._proxy is not None
return self._proxy.staticTimeDelays
@check_communicating
def daq_path(self: _SpsStationProxy) -> str:
"""
Report the DAQ path in use for this station.
:return: DAQ path.
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.DaqPath
@check_communicating
def configure_integrated_channel_data(
self: _SpsStationProxy, argin: str
) -> ResultCode:
"""
Configure and start the transmission of integrated channel data.
Using the provided integration time, first channel and last channel.
Data are sent continuously until the StopIntegratedData command is run.
:param argin: json dictionary with optional keywords:
* integration_time - (float) in seconds (default = 0.5)
* first_channel - (int) default 0
* last_channel - (int) default 511
:return: result code of ConfigureIntegratedChannelData
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.ConfigureIntegratedChannelData(argin)
return result_code
@check_communicating
def stop_integrated_data(self: _SpsStationProxy) -> ResultCode:
"""
Stop the integrated data.
:return: result code of StopIntegratedData
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.StopIntegratedData()
return result_code
@check_communicating
def send_data_samples(self: _SpsStationProxy, argin: str) -> ResultCode:
"""
Transmit a snapshot containing raw antenna data.
:param argin: json dictionary with optional keywords:
* data_type - type of snapshot data (mandatory): "raw", "channel",
"channel_continuous", "narrowband", "beam"
* start_time - Time (UTC string) to start sending data. Default immediately
* seconds - (float) Delay if timestamp is not specified. Default 0.2 seconds
Depending on the data type:
raw:
* sync: bool: send synchronised samples for all antennas, vs. round robin
larger snapshot from each antenna
channel:
* n_samples: Number of samples per channel, default 1024
* first_channel - (int) first channel to send, default 0
* last_channel - (int) last channel to send, default 511
channel_continuous
* channel_id - (int) channel_id (Mandatory)
* n_samples - (int) number of samples to send per packet, default 128
narrowband:
* frequency - (int) Sky frequency for band centre, in Hz (Mandatory)
* round_bits - (int) Specify whow many bits to round
* n_samples - (int) number of spectra to send
:return: result code of SendDataSamples
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.SendDataSamples(argin)
return result_code
@check_communicating
def stop_data_transmission(
self: _SpsStationProxy,
) -> ResultCode:
"""
Stop data transmission from board.
:return: result code of StopDataTransmission
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.StopDataTransmission()
return result_code
@check_communicating
def configure_test_generator(
self: _SpsStationProxy,
argin: str,
) -> ResultCode:
"""
Set the test signal generator.
:param argin: json dictionary with keywords:
* tone_frequency: first tone frequency, in Hz. The frequency
is rounded to the resolution of the generator. If this
is not specified, the tone generator is disabled.
* tone_amplitude: peak tone amplitude, normalized to 31.875 ADC
units. The amplitude is rounded to 1/8 ADC unit. Default
is 1.0. A value of -1.0 keeps the previously set value.
* tone_2_frequency: frequency for the second tone. Same
as ToneFrequency.
* tone_2_amplitude: peak tone amplitude for the second tone.
Same as ToneAmplitude.
* noise_amplitude: RMS amplitude of the pseudorandom Gaussian
white noise, normalized to 26.03 ADC units.
* pulse_frequency: frequency of the periodic pulse. A code
in the range 0 to 7, corresponding to (16, 12, 8, 6, 4, 3, 2)
times the ADC frame frequency.
* pulse_amplitude: peak amplitude of the periodic pulse, normalized
to 127 ADC units. Default is 1.0. A value of -1.0 keeps the
previously set value.
* set_time: time at which the generator is set, for synchronization
among different TPMs. In UTC ISO format (string)
* adc_channels: list of adc channels which will be substituted with
the generated signal. It is a 32 integer, with each bit representing
an input channel. Default: all if at least q source is specified,
none otherwises.
:return: result code of ConfigureTestGenerator
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
[result_code], _ = self._proxy._device.ConfigureTestGenerator(argin)
return result_code
@property
def nof_tiles(self: _SpsStationProxy) -> int:
"""
Return the number of TPMs configured on this station.
:returns: the number of TPM part of this spsstation
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return len(self._proxy._device.get_property("TileFQDNs")["TileFQDNs"])
@property
def antennas_mapping(self: _SpsStationProxy) -> str:
"""
Return the number of TPMs configured on this station.
:returns: the number of TPM part of this spsstation
"""
assert self._proxy is not None # for the type checker
assert self._proxy._device is not None # for the type checker
return self._proxy._device.antennasMapping
def _data_received_result_changed(
self: _SpsStationProxy,
event_name: str,
event_value: list[str],
event_quality: tango.AttrQuality,
) -> None:
if event_name.lower() != "datareceivedresult":
self.logger.error(
"dataReceivedResult callback called, "
f"but event_name is {event_name}."
)
return
self._data_received_result_changed_callback(event_value)
# pylint: disable=too-many-instance-attributes,too-many-public-methods
[docs]
class StationComponentManager(TaskExecutorComponentManager):
"""A component manager for a station."""
# pylint: disable=too-many-arguments, too-many-statements, too-many-locals
[docs]
def __init__(
self: StationComponentManager,
station_id: int,
ref_latitude: float,
ref_longitude: float,
ref_height: float,
field_station_trl: str,
antenna_trls: Sequence[str],
antenna_station_locations: np.ndarray,
antenna_element_ids: list[int],
station_calibrator_trl: str,
sps_station_trl: str,
calibration_load_delay: float,
antenna_masks: list[bool],
use_beam_weights: bool,
logger: logging.Logger,
communication_state_callback: Callable[[CommunicationStatus], None],
component_state_callback: Callable[..., None],
event_serialiser: Optional[EventSerialiser] = None,
) -> None:
"""
Initialise a new instance.
:param station_id: the id of this station
:param ref_latitude: reference latitude of the station.
:param ref_longitude: reference longitude of the station.
:param ref_height: reference ellipsoidal height of the station.
:param field_station_trl: TRL of the Tango device that manages this
station's FieldStation
:param antenna_trls: TRLs of the Tango devices and manage this
station's antennas
:param antenna_station_locations: array of the x, y, z positions of the antennas
:param antenna_element_ids: list of the element IDs of the antennas
:param station_calibrator_trl: TRL of the Tango devices and manage this
station's station calibrator
:param sps_station_trl: TRL of the Tango devices and manage this
station's Spshw station
:param calibration_load_delay: The amount of seconds in the future for the
calibration solutions to be applied.
:param antenna_masks: the antenna masks to be masked from all beams in the
beamformerTable.
:param use_beam_weights: True if we are prototyping
the beam weighting feature.
:param logger: the logger to be used by this object.
:param communication_state_callback: callback to be
called when the status of the communications channel
between the component manager and its component changes
:param component_state_callback: callback to be
called when the component state changes
:param event_serialiser: the event serialiser to be used by this object.
"""
self._additional_debug: bool = False
self._event_serialiser = event_serialiser
self._power_state_lock = threading.RLock()
self._station_id = station_id
self._field_station_proxy: Optional[_FieldStationProxy] = None
self._sps_station_proxy: Optional[_SpsStationProxy] = None
self._station_calibrator_proxy: Optional[_StationCalibratorProxy] = None
self._field_station_power_state: Optional[PowerState] = None
self._sps_station_power_state: Optional[PowerState] = None
self._station_calibrator_power_state: Optional[PowerState] = None
self._field_station_trl = field_station_trl
self._sps_station_trl = sps_station_trl
self._is_configured = False
self._field_station_on_called = False
self.outside_temperature: Optional[float] = None
self.data_received_result: Optional[tuple[str, str]] = ("", "")
self._is_communicating: bool = False
self._ref_longitude: float = ref_longitude
self._ref_latitude: float = ref_latitude
self._ref_height: float = ref_height
self._pointing_delays: Optional[list[list]] = [[]]
self._scan_id = [0] * 17
self._channel_groups: list[list[int] | None] = [None] * 17
self._nof_subdevices = 0
self._use_beam_weights = use_beam_weights
self.inactive_cal_id: str = ""
self.active_cal_id: str = ""
self._powering_resources: set[str] = set() # this set of stations should go
self._desired_power_state: Optional[PowerState] = None # to this state
self.stop_ids: dict[int, bool] = {}
self.tracking_threads: dict[int, threading.Thread] = {}
self._antenna_xyz = np.array(antenna_station_locations)
self._antenna_element_ids = np.array(antenna_element_ids)
self._nof_antennas = self._antenna_xyz.shape[0]
self._component_state_callback = component_state_callback
self._communication_state_callback = communication_state_callback
# beam_idx + 2*(number of antennas)
self.last_pointing_delays = [0.0] * (1 + 2 * NUMBER_OF_ANTENNAS)
self._communication_states = {}
if antenna_trls != [""]:
self._communication_states = {
trl: CommunicationStatus.DISABLED for trl in list(antenna_trls)
}
self._antenna_power_states = {trl: PowerState.UNKNOWN for trl in antenna_trls}
self._antenna_proxies = {
antenna_trl: _AntennaProxy(
antenna_trl,
logger,
functools.partial(
self._device_communication_state_changed, antenna_trl
),
functools.partial(component_state_callback, trl=antenna_trl),
event_serialiser=self._event_serialiser,
)
for antenna_trl in antenna_trls
}
self._nof_subdevices += self._nof_antennas
self.calibration_load_delay = calibration_load_delay
# self._station_calibrator_power_state = PowerState.UNKNOWN
if field_station_trl != "":
self._communication_states[field_station_trl] = CommunicationStatus.DISABLED
self._field_station_power_state = PowerState.UNKNOWN
self._field_station_proxy = _FieldStationProxy(
self._field_station_trl,
logger,
functools.partial(
self._device_communication_state_changed, self._field_station_trl
),
functools.partial(
component_state_callback, trl=self._field_station_trl
),
self._field_station_outside_temperature_changed,
event_serialiser=self._event_serialiser,
)
else:
self._field_station_proxy = None
self._field_station_power_state = None
self._station_calibrator_trl = station_calibrator_trl
if station_calibrator_trl != "":
self._station_calibrator_proxy = _StationCalibratorProxy(
station_calibrator_trl,
logger,
functools.partial(
self._device_communication_state_changed,
self._station_calibrator_trl,
),
self._station_calibrator_power_state_changed,
event_serialiser=self._event_serialiser,
)
if sps_station_trl != "":
self._communication_states[sps_station_trl] = CommunicationStatus.DISABLED
self._sps_station_power_state = PowerState.UNKNOWN
self._sps_station_trl = sps_station_trl
self._sps_station_proxy = _SpsStationProxy(
sps_station_trl,
logger,
functools.partial(
self._device_communication_state_changed, sps_station_trl
),
functools.partial(component_state_callback, trl=sps_station_trl),
self._sps_station_data_received_result_changed,
event_serialiser=self._event_serialiser,
)
self._nof_subdevices += 1
else:
self._sps_station_power_state = None
self._sps_station_proxy = None
# Aggregate configuration tables
# Channel table: one entry per channel block (48 total)
# Bidimensional array of one row for each 8 channels, with elements:
# 0. start physical channel
# 1. beam number
# 2. subarray ID
# 3. subarray_logical_channel
# 4. subarray_beam_id
# 5. substation_id
# 6. aperture_id
self._beamformer_table: list[list[int]] = [[0] * 7 for _ in range(48)]
self._number_of_channels = 0
# This matrix of shape (256, 384)
# represents the weights for each antenna per logical channel.
self.configured_weighting_block = np.ones((256, 384), dtype=complex)
self._loaded_weighting_block = np.full((256, 384), -1.0, dtype=complex)
self.applied_weighting_block = np.full((256, 384), -1.0, dtype=complex)
# Antenna weights on a per-beam basis (48 beams × 256 antennas)
self._beam_antenna_weights = np.ones((48, 256), dtype=complex)
# Antennas to mask from all beams.
self._antenna_masks = np.array(antenna_masks)
self._pointing_helper: Pointing = self.setup_pointing_helper()
self._pointing_manager = StationPointingManager(
logger=logger,
pointing_helper=self._pointing_helper,
failed_pointing_updates_callback=self._update_failed_pointing_updates,
pointing_thread_cadence=0.1, # seconds
sps_station_proxy=self._sps_station_proxy,
nof_antennas=self._nof_antennas,
)
super().__init__(
logger,
communication_state_callback,
component_state_callback,
power=None,
fault=None,
configuration_changed=None,
is_configured=None,
failed_pointing_updates=None,
)
self._communication_manager = CommunicationManager(
self._update_communication_state,
self._update_component_state,
self.logger,
(
{self._sps_station_trl: self._sps_station_proxy}
if self._sps_station_proxy is not None
else {}
),
(
{self._field_station_trl: self._field_station_proxy}
if self._field_station_proxy is not None
else {}
),
(
{self._station_calibrator_trl: self._station_calibrator_proxy}
if self._station_calibrator_proxy is not None
else {}
),
self._antenna_proxies,
)
def _update_failed_pointing_updates(
self: StationComponentManager, failed_pointing_updates: dict[int, int]
) -> None:
"""
Update the number of failed pointing updates on a per beam basis.
:param failed_pointing_updates: the number of failed pointing updates.
"""
assert self._component_state_callback is not None
self._component_state_callback(failed_pointing_updates=failed_pointing_updates)
def _update_weight_matrix(
self: StationComponentManager, msg: str | None = None
) -> None:
"""
Recalculate the weighting matrix.
Using beamformer configuration and antenna weights, populate
a matrix. Antenna masks are applied after weights are set.
:param msg: a message to log on success.
"""
# Iterate through each beamformer table block (48 in total)
for block in self._beamformer_table:
# index 3 is the starting logical channel (0–376, step 8)
subarray_logical_channel: int = block[3]
# Index 4 gives the subarray beam ID used to index the weights
subarray_beam_id: int = block[4]
# check start physical channel and subarray_id. 0 signifies unused.
# start physical channel [index: 0], subarray_id [index: 2]
if block[0] == 0 and block[2]:
# channel set to 0 marks unused (unconfigured) table entries.
# These must have a unit weight for all unmasked antenna.
block_weights = np.ones(256, dtype=complex)
else:
# Get the per-antenna weights for the beam
block_weights = np.array(self._beam_antenna_weights[subarray_beam_id])
# Reshape weights to (256, 1) so they can be broadcast across 8 channels
formatted_block_weights = block_weights.reshape(256, 1)
# Apply the weights to the 8 consecutive channels in the matrix
self.configured_weighting_block[
:, subarray_logical_channel : subarray_logical_channel + 8
] = formatted_block_weights
self.configured_weighting_block[self.antenna_masks, :] = 0
self.logger.info(msg or "Weighting matrix updated!")
[docs]
def load_beam_weights(
self: StationComponentManager,
subarray_beam_id: int,
antenna_weights: list[complex],
) -> None:
"""
Load beam weights to the weighting matrix.
:TODO Consider a station with fewer than 256 antenna. Or not
Do we just mask the rest?
:param subarray_beam_id: The beam we are loading the weights for (1-48)
:param antenna_weights: A list of the 256 antenna weights to apply.
"""
# Load the antenna weight for a beam.
self.logger.info(f"load_beam_weights for {subarray_beam_id}")
self._beam_antenna_weights[subarray_beam_id] = antenna_weights
# Using the beam antenna weight populate a matrix of (256 * 384)
# continaing the weights for each channel per antenna, masking out
# any antennas masked.
self._update_weight_matrix(f"Weighting matric updated for {subarray_beam_id=}")
[docs]
def setup_pointing_helper(self: StationComponentManager) -> Pointing:
"""
Set up the pointing helper.
:returns: A Pointing helper object.
"""
assert (
self.ref_longitude is not None
and self.ref_latitude is not None
and self.ref_height is not None
)
station_helper = StationInformation()
station_helper.set_location(
self.ref_latitude, self.ref_longitude, self.ref_height
)
station_helper.load_displacements_arrays(
self._antenna_xyz,
self._antenna_element_ids,
)
return Pointing(station_helper)
[docs]
def start_communicating(self: StationComponentManager) -> None:
"""Establish communication with the station components."""
self._communication_manager.start_communicating()
[docs]
def stop_communicating(self: StationComponentManager) -> None:
"""Break off communication with the station components."""
self._communication_manager.stop_communicating()
def _device_communication_state_changed(
self: StationComponentManager,
trl: str,
communication_state: CommunicationStatus,
) -> None:
self._communication_manager.update_communication_status(
trl, communication_state
)
def _field_station_outside_temperature_changed(
self: StationComponentManager,
outside_temperature: float,
) -> None:
self.outside_temperature = outside_temperature
if self._component_state_callback is not None:
self._component_state_callback(outside_temperature=self.outside_temperature)
def _sps_station_data_received_result_changed(
self: StationComponentManager,
data_received_result: tuple[str, str],
) -> None:
self.data_received_result = data_received_result
if self._component_state_callback is not None:
self._component_state_callback(
data_received_result=self.data_received_result
)
@threadsafe
def _station_calibrator_power_state_changed(
self: StationComponentManager,
power: PowerState | None = None,
fault: bool | None = None,
**kwargs: Any,
) -> None:
# Station calibrator is a software only device and should not roll up
# Into the PowerState of the MccsStation. We will log a message and do nothing.
self._station_calibrator_power_state = power
if power is not None:
self.logger.info(
f"Station calibrator reports its power as {PowerState(power).name}."
)
@threadsafe
def _antenna_power_state_changed(
self: StationComponentManager,
trl: str,
power_state: PowerState,
) -> None:
with self.power_state_lock:
self._antenna_power_states[trl] = power_state
@threadsafe
def _field_station_power_state_changed(
self: StationComponentManager,
power_state: PowerState,
) -> None:
with self._power_state_lock:
self._field_station_power_state = power_state
if (
self._field_station_trl in self._powering_resources
and power_state == self._desired_power_state
):
self._powering_resources.remove(self._field_station_trl)
self._evaluate_power_state()
# Antennas should just be passively reflecting the
# power state of their smartbox port?
# if power_state is PowerState.ON and self._field_station_on_called:
# self._field_station_on_called = False
# _ = self._turn_on_antennas()
@threadsafe
def _sps_station_power_state_changed(
self: StationComponentManager,
power_state: PowerState,
) -> None:
with self._power_state_lock:
self._sps_station_power_state = power_state
if power_state == PowerState.STANDBY:
self.logger.warning(
"There is no way for MCCS to turn SpsStation 'OFF'. "
"Due to there being no control over the subrack PDU. "
"At MccsStation we interpret STANDBY as OFF "
)
if (
self._sps_station_trl in self._powering_resources
and self._desired_power_state == PowerState.OFF
):
self._powering_resources.remove(self._sps_station_trl)
else:
if (
self._sps_station_trl in self._powering_resources
and power_state == self._desired_power_state
):
self._powering_resources.remove(self._sps_station_trl)
self._evaluate_power_state()
def _evaluate_power_state(
self: StationComponentManager,
) -> None:
with self.power_state_lock:
power_states = []
if (
self._field_station_proxy is not None
and self._field_station_power_state is not None
):
power_states.append(self._field_station_power_state)
if (
self._sps_station_proxy is not None
and self._sps_station_power_state is not None
):
if self._sps_station_power_state == PowerState.STANDBY:
# Handle special case due to there being no such thing
# as SpsStation OFF. The reason for this is that there
# is no control over the subrack PDU
# TODO: SP-4050
self.logger.warning(
"When evaluating MccsStation Power, "
"we classify SpsStation `STANDBY` as `OFF`. "
"This is required since MCCS has no control over "
"the subracks' PDU."
)
power_states.append(PowerState.OFF)
else:
power_states.append(self._sps_station_power_state)
if all(power_state == PowerState.ON for power_state in power_states):
evaluated_power_state = PowerState.ON
elif all(power_state == PowerState.OFF for power_state in power_states):
evaluated_power_state = PowerState.OFF
elif all(power_state == PowerState.STANDBY for power_state in power_states):
evaluated_power_state = PowerState.STANDBY
elif len(power_states) == 0:
evaluated_power_state = PowerState.ON
else:
evaluated_power_state = PowerState.UNKNOWN
self.logger.debug(
"In StationComponentManager._evaluatePowerState with:\n"
f"\tspsStation: {str(self._sps_station_power_state)}\n"
f"\tfieldStation: {str(self._field_station_power_state)}\n "
f"\tresult: {str(evaluated_power_state)}"
)
self._update_component_state(power=evaluated_power_state)
@property
def antenna_masks(self: StationComponentManager) -> np.ndarray:
"""
Return the antennas masked from all beams.
:return: the masked antenna.
"""
return self._antenna_masks
@antenna_masks.setter
def antenna_masks(self: StationComponentManager, masks: np.ndarray) -> None:
"""
Set the antennas masked from all beams.
:param masks: the masks to apply
"""
self._antenna_masks = masks
self._update_weight_matrix()
@property
def additional_debug(self: StationComponentManager) -> bool:
"""
Return whether additional debug information is enabled.
:return: True if additional debug information is enabled, False otherwise.
"""
return self._additional_debug
@additional_debug.setter
def additional_debug(self: StationComponentManager, argin: bool) -> None:
"""
Set whether additional debug information is enabled.
:param argin: True to enable additional debug information, False to disable.
"""
self._additional_debug = argin
self._pointing_manager._additional_debug = argin
@property
def power_state_lock(self: StationComponentManager) -> threading.RLock:
"""
Return the power state lock of this component manager.
:return: the power state lock of this component manager.
"""
return self._power_state_lock
@property
def power_state(self: StationComponentManager) -> Optional[PowerState]:
"""
Return my power state.
:return: my power state
"""
return self._component_state["power"]
[docs]
@check_communicating
def do_off(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Turn off this station.
The order to turn a station on is: FieldStation, then tiles and
antennas.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
task_status = TaskStatus.COMPLETED
error_log = ""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
self._desired_power_state = PowerState.OFF
if (
self._field_station_proxy
and self._field_station_power_state != PowerState.OFF
):
task_status, msg = self._field_station_proxy.off() # type: ignore
self.logger.info(f"FieldStation off command TaskStatus: {task_status.name}")
self._powering_resources.add(self._field_station_trl)
if task_status not in [TaskStatus.COMPLETED, TaskStatus.QUEUED]:
error_log += (
f"FieldStation failed to complete off command with message: {msg}\n"
)
if self._sps_station_proxy and self._sps_station_power_state != PowerState.OFF:
self.logger.warning(
"There is no way for MCCS to turn `OFF` SpsStation, "
"due to there being no control over the subracks' PDU. "
"At MccsStation we interpret SpsStation STANDBY as OFF. "
"We are sending the Standby command to SpsStation."
)
task_status, msg = self._sps_station_proxy.standby()
self.logger.info(
f"SpshwStation standby command TaskStatus: {task_status.name}"
)
self._powering_resources.add(self._sps_station_trl)
if task_status not in [TaskStatus.COMPLETED, TaskStatus.QUEUED]:
error_log += (
"SpsStation failed to complete standby command with "
f"message: {msg}\n"
)
# Check for no errors before waiting
if error_log == "":
task_status = self._wait_for_power_state(600)
if task_status not in [TaskStatus.COMPLETED, TaskStatus.QUEUED]:
error_log += "Failed to reach correct power state after 600 seconds\n"
if error_log == "":
result_code = ResultCode.OK
result_msg = "Off command completed successfully"
else:
result_code = ResultCode.FAILED
task_status = TaskStatus.FAILED
result_msg = f"Off command failed: {error_log}"
if task_callback:
task_callback(status=task_status, result=(result_code, result_msg))
[docs]
@check_communicating
def do_on(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Turn on this station.
The order to turn a station on is: FieldStation, then tiles and
antennas.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
task_status = TaskStatus.COMPLETED
error_log = ""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
self._desired_power_state = PowerState.ON
if (
self._field_station_proxy
and self._field_station_power_state != PowerState.ON
):
task_status, msg = self._field_station_proxy.on() # type: ignore
self.logger.info(f"FieldStation on command TaskStatus: {task_status.name}")
self._powering_resources.add(self._field_station_trl)
if task_status not in [TaskStatus.COMPLETED, TaskStatus.QUEUED]:
error_log += (
f"FieldStation failed to complete on command with message: {msg}\n"
)
if self._sps_station_proxy and self._sps_station_power_state != PowerState.ON:
task_status, msg = self._sps_station_proxy.on()
self.logger.info(f"SpshwStation on command TaskStatus: {task_status.name}")
self._powering_resources.add(self._sps_station_trl)
if task_status not in [TaskStatus.COMPLETED, TaskStatus.QUEUED]:
error_log += (
f"SpsStation failed to complete on command with message: {msg}\n"
)
# Check for no errors before waiting
if error_log == "":
task_status = self._wait_for_power_state(600)
if task_status not in [TaskStatus.COMPLETED, TaskStatus.QUEUED]:
error_log += "Failed to reach correct power state after 600 seconds\n"
if error_log == "":
result_code = ResultCode.OK
result_msg = "On command completed successfully"
else:
result_code = ResultCode.FAILED
task_status = TaskStatus.FAILED
result_msg = f"On command failed: {error_log}"
if task_callback:
task_callback(status=task_status, result=(result_code, result_msg))
@check_communicating
def _turn_on_antennas(
self: StationComponentManager,
) -> ResultCode:
"""
Turn on antennas if not already on.
:return: a result code
"""
with self.power_state_lock:
if not all(
power_state == PowerState.ON
for power_state in self._antenna_power_states.values()
):
results = [proxy.on() for proxy in self._antenna_proxies.values()]
if ResultCode.FAILED in results:
return ResultCode.FAILED
return ResultCode.QUEUED
[docs]
@check_communicating
@check_on
def apply_pointing_delays(
self: StationComponentManager,
load_time: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Load the pointing delay at a specified time.
:param load_time: time at which to load the pointing delay
:param task_callback: Update the task state, defaults to None
:param task_abort_event: Abort the task
"""
assert self._sps_station_proxy
if task_callback:
task_callback(
status=TaskStatus.IN_PROGRESS,
)
result = self._sps_station_proxy.apply_pointing_delays(load_time)
if result in [ResultCode.OK, ResultCode.STARTED, ResultCode.QUEUED]:
task_status = TaskStatus.COMPLETED
msg = "ApplyPointingDelays command completed"
else:
task_status = TaskStatus.FAILED
msg = "ApplyPointingDelays command failed"
if task_callback:
task_callback(status=task_status, result=(ResultCode.OK, msg))
@property # type: ignore[misc]
@check_communicating
def is_configured(self: StationComponentManager) -> bool:
"""
Return whether this station component manager is configured.
:return: whether this station component manager is configured.
"""
# TODO: the station can be configured for some subarrays and not for others.
# should be changed to a vector attribute or removed
return self._is_configured
@property # type: ignore[misc]
def ref_longitude(self: StationComponentManager) -> float:
"""
Return whether this stations longitude.
:return: this stations longitude.
"""
return self._ref_longitude
@property # type: ignore[misc]
def ref_latitude(self: StationComponentManager) -> float:
"""
Return whether this stations latitude.
:return: this stations latitude.
"""
return self._ref_latitude
@property # type: ignore[misc]
def ref_height(self: StationComponentManager) -> float:
"""
Return whether this stations height.
:return: this stations height.
"""
return self._ref_height
@property
def pointing_update_timing_history(self: StationComponentManager) -> str:
"""
Return the pointing update timing history as JSON.
:return: JSON string containing timing history for recent pointing updates.
"""
return json.dumps(self._pointing_manager.update_timing_history)
[docs]
def set_pointing_update_timing_history_size(
self: StationComponentManager, history_size: int
) -> None:
"""
Set the number of pointing update timing records to keep.
:param history_size: Number of timing records to keep (must be > 0)
"""
self._pointing_manager.set_update_timing_history_size(history_size)
def _update_station_configs(
self: StationComponentManager,
configuration: Optional[dict],
) -> None:
"""
Update the config for the station device.
:param configuration: dict containing the config of the device
"""
if configuration is None:
return
if self._component_state_callback is not None:
self._is_configured = True
self._ref_longitude = (
configuration.get("refLongitude") or self._ref_longitude
)
self._ref_latitude = configuration.get("refLatitude") or self._ref_latitude
self._ref_height = configuration.get("refHeight") or self._ref_height
def _update_children_configs(
self: StationComponentManager,
field_station_config: Optional[dict],
antenna_config: Optional[dict],
) -> None:
"""
Update the config for the station device.
:param field_station_config: Configuration specification
for the field station device.
:param antenna_config: Configuration specification for the antenna deviced.
"""
for trl in self._antenna_proxies.keys():
self._antenna_proxies[trl].on()
if field_station_config is not None:
if self._field_station_proxy:
assert self._field_station_proxy._proxy is not None
self._field_station_proxy._proxy.Configure(
json.dumps(field_station_config)
)
else:
self.logger.error(
"Attempted to update config of non-existent FieldStation"
)
if antenna_config:
for trl in self._antenna_proxies.keys():
config = antenna_config.get(trl, None)
if config is not None:
self._antenna_proxies[trl].configure(
json.dumps({"antenna_config": config, "tile_config": {}})
)
# TODO: This needs to be implemented in SpsStation
# tiles_config = configuration.get("tiles")
# if tiles_config:
# for trl in self._tile_proxies.keys():
# config = tiles_config[trl]
# self._tile_proxies[trl].configure(json.dumps(config))
[docs]
def get_pointing_delays(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
*,
interface: Optional[str] = None,
pointing_type: str,
values: dict | str,
time_step: float = 10.0,
reference_time: Optional[str] = None,
) -> np.ndarray:
"""
Get the pointing delays for this station.
:param interface: the schema version this is running against.
:param pointing_type: the type of pointing requested
:param values: the pointing values, either in alt_az or ra_dec
:param reference_time: time in which coordinates are equal,
in ISO8601 formatted astropy.Time time
:param time_step: How long between each time step in seconds
:param task_callback: callback to signal end of command
:return: list of pointing delays
"""
if reference_time:
start_time = Time(reference_time, format="isot", scale="utc")
else:
start_time = Time.now()
pointing_delays, delay_rates = self._pointing_manager._get_pointing_delays(
pointing_type, values, start_time, time_step
)
# Don't want to return the fake beam number.
return self._pointing_manager._construct_delays(
pointing_delays, delay_rates, 0
)[1:]
[docs]
def verify_values(
self: StationComponentManager, pointing_type: str, values: dict | str
) -> bool:
"""
Verify if the pointing values are valid.
:param pointing_type: the type of pointing requested
:param values: Coordinates for object to be tracked
:return: If the pointing values are valid
"""
reject_command = False
if pointing_type == "alt_az":
if not isinstance(values, dict):
reject_command = True
if "altitude" not in values or "azimuth" not in values:
reject_command = True
elif pointing_type == "ra_dec":
if not isinstance(values, dict):
reject_command = True
if "right_ascension" not in values or "declination" not in values:
reject_command = True
elif pointing_type == "tle":
if not isinstance(values, dict):
reject_command = True
if "line1" not in values or "line2" not in values:
reject_command = True
elif pointing_type == "special":
if not isinstance(values, str):
reject_command = True
if values not in self._pointing_manager.SPECIAL_TARGETS:
reject_command = True
return reject_command
[docs]
def verify_antennas_mapping(
self: StationComponentManager, task_callback: Optional[Callable] = None
) -> tuple[ResultCode | TaskStatus, str]:
"""
Verify that the antenna mapping available is correct.
:param task_callback: Update task state, defaults to None
:return: True if the mapping is correct, False otherwise
"""
if not self._sps_station_proxy:
return (
TaskStatus.REJECTED,
(
f"Failed to configure station {self._station_id}. "
f"Missing SPS Station Proxy"
),
)
if self._sps_station_proxy.antennas_mapping is None:
return (
TaskStatus.REJECTED,
(
f"Failed to configure station {self._station_id}. "
f"Please check the AntennaConfigURI property: "
"missing antenna mapping object"
),
)
antenna_map = json.loads(self._sps_station_proxy.antennas_mapping)
if antenna_map == {}:
return (
TaskStatus.REJECTED,
(
f"Failed to configure station {self._station_id}. "
f"Please check the AntennaConfigURI property: "
"missing antenna mapping values"
),
)
if len(antenna_map) != self._nof_antennas:
return (
TaskStatus.REJECTED,
(
f"Failed to configure station {self._station_id}. "
f"Please check the AntennaConfigURI property: "
"antenna mappings has an incorrect number of antennas:"
f"number of antenna mappings: {len(antenna_map)} "
f"number of antennas: {self._nof_antennas} "
),
)
for antenna_id, antenna_values in antenna_map.items():
antenna_attributes = ["tpm", "tpm_x_channel", "tpm_y_channel", "delay"]
for item in antenna_attributes:
if item not in antenna_values.keys():
return (
TaskStatus.REJECTED,
(
f"Failed to configure station {self._station_id}. "
f"Please check the AntennaConfigURI property: "
f"missing {item} values from antenna {antenna_id}"
),
)
return (
ResultCode.OK,
f"Antenna map check passed for station {self._station_id}.",
)
[docs]
def track_object(
self: StationComponentManager,
pointing_type: str,
values: dict | str,
reference_time: str,
station_beam_number: int = 1,
scan_time: float = 86400.0,
time_step: float = 10.0,
) -> ResultCode:
"""
Track the object in the sky.
:param pointing_type: the type of pointing requested
:param values: Coordinates for object to be tracked
:param scan_time: Time to scan object in seconds
:param reference_time: time in which coordinates are equal,
in ISO8601 formatted astropy.Time time
:param station_beam_number: The station beam number to be used
:param time_step: How long between each time step in seconds
:return: ResultCode.OK on success, ResultCode.REJECTED on failure
"""
if time_step <= 0:
self.logger.warning(
f"Invalid time_step {time_step} provided to track_object. "
"Using default value of 10 seconds."
)
time_step = 10.0
pointing_request = PointingRequest(
pointing_type=pointing_type,
values=values,
reference_time=reference_time,
time_step=time_step,
scan_time=scan_time,
station_beam_id=station_beam_number,
)
self._pointing_manager.add_pointing_request(pointing_request)
return ResultCode.OK
[docs]
def stop_tracking(
self: StationComponentManager,
station_beam_id: int,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Stop a tracking thread.
:param station_beam_id: the beam id whose tracking you wish to stop.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
if task_callback:
task_callback(
status=TaskStatus.IN_PROGRESS,
result=f"Attempting to stop tracking thread for beam {station_beam_id}",
)
result, msg = self._pointing_manager.stop_tracking(station_beam_id)
if result:
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result=f"Tracking stopped for beam {station_beam_id}",
)
else:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=f"Failed to stop tracking for beam {station_beam_id}: {msg}",
)
[docs]
def stop_tracking_all(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Stop a tracking thread.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
if task_callback:
task_callback(
status=TaskStatus.IN_PROGRESS,
result="Attempting to stop tracking thread for all beams.",
)
self._pointing_manager.stop_tracking_all()
# if succesfully_closed:
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result="Tracking stopped for all beams.",
)
[docs]
@check_communicating
def load_pointing_delays(
self: StationComponentManager,
delays: np.ndarray,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Load the pointing delays for this station.
:param delays: list of delays
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
if task_callback:
task_callback(
status=TaskStatus.IN_PROGRESS,
)
if delays is not None:
self.last_pointing_delays = list(delays)
if self._sps_station_proxy:
self._sps_station_proxy.load_pointing_delays(delays)
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result="LoadPointingDelays command has completed",
)
else:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result="LoadPointingDelays called without SpsStation",
)
else:
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result="Unable to set delays for station",
)
def _notify_abort(
self: StationComponentManager,
msg: str,
task_callback: Optional[Callable],
) -> None:
self.logger.info(msg)
if task_callback:
task_callback(
status=TaskStatus.ABORTED,
result=(ResultCode.ABORTED, msg),
)
# pylint: disable=too-many-return-statements, too-many-branches
[docs]
@check_communicating
def apply_configuration(
self: StationComponentManager,
transaction_id: Optional[str] = None,
calibration_id: Optional[str] = None,
subarray_id: Optional[int] = None,
solution_type: str = "fitted",
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Apply the configuration to the SpsStation.
:param transaction_id: the transaction id for the configuration
:param calibration_id: Unique calibration id.
:param subarray_id: ID of the subarray to which the configuration applies,
default applies for all subarrays
:param solution_type: ``"fitted"`` (default) reconstructs gains from stored
phase fit params; ``"raw"`` loads the stored solution directly.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
"""
if not self._station_calibrator_proxy:
rejected_msg = (
"ApplyConfiguration command was rejected: "
"No station calibrator proxy found. "
f"Station calibrator TRL: {self._station_calibrator_trl}"
)
if task_callback:
task_callback(
status=TaskStatus.REJECTED,
result=(ResultCode.REJECTED, rejected_msg),
)
return
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
self.logger.debug("Loading beamformer table")
assert self._sps_station_proxy
[result_code], [message] = self._sps_station_proxy.set_beamformer_table(
np.array(self._beamformer_table).flatten()
)
if result_code != ResultCode.OK:
message = f"ApplyConfiguration command has failed: {message}"
self.logger.error(message)
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(
ResultCode.FAILED,
message,
),
)
return
if task_abort_event and task_abort_event.is_set():
self._notify_abort(
"ApplyConfiguration aborted after set_beamformer_table",
task_callback,
)
return
# Load a solution. Currently this is a manual activity for the operator.
# There are discussions underway THORN-29 regarding how we can extent the
# solution SelectionPolicies into TMC
load_result_code, message = self._do_load_calibration_coefficients(
calibration_id=calibration_id,
subarray_id=subarray_id,
solution_type=solution_type,
task_abort_event=task_abort_event,
)
if load_result_code == ResultCode.ABORTED:
self._notify_abort(
"Configure aborted during loading of calibration coefficients.",
task_callback,
)
return
if load_result_code != ResultCode.OK:
message = "Failed to load calibration solution."
self.logger.error(message)
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, message),
)
return
if task_abort_event and task_abort_event.is_set():
self._notify_abort(
"ApplyConfiguration aborted before apply_calibration",
task_callback,
)
return
if self.apply_calibration()[0] != ResultCode.OK:
message = "Failed to apply calibration solution."
self.logger.error(message)
if task_callback:
task_callback(
status=TaskStatus.FAILED,
result=(ResultCode.FAILED, message),
)
return
self.active_cal_id, self.inactive_cal_id = (
self.inactive_cal_id,
self.active_cal_id,
)
# TODO ADR-111: call to start_beamformer()
message = "ApplyConfiguration command has completed"
if task_callback:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.OK,
message,
),
)
self.logger.info(message)
@property
def latest_preferred_job_id(self: StationComponentManager) -> str:
"""
Return the user_friendly_name of the most recent preferred calibration job.
:return: the user_friendly_name of the most recent preferred job,
or an empty string if none exists.
"""
if self._station_calibrator_proxy is None:
return ""
return self._station_calibrator_proxy.get_latest_preferred_job_id()
def _get_fitted_solutions(
self: StationComponentManager,
channels_needed: list[int],
calibration_id: Optional[str],
abort_event: Optional[threading.Event] = None,
) -> dict[int, np.ndarray]:
"""
Fetch fitted solutions for all requested channels in a single round-trip.
:param channels_needed: list of frequency channel indices to fetch.
:param calibration_id: optional unique identifier for the calibration.
:param abort_event: An event to check for aborted signal.
:return: mapping from channel index to solution array.
"""
assert self._station_calibrator_proxy is not None
if abort_event and abort_event.is_set():
return {}
try:
raw = self._station_calibrator_proxy.get_fitted_solutions(
channels_needed, calibration_id
)
if len(raw) == 0:
self.logger.warning(
"No fitted solutions returned for requested channels."
)
return {}
stride = NOF_CORRELATION_ANTENNA * 8
fitted_solution = {
ch: np.array(raw[i * stride : (i + 1) * stride])
for i, ch in enumerate(channels_needed)
}
self.logger.info(
"Fitted solutions retrieved for channels: "
f"{list(fitted_solution.keys())}"
)
self.logger.debug(f"{fitted_solution=}")
return fitted_solution
except tango.DevFailed as df:
self.logger.warning(
f"GetFittedSolutions failed, using unit calibration: {repr(df)}"
)
return {}
def _get_raw_solutions(
self: StationComponentManager,
channels_needed: list[int],
calibration_id: Optional[str],
abort_event: Optional[threading.Event] = None,
) -> dict[int, np.ndarray]:
"""
Fetch raw solutions for all requested channels, one call per channel.
:param channels_needed: list of frequency channel indices to fetch.
:param calibration_id: optional unique identifier for the calibration.
:param abort_event: An event to check for aborted signal.
:return: mapping from channel index to solution array.
"""
assert self._station_calibrator_proxy is not None
solutions: dict[int, np.ndarray] = {}
for ch in channels_needed:
if abort_event and abort_event.is_set():
return {}
try:
sol = self._station_calibrator_proxy.get_calibration(ch, calibration_id)
if sol is not None and len(sol) > 0:
solutions[ch] = np.array(sol)
except tango.DevFailed as df:
self.logger.warning(
f"GetCalibration failed for channel {ch}: {repr(df)}"
)
return solutions
# pylint: disable=too-many-branches
def _do_load_calibration_coefficients( # noqa: C901
self: StationComponentManager,
calibration_id: Optional[str],
subarray_id: Optional[int],
solution_type: str,
task_abort_event: Optional[threading.Event],
) -> tuple[ResultCode, str]:
"""
Core calibration loading implementation, without task-lifecycle management.
:param calibration_id: Unique identifier for calibration.
:param subarray_id: ID of the subarray to which the calibration applies,
default applies for all subarrays
:param solution_type: ``"fitted"`` (default) reconstructs gains from stored
phase fit params; ``"raw"`` loads the stored solution directly.
:param task_abort_event: Abort the task
:return: a result code and string.
:raises DatabaseSolutionStructureError: When the solution retrieved from
the database has an unexpected format.
"""
result_message = ""
self.logger.debug("Computing and downloading calibration coefficients")
if calibration_id:
self.logger.info(f"Using calibration ID: {calibration_id}")
calibration_table = np.zeros(
[CHANNELS_PER_BLOCK, NOF_CORRELATION_ANTENNA, 4],
np.csingle,
)
# Copy array to ensure no changes during loading
configured_weighting_block = np.copy(self.configured_weighting_block)
unity_calibration = np.array([1.0, 0.0, 0.0, 1.0], np.csingle)
# Collect all frequency channels needed across every active block so
# we can fetch all fitted solutions in a single round-trip.
channels_needed: list[int] = []
for table_entry in self._beamformer_table:
freq = table_entry[0]
if subarray_id is not None and subarray_id != table_entry[2]:
continue
if freq == 0:
continue
channels_needed.extend(int(freq) + ch for ch in range(CHANNELS_PER_BLOCK))
solutions_by_channel: dict[int, np.ndarray] = {}
if channels_needed:
if solution_type == "fitted":
solutions_by_channel = self._get_fitted_solutions(
channels_needed, calibration_id, abort_event=task_abort_event
)
else:
solutions_by_channel = self._get_raw_solutions(
channels_needed=channels_needed,
calibration_id=calibration_id,
abort_event=task_abort_event,
)
if task_abort_event and task_abort_event.is_set():
return (
ResultCode.ABORTED,
"Aborted in load_calibration_coefficients "
"during retrieval of solutions from database.",
)
# compute and apply calibration
for block, table_entry in enumerate(self._beamformer_table):
frequency_channel = table_entry[0]
beamformer_channel = block * 8
if subarray_id is not None and subarray_id != table_entry[2]:
# If subarray ID is specified, only blocks assigned to
# that subarray are processed.
continue
if frequency_channel == 0:
# frequency channel set to 0 marks unused (unconfigured) table entries.
# These do not require calibration
continue
for channel in range(CHANNELS_PER_BLOCK):
for antenna in range(NOF_CORRELATION_ANTENNA):
calibration_table[channel, antenna] = unity_calibration
for ch in range(CHANNELS_PER_BLOCK): # 8 channels per block
solution_numpy = solutions_by_channel.get(int(frequency_channel))
if solution_numpy is None or len(solution_numpy) == 0:
error_message = (
f"No solution found for channel {int(frequency_channel)}. "
"Using a unit calibration for this frequency channel\n"
)
result_message += error_message
self.logger.warning(error_message)
else:
try:
reshaped_array = self._format_solution_to_complex_jones(
solution_numpy
)
except ValueError as ve:
raise DatabaseSolutionStructureError(
"Bad database entry"
) from ve
self.logger.debug(f"Solution found for {frequency_channel=}")
calibration_table[ch] = reshaped_array
frequency_channel += 1
if self._use_beam_weights:
# Retrieve the weight for a specific antenna and beamformer channel
# multiple by the retreived calibration.
# TODO MCCS-1023
# Actual antenna number should be computed from config database:
# it should be (tile_number)*16 + (tile _port_number), both 0 based
for ch in range(CHANNELS_PER_BLOCK):
weighting_channel = beamformer_channel + ch
for antenna in range(NOF_CORRELATION_ANTENNA):
weight = configured_weighting_block[antenna, weighting_channel]
calibration_table[ch, antenna] *= weight
# write calibration coefficients to station
calibrations = [float(beamformer_channel)]
for element in calibration_table.reshape(-1):
calibrations.append(element.real)
calibrations.append(element.imag)
assert self._sps_station_proxy is not None
command_proxy = MccsCommandProxy(
self._sps_station_proxy._name,
"LoadCalibrationCoefficientsForChannels",
self.logger,
)
result_code, message = command_proxy(
is_lrc=True,
arg=calibrations,
wait_for_result=True,
task_abort_event=task_abort_event,
)
if result_code == ResultCode.ABORTED:
return (
ResultCode.ABORTED,
"LoadCalibrationCoefficientsForChannels command has ABORTED",
)
if result_code != ResultCode.OK:
self.logger.error(
f"Failure in loading calibration coefficient \n\t{message}"
)
return (
ResultCode.FAILED,
"LoadCalibrationCoefficientsForChannels command has failed",
)
self.logger.debug(
f"load_calibration_coefficients for {beamformer_channel=}"
)
for ch in range(
beamformer_channel, beamformer_channel + CHANNELS_PER_BLOCK
):
self._loaded_weighting_block[:, ch] = configured_weighting_block[:, ch]
# End of loop on beamformer blocks
self.logger.info(
"Calibration solutions have been loaded, You may now apply them!"
)
self.inactive_cal_id = calibration_id or ""
return (
ResultCode.OK,
f"LoadCalibrationCoefficients command has completed{result_message}",
)
[docs]
def load_calibration_coefficients(
self: StationComponentManager,
calibration_id: Optional[str] = None,
subarray_id: Optional[int] = None,
solution_type: str = "fitted",
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> tuple[ResultCode, str]:
"""
Load a calibration solution from the store.
NOTE: Is calibration key specified on a per channel basis?
:param calibration_id: Unique identifier for calibration.
:param subarray_id: ID of the subarray to which the calibration applies,
default applies for all subarrays
:param solution_type: ``"fitted"`` (default) reconstructs gains from stored
phase fit params; ``"raw"`` loads the stored solution directly.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Abort the task
:return: a result code and string.
"""
if not self._station_calibrator_proxy:
rejected_msg = (
"LoadCalibrationCoefficients command was rejected: "
"No station calibrator proxy found. "
f"Station calibrator TRL: {self._station_calibrator_trl}"
)
if task_callback:
task_callback(
status=TaskStatus.REJECTED,
result=(ResultCode.REJECTED, rejected_msg),
)
return ResultCode.REJECTED, rejected_msg
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
result_code, message = self._do_load_calibration_coefficients(
calibration_id=calibration_id,
subarray_id=subarray_id,
solution_type=solution_type,
task_abort_event=task_abort_event,
)
if task_callback:
status = {
ResultCode.OK: TaskStatus.COMPLETED,
ResultCode.ABORTED: TaskStatus.ABORTED,
}.get(result_code, TaskStatus.FAILED)
task_callback(status=status, result=(result_code, message))
return result_code, message
[docs]
def apply_calibration(
self: StationComponentManager,
load_time: str = "",
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> tuple[ResultCode, str]:
"""
Apply a solution to the station.
:param task_callback: Update task state, defaults to None
:param load_time: An optional future time to
swap the calibration banks.
:param task_abort_event: Abort the task
:return: ResultCode and response message
"""
assert self._sps_station_proxy is not None
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
command_proxy = MccsCommandProxy(
self._sps_station_proxy._name, "ApplyCalibration", self.logger
)
utc = timezone.utc
if load_time:
# TODO: from Python 3.11, this can be replaced with datetime.fromisoformat()
load_dt = datetime.strptime(load_time, RFC_FORMAT).replace(tzinfo=utc)
else:
load_dt = datetime.now(utc) + timedelta(seconds=self.calibration_load_delay)
load_time = load_dt.strftime(RFC_FORMAT)
return_code, message = command_proxy(is_lrc=False, arg=load_time)
if return_code == ResultCode.OK:
time.sleep(max(0, (load_dt - datetime.now(utc)).total_seconds()))
# TODO: Update the phase centeres on a beam basis.
# This will take substation_weights and antenna_positions
# We will want to propagate this to a TMC interface allowing
# visibility into as applied configuration.
# e.g. Pseudo code
# self.post_change_event(phase_centre = {"subarray_beam": {1: [1, 2, 3]}})
self.applied_weighting_block = np.copy(self._loaded_weighting_block)
if task_callback is not None:
task_callback(status=TaskStatus.COMPLETED, result=(return_code, message))
return (return_code, message)
def _format_solution_to_complex_jones(
self: StationComponentManager,
solution_array: np.ndarray,
) -> np.ndarray:
"""
Format solution to complex jones.
:param solution_array: The solution array of type `np.ndarray`.
the shape of this is (NOF_CORRELATION_ANTENNA * 8).
:return: A complex numpy.ndarray of shape
(NOF_CORRELATION_ANTENNA, 4).
:raises ValueError: when solution_array is not
len (NOF_CORRELATION_ANTENNA * 8).
"""
if len(solution_array) != NOF_CORRELATION_ANTENNA * 8:
raise ValueError("Solution has incorrect shape")
# Reshape the flattened array back into the transposed shape
transposed_shape = (len(solution_array) // 2, 2)
reshaped = solution_array.reshape(transposed_shape)
# Transpose it back to its original 2-row shape
original_2_row_shape = reshaped.T
# Combine the real and imaginary parts back into complex numbers
real_parts = original_2_row_shape[0]
imaginary_parts = original_2_row_shape[1]
original_complex_array = real_parts + 1j * imaginary_parts
reshaped_array = original_complex_array.reshape(NOF_CORRELATION_ANTENNA, 4)
return reshaped_array
[docs]
def scan(
self: StationComponentManager,
subarray_id: int,
scan_id: int,
start_time: Optional[str],
duration: float,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Execute the Scan slow task.
:param subarray_id: The subarray for which the command applies
:param scan_id: The ID for this scan
:param start_time: UTC time for begin of scan, None for immediate start
:param duration: Scan duration in seconds. 0.0 or omitted means forever
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
# TODO ADR-111: create a start_beamformer (private) method, to be used
# where needed. Here, should start the beamformer if not already started
#
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
if duration == 0:
duration = -1 # this is the "infinite" time in SpsStation
# compute and save channel block mask: which blocks belong to the subarray
channel_groups = []
for block, table_entry in enumerate(self._beamformer_table):
if subarray_id == table_entry[2] and table_entry[0] != 0:
channel_groups.append(block) # mask += 1 << block
# check for double start scan
if not self._channel_groups[subarray_id]:
self.logger.debug(
f"Starting beamformer for scan {scan_id} channels {channel_groups}"
)
assert self._sps_station_proxy
result_code, message = self._sps_station_proxy.start_beamformer(
scan_id,
start_time,
duration,
channel_groups,
)
if result_code == ResultCode.OK:
# Only set if all beamformers started ok.
self._channel_groups[subarray_id] = channel_groups
self._scan_id[subarray_id] = scan_id
task_status = TaskStatus.COMPLETED
else:
task_status = TaskStatus.FAILED
if task_callback is not None:
task_callback(
status=task_status,
result=(result_code, message),
)
# duplicate start scan, nothing to do
# TODO SKB-1097: if beamformer has stopped e.g. because of finite duration
# this cannot detect that. Should either check that beamformer is
# really active (beamformer_running_for_subarray) or restart it anyway
elif self._channel_groups[subarray_id] == channel_groups:
self._scan_id[subarray_id] = scan_id
# TODO for ADR-111: must update scan ID on station
if task_callback is not None:
task_callback(
TaskStatus.COMPLETED,
result=(ResultCode.OK, "Scan command completed."),
)
else:
self.logger.error(
f"Starting scan for already scanning subarray {subarray_id}"
)
if task_callback is not None:
task_callback(
TaskStatus.FAILED,
result=(ResultCode.FAILED, "Scan command for running subarray."),
)
[docs]
def end_scan(
self: StationComponentManager,
subarray_id: int,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Execute the EndScan slow task.
:param subarray_id: The subarray for which the command applies
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback is not None:
task_callback(status=TaskStatus.IN_PROGRESS)
# ADR-111: just set the scan ID to 0 for channel groups in
# self._channel_groups[subarray_id]
# Anything else moved at end of deallocate_subarray()
self.logger.info(f"Stopping beamformer for subarray {subarray_id}")
result_code = ResultCode.OK
if self._channel_groups[subarray_id] is not None:
assert self._sps_station_proxy
(
result_code,
message,
) = self._sps_station_proxy.stop_beamformer_for_channels(
self._channel_groups[subarray_id]
)
self._scan_id[subarray_id] = 0
self._channel_groups[subarray_id] = None
# up to here moved to deallocate_subarray()
message = "EndScan has completed." if result_code == ResultCode.OK else message
if task_callback is not None:
task_callback(
TaskStatus.COMPLETED,
result=(result_code, message),
)
[docs]
def acquire_data_for_calibration(
self: StationComponentManager,
first_channel: Optional[int] = 64,
last_channel: Optional[int] = 72,
daq_mode: str = "TCC",
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Submit the AcquireDataForCalibration slow task.
This method returns immediately after it is submitted for
execution.
:param first_channel: The first channel to acquire data for
:param last_channel: The last channel to acquire data for
:param task_callback: Update task state, defaults to None
:param daq_mode: Which correlator to start, default TCC.
:param task_abort_event: task abort event.
"""
command_proxy = MccsCommandProxy(
self._sps_station_trl, "AcquireDataForCalibration", self.logger
)
command_proxy(
is_lrc=True,
wait_for_result=True,
arg=json.dumps(
{
"first_channel": first_channel,
"last_channel": last_channel,
"daq_mode": daq_mode,
}
),
task_callback=task_callback,
)
[docs]
def trigger_adc_equalisation(
self: StationComponentManager,
target_adc: float,
bias: Optional[float] = 0.0,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Trigger Adc Equalisation on the sps station.
This method returns immediately after it is submitted for
execution.
:param task_callback: Update task state, defaults to None
:param target_adc: adc value in ADU units. Defaults to 17.
:param bias: user specifed bias in dB added to the antenna preadu levels.
Defaults to 0.
:param task_abort_event: the task abort event.
"""
command_proxy = MccsCommandProxy(
self._sps_station_trl, "TriggerAdcEqualisation", self.logger
)
command_proxy(
is_lrc=True,
wait_for_result=True,
arg=json.dumps({"target_adc": target_adc, "bias": bias}),
task_callback=task_callback,
)
[docs]
def cleanup(self: StationComponentManager) -> None:
"""
Cleanup resources held by the component manager.
This includes cleaning up resources held by all sub-component managers.
"""
self._pointing_manager.cleanup()
self._communication_manager.shutdown()
if self._field_station_proxy:
self._field_station_proxy.cleanup()
for antenna_proxy in self._antenna_proxies.values():
antenna_proxy.cleanup()
if self._station_calibrator_proxy:
self._station_calibrator_proxy.cleanup()
if self._sps_station_proxy:
self._sps_station_proxy.cleanup()
super().cleanup()
def _count_blocks(
self: StationComponentManager,
) -> int:
"""
Return the number of blocks required to specify the whole configuration.
It also updates the number of used channels.
:return: the number of blocks up to the last non-empty one.
"""
max_block = 0
n_chans = 0
for n, block in enumerate(self._beamformer_table):
if block[2] != 0:
n_chans += 8
max_block = n + 1
self._number_of_channels = n_chans
return max_block
[docs]
def deallocate_subarray(
self: StationComponentManager,
subarray_id: int,
) -> ResultCode:
"""
Clear channels for a station beam in the channel table.
:param subarray_id: subarray_id to clear
:return: a result code and response string
"""
for i, block in enumerate(self._beamformer_table):
if block[2] == subarray_id:
self._beamformer_table[i] = [0] * 7
# Update the weight matrix
self._update_weight_matrix(
f"Weights updated after deallocation of {subarray_id=}"
)
self._count_blocks() # this updates self:_number_of_channels
# TODO ADR-111: Deallocate channel groups and scan_id for subarray
# Move here section from self.end_scan()
return ResultCode.OK
[docs]
@check_communicating
def start_acquisition(
self: StationComponentManager,
start_time: Optional[str] = None,
delay: Optional[int] = 2,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Start acquisition using slow command.
:param start_time: the time at which to start data acquisition, defaults to None
:param delay: delay start, defaults to 2
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
success = True
parameter_list = {"start_time": start_time, "delay": delay}
json_argument = json.dumps(parameter_list)
assert self._sps_station_proxy is not None
result = self._sps_station_proxy.start_acquisition(json_argument)
if result not in (ResultCode.QUEUED, ResultCode.OK, ResultCode.STARTED):
success = False
if task_callback:
if success:
task_callback(
status=TaskStatus.COMPLETED,
result="Start acquisition has completed",
)
else:
task_callback(
status=TaskStatus.FAILED, result="Start acquisition task failed"
)
return
[docs]
@check_communicating
def initialise(
self: StationComponentManager,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[threading.Event] = None,
) -> None:
"""
Initialise using slow command.
:param task_callback: Update task state, defaults to None
:param task_abort_event: Check for abort, defaults to None
"""
if task_callback:
task_callback(status=TaskStatus.IN_PROGRESS)
success = True
assert self._sps_station_proxy is not None
result = self._sps_station_proxy.initialise()
if result not in (ResultCode.QUEUED, ResultCode.OK, ResultCode.STARTED):
success = False
if task_callback:
if success:
task_callback(
status=TaskStatus.COMPLETED,
result="Initialisation has completed",
)
else:
task_callback(
status=TaskStatus.FAILED, result="Initialisation task failed"
)
return
def _wait_for_power_state(
self: StationComponentManager,
timeout: float,
task_abort_event: Optional[threading.Event] = None,
) -> TaskStatus:
"""
Wait for sub-station PowerState to reach desired state.
:param timeout: Time to wait, in seconds.
:param task_abort_event: Check for abort, defaults to None
:return: completed if status reached, FAILED if timed out, ABORTED if aborted
"""
assert self._desired_power_state is not None
resolution = 1 # seconds
ticks = int(timeout / resolution)
while self._powering_resources:
if task_abort_event and task_abort_event.is_set():
return TaskStatus.ABORTED
time.sleep(resolution)
ticks -= 1
if ticks == 0:
self.logger.warning(
f"Timed out waiting for PowerState in {timeout} seconds"
)
return TaskStatus.FAILED
self.logger.debug(
f"Waiting for {self._powering_resources} PowerState"
f" {self._desired_power_state.name},"
f" waiting for {ticks*resolution} more seconds"
)
self.logger.debug(f"Waited PowerState for {timeout-ticks*resolution} seconds")
return TaskStatus.COMPLETED
[docs]
def reset_csp_ingest(
self: StationComponentManager,
) -> DevVarLongStringArrayType:
"""
Reset link for beam data packets to CSP to defaults.
:return: result code of ResetCspIngest
"""
if self._sps_station_proxy is None:
self.logger.error("There is no SpsStation proxy to reset CSP ingest.")
return (
[ResultCode.FAILED],
["There is no SpsStation proxy to reset CSP ingest."],
)
return self._sps_station_proxy.reset_csp_ingest()
[docs]
def set_csp_ingest(
self: StationComponentManager,
argin: str,
) -> tuple[ResultCode, str]:
"""
Configure link for beam data packets to CSP.
:param argin: json dictionary with optional keywords:
* destination_ip - (string) Destination IP
* source_port - (int) Source port for integrated data streams
* destination_port - (int) Destination port for integrated data streams
:return: result code of SetCspIngest
"""
if self._sps_station_proxy is None:
self.logger.error("There is no SpsStation proxy to set CSP ingest.")
return (
ResultCode.FAILED,
"There is no SpsStation proxy to set CSP ingest.",
)
return self._sps_station_proxy.set_csp_ingest(argin)
@property
def number_of_channels(self: StationComponentManager) -> int:
"""
Return the total number of channels in the beamformer.
:return: the total numebr of channels
"""
return self._number_of_channels
@property
def beamformer_table(self: StationComponentManager) -> list[list[int]]:
"""
Return the channel table reformatted as would be needed by ConfigureChannels.
:return: reformatted channel table
"""
table: list[list[int]] = []
n_blocks = self._count_blocks()
for n, block in enumerate(self._beamformer_table[:n_blocks]):
table.append([n, *block])
return table
@property
def scan_ids(self: StationComponentManager) -> list[int]:
"""
Return the current scan IDs for each subarray.
:return: list of scan IDs starting from subarray 1, 0 = subarray not scanning
"""
return self._scan_id[1:]
@property
def tileprogrammingstate(self: StationComponentManager) -> tuple[str]:
"""
Return the tileprogrammingstate of the SpsStation.
:return: the tileprogrammingstate of the SpsStation.
"""
assert self._sps_station_proxy is not None
assert self._sps_station_proxy._proxy is not None
return self._sps_station_proxy._proxy._device.tileprogrammingstate