Source code for ska_low_mccs.station.station_component_manager

#  -*- coding: utf-8 -*
#
# This file is part of the SKA Low MCCS project
#
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module implements component management for stations."""

# pylint: disable=too-many-lines
from __future__ import annotations

import functools
import json
import logging
import threading
import time
from typing import Any, Callable, Optional, Sequence

import numpy as np
import tango
from astropy.coordinates import Angle
from astropy.time.core import Time, TimeDelta
from ska_control_model import CommunicationStatus, PowerState, ResultCode, TaskStatus
from ska_low_mccs_common.component import DeviceComponentManager
from ska_low_mccs_common.utils import threadsafe
from ska_tango_base.base import check_communicating, check_on
from ska_tango_base.executor import TaskExecutorComponentManager

from ska_low_mccs.station.point_station import Pointing, StationInformation

__all__ = ["StationComponentManager"]

MAX_NUMBER_OF_CHANNELS = 384
NUMBER_OF_ANTENNAS = 256


class _FieldStationProxy(DeviceComponentManager):
    """A proxy to a FieldStation device, for a station to use."""

    def __init__(
        self: _FieldStationProxy,
        trl: str,
        logger: logging.Logger,
        communication_state_callback: Callable[[CommunicationStatus], None],
        component_state_callback: Callable[..., None],
        outside_temperature_changed_callback: Callable[[float], None],
    ) -> None:
        super().__init__(
            trl,
            logger,
            communication_state_callback,
            component_state_callback,
        )
        self._outside_temperature_changed_callback = (
            outside_temperature_changed_callback
        )

    @check_communicating
    def configure(self: _FieldStationProxy, config: str) -> None:
        """
        Configure the device proxy.

        :param config: json string of configuration.
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker
        self._proxy._device.Configure(config)

    @property
    def outside_temperature(self: _FieldStationProxy) -> int:
        """
        Return outsideTemperature.

        :return: outsideTemperature
        """
        assert self._proxy is not None  # for the type checker
        return self._proxy.outsideTemperature

    def subscribe_to_attributes(self: _FieldStationProxy) -> None:
        """Subscribe to change events in field station attributes of interest."""
        assert self._proxy is not None
        if (
            "outsideTemperature"
            not in self._proxy._change_event_subscription_ids.keys()
        ):
            self._proxy.add_change_event_callback(
                "outsideTemperature", self._outside_temperature_changed
            )

    def _outside_temperature_changed(
        self: _FieldStationProxy,
        event_name: str,
        event_value: float,
        event_quality: tango.AttrQuality,
    ) -> None:
        assert (
            event_name.lower() == "outsidetemperature"
        ), f"outsideTemperature changed callback called but event_name is {event_name}."
        self._outside_temperature_changed_callback(event_value)


class _AntennaProxy(DeviceComponentManager):
    """A proxy to a antenna device, for a station to use."""

    def __init__(
        self: _AntennaProxy,
        trl: str,
        logger: logging.Logger,
        communication_state_callback: Callable[[CommunicationStatus], None],
        component_state_callback: Callable[..., None],
    ) -> None:
        self._power_state_lock = threading.RLock()
        super().__init__(
            trl,
            logger,
            communication_state_callback,
            component_state_callback,
        )

    @check_communicating
    def configure(self: _AntennaProxy, config: str) -> None:
        """
        Configure the device proxy.

        :param config: json string of configuration.
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker
        self._proxy._device.Configure(config)


class _StationCalibratorProxy(DeviceComponentManager):
    """A proxy to a station calibrator, for a station to use."""

    @check_communicating
    def get_calibration(self: _StationCalibratorProxy, channel: int) -> np.ndarray:
        """
        Get the calibration cooefficients.

        :param channel: channel for calibration coefficents.

        :return: result code of GetCalibration and calibration coefficents
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker
        argin = json.dumps({"frequency_channel": channel})

        return self._proxy._device.GetCalibration(argin)


