Source code for ska_pst.lmc.dsp.dsp_grpc_api_strategy

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
Module for providing the API to be communicate with the DSP process.

The :py:class:`PstDspProcessApiSimulator` is used in testing or
simulation mode, while the :py:class:`PstDspProcessApiGrpc` is used
to connect to a remote application that exposes a gRPC API.
"""

from __future__ import annotations

from typing import cast

from ska_control_model import PstProcessingMode
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
    BeamConfiguration,
    DspBeamConfiguration,
    DspDetectedFilterbankScanConfiguration,
    DspDiskScanConfiguration,
    DspFlowThroughScanConfiguration,
    MonitorData,
    ScanConfiguration,
    SpectralKurtosisConfig,
)
from ska_pst.lmc.component import MonitorDataCallback
from ska_pst.lmc.component.grpc_lmc_client import protobuf_to_ascii_header

from .dsp_model import DspMonitorData
from .dsp_util import generate_dsp_scan_request

__all__ = [
    "PstDspGrpcApiStrategy",
]


def _map_sk_config(sk_config: dict) -> SpectralKurtosisConfig:
    """Map from spectral kurtosis (SK) dictionary to a protobuf message."""
    # this strips the sk_ off the start of the keys in the dictionary
    sk_config = {cast(str, k).replace("sk_", "", 1): v for k, v in sk_config.items()}
    return SpectralKurtosisConfig(**sk_config)


[docs]class PstDspGrpcApiStrategy: """Implementation of the GrpcApiStrategy for the DSP subcomponent."""
[docs] def get_beam_configuration_msg(self: PstDspGrpcApiStrategy, *, configuration: dict) -> BeamConfiguration: """ Get the gRPC BeamConfiguration Protobuf message for DSP. :param configuration: the PST beam configuration :type configuration: dict :return: the gRPC BeamConfiguration Protobuf message for DSP. :rtype: BeamConfiguration """ return BeamConfiguration(dsp=DspBeamConfiguration(**configuration))
[docs] def get_scan_configuration_msg(self: PstDspGrpcApiStrategy, *, configuration: dict) -> ScanConfiguration: """ Get the gRPC ScanConfiguration Protobuf message for DSP. :param configuration: the PST scan configuration :type configuration: dict :return: the gRPC ScanConfiguration Protobuf message for DSP. :rtype: ScanConfiguration """ pst_processing_mode: PstProcessingMode = configuration["pst_processing_mode"] dsp_scan_config = generate_dsp_scan_request(**configuration) if pst_processing_mode == PstProcessingMode.VOLTAGE_RECORDER: return ScanConfiguration(dsp_disk=DspDiskScanConfiguration(**dsp_scan_config)) elif pst_processing_mode == PstProcessingMode.FLOW_THROUGH: return ScanConfiguration(dsp_flow_through=DspFlowThroughScanConfiguration(**dsp_scan_config)) elif pst_processing_mode == PstProcessingMode.DETECTED_FILTERBANK: sk_config = [_map_sk_config(sk) for sk in dsp_scan_config.pop("sk_config")] return ScanConfiguration( dsp_detected_filterbank=DspDetectedFilterbankScanConfiguration( sk_config=sk_config, **dsp_scan_config ) ) else: raise ValueError(f"unsupported PST processing mode: {pst_processing_mode.name}")
[docs] def handle_monitor_response( self: PstDspGrpcApiStrategy, *, data: MonitorData, callback: MonitorDataCallback ) -> None: """ Handle the gRPC monitoring data response. :param data: the gRPC/Protobuf monitoring data message from server :type data: MonitorData :param callback: the callback used to update the LMC subcomponent model :type callback: MonitorDataCallback """ # merge the DSP.DISK and any pipeline stats ascii_header = protobuf_to_ascii_header(data.dsp.dsp_disk_data.header) for k, v in protobuf_to_ascii_header(data.dsp.pipeline_data.header).items(): ascii_header[k] = v callback(subband_id=1, subband_data=ascii_header.get_dataclass_obj(DspMonitorData))