# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
Module for providing the API to be communicate with the DSP process.
The :py:class:`PstDspProcessApiSimulator` is used in testing or
simulation mode, while the :py:class:`PstDspProcessApiGrpc` is used
to connect to a remote application that exposes a gRPC API.
"""
from __future__ import annotations
from typing import cast
from ska_control_model import PstProcessingMode
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
BeamConfiguration,
DspBeamConfiguration,
DspDetectedFilterbankScanConfiguration,
DspDiskScanConfiguration,
DspFlowThroughScanConfiguration,
MonitorData,
ScanConfiguration,
SpectralKurtosisConfig,
)
from ska_pst.lmc.component import MonitorDataCallback
from ska_pst.lmc.component.grpc_lmc_client import protobuf_to_ascii_header
from .dsp_model import DspMonitorData
from .dsp_util import generate_dsp_scan_request
__all__ = [
"PstDspGrpcApiStrategy",
]
def _map_sk_config(sk_config: dict) -> SpectralKurtosisConfig:
"""Map from spectral kurtosis (SK) dictionary to a protobuf message."""
# this strips the sk_ off the start of the keys in the dictionary
sk_config = {cast(str, k).replace("sk_", "", 1): v for k, v in sk_config.items()}
return SpectralKurtosisConfig(**sk_config)
[docs]class PstDspGrpcApiStrategy:
"""Implementation of the GrpcApiStrategy for the DSP subcomponent."""
[docs] def get_beam_configuration_msg(self: PstDspGrpcApiStrategy, *, configuration: dict) -> BeamConfiguration:
"""
Get the gRPC BeamConfiguration Protobuf message for DSP.
:param configuration: the PST beam configuration
:type configuration: dict
:return: the gRPC BeamConfiguration Protobuf message for DSP.
:rtype: BeamConfiguration
"""
return BeamConfiguration(dsp=DspBeamConfiguration(**configuration))
[docs] def get_scan_configuration_msg(self: PstDspGrpcApiStrategy, *, configuration: dict) -> ScanConfiguration:
"""
Get the gRPC ScanConfiguration Protobuf message for DSP.
:param configuration: the PST scan configuration
:type configuration: dict
:return: the gRPC ScanConfiguration Protobuf message for DSP.
:rtype: ScanConfiguration
"""
pst_processing_mode: PstProcessingMode = configuration["pst_processing_mode"]
dsp_scan_config = generate_dsp_scan_request(**configuration)
if pst_processing_mode == PstProcessingMode.VOLTAGE_RECORDER:
return ScanConfiguration(dsp_disk=DspDiskScanConfiguration(**dsp_scan_config))
elif pst_processing_mode == PstProcessingMode.FLOW_THROUGH:
return ScanConfiguration(dsp_flow_through=DspFlowThroughScanConfiguration(**dsp_scan_config))
elif pst_processing_mode == PstProcessingMode.DETECTED_FILTERBANK:
sk_config = [_map_sk_config(sk) for sk in dsp_scan_config.pop("sk_config")]
return ScanConfiguration(
dsp_detected_filterbank=DspDetectedFilterbankScanConfiguration(
sk_config=sk_config, **dsp_scan_config
)
)
else:
raise ValueError(f"unsupported PST processing mode: {pst_processing_mode.name}")
[docs] def handle_monitor_response(
self: PstDspGrpcApiStrategy, *, data: MonitorData, callback: MonitorDataCallback
) -> None:
"""
Handle the gRPC monitoring data response.
:param data: the gRPC/Protobuf monitoring data message from server
:type data: MonitorData
:param callback: the callback used to update the LMC subcomponent model
:type callback: MonitorDataCallback
"""
# merge the DSP.DISK and any pipeline stats
ascii_header = protobuf_to_ascii_header(data.dsp.dsp_disk_data.header)
for k, v in protobuf_to_ascii_header(data.dsp.pipeline_data.header).items():
ascii_header[k] = v
callback(subband_id=1, subband_data=ascii_header.get_dataclass_obj(DspMonitorData))