class _SpsStationProxy(DeviceComponentManager):
    """A proxy to an SpsStation, for a station to use."""

    def __init__(
        self: _SpsStationProxy,
        trl: str,
        logger: logging.Logger,
        communication_state_callback: Callable[[CommunicationStatus], None],
        component_state_callback: Callable[..., None],
        data_received_result_changed_callback: Callable[..., None],
    ) -> None:
        super().__init__(
            trl,
            logger,
            communication_state_callback,
            component_state_callback,
        )
        self._data_received_result_changed_callback = (
            data_received_result_changed_callback
        )

    def subscribe_to_attributes(self: _SpsStationProxy) -> None:
        """Subscribe to change events in SPS station attributes of interest."""
        assert self._proxy is not None
        if (
            "dataReceivedResult"
            not in self._proxy._change_event_subscription_ids.keys()
        ):
            self._proxy.add_change_event_callback(
                "dataReceivedResult", self._data_received_result_changed
            )

    @check_communicating
    def load_calibration_coefficients(
        self: _SpsStationProxy, calibration: list[float]
    ) -> ResultCode:
        """
        Get the calibration cooefficients.

        :param calibration: list comprises:

        * antenna - (int) is the antenna to which the coefficients will be applied.
        * calibration_coefficients - [array] a bidimensional complex array comprising
            calibration_coefficients[channel, polarization], with each element
            representing a normalized coefficient, with (1.0, 0.0) being the
            normal, expected response for an ideal antenna.

            * channel - (int) channel is the index specifying the channels at the
                              beamformer output, i.e. considering only those channels
                              actually processed and beam assignments.
            * polarization index ranges from 0 to 3.

                * 0: X polarization direct element
                * 1: X->Y polarization cross element
                * 2: Y->X polarization cross element
                * 3: Y polarization direct element

        :return: result code of LoadCalibrationCoefficients
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.LoadCalibrationCoefficients(
            calibration
        )
        return result_code

    @check_communicating
    def apply_calibration(self: _SpsStationProxy, load_time: str) -> ResultCode:
        """
        Apply the calibration cooefficients.

        :param load_time: switch time, in ISO formatted time. Default: now

        :return: result code of ApplyCalibration
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.ApplyCalibration(load_time)
        return result_code

    @check_communicating
    def load_pointing_delays(self: _SpsStationProxy, delays: np.ndarray) -> ResultCode:
        """
        Set the pointing delay parameters of this Station's Tiles.

        Delay is the geometric delay to be corrected for each antenna, in seconds.
        Delay is negative towards source.

        :param delays: an array containing a beam index followed by antennar
            delays and delay rate pairs for each antenna

        :return: result code of LoadPointingDelays
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.LoadPointingDelays(delays)
        return result_code

    @check_communicating
    def apply_pointing_delays(self: _SpsStationProxy, load_time: str) -> ResultCode:
        """
        Set the pointing delay parameters of this Station's Tiles.

        :param load_time: switch time, in ISO formatted time. Default: now

        :return: result code of ApplyPointingDelays
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy.ApplyPointingDelays(load_time)
        return result_code

    @check_communicating
    def set_beamformer_table(self: _SpsStationProxy, table: np.ndarray) -> ResultCode:
        """
        Set the beamformer table which are going to be beamformed into each beam.

        region_array is defined as a flattened 2D array, for a maximum of 48 entries.
        Each entry corresponds to 8 consecutive frequency channels.
        This is equivalent to SetBeamformerRegions, with a different way
        to specify the bandwidth of each spectral region.
        Input is consistent with the beamformerTable attribute

        :param table: list of regions. Each region comprises:

        * start_channel - (int) region starting channel, must be even in range 0 to 510
        * beam_index - (int) beam used for this region with range 0 to 47
        * subarray_id - (int) Subarray
        * subarray_logical_channel - (int) logical channel # in the subarray
        * subarray_beam_id - (int) ID of the subarray beam
        * substation_id - (int) Substation
        * aperture_id:  ID of the aperture (APXX.YYY)

        :return: result code of SetBeamformerTable
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.SetBeamformerTable(table)
        return result_code

    @check_communicating
    def set_beamformer_regions(self: _SpsStationProxy, argin: np.ndarray) -> ResultCode:
        """
        Set the frequency regions which are going to be beamformed into each beam..

        :param argin: list of regions. Each region comprises:

        * start_channel - (int) region starting channel, must be even in range 0 to 510
        * num_channels - (int) size of the region, must be a multiple of 8
        * beam_index - (int) beam used for this region with range 0 to 47
        * subarray_id - (int) Subarray
        * subarray_logical_channel - (int) logical channel # in the subarray
        * subarray_beam_id - (int) ID of the subarray beam
        * substation_id - (int) Substation
        * aperture_id:  ID of the aperture (APXX.YYY)

        :return: result code of SetBeamformerRegions
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.SetBeamformerRegions(argin)
        return result_code

    @check_communicating
    def start_beamformer(
        self: _SpsStationProxy,
        scan_id: int,
        start_time: Optional[str],
        duration: float = -1.0,
        channel_mask: Optional[int] = -1,
    ) -> ResultCode:
        """
        Start the beamformer for the selected channel blocks.

        :param scan_id: unique scan ID
        :param start_time: ISO-8691 formatted UTC scan start time
        :param duration: Scan duration in seconds. If omitted or negative
            scan lasts forever
        :param channel_mask: Channel groups to which the command applies.
        :return: result code of StartBeamformer
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        # TODO Duration is buggy in the MccsTile, is expressed in frames
        # of 256*1.08e-6 seconds instead of seconds
        if duration < 0.0:
            duration_i = -1
        else:
            duration_i = int(round(duration * 3616.90))

        scan_arg: dict[str, int | float | str] = {
            "scan_id": scan_id,
            "duration": duration_i,
        }
        if start_time is not None:
            scan_arg["start_time"] = start_time

        ([result_code], _) = self._proxy._device.StartBeamformer(json.dumps(scan_arg))
        return result_code

    @check_communicating
    def stop_beamformer(
        self: _SpsStationProxy,
        channel_mask: Optional[int] = -1,
    ) -> ResultCode:
        """
        Stop the beamformer for the selected channel blocks.

        :param channel_mask: Channel groups to which the command applies.
        :return: result code of StopBeamformer
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        # TODO Reinsert the channel mask when it is supported in hardware
        ([result_code], _) = self._proxy._device.StopBeamformer()
        return result_code

    @check_communicating
    def start_acquisition(self: _SpsStationProxy, argin: str) -> ResultCode:
        """
        Start the acquisition synchronously for all tiles, checks for synchronisation.

        :param argin: json dictionary with optional keywords

            * start_time - (str) start time
            * delay - (int) delay start

        :return: result code of StartAcquisition
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.StartAcquisition(argin)
        return result_code

    @check_communicating
    def initialise(self: _SpsStationProxy) -> ResultCode:
        """
        Initialise the station's tiles.

        :return: result code of Initialise
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.Initialise()
        return result_code

    @check_communicating
    def set_lmc_download(self: _SpsStationProxy, argin: str) -> ResultCode:
        """
        Specify whether control data will be transmitted over 1G or 40G networks.

        :param argin: json dictionary with optional keywords:

            * mode - (string) '1G' or '10G' (Mandatory) (use '10G' for 40G also)
            * payload_length - (int) SPEAD payload length for channel data
            * destination_ip - (string) Destination IP.
            * source_port - (int) Source port for integrated data streams
            * destination_port - (int) Destination port for integrated data streams

        :return: result code of SetLmcDownload
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.SetLmcDownload(argin)
        return result_code

    @check_communicating
    def set_lmc_integrated_download(self: _SpsStationProxy, argin: str) -> ResultCode:
        """
        Configure link and size for integrated data packets, for all tiles.

        :param argin: json dictionary with optional keywords:

            * mode - (string) '1G' '10G' '40G' - default 40G
            * channel_payload_length - (int) SPEAD payload length for integrated
                 channel data
            * beam_payload_length - (int) SPEAD payload length for integrated beam data
            * destination_ip - (string) Destination IP
            * source_port - (int) Source port for integrated data streams
            * destination_port - (int) Destination port for integrated data streams

        :return: result code of SetLmcIntegratedDownload
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.SetLmcIntegratedDownload(argin)
        return result_code

    @check_communicating
    def set_csp_ingest(self: _SpsStationProxy, argin: str) -> ResultCode:
        """
        Configure link for beam data packets to CSP.

        :param argin: json dictionary with optional keywords:

            * destination_ip - (string) Destination IP
            * source_port - (int) Source port for integrated data streams
            * destination_port - (int) Destination port for integrated data streams

        :return: result code of SetCspIngest
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.SetCspIngest(argin)
        return result_code

    @check_communicating
    def configure_integrated_channel_data(
        self: _SpsStationProxy, argin: str
    ) -> ResultCode:
        """
        Configure and start the transmission of integrated channel data.

        Using the provided integration time, first channel and last channel.
        Data are sent continuously until the StopIntegratedData command is run.

        :param argin: json dictionary with optional keywords:

        * integration_time - (float) in seconds (default = 0.5)
        * first_channel - (int) default 0
        * last_channel - (int) default 511

        :return: result code of ConfigureIntegratedChannelData
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.ConfigureIntegratedChannelData(argin)
        return result_code

    @check_communicating
    def stop_integrated_data(self: _SpsStationProxy) -> ResultCode:
        """
        Stop the integrated  data.

        :return: result code of StopIntegratedData
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.StopIntegratedData()
        return result_code

    @check_communicating
    def send_data_samples(self: _SpsStationProxy, argin: str) -> ResultCode:
        """
        Transmit a snapshot containing raw antenna data.

        :param argin: json dictionary with optional keywords:

        * data_type - type of snapshot data (mandatory): "raw", "channel",
                    "channel_continuous", "narrowband", "beam"
        * start_time - Time (UTC string) to start sending data. Default immediately
        * seconds - (float) Delay if timestamp is not specified. Default 0.2 seconds

        Depending on the data type:
        raw:

        * sync: bool: send synchronised samples for all antennas, vs. round robin
                larger snapshot from each antenna

        channel:

        * n_samples: Number of samples per channel, default 1024
        * first_channel - (int) first channel to send, default 0
        * last_channel - (int) last channel to send, default 511

        channel_continuous

        * channel_id - (int) channel_id (Mandatory)
        * n_samples -  (int) number of samples to send per packet, default 128

        narrowband:

        * frequency - (int) Sky frequency for band centre, in Hz (Mandatory)
        * round_bits - (int)  Specify whow many bits to round
        * n_samples -  (int) number of spectra to send

        :return: result code of SendDataSamples
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.SendDataSamples(argin)
        return result_code

    @check_communicating
    def stop_data_transmission(
        self: _SpsStationProxy,
    ) -> ResultCode:
        """
        Stop data transmission from board.

        :return: result code of StopDataTransmission
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.StopDataTransmission()
        return result_code

    @check_communicating
    def configure_test_generator(
        self: _SpsStationProxy,
        argin: str,
    ) -> ResultCode:
        """
        Set the test signal generator.

        :param argin: json dictionary with keywords:

        * tone_frequency: first tone frequency, in Hz. The frequency
            is rounded to the resolution of the generator. If this
            is not specified, the tone generator is disabled.
        * tone_amplitude: peak tone amplitude, normalized to 31.875 ADC
            units. The amplitude is rounded to 1/8 ADC unit. Default
            is 1.0. A value of -1.0 keeps the previously set value.
        * tone_2_frequency: frequency for the second tone. Same
            as ToneFrequency.
        * tone_2_amplitude: peak tone amplitude for the second tone.
            Same as ToneAmplitude.
        * noise_amplitude: RMS amplitude of the pseudorandom Gaussian
            white noise, normalized to 26.03 ADC units.
        * pulse_frequency: frequency of the periodic pulse. A code
            in the range 0 to 7, corresponding to (16, 12, 8, 6, 4, 3, 2)
            times the ADC frame frequency.
        * pulse_amplitude: peak amplitude of the periodic pulse, normalized
            to 127 ADC units. Default is 1.0. A value of -1.0 keeps the
            previously set value.
        * set_time: time at which the generator is set, for synchronization
            among different TPMs. In UTC ISO format (string)
        * adc_channels: list of adc channels which will be substituted with
            the generated signal. It is a 32 integer, with each bit representing
            an input channel. Default: all if at least q source is specified,
            none otherwises.

        :return: result code of ConfigureTestGenerator
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.ConfigureTestGenerator(argin)
        return result_code

    @check_communicating
    def acquire_data_for_calibration(
        self: _SpsStationProxy, channel: int
    ) -> ResultCode:
        """
        Start collecting data for calibration.

        :param channel: the channel to get calibration data for.

        :return: result code of AcquireDataForCalibration
        """
        assert self._proxy is not None  # for the type checker
        assert self._proxy._device is not None  # for the type checker

        ([result_code], _) = self._proxy._device.AcquireDataForCalibration(channel)
        return result_code

    def _data_received_result_changed(
        self: _SpsStationProxy,
        event_name: str,
        event_value: list[str],
        event_quality: tango.AttrQuality,
    ) -> None:
        if event_name.lower() != "datareceivedresult":
            self.logger.error(
                "dataReceivedResult callback called, "
                f"but event_name is {event_name}."
            )
            return
        self._data_received_result_changed_callback(event_value)


