# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
Module for providing the API to be communicate with the DSP process.
The :py:class:`PstDspFlowThroughProcessApiSimulator` is used in testing or
simulation mode, while the :py:class:`PstDspFlowThroughProcessApiGrpc` is used
to connect to a remote application that exposes a gRPC API.
"""
from __future__ import annotations
import logging
from typing import Callable
from overrides import override
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
BeamConfiguration,
DspFlowThroughBeamConfiguration,
)
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import DspFlowThroughMonitorData as DspFlowThroughMonitorDataProtobuf
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
DspFlowThroughScanConfiguration,
)
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import DspPipelineMonitorData as DspPipelineMonitorDataProtobuf
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
MonitorData,
ScanConfiguration,
)
from ska_pst.lmc.component import PstProcessApi, PstProcessApiGrpc, PstProcessApiSimulator
from ska_pst.lmc.dsp.dsp_ft_simulator import PstDspFlowThroughSimulator
from ska_pst.lmc.dsp.dsp_pipeline_model import DspPipelineMonitorData
from ska_pst.lmc.dsp.dsp_util import generate_dsp_scan_request
__all__ = [
"PstDspFlowThroughProcessApi",
"PstDspFlowThroughProcessApiSimulator",
]
[docs]class PstDspFlowThroughProcessApi(PstProcessApi):
"""
Abstract class for the API of the DSP process.
This extends from :py:class:`PstProcessApi` but
provides the specific method of getting the monitoring
data.
"""
[docs]class PstDspFlowThroughProcessApiSimulator(
PstProcessApiSimulator[DspPipelineMonitorData, PstDspFlowThroughSimulator],
PstDspFlowThroughProcessApi,
):
"""A simulator implementation version of the API of `PstDspFlowThroughProcessApi`."""
def __init__(
self: PstDspFlowThroughProcessApiSimulator,
simulator: PstDspFlowThroughSimulator | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise the API.
:param logger: the logger to use for the API.
:param component_state_callback: this allows the API to call back to the component manager / TANGO
device to deal with state model changes.
:param simulator: the simulator instance to use in the API.
"""
simulator = simulator or PstDspFlowThroughSimulator()
super().__init__(logger=logger, simulator=simulator)
[docs]class PstDspFlowThroughProcessApiGrpc(PstProcessApiGrpc, PstDspFlowThroughProcessApi):
"""This is an gRPC implementation of the `PstDspFlowThroughProcessApi` API.
This uses an instance of a `PstGrpcLmcClient` to send requests through
to the DSP.FT application. Instances of this class should be per
subband, rather than one for all of DSP.FT as a whole.
"""
@override
def _get_configure_beam_request(
self: PstDspFlowThroughProcessApiGrpc, configuration: dict
) -> BeamConfiguration:
return BeamConfiguration(dsp_flow_through=DspFlowThroughBeamConfiguration(**configuration))
@override
def _handle_monitor_response(
self: PstDspFlowThroughProcessApiGrpc,
data: MonitorData,
*,
monitor_data_callback: Callable[..., None],
) -> None:
dsp_ft_data: DspFlowThroughMonitorDataProtobuf = data.dsp_flow_through
# for DSP.FT we only use the common pipeline monitoring data
pipeline_data_msg: DspPipelineMonitorDataProtobuf = dsp_ft_data.pipeline_data
monitor_data_callback(
subband_id=1,
subband_data=DspPipelineMonitorData(
processing_time=pipeline_data_msg.processing_time,
data_time=pipeline_data_msg.data_time,
bytes_processed=pipeline_data_msg.bytes_processed,
processing_time_percent=pipeline_data_msg.processing_time_percent,
overall_efficiency=pipeline_data_msg.overall_efficiency,
efficiency=pipeline_data_msg.efficiency,
bytes_processing_rate=pipeline_data_msg.bytes_processing_rate,
),
)
@override
def _get_configure_scan_request(
self: PstDspFlowThroughProcessApiGrpc, configure_parameters: dict
) -> ScanConfiguration:
return ScanConfiguration(
dsp_flow_through=DspFlowThroughScanConfiguration(
**generate_dsp_scan_request(**configure_parameters)
)
)