Source code for ska_pst.lmc.dsp.dsp_ft_process_api

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
Module for providing the API to be communicate with the DSP process.

The :py:class:`PstDspFlowThroughProcessApiSimulator` is used in testing or
simulation mode, while the :py:class:`PstDspFlowThroughProcessApiGrpc` is used
to connect to a remote application that exposes a gRPC API.
"""

from __future__ import annotations

import logging
from typing import Callable

from overrides import override
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
    BeamConfiguration,
    DspFlowThroughBeamConfiguration,
)
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import DspFlowThroughMonitorData as DspFlowThroughMonitorDataProtobuf
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
    DspFlowThroughScanConfiguration,
)
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import DspPipelineMonitorData as DspPipelineMonitorDataProtobuf
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
    MonitorData,
    ScanConfiguration,
)
from ska_pst.lmc.component import PstProcessApi, PstProcessApiGrpc, PstProcessApiSimulator
from ska_pst.lmc.dsp.dsp_ft_simulator import PstDspFlowThroughSimulator
from ska_pst.lmc.dsp.dsp_pipeline_model import DspPipelineMonitorData
from ska_pst.lmc.dsp.dsp_util import generate_dsp_scan_request

__all__ = [
    "PstDspFlowThroughProcessApi",
    "PstDspFlowThroughProcessApiSimulator",
]


[docs]class PstDspFlowThroughProcessApi(PstProcessApi): """ Abstract class for the API of the DSP process. This extends from :py:class:`PstProcessApi` but provides the specific method of getting the monitoring data. """
[docs]class PstDspFlowThroughProcessApiSimulator( PstProcessApiSimulator[DspPipelineMonitorData, PstDspFlowThroughSimulator], PstDspFlowThroughProcessApi, ): """A simulator implementation version of the API of `PstDspFlowThroughProcessApi`.""" def __init__( self: PstDspFlowThroughProcessApiSimulator, simulator: PstDspFlowThroughSimulator | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise the API. :param logger: the logger to use for the API. :param component_state_callback: this allows the API to call back to the component manager / TANGO device to deal with state model changes. :param simulator: the simulator instance to use in the API. """ simulator = simulator or PstDspFlowThroughSimulator() super().__init__(logger=logger, simulator=simulator)
[docs]class PstDspFlowThroughProcessApiGrpc(PstProcessApiGrpc, PstDspFlowThroughProcessApi): """This is an gRPC implementation of the `PstDspFlowThroughProcessApi` API. This uses an instance of a `PstGrpcLmcClient` to send requests through to the DSP.FT application. Instances of this class should be per subband, rather than one for all of DSP.FT as a whole. """ @override def _get_configure_beam_request( self: PstDspFlowThroughProcessApiGrpc, configuration: dict ) -> BeamConfiguration: return BeamConfiguration(dsp_flow_through=DspFlowThroughBeamConfiguration(**configuration)) @override def _handle_monitor_response( self: PstDspFlowThroughProcessApiGrpc, data: MonitorData, *, monitor_data_callback: Callable[..., None], ) -> None: dsp_ft_data: DspFlowThroughMonitorDataProtobuf = data.dsp_flow_through # for DSP.FT we only use the common pipeline monitoring data pipeline_data_msg: DspPipelineMonitorDataProtobuf = dsp_ft_data.pipeline_data monitor_data_callback( subband_id=1, subband_data=DspPipelineMonitorData( processing_time=pipeline_data_msg.processing_time, data_time=pipeline_data_msg.data_time, bytes_processed=pipeline_data_msg.bytes_processed, processing_time_percent=pipeline_data_msg.processing_time_percent, overall_efficiency=pipeline_data_msg.overall_efficiency, efficiency=pipeline_data_msg.efficiency, bytes_processing_rate=pipeline_data_msg.bytes_processing_rate, ), ) @override def _get_configure_scan_request( self: PstDspFlowThroughProcessApiGrpc, configure_parameters: dict ) -> ScanConfiguration: return ScanConfiguration( dsp_flow_through=DspFlowThroughScanConfiguration( **generate_dsp_scan_request(**configure_parameters) ) )