# pylint: disable=too-many-instance-attributes,too-many-public-methods
[docs]class StationComponentManager(TaskExecutorComponentManager): """A component manager for a station.""" # pylint: disable=too-many-arguments, too-many-statements, too-many-locals
[docs] def __init__( self: StationComponentManager, station_id: int, ref_latitude: float, ref_longitude: float, ref_height: float, field_station_trl: str, antenna_trls: Sequence[str], antenna_station_locations: np.ndarray, antenna_element_ids: list[int], station_calibrator_trl: str, sps_station_trl: str, logger: logging.Logger, communication_state_callback: Callable[[CommunicationStatus], None], component_state_callback: Callable[..., None], ) -> None: """ Initialise a new instance. :param station_id: the id of this station :param ref_latitude: reference latitude of the station. :param ref_longitude: reference longitude of the station. :param ref_height: reference ellipsoidal height of the station. :param field_station_trl: TRL of the Tango device that manages this station's FieldStation :param antenna_trls: TRLs of the Tango devices and manage this station's antennas :param antenna_station_locations: array of the x, y, z positions of the antennas :param antenna_element_ids: list of the element IDs of the antennas :param station_calibrator_trl: TRL of the Tango devices and manage this station's station calibrator :param sps_station_trl: TRL of the Tango devices and manage this station's Spshw station :param logger: the logger to be used by this object. :param communication_state_callback: callback to be called when the status of the communications channel between the component manager and its component changes :param component_state_callback: callback to be called when the component state changes """ self._power_state_lock = threading.RLock() self._station_id = station_id self._field_station_proxy: Optional[_FieldStationProxy] = None self._sps_station_proxy: Optional[_SpsStationProxy] = None self._field_station_power_state: Optional[PowerState] = None self._sps_station_power_state: Optional[PowerState] = None self._station_calibrator_power_state: Optional[PowerState] = None self._field_station_trl = field_station_trl self._is_configured = False self._field_station_on_called = False self.outside_temperature: Optional[float] = None self.data_received_result: Optional[tuple[str, str]] = ("", "") self._is_communicating: bool = False self._ref_longitude: float = ref_longitude self._ref_latitude: float = ref_latitude self._ref_height: float = ref_height self._pointing_delays: Optional[list[list]] = [[]] self._scan_id = [0] * 17 self._scan_mask = [0] * 17 self._nof_subdevices = 0 self._powering_resources: set[str] = set() # this set of stations should go self._desired_power_state: Optional[PowerState] = None # to this state self.stop_ids: dict[int, bool] = {} self.tracking_threads: dict[int, threading.Thread] = {} self.tracking_id = 0 self._antenna_xyz = np.array(antenna_station_locations) self._antenna_element_ids = np.array(antenna_element_ids) self._nof_antennas = self._antenna_xyz.shape[0] self._component_state_callback = component_state_callback self._communication_state_callback = communication_state_callback # beam_idx + 2*(number of antennas) self.last_pointing_delays = [0.0] * (1 + 2 * NUMBER_OF_ANTENNAS) self._communication_states = {} if antenna_trls != [""]: self._communication_states = { trl: CommunicationStatus.DISABLED for trl in list(antenna_trls) } self._antenna_power_states = {trl: PowerState.UNKNOWN for trl in antenna_trls} self._antenna_proxies = { antenna_trl: _AntennaProxy( antenna_trl, logger, functools.partial( self._device_communication_state_changed, antenna_trl ), functools.partial(component_state_callback, trl=antenna_trl), ) for antenna_trl in antenna_trls } self._nof_subdevices += self._nof_antennas # self._station_calibrator_power_state = PowerState.UNKNOWN if field_station_trl != "": self._communication_states[field_station_trl] = CommunicationStatus.DISABLED self._field_station_power_state = PowerState.UNKNOWN self._field_station_proxy = _FieldStationProxy( self._field_station_trl, logger, functools.partial( self._device_communication_state_changed, self._field_station_trl ), functools.partial( component_state_callback, trl=self._field_station_trl ), self._field_station_outside_temperature_changed, ) else: self._field_station_proxy = None self._field_station_power_state = None self._station_calibrator_proxy = _StationCalibratorProxy( station_calibrator_trl, logger, lambda *args: None, # Avaliability is checked when service required. self._station_calibrator_power_state_changed, ) if sps_station_trl != "": self._communication_states[sps_station_trl] = CommunicationStatus.DISABLED self._sps_station_power_state = PowerState.UNKNOWN self._sps_station_trl = sps_station_trl self._sps_station_proxy = _SpsStationProxy( sps_station_trl, logger, functools.partial( self._device_communication_state_changed, sps_station_trl ), functools.partial(component_state_callback, trl=sps_station_trl), self._sps_station_data_received_result_changed, ) self._nof_subdevices += 1 else: self._sps_station_power_state = None self._sps_station_proxy = None # Aggregate configuration tables # Channel table: one entry per channel block (48 total) # Bidimensional array of one row for each 8 channels, with elements: # 0. start physical channel # 1. beam number # 2. subarray ID # 3. subarray_logical_channel # 4. subarray_beam_id # 5. substation_id # 6. aperture_id self._beamformer_table = [[0] * 7 for _ in range(48)] self._number_of_channels = 0 self._pointing_helper: Optional[Pointing] = None super().__init__( logger, communication_state_callback, component_state_callback, power=None, fault=None, configuration_changed=None, is_configured=None, )
[docs] def setup_pointing_helper(self: StationComponentManager) -> None: """Set up the pointing helper.""" assert ( self.ref_longitude is not None and self.ref_latitude is not None and self.ref_height is not None ) station_helper = StationInformation() station_helper.set_location( self.ref_latitude, self.ref_longitude, self.ref_height ) station_helper.load_displacements_arrays( self._antenna_xyz, self._antenna_element_ids, ) self._pointing_helper = Pointing(station_helper)
[docs] def start_communicating(self: StationComponentManager) -> None: """Establish communication with the station components.""" self._is_communicating = True if self._communication_state == CommunicationStatus.ESTABLISHED: return if self._communication_state == CommunicationStatus.DISABLED: self.update_communication_state(CommunicationStatus.NOT_ESTABLISHED) if self._field_station_proxy: self._field_station_proxy.start_communicating() if self._station_calibrator_proxy: self._station_calibrator_proxy.start_communicating() if self._sps_station_proxy: self._sps_station_proxy.start_communicating() for antenna_proxy in self._antenna_proxies.values(): antenna_proxy.start_communicating() # If we have instantiated our MccsStation with no subdevices, it will always # report CommunicationStatus.ESTABLISHED if self._nof_subdevices == 0: self.update_communication_state(CommunicationStatus.ESTABLISHED) self._evaluate_power_state()
[docs] def stop_communicating(self: StationComponentManager) -> None: """Break off communication with the station components.""" self._is_communicating = False for antenna_proxy in self._antenna_proxies.values(): antenna_proxy.stop_communicating() if self._sps_station_proxy: self._sps_station_proxy.stop_communicating() if self._station_calibrator_proxy: self._station_calibrator_proxy.stop_communicating() if self._field_station_proxy: self._field_station_proxy.stop_communicating() if self.communication_state == CommunicationStatus.DISABLED: return self.update_communication_state(CommunicationStatus.DISABLED) self._update_component_state(power=None, fault=None)
def _device_communication_state_changed( self: StationComponentManager, trl: str, communication_state: CommunicationStatus, ) -> None: # Many callback threads could be hitting this method at the same time, so it's # possible (likely) that the GIL will suspend a thread between checking if it # need to update, and actually updating. This leads to callbacks appearing out # of order, which breaks tests. Therefore we need to serialise access. self._communication_states[trl] = communication_state self.logger.debug( f"device {trl} changed communcation state to {communication_state.name}" ) if CommunicationStatus.DISABLED in self._communication_states.values(): self.update_communication_state(CommunicationStatus.NOT_ESTABLISHED) elif CommunicationStatus.NOT_ESTABLISHED in self._communication_states.values(): self.update_communication_state(CommunicationStatus.NOT_ESTABLISHED) else: self.update_communication_state(CommunicationStatus.ESTABLISHED)
[docs] def update_communication_state( self: StationComponentManager, communication_state: CommunicationStatus, ) -> None: """ Update the status of communication with the component. Overridden here to fire the "is configured" callback whenever communication is freshly established :param communication_state: the status of communication with the component """ if self._communication_state == communication_state: return super()._update_communication_state(communication_state) if communication_state == CommunicationStatus.ESTABLISHED: if self._field_station_proxy is not None: self._field_station_proxy.subscribe_to_attributes() if self._sps_station_proxy is not None: self._sps_station_proxy.subscribe_to_attributes() if self._pointing_helper is None: self.setup_pointing_helper() if self._component_state_callback is not None: self._component_state_callback(is_configured=self.is_configured)
def _field_station_outside_temperature_changed( self: StationComponentManager, outside_temperature: float, ) -> None: self.outside_temperature = outside_temperature if self._component_state_callback is not None: self._component_state_callback(outside_temperature=self.outside_temperature) def _sps_station_data_received_result_changed( self: StationComponentManager, data_received_result: tuple[str, str], ) -> None: self.data_received_result = data_received_result if self._component_state_callback is not None: self._component_state_callback( data_received_result=self.data_received_result ) @threadsafe def _station_calibrator_power_state_changed( self: StationComponentManager, power: PowerState | None = None, fault: bool | None = None, **kwargs: Any, ) -> None: # Station calibrator is a software only device and should not roll up # Into the PowerState of the MccsStation. We will log a message and do nothing. self._station_calibrator_power_state = power if power is not None: self.logger.info( f"Station calibrator reports its power as {PowerState(power).name}." ) @threadsafe def _antenna_power_state_changed( self: StationComponentManager, trl: str, power_state: PowerState, ) -> None: with self.power_state_lock: self._antenna_power_states[trl] = power_state @threadsafe def _field_station_power_state_changed( self: StationComponentManager, power_state: PowerState, ) -> None: with self._power_state_lock: self._field_station_power_state = power_state if ( self._field_station_trl in self._powering_resources and power_state == self._desired_power_state ): self._powering_resources.remove(self._field_station_trl) self._evaluate_power_state() # Antennas should just be passively reflecting the # power state of their smartbox port? # if power_state is PowerState.ON and self._field_station_on_called: # self._field_station_on_called = False # _ = self._turn_on_antennas() @threadsafe def _sps_station_power_state_changed( self: StationComponentManager, power_state: PowerState, ) -> None: with self._power_state_lock: self._sps_station_power_state = power_state if power_state == PowerState.STANDBY: self.logger.warning( "There is no way for MCCS to turn SpsStation 'OFF'. " "Due to there being no control over the subrack PDU. " "At MccsStation we interpret STANDBY as OFF " ) if ( self._sps_station_trl in self._powering_resources and self._desired_power_state == PowerState.OFF ): self._powering_resources.remove(self._sps_station_trl) else: if ( self._sps_station_trl in self._powering_resources and power_state == self._desired_power_state ): self._powering_resources.remove(self._sps_station_trl) self._evaluate_power_state() def _evaluate_power_state( self: StationComponentManager, ) -> None: with self.power_state_lock: power_states = [] if ( self._field_station_proxy is not None and self._field_station_power_state is not None ): power_states.append(self._field_station_power_state) if ( self._sps_station_proxy is not None and self._sps_station_power_state is not None ): if self._sps_station_power_state == PowerState.STANDBY: # Handle special case due to there being no such thing # as SpsStation OFF. The reason for this is that there # is no control over the subrack PDU # TODO: SP-4050 self.logger.warning( "When evaluating MccsStation Power, " "we classify SpsStation `STANDBY` as `OFF`. " "This is required since MCCS has no control over " "the subracks' PDU." ) power_states.append(PowerState.OFF) else: power_states.append(self._sps_station_power_state) if all(power_state == PowerState.ON for power_state in power_states): evaluated_power_state = PowerState.ON elif all(power_state == PowerState.OFF for power_state in power_states): evaluated_power_state = PowerState.OFF elif all(power_state == PowerState.STANDBY for power_state in power_states): evaluated_power_state = PowerState.STANDBY elif len(power_states) == 0: evaluated_power_state = PowerState.ON else: evaluated_power_state = PowerState.UNKNOWN self.logger.debug( "In StationComponentManager._evaluatePowerState with:\n" f"\tspsStation: {str(self._sps_station_power_state)}\n" f"\tfieldStation: {str(self._field_station_power_state)}\n " f"\tresult: {str(evaluated_power_state)}" ) self._update_component_state(power=evaluated_power_state) @property def power_state_lock(self: StationComponentManager) -> threading.RLock: """ Return the power state lock of this component manager. :return: the power state lock of this component manager. """ return self._power_state_lock @property def power_state(self: StationComponentManager) -> Optional[PowerState]: """ Return my power state. :return: my power state """ return self._component_state["power"]
[docs] def off( self: StationComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Submit the _off method. This method returns immediately after it submitted `self._off` for execution. :param task_callback: Update task state, defaults to None :return: a result code and response message """ return self.submit_task(self._off, task_callback=task_callback)
@check_communicating def _off( self: StationComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Turn off this station. The order to turn a station on is: FieldStation, then tiles and antennas. :param task_callback: Update task state, defaults to None :param task_abort_event: Abort the task """ task_status = TaskStatus.COMPLETED if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) self._desired_power_state = PowerState.OFF if ( self._field_station_proxy and self._field_station_power_state != PowerState.OFF ): task_status, _ = self._field_station_proxy.off() # type: ignore self.logger.info(f"FieldStation off command TaskStatus: {task_status.name}") self._powering_resources.add(self._field_station_trl) if self._sps_station_proxy and self._sps_station_power_state != PowerState.OFF: self.logger.warning( "There is no way for MCCS to turn `OFF` SpsStation, " "due to there being no control over the subracks' PDU. " "At MccsStation we interpret SpsStation STANDBY as OFF. " "We are sending the Standby command to SpsStation." ) task_status, _ = self._sps_station_proxy.standby() self.logger.info( f"SpshwStation standby command TaskStatus: {task_status.name}" ) self._powering_resources.add(self._sps_station_trl) task_status = self._wait_for_power_state(600) if task_callback: task_callback(status=task_status)
[docs] def on( self: StationComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Submit the _on method. This method returns immediately after it submitted `self._on` for execution. :param task_callback: Update task state, defaults to None :return: a task staus and response message """ return self.submit_task(self._on, task_callback=task_callback)
@check_communicating def _on( self: StationComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Turn on this station. The order to turn a station on is: FieldStation, then tiles and antennas. :param task_callback: Update task state, defaults to None :param task_abort_event: Abort the task """ task_status = TaskStatus.COMPLETED if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) self._desired_power_state = PowerState.ON if ( self._field_station_proxy and self._field_station_power_state != PowerState.ON ): task_status, _ = self._field_station_proxy.on() # type: ignore self.logger.info(f"FieldStation on command TaskStatus: {task_status.name}") self._powering_resources.add(self._field_station_trl) if self._sps_station_proxy and self._sps_station_power_state != PowerState.ON: task_status, _ = self._sps_station_proxy.on() self.logger.info(f"SpshwStation on command TaskStatus: {task_status.name}") self._powering_resources.add(self._sps_station_trl) task_status = self._wait_for_power_state(600) if task_callback: task_callback(status=task_status) @check_communicating def _turn_on_antennas( self: StationComponentManager, ) -> ResultCode: """ Turn on antennas if not already on. :return: a result code """ with self.power_state_lock: if not all( power_state == PowerState.ON for power_state in self._antenna_power_states.values() ): results = [proxy.on() for proxy in self._antenna_proxies.values()] if ResultCode.FAILED in results: return ResultCode.FAILED return ResultCode.QUEUED
[docs] @check_communicating @check_on def apply_pointing_delays( self: StationComponentManager, load_time: str, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Submit the apply_pointing method. This method returns immediately after it submitted `self._apply_pointing` for execution. :param load_time: time at which to load the pointing delay :param task_callback: Update task state, defaults to None :return: a task status and response message """ return self.submit_task( self._apply_pointing_delays, [load_time], task_callback=task_callback )
@check_communicating @check_on def _apply_pointing_delays( self: StationComponentManager, load_time: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Load the pointing delay at a specified time. :param load_time: time at which to load the pointing delay :param task_callback: Update the task state, defaults to None :param task_abort_event: Abort the task """ assert self._sps_station_proxy if task_callback: task_callback( status=TaskStatus.IN_PROGRESS, result="ApplyPointingDelays command in progress", ) result = self._sps_station_proxy.apply_pointing_delays(load_time) if result in [ResultCode.OK, ResultCode.STARTED, ResultCode.QUEUED]: task_status = TaskStatus.COMPLETED msg = "ApplyPointingDelays command completed" else: task_status = TaskStatus.FAILED msg = "ApplyPointingDelays command failed" if task_callback: task_callback(status=task_status, result=msg) @property # type:ignore[misc] @check_communicating def is_configured(self: StationComponentManager) -> bool: """ Return whether this station component manager is configured. :return: whether this station component manager is configured. """ # TODO: the station can be configured for some subarrays and not for others. # should be changed to a vector attribute or removed return self._is_configured @property # type:ignore[misc] @check_communicating def ref_longitude(self: StationComponentManager) -> float: """ Return whether this stations longitude. :return: this stations longitude. """ return self._ref_longitude @property # type:ignore[misc] @check_communicating def ref_latitude(self: StationComponentManager) -> float: """ Return whether this stations latitude. :return: this stations latitude. """ return self._ref_latitude @property # type:ignore[misc] @check_communicating def ref_height(self: StationComponentManager) -> float: """ Return whether this stations height. :return: this stations height. """ return self._ref_height def _update_station_configs( self: StationComponentManager, configuration: dict, ) -> None: """ Update the config for the station device. :param configuration: dict containing the config of the device """ if self._component_state_callback is not None: self._is_configured = True self._ref_longitude = ( configuration.get("refLongitude") or self._ref_longitude ) self._ref_latitude = configuration.get("refLatitude") or self._ref_latitude self._ref_height = configuration.get("refHeight") or self._ref_height def _update_children_configs( self: StationComponentManager, field_station_config: Optional[dict], antenna_config: Optional[dict], ) -> None: """ Update the config for the station device. :param field_station_config: Configuration specification for the field station device. :param antenna_config: Configuration specification for the antenna deviced. """ for trl in self._antenna_proxies.keys(): self._antenna_proxies[trl].on() if field_station_config is not None: if self._field_station_proxy: assert self._field_station_proxy._proxy is not None self._field_station_proxy._proxy.Configure( json.dumps(field_station_config) ) else: self.logger.error( "Attempted to update config of non-existent FieldStation" ) if antenna_config: for trl in self._antenna_proxies.keys(): config = antenna_config.get(trl, None) if config is not None: self._antenna_proxies[trl].configure( json.dumps({"antenna_config": config, "tile_config": {}}) ) # TODO: This needs to be implemented in SpsStation # tiles_config = configuration.get("tiles") # if tiles_config: # for trl in self._tile_proxies.keys(): # config = tiles_config[trl] # self._tile_proxies[trl].configure(json.dumps(config))
[docs] def configure_semi_static( self: StationComponentManager, task_callback: Optional[Callable] = None, *, interface: Optional[str] = None, station_config: dict, field_station_config: Optional[dict], antenna_config: Optional[dict], ) -> tuple[TaskStatus, str]: """ Submit the configure method. TODO Check if this is required anymore This method returns immediately after it submitted `self._configure_semi_static` for execution. :param interface: the schema version this is running against. :param station_config: Configuration specification for the station device. :param field_station_config: Configuration specification for the field station device. :param antenna_config: Configuration specification for the antenna deviced. :param task_callback: Update task state, defaults to None :return: a result code and response string """ return self.submit_task( self._configure_semi_static, args=[ station_config, field_station_config, antenna_config, ], task_callback=task_callback, )
@check_communicating def _configure_semi_static( self: StationComponentManager, station_config: dict, field_station_config: Optional[dict], antenna_config: Optional[dict], task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Configure the stations children. This sends off configuration commands to all of the devices that this station manages. :param station_config: Configuration specification for the station device. :param field_station_config: Configuration specification for the field station device. :param antenna_config: Configuration specification for the antenna deviced. :param task_callback: Update task state, defaults to None :param task_abort_event: Abort the task :raises ValueError: Station value not correct """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) if station_config.get("StationId") != self._station_id: raise ValueError("Wrong station id") self._update_station_configs(station_config) self._update_children_configs(field_station_config, antenna_config) self.setup_pointing_helper() if task_callback: task_callback( status=TaskStatus.COMPLETED, result="Configure command has completed", ) def _pointing_delays_alt_az( self: StationComponentManager, values: dict[str, float], ) -> np.ndarray: """ Get pointing delays for ra dec pointing. :param values: dict containing ra and dec values :raises ValueError: pointing delays not found :return: list of pointing delays """ altitude = Angle(values.get("altitude"), unit="deg") azimuth = Angle(values.get("azimuth"), unit="deg") assert self._pointing_helper is not None self._pointing_helper.point_array_static(altitude, azimuth) delays = self._pointing_helper.delays() if delays is None: raise ValueError("Could not get delays") return delays def _pointing_delays_ra_dec( self: StationComponentManager, values: dict[str, float], reference_time: Time, time_step: float, ) -> tuple[np.ndarray, np.ndarray]: """ Get pointing delays for ra dec pointing. :param values: dict containing ra and dec values :param reference_time: time in which coordinates are equal, in ISO8601 formatted astropy.Time time :param time_step: How long between each time step in seconds :raises ValueError: pointing delays not found :return: list of pointing delays for each antenna, negative towards source """ right_ascension = Angle(values.get("right_ascension"), unit="deg") declination = Angle(values.get("declination"), unit="deg") right_ascension_rate = Angle( values.get("right_ascension_rate") or 0.0, unit="deg" ) declination_rate = Angle(values.get("declination_rate") or 0.0, unit="deg") assert self._pointing_helper is not None self._pointing_helper.point_array_equatorial( right_ascension, declination, right_ascension_rate, declination_rate, reference_time, time_step, ) delays = self._pointing_helper.delays() delay_rates = self._pointing_helper.delay_rates() if delays is None: raise ValueError("Could not get delays") return (delays, delay_rates)
[docs] def get_pointing_delays( self: StationComponentManager, task_callback: Optional[Callable] = None, *, interface: Optional[str] = None, pointing_type: str, values: dict, time_step: float = 10.0, reference_time: Optional[str] = None, ) -> np.ndarray: """ Get the pointing delays for this station. :param interface: the schema version this is running against. :param pointing_type: the type of pointing requested :param values: the pointing values, either in alt_az or ra_dec :param reference_time: time in which coordinates are equal, in ISO8601 formatted astropy.Time time :param time_step: How long between each time step in seconds :param task_callback: callback to signal end of command :return: list of pointing delays """ start_time = Time(reference_time, format="isot", scale="utc") pointing_delays, delay_rates = self._get_pointing_delays( pointing_type, values, start_time, time_step ) # Don't want to return the fake beam number. return self._construct_delays(pointing_delays, delay_rates, 0)[1:]
@check_communicating def _get_pointing_delays( self: StationComponentManager, pointing_type: str, values: dict, reference_time: Time, time_step: float, ) -> tuple[np.ndarray, np.ndarray]: """ Get the pointing delays for this station. :param pointing_type: the type of pointing requested :param values: the pointing values, either in alt_az or ra_dec :param reference_time: time in which coordinates are equal, in ISO8601 formatted astropy.Time time :param time_step: How long between each time step in seconds :raises KeyError: pointing type not found. :return: list of pointing delays. Delays are negative towards source """ if pointing_type == "alt_az": delays = self._pointing_delays_alt_az(values) delay_rates = np.zeros(256) return delays, delay_rates if pointing_type == "ra_dec": return self._pointing_delays_ra_dec(values, reference_time, time_step) raise KeyError("Couldn't find valid pointing type")
[docs] def track_object( self: StationComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, *, interface: Optional[str] = None, pointing_type: str, values: dict, scan_time: float, reference_time: Optional[str] = None, station_beam_number: Optional[int] = 0, time_step: Optional[float] = 1.0, ) -> tuple[TaskStatus, str]: """ Submit the `track_object` slow task. This method returns immediately after it is submitted for execution. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None :param interface: the schema version this is running against. :param pointing_type: the type of pointing requested :param values: Coordinates for object to be tracked :param scan_time: Time to scan object in seconds :param reference_time: time in which coordinates are equal, in ISO8601 formatted astropy.Time time :param station_beam_number: The station beam number to be used :param time_step: How long between each time step in seconds :return: A return code and a unique command ID. """ return self.submit_task( self._track_object, args=[ pointing_type, values, reference_time, station_beam_number, scan_time, time_step, ], task_callback=task_callback, )
@check_communicating def _track_object( self: StationComponentManager, pointing_type: str, values: dict, reference_time: str, station_beam_number: int = 1, scan_time: float = 86400.0, time_step: Optional[float] = 10.0, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Track the object in the sky. :param pointing_type: the type of pointing requested :param values: Coordinates for object to be tracked :param scan_time: Time to scan object in seconds :param reference_time: time in which coordinates are equal, in ISO8601 formatted astropy.Time time :param station_beam_number: The station beam number to be used :param time_step: How long between each time step in seconds :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) self.stop_ids[self.tracking_id] = False tmp_id = self.tracking_id self.tracking_threads[self.tracking_id] = threading.Thread( target=self._track_object_thread, args=( lambda: self.stop_ids[tmp_id], pointing_type, values, station_beam_number, scan_time, time_step, reference_time, tmp_id, task_callback, # pass task_callback down ), ) self.tracking_threads[tmp_id].start() self.tracking_id += 1 if task_callback is not None: task_callback( status=TaskStatus.COMPLETED, result="Track Object has completed, currently tracking.", ) @check_communicating def _track_object_thread( self: StationComponentManager, stop: Callable, pointing_type: str, values: dict, station_beam_number: int, scan_time: float, time_step: float, reference_time: str, tracking_id: int, task_callback: Optional[Callable] = None, ) -> None: """ Track the object in the sky in a seperate thread. Used to stop the station hanging from other requests. :param stop: Flag passed in to force stop the track if required. :param pointing_type: The type of pointing requested :param values: Coordinates for object to be tracked :param station_beam_number: The station beam number to be used :param scan_time: Time to scan object in seconds :param reference_time: time in which coordinates are equal, in ISO8601 formatted astropy.Time time :param time_step: How long between each time step in seconds :param tracking_id: The id of the tracking thread. :param task_callback: Update task state, defaults to None """ end_time = Time.now() + TimeDelta(scan_time, format="sec") ref_time = Time(reference_time) time_next = Time.now() # next time pointing occurs while Time.now() < end_time: if stop(): self.logger.debug("Stopping tracking object command called") break if time_next <= Time.now(): self.logger.debug(f"Updating pointing for beam {station_beam_number}") pointing_delays, delay_rates = self._get_pointing_delays( pointing_type, values, ref_time, time_step ) delays = self._construct_delays( pointing_delays, delay_rates, station_beam_number ) if self._sps_station_proxy: self.last_pointing_delays = list(delays) try: self._sps_station_proxy.load_pointing_delays(delays) except tango.DevFailed as devfailed: dev_error = devfailed.args[0] # Get the Tango error object # Check if the description contains the specific "KeyError: 1" if "KeyError: 1" in dev_error.desc: self.logger.error("No antenna mapping detected") if task_callback is not None: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, "No antenna mapping detected", ), ) else: self.logger.error( f"Tango DevFailed encountered: {dev_error.desc}" ) if task_callback is not None: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, f"Tango DevFailed: {dev_error.desc}", ), ) self.stop_ids.pop(tracking_id, None) self.tracking_threads.pop(tracking_id, None) return self._sps_station_proxy.apply_pointing_delays("") time_next = time_next + TimeDelta(time_step, format="sec") time.sleep(0.1) self.stop_ids.pop(tracking_id) self.tracking_threads.pop(tracking_id) self.logger.debug("Tracking complete") def _construct_delays( self: StationComponentManager, pointing_delays: np.ndarray, delay_rate: np.ndarray, beam_number: int, ) -> np.ndarray: delays = np.zeros(513) delays[0] = beam_number for i in range(self._nof_antennas): delays[2 * i + 1] = pointing_delays[i] delays[2 * i + 2] = delay_rate[i] return delays
[docs] def stop_tracking( self: StationComponentManager, track_id: int, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Stop a tracking thread. :param track_id: the id of the tracking you wish to stop. :param task_callback: Update task state, defaults to None :return: a result code and list of pointing delays """ return self.submit_task( self._stop_tracking, args=[track_id], task_callback=task_callback, )
def _stop_tracking( self: StationComponentManager, track_id: int, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Stop a tracking thread. :param track_id: the id of the tracking you wish to stop. :param task_callback: Update task state, defaults to None :param task_abort_event: Abort the task """ if task_callback: task_callback( status=TaskStatus.IN_PROGRESS, result="Attempting to stop tracking thread", ) if track_id not in self.tracking_threads: if task_callback: task_callback( status=TaskStatus.REJECTED, result="Tracking Id does not match any running thread", ) return if self.tracking_threads[track_id].is_alive(): self.stop_ids[track_id] = True self.tracking_threads[track_id].join() for i in range(5): if ( track_id not in self.tracking_threads or not self.tracking_threads[track_id].is_alive() ): if task_callback: task_callback( status=TaskStatus.COMPLETED, result="Tracking stopped", ) return time.sleep(1) if task_callback: task_callback( status=TaskStatus.FAILED, result="Failed to stop tracking thread", ) else: if task_callback: task_callback( status=TaskStatus.COMPLETED, result="No tracking needed to stop", )
[docs] def stop_tracking_all( self: StationComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Stop all tracking threads. :param task_callback: Update task state, defaults to None :return: a result code and list of pointing delays """ return self.submit_task( self._stop_tracking_all, args=[], task_callback=task_callback, )
def _stop_tracking_all( self: StationComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Stop a tracking thread. :param task_callback: Update task state, defaults to None :param task_abort_event: Abort the task """ if task_callback: task_callback( status=TaskStatus.IN_PROGRESS, result="Attempting to stop tracking thread", ) for key, _ in self.stop_ids.items(): self.stop_ids[key] = True succesfully_closed = False for i in range(5): if len(self.tracking_threads) == 0: succesfully_closed = True break time.sleep(1) if succesfully_closed: if task_callback: task_callback( status=TaskStatus.COMPLETED, result="Trackings stopped", ) else: if task_callback: task_callback( status=TaskStatus.FAILED, result="Failed to stop trackings", )
[docs] def load_pointing_delays( self: StationComponentManager, delays: np.ndarray, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Load the pointing delays for this station. :param delays: list of delays :param task_callback: Update task state, defaults to None :return: a result code and list of pointing delays """ return self.submit_task( self._load_pointing_delays, args=[delays], task_callback=task_callback, )
@check_communicating def _load_pointing_delays( self: StationComponentManager, delays: np.ndarray, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Load the pointing delays for this station. :param delays: list of delays :param task_callback: Update task state, defaults to None :param task_abort_event: Abort the task """ if task_callback: task_callback( status=TaskStatus.IN_PROGRESS, result="LoadPointingDelays command in progress", ) if delays is not None: self.last_pointing_delays = list(delays) if self._sps_station_proxy: self._sps_station_proxy.load_pointing_delays(delays) if task_callback: task_callback( status=TaskStatus.COMPLETED, result="LoadPointingDelays command has completed", ) else: if task_callback: task_callback( status=TaskStatus.FAILED, result="LoadPointingDelays called without SpsStation", ) else: if task_callback: task_callback( status=TaskStatus.FAILED, result="Unable to set delays for station", )
[docs] def apply_configuration( self: StationComponentManager, transaction_id: str, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Apply the beamformer and calibration configuration to SpsStation. :param transaction_id: the transaction id for the configuration :param task_callback: Update task state, defaults to None :return: a result code and response string """ if self._station_calibrator_power_state != PowerState.ON: station_calibrator_not_ready_message = ( "Service StationCalibrator is not avaliable or ON" "Unable to apply configuration." ) self.logger.error(station_calibrator_not_ready_message) return (TaskStatus.REJECTED, station_calibrator_not_ready_message) return self.submit_task( self._apply_configuration, args=[transaction_id], task_callback=task_callback, )
# pylint: disable=too-many-branches @check_communicating def _apply_configuration( self: StationComponentManager, transaction_id: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Apply the configuration to the SpsStation. :param transaction_id: the transaction id for the configuration :param task_callback: Update task state, defaults to None :param task_abort_event: Abort the task """ result_message = "" if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) self.logger.debug("Loading beamformer table") assert self._sps_station_proxy self._sps_station_proxy.set_beamformer_table( np.array(self._beamformer_table).flatten() ) # Compute and upload calibration # Initialize calibration table to unity gain self.logger.debug("Computing and downloading calibration coefficients") calibration_table = np.zeros( [MAX_NUMBER_OF_CHANNELS, self._nof_antennas, 4], np.csingle ) unity_calibration = np.array([1.0, 0.0, 0.0, 1.0], np.csingle) # compute and apply calibration for channel in range(MAX_NUMBER_OF_CHANNELS): for antenna in range(self._nof_antennas): calibration_table[channel, antenna] = unity_calibration try: for block, table_entry in enumerate(self._beamformer_table): channel = table_entry[0] beamformer_channel = block * 8 if channel == 0: # channel set to 0 marks unused (unconfigured) table entries. # These do not require calibration continue for i in range(8): # 8 channels per block # TODO Interface with calibration must be refined # get_calibration should return an array of 4 complex per antenna # ordered in station antenna number order try: solution_numpy = self._station_calibrator_proxy.get_calibration( int(channel) ) reshaped_array = self._format_solution_to_complex_jones( solution_numpy ) except ValueError as ve: error_message = ( f"\nFailed to fill calibration_table for {channel=}:\n" f"{repr(ve)}\n" "We will use a unit calibration for this channel.\n" ) self.logger.warning(error_message) result_message += error_message continue except tango.DevFailed as df: error_message = ( f"\nException raised when asking " f"for a solution for {channel=}:\n" f"{repr(df)}\n" "We will use a unit calibration for this channel.\n" ) result_message += error_message self.logger.warning(error_message) continue self.logger.debug(f"Solution found for {channel=}\n") calibration_table[beamformer_channel] = reshaped_array beamformer_channel += 1 channel += 1 # write calibration coefficients to station # TODO MCCS-1023 # Actual antenna number should be computed from configuration database: # it should be (tile_number)*16 + (tile _port_number), both 0 based # for antenna in range(self._nof_antennas): calibrations = [float(antenna)] for channel in range(MAX_NUMBER_OF_CHANNELS): for jones in calibration_table[channel, antenna]: calibrations.append(jones.real) calibrations.append(jones.imag) self._sps_station_proxy.load_calibration_coefficients(calibrations) self.logger.debug( f"load_calibration_coefficients for antenna {antenna}" ) self.logger.debug("Apply calibration") # TODO Add load time self._sps_station_proxy.apply_calibration("") except ValueError as value_error: self.logger.error(f"ApplyConfiguration command has failed: {value_error}") if task_callback: task_callback( status=TaskStatus.FAILED, result=( ResultCode.FAILED, f"ApplyConfiguration command has failed: {value_error}", ), ) return if task_callback: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.OK, f"ApplyConfiguration command has completed{result_message}", ), ) return def _format_solution_to_complex_jones( self: StationComponentManager, solution_array: np.ndarray, ) -> np.ndarray: """ Format solution to complex jones. :param solution_array: The solution array of type `np.ndarray`. the shape of this is (self._number_of_antenna * 8). :return: A complex numpy.ndarray of shape (self._number_of_antenna, 4). :raises ValueError: the solution is not in the correct format specified by solution_array. """ if len(solution_array) != self._nof_antennas * 8: raise ValueError( f"\tThis station is configured with {self._nof_antennas} antenna. " "We are expecting 4 complex numbers per antenna yielding a solution " f"of length {self._nof_antennas * 8}.\n" f"\tHowever, solution gathered is length {len(solution_array)}." ) # Reshape the flattened array back into the transposed shape transposed_shape = (len(solution_array) // 2, 2) reshaped = solution_array.reshape(transposed_shape) # Transpose it back to its original 2-row shape original_2_row_shape = reshaped.T # Combine the real and imaginary parts back into complex numbers real_parts = original_2_row_shape[0] imaginary_parts = original_2_row_shape[1] original_complex_array = real_parts + 1j * imaginary_parts reshaped_array = original_complex_array.reshape(self._nof_antennas, 4) return reshaped_array
[docs] def scan( self: StationComponentManager, task_callback: Optional[Callable] = None, *, interface: Optional[str] = None, subarray_id: int, scan_id: int, start_time: Optional[str] = None, duration: Optional[float] = 0.0, ) -> tuple[TaskStatus, str]: """ Submit the Scan slow task. This method returns immediately after it is submitted for execution. :param interface: the schema version this is running against. :param subarray_id: The subarray for whic the command applies :param scan_id: The ID for this scan :param start_time: UTC time for begin of scan, None for immediate start :param duration: Scan duration in seconds. 0.0 or omitted means forever :param task_callback: Update task state, defaults to None :return: Task status and response message """ return self.submit_task( self._scan, args=[subarray_id, scan_id, start_time, duration], task_callback=task_callback, )
def _scan( self: StationComponentManager, subarray_id: int, scan_id: int, start_time: Optional[str], duration: float, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Execute the Scan slow task. :param subarray_id: The subarray for whic the command applies :param scan_id: The ID for this scan :param start_time: UTC time for begin of scan, None for immediate start :param duration: Scan duration in seconds. 0.0 or omitted means forever :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) # compute and save channel block mask: which blocks belong to the subarray mask = 0 for block, table_entry in enumerate(self._beamformer_table): if subarray_id == table_entry[2] and table_entry[0] != 0: mask += 1 << block self._scan_mask[subarray_id] = mask self._scan_id[subarray_id] = scan_id if duration == 0: duration = -1 # this is the "infinite" time in SpsStation self.logger.debug(f"Starting beamformer for scan {scan_id} mask {hex(mask)}") assert self._sps_station_proxy self._sps_station_proxy.start_beamformer(scan_id, start_time, duration, mask) if task_callback is not None: task_callback( TaskStatus.COMPLETED, result="Scan has completed.", )
[docs] def end_scan( self: StationComponentManager, subarray_id: int, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> tuple[TaskStatus, str]: """ Submit the EndScan slow task. This method returns immediately after it is submitted for execution. :param subarray_id: The subarray for which the command applies :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None :return: Task status and response message """ if subarray_id < 1 or subarray_id > 16: self.logger.error(f"Invalid subarray ID {subarray_id}") return (TaskStatus.REJECTED, "Invalid subscan ID") return self.submit_task( self._end_scan, args=[subarray_id], task_callback=task_callback, )
def _end_scan( self: StationComponentManager, subarray_id: int, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Execute the EndScan slow task. :param subarray_id: The subarray for which the command applies :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) assert self._sps_station_proxy # TODO Patch for the demo self._sps_station_proxy.stop_beamformer(self._scan_mask[subarray_id]) self._scan_id[subarray_id] = 0 self._scan_mask[subarray_id] = 0 if task_callback is not None: task_callback( TaskStatus.COMPLETED, result="EndScan has completed.", )
[docs] def acquire_data_for_calibration( self: StationComponentManager, channel: int, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> tuple[TaskStatus, str]: """ Submit the AcquireDataForCalibration slow task. This method returns immediately after it is submitted for execution. :param channel: The channel to acquire data for :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None :return: Task status and response message """ if channel < 0 or channel > 510: self.logger.error(f"Invalid channel{channel}") return (TaskStatus.REJECTED, "Invalid channel") if self._sps_station_proxy is None: self.logger.error("There is no SpsStation proxy to collect data.") return (TaskStatus.REJECTED, "No SpsStation proxy to collect data.") return self.submit_task( self._acquire_data_for_calibration, args=[channel], task_callback=task_callback, )
def _acquire_data_for_calibration( self: StationComponentManager, channel: int, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Execute the AcquireDataForCalibration slow task. :param channel: The channel to acquire data for :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback is not None: task_callback(status=TaskStatus.IN_PROGRESS) assert self._sps_station_proxy self._sps_station_proxy.acquire_data_for_calibration(channel) if task_callback is not None: task_callback( status=TaskStatus.COMPLETED, result="AcquireDataForCalibration has completed.", )
[docs] def configure_channels( self: StationComponentManager, channel_blocks: list[int], ) -> ResultCode: """ Configure channels for a station beam in the channel table. :param channel_blocks: List of channel table entries :return: a result code and response string """ nitems = (len(channel_blocks) // 8) * 8 for index in range(0, nitems, 8): block_id = channel_blocks[index] block = channel_blocks[(index + 1) : (index + 8)] if block_id < 0 or block_id > 47: # block_id in range(48) return ResultCode.REJECTED if self._beamformer_table[block_id][2] not in (0, block[2]): return ResultCode.REJECTED self._beamformer_table[block_id] = block self._count_blocks() return ResultCode.OK
def _count_blocks( self: StationComponentManager, ) -> int: """ Return the number of blocks required to specify the whole configuration. It also updates the number of used channels. :return: the number of blocks up to the last non-empty one. """ max_block = 0 n_chans = 0 for n, block in enumerate(self._beamformer_table): if block[2] != 0: n_chans += 8 max_block = n + 1 self._number_of_channels = n_chans return max_block
[docs] def deallocate_subarray( self: StationComponentManager, subarray_id: int, ) -> ResultCode: """ Clear channels for a station beam in the channel table. :param subarray_id: subarray_id to clear :return: a result code and response string """ for i, block in enumerate(self._beamformer_table): if block[2] == subarray_id: self._beamformer_table[i] = [0] * 7 self._count_blocks() # this updates self:_number_of_channels return ResultCode.OK
[docs] def start_acquisition( self: StationComponentManager, task_callback: Optional[Callable] = None, *, start_time: Optional[str] = None, delay: Optional[int] = 2, ) -> tuple[TaskStatus, str]: """ Submit the start acquisition method. This method returns immediately after it submitted `self._start_acquisition` for execution. :param start_time: the time at which to start data acquisition, defaults to None :param delay: delay start, defaults to 2 :param task_callback: Update task state, defaults to None :return: a task staus and response message """ return self.submit_task( self._start_acquisition, args=[start_time, delay], task_callback=task_callback, )
@check_communicating def _start_acquisition( self: StationComponentManager, start_time: Optional[str] = None, delay: Optional[int] = 2, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Start acquisition using slow command. :param start_time: the time at which to start data acquisition, defaults to None :param delay: delay start, defaults to 2 :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) success = True parameter_list = {"start_time": start_time, "delay": delay} json_argument = json.dumps(parameter_list) assert self._sps_station_proxy is not None result = self._sps_station_proxy.start_acquisition(json_argument) if result not in (ResultCode.QUEUED, ResultCode.OK, ResultCode.STARTED): success = False if task_callback: if success: task_callback( status=TaskStatus.COMPLETED, result="Start acquisition has completed", ) else: task_callback( status=TaskStatus.FAILED, result="Start acquisition task failed" ) return
[docs] def initialise( self: StationComponentManager, task_callback: Optional[Callable] = None, ) -> tuple[TaskStatus, str]: """ Submit the initialise method. This method returns immediately after it submitted `self._initialise` for execution. :param task_callback: Update task state, defaults to None :return: a task staus and response message """ return self.submit_task( self._initialise, args=[], task_callback=task_callback, )
@check_communicating def _initialise( self: StationComponentManager, task_callback: Optional[Callable] = None, task_abort_event: Optional[threading.Event] = None, ) -> None: """ Initialise using slow command. :param task_callback: Update task state, defaults to None :param task_abort_event: Check for abort, defaults to None """ if task_callback: task_callback(status=TaskStatus.IN_PROGRESS) success = True assert self._sps_station_proxy is not None result = self._sps_station_proxy.initialise() if result not in (ResultCode.QUEUED, ResultCode.OK, ResultCode.STARTED): success = False if task_callback: if success: task_callback( status=TaskStatus.COMPLETED, result="Initialisation has completed", ) else: task_callback( status=TaskStatus.FAILED, result="Initialisation task failed" ) return def _wait_for_power_state( self: StationComponentManager, timeout: float, task_abort_event: Optional[threading.Event] = None, ) -> TaskStatus: """ Wait for sub-station PowerState to reach desired state. :param timeout: Time to wait, in seconds. :param task_abort_event: Check for abort, defaults to None :return: completed if status reached, FAILED if timed out, ABORTED if aborted """ assert self._desired_power_state is not None resolution = 1 # seconds ticks = int(timeout / resolution) while self._powering_resources: if task_abort_event and task_abort_event.is_set(): return TaskStatus.ABORTED time.sleep(resolution) ticks -= 1 if ticks == 0: self.logger.warning( f"Timed out waiting for PowerState in {timeout} seconds" ) return TaskStatus.FAILED self.logger.debug( f"Waiting for {self._powering_resources} PowerState" f" {self._desired_power_state.name}," f" waiting for {ticks*resolution} more seconds" ) self.logger.debug(f"Waited PowerState for {timeout-ticks*resolution} seconds") return TaskStatus.COMPLETED @property def number_of_channels(self: StationComponentManager) -> int: """ Return the total number of channels in the beamformer. :return: the total numebr of channels """ return self._number_of_channels @property def beamformer_table(self: StationComponentManager) -> list[list[int]]: """ Return the channel table reformatted as would be needed by ConfigureChannels. :return: reformatted channel table """ table: list[list[int]] = [] n_blocks = self._count_blocks() for n, block in enumerate(self._beamformer_table[:n_blocks]): table.append([n, *block]) return table @property def scan_ids(self: StationComponentManager) -> list[int]: """ Return the current scan IDs for each subarray. :return: list of scan IDs starting from subarray 1, 0 = subarray not scanning """ return self._scan_id[1:] @property def tileprogrammingstate(self: StationComponentManager) -> tuple[str]: """ Return the tileprogrammingstate of the SpsStation. :return: the tileprogrammingstate of the SpsStation. """ assert self._sps_station_proxy is not None assert self._sps_station_proxy._proxy is not None return self._sps_station_proxy._proxy._device.tileprogrammingstate