Source code for ska_pst.lmc.stat.stat_grpc_api_strategy

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
Module for providing the API to be communicate with the STAT process.

The :py:class:`PstStatProcessApiSimulator` is used in testing or
simulation.)
"""

from __future__ import annotations

import numpy as np
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
    BeamConfiguration,
    MonitorData,
    ScanConfiguration,
    StatBeamConfiguration,
)
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import StatMonitorData as StatMonitorDataProto
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
    StatScanConfiguration,
)
from ska_pst.lmc.component import SUBBAND_1, MonitorDataCallback
from ska_pst.lmc.stat.stat_model import StatMonitorData
from ska_pst.lmc.stat.stat_util import generate_stat_scan_request

__all__ = [
    "PstStatGrpcApiStrategy",
]

from ska_pst.common.constants import IMAG_IDX, POLA_IDX, POLB_IDX, REAL_IDX


[docs]class PstStatGrpcApiStrategy: """Implementation of the GrpcApiStrategy for the STAT subcomponent."""
[docs] def get_beam_configuration_msg(self: PstStatGrpcApiStrategy, *, configuration: dict) -> BeamConfiguration: """ Get the gRPC BeamConfiguration Protobuf message for STAT. :param configuration: the PST beam configuration :type configuration: dict :return: the gRPC BeamConfiguration Protobuf message for STAT. :rtype: BeamConfiguration """ return BeamConfiguration(stat=StatBeamConfiguration(**configuration))
[docs] def get_scan_configuration_msg(self: PstStatGrpcApiStrategy, *, configuration: dict) -> ScanConfiguration: """ Get the gRPC ScanConfiguration Protobuf message for STAT. :param configuration: the PST scan configuration :type configuration: dict :return: the gRPC ScanConfiguration Protobuf message for STAT. :rtype: ScanConfiguration """ return ScanConfiguration( stat=StatScanConfiguration(**generate_stat_scan_request(request_params=configuration)) )
[docs] def handle_monitor_response( self: PstStatGrpcApiStrategy, *, data: MonitorData, callback: MonitorDataCallback ) -> None: """ Handle the gRPC monitoring data response. :param data: the gRPC/Protobuf monitoring data message from server :type data: MonitorData :param callback: the callback used to update the LMC subcomponent model :type callback: MonitorDataCallback """ stat_data_proto: StatMonitorDataProto = data.stat try: mean_frequency_avg = ( np.array(stat_data_proto.mean_frequency_avg).reshape((2, 2)).astype(dtype=np.float32) ) mean_frequency_avg_rfi_excised = ( np.array(stat_data_proto.mean_frequency_avg_masked).reshape((2, 2)).astype(dtype=np.float32) ) variance_frequency_avg = ( np.array(stat_data_proto.variance_frequency_avg).reshape((2, 2)).astype(dtype=np.float32) ) variance_frequency_avg_rfi_excised = ( np.array(stat_data_proto.variance_frequency_avg_masked) .reshape((2, 2)) .astype(dtype=np.float32) ) num_clipped_samples = ( np.array(stat_data_proto.num_clipped_samples).reshape((2, 2)).astype(dtype=np.int32) ) num_clipped_samples_rfi_excised = ( np.array(stat_data_proto.num_clipped_samples_masked).reshape((2, 2)).astype(dtype=np.int32) ) except ValueError: # Ignoring monitoring update due to un-populated statistics return subband_data = StatMonitorData( # Pol A + I real_pol_a_mean_freq_avg=mean_frequency_avg[POLA_IDX][REAL_IDX], real_pol_a_variance_freq_avg=variance_frequency_avg[POLA_IDX][REAL_IDX], real_pol_a_num_clipped_samples=num_clipped_samples[POLA_IDX][REAL_IDX], # Pol A + I (RFI excised) real_pol_a_mean_freq_avg_rfi_excised=mean_frequency_avg_rfi_excised[POLA_IDX][REAL_IDX], real_pol_a_variance_freq_avg_rfi_excised=variance_frequency_avg_rfi_excised[POLA_IDX][REAL_IDX], real_pol_a_num_clipped_samples_rfi_excised=num_clipped_samples_rfi_excised[POLA_IDX][REAL_IDX], # Pol A + Q imag_pol_a_mean_freq_avg=mean_frequency_avg[POLA_IDX][IMAG_IDX], imag_pol_a_variance_freq_avg=variance_frequency_avg[POLA_IDX][IMAG_IDX], imag_pol_a_num_clipped_samples=num_clipped_samples[POLA_IDX][IMAG_IDX], # Pol A + Q (RFI excised) imag_pol_a_mean_freq_avg_rfi_excised=mean_frequency_avg_rfi_excised[POLA_IDX][IMAG_IDX], imag_pol_a_variance_freq_avg_rfi_excised=variance_frequency_avg_rfi_excised[POLA_IDX][IMAG_IDX], imag_pol_a_num_clipped_samples_rfi_excised=num_clipped_samples_rfi_excised[POLA_IDX][IMAG_IDX], # Pol B + I real_pol_b_mean_freq_avg=mean_frequency_avg[POLB_IDX][REAL_IDX], real_pol_b_variance_freq_avg=variance_frequency_avg[POLB_IDX][REAL_IDX], real_pol_b_num_clipped_samples=num_clipped_samples[POLB_IDX][REAL_IDX], # Pol B + I (RFI excised) real_pol_b_mean_freq_avg_rfi_excised=mean_frequency_avg_rfi_excised[POLB_IDX][REAL_IDX], real_pol_b_variance_freq_avg_rfi_excised=variance_frequency_avg_rfi_excised[POLB_IDX][REAL_IDX], real_pol_b_num_clipped_samples_rfi_excised=num_clipped_samples_rfi_excised[POLB_IDX][REAL_IDX], # Pol B + Q imag_pol_b_mean_freq_avg=mean_frequency_avg[POLB_IDX][IMAG_IDX], imag_pol_b_variance_freq_avg=variance_frequency_avg[POLB_IDX][IMAG_IDX], imag_pol_b_num_clipped_samples=num_clipped_samples[POLB_IDX][IMAG_IDX], # Pol B + Q (RFI excised) imag_pol_b_mean_freq_avg_rfi_excised=mean_frequency_avg_rfi_excised[POLB_IDX][IMAG_IDX], imag_pol_b_variance_freq_avg_rfi_excised=variance_frequency_avg_rfi_excised[POLB_IDX][IMAG_IDX], imag_pol_b_num_clipped_samples_rfi_excised=num_clipped_samples_rfi_excised[POLB_IDX][IMAG_IDX], ) callback( subband_id=SUBBAND_1, subband_data=subband_data, )