# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
Module for providing the API to be communicate with the STAT process.
The :py:class:`PstStatProcessApiSimulator` is used in testing or
simulation.)
"""
from __future__ import annotations
import logging
from typing import Callable, Optional
import numpy as np
from overrides import override
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
BeamConfiguration,
MonitorData,
ScanConfiguration,
StatBeamConfiguration,
)
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import StatMonitorData as StatMonitorDataProto
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
StatScanConfiguration,
)
from ska_pst.lmc.component import SUBBAND_1, PstProcessApi, PstProcessApiGrpc, PstProcessApiSimulator
from ska_pst.lmc.stat.stat_model import StatMonitorData
from ska_pst.lmc.stat.stat_simulator import PstStatSimulator
from ska_pst.lmc.stat.stat_util import generate_stat_scan_request
__all__ = [
"PstStatProcessApi",
"PstStatProcessApiSimulator",
"PstStatProcessApiGrpc",
]
POLA_IDX = 0
POLB_IDX = 1
REAL_IDX = 0
IMAG_IDX = 1
[docs]class PstStatProcessApi(PstProcessApi):
"""
Abstract class for the API of the STAT process.
This extends from :py:class:`PstProcessApi` but
provides the specific method of getting the monitoring
data.
"""
[docs]class PstStatProcessApiSimulator(
PstProcessApiSimulator[StatMonitorData, PstStatSimulator], PstStatProcessApi
):
"""A simulator implementation version of the API of `PstStatProcessApi`."""
def __init__(
self: PstStatProcessApiSimulator,
simulator: Optional[PstStatSimulator] = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise the API.
:param logger: the logger to use for the API.
:param component_state_callback: this allows the API to call back to the component manager / TANGO
device to deal with state model changes.
:param simulator: the simulator instance to use in the API.
"""
simulator = simulator or PstStatSimulator()
super().__init__(simulator=simulator, logger=logger)
[docs]class PstStatProcessApiGrpc(PstProcessApiGrpc, PstStatProcessApi):
"""This is an gRPC implementation of the `PstStatProcessApi` API.
This uses an instance of a `PstGrpcLmcClient` to send requests through
to the STAT.CORE application. Instances of this class should be per
subband, rather than one for all of STAT as a whole.
"""
@override
def _get_configure_beam_request(self: PstStatProcessApiGrpc, configuration: dict) -> BeamConfiguration:
return BeamConfiguration(stat=StatBeamConfiguration(**configuration))
@override
def _handle_monitor_response(
self: PstStatProcessApiGrpc, data: MonitorData, *, monitor_data_callback: Callable[..., None]
) -> None:
stat_data_proto: StatMonitorDataProto = data.stat
try:
mean_frequency_avg = (
np.array(stat_data_proto.mean_frequency_avg).reshape((2, 2)).astype(dtype=np.float32)
)
mean_frequency_avg_rfi_excised = (
np.array(stat_data_proto.mean_frequency_avg_masked).reshape((2, 2)).astype(dtype=np.float32)
)
variance_frequency_avg = (
np.array(stat_data_proto.variance_frequency_avg).reshape((2, 2)).astype(dtype=np.float32)
)
variance_frequency_avg_rfi_excised = (
np.array(stat_data_proto.variance_frequency_avg_masked)
.reshape((2, 2))
.astype(dtype=np.float32)
)
num_clipped_samples = (
np.array(stat_data_proto.num_clipped_samples).reshape((2, 2)).astype(dtype=np.int32)
)
num_clipped_samples_rfi_excised = (
np.array(stat_data_proto.num_clipped_samples_masked).reshape((2, 2)).astype(dtype=np.int32)
)
except ValueError:
# Ignoring monitoring update due to un-populated statistics
return
subband_data = StatMonitorData(
# Pol A + I
real_pol_a_mean_freq_avg=mean_frequency_avg[POLA_IDX][REAL_IDX],
real_pol_a_variance_freq_avg=variance_frequency_avg[POLA_IDX][REAL_IDX],
real_pol_a_num_clipped_samples=num_clipped_samples[POLA_IDX][REAL_IDX],
# Pol A + I (RFI excised)
real_pol_a_mean_freq_avg_rfi_excised=mean_frequency_avg_rfi_excised[POLA_IDX][REAL_IDX],
real_pol_a_variance_freq_avg_rfi_excised=variance_frequency_avg_rfi_excised[POLA_IDX][REAL_IDX],
real_pol_a_num_clipped_samples_rfi_excised=num_clipped_samples_rfi_excised[POLA_IDX][REAL_IDX],
# Pol A + Q
imag_pol_a_mean_freq_avg=mean_frequency_avg[POLA_IDX][IMAG_IDX],
imag_pol_a_variance_freq_avg=variance_frequency_avg[POLA_IDX][IMAG_IDX],
imag_pol_a_num_clipped_samples=num_clipped_samples[POLA_IDX][IMAG_IDX],
# Pol A + Q (RFI excised)
imag_pol_a_mean_freq_avg_rfi_excised=mean_frequency_avg_rfi_excised[POLA_IDX][IMAG_IDX],
imag_pol_a_variance_freq_avg_rfi_excised=variance_frequency_avg_rfi_excised[POLA_IDX][IMAG_IDX],
imag_pol_a_num_clipped_samples_rfi_excised=num_clipped_samples_rfi_excised[POLA_IDX][IMAG_IDX],
# Pol B + I
real_pol_b_mean_freq_avg=mean_frequency_avg[POLB_IDX][REAL_IDX],
real_pol_b_variance_freq_avg=variance_frequency_avg[POLB_IDX][REAL_IDX],
real_pol_b_num_clipped_samples=num_clipped_samples[POLB_IDX][REAL_IDX],
# Pol B + I (RFI excised)
real_pol_b_mean_freq_avg_rfi_excised=mean_frequency_avg_rfi_excised[POLB_IDX][REAL_IDX],
real_pol_b_variance_freq_avg_rfi_excised=variance_frequency_avg_rfi_excised[POLB_IDX][REAL_IDX],
real_pol_b_num_clipped_samples_rfi_excised=num_clipped_samples_rfi_excised[POLB_IDX][REAL_IDX],
# Pol B + Q
imag_pol_b_mean_freq_avg=mean_frequency_avg[POLB_IDX][IMAG_IDX],
imag_pol_b_variance_freq_avg=variance_frequency_avg[POLB_IDX][IMAG_IDX],
imag_pol_b_num_clipped_samples=num_clipped_samples[POLB_IDX][IMAG_IDX],
# Pol B + Q (RFI excised)
imag_pol_b_mean_freq_avg_rfi_excised=mean_frequency_avg_rfi_excised[POLB_IDX][IMAG_IDX],
imag_pol_b_variance_freq_avg_rfi_excised=variance_frequency_avg_rfi_excised[POLB_IDX][IMAG_IDX],
imag_pol_b_num_clipped_samples_rfi_excised=num_clipped_samples_rfi_excised[POLB_IDX][IMAG_IDX],
)
monitor_data_callback(
subband_id=SUBBAND_1,
subband_data=subband_data,
)
@override
def _get_configure_scan_request(self: PstProcessApiGrpc, configure_parameters: dict) -> ScanConfiguration:
return ScanConfiguration(
stat=StatScanConfiguration(**generate_stat_scan_request(request_params=configure_parameters))
)