Source code for ska_pst.lmc.receive.receive_process_api

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
Module for providing the API to be communicate with the RECV process.

The :py:class:`PstReceiveProcessApiSimulator` is used in testing or
simulation mode, while the :py:class:`PstReceiveProcessApiGrpc` is used
to connect to a remote application that exposes a gRPC API.
"""

from __future__ import annotations

import copy
import logging
from typing import Callable

from overrides import override
from ska_pst.common.constants import GIGABITS_PER_BYTE
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
    BeamConfiguration,
    MonitorData,
    ReceiveBeamConfiguration,
    ReceiveMonitorData,
    ReceiveScanConfiguration,
    ReceiveSubbandResources,
    ScanConfiguration,
)
from ska_pst.lmc.component import PstProcessApi, PstProcessApiGrpc, PstProcessApiSimulator
from ska_pst.lmc.receive.receive_model import ReceiveData
from ska_pst.lmc.receive.receive_simulator import PstReceiveSimulator
from ska_pst.lmc.receive.receive_util import generate_recv_configure_scan_request

__all__ = [
    "PstReceiveProcessApi",
    "PstReceiveProcessApiSimulator",
    "PstReceiveProcessApiGrpc",
]


[docs]class PstReceiveProcessApi(PstProcessApi): """ Abstract class for the API of the RECV process. This extends from :py:class:`PstProcessApi` but provides the specific method of getting the monitoring data. """
[docs]class PstReceiveProcessApiSimulator( PstProcessApiSimulator[ReceiveData, PstReceiveSimulator], PstReceiveProcessApi ): """A simulator implementation version of the API of `PstReceiveProcessApi`.""" def __init__( self: PstReceiveProcessApiSimulator, simulator: PstReceiveSimulator | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise the API. :param logger: the logger to use for the API. :param component_state_callback: this allows the API to call back to the component manager / TANGO device to deal with state model changes. :param simulator: the simulator instance to use in the API. """ simulator = simulator or PstReceiveSimulator() super().__init__(simulator=simulator, logger=logger) @override def get_env(self: PstReceiveProcessApiSimulator) -> dict: """ Get simulated environment values for RECV.CORE. This returns the following: * data_host = '127.0.0.1' * data_mac = '01:23:45:ab:cd:ef', * data_port = 32080 """ return { "data_host": "127.0.0.1", "data_mac": "01:23:45:ab:cd:ef", "data_port": 32080, }
[docs]class PstReceiveProcessApiGrpc(PstProcessApiGrpc, PstReceiveProcessApi): """This is an gRPC implementation of the `PstReceiveProcessApi`. This uses an instance of a `PstGrpcLmcClient` to send requests through to the RECV.CORE application. Instances of this class should be per subband, rather than one for all of RECV as a whole. """ @override def _get_configure_beam_request(self: PstReceiveProcessApiGrpc, configuration: dict) -> BeamConfiguration: configuration = copy.deepcopy(configuration) del configuration["subband"]["start_centre_freq_mhz"] subband_resources = ReceiveSubbandResources(**configuration["subband"]) return BeamConfiguration( receive=ReceiveBeamConfiguration(subband_resources=subband_resources, **configuration["common"]) ) @override def _get_configure_scan_request( self: PstReceiveProcessApiGrpc, configure_parameters: dict ) -> ScanConfiguration: return ScanConfiguration( receive=ReceiveScanConfiguration(**generate_recv_configure_scan_request(**configure_parameters)) ) @override def _handle_monitor_response( self: PstProcessApiGrpc, data: MonitorData, *, monitor_data_callback: Callable[..., None] ) -> None: receive_monitor_data: ReceiveMonitorData = data.receive monitor_data_callback( subband_id=1, subband_data=ReceiveData( data_received=receive_monitor_data.data_received, data_receive_rate=receive_monitor_data.receive_rate * GIGABITS_PER_BYTE, data_dropped=receive_monitor_data.data_dropped, data_drop_rate=receive_monitor_data.data_drop_rate, misordered_packets=receive_monitor_data.misordered_packets, misordered_packet_rate=receive_monitor_data.misordered_packet_rate, malformed_packets=receive_monitor_data.malformed_packets, malformed_packet_rate=receive_monitor_data.malformed_packet_rate, misdirected_packets=receive_monitor_data.misdirected_packets, misdirected_packet_rate=receive_monitor_data.misdirected_packet_rate, checksum_failure_packets=receive_monitor_data.checksum_failure_packets, checksum_failure_packet_rate=receive_monitor_data.checksum_failure_packet_rate, timestamp_sync_error_packets=receive_monitor_data.timestamp_sync_error_packets, timestamp_sync_error_packet_rate=receive_monitor_data.timestamp_sync_error_packet_rate, seq_number_sync_error_packets=receive_monitor_data.seq_number_sync_error_packets, seq_number_sync_error_packet_rate=receive_monitor_data.seq_number_sync_error_packet_rate, no_valid_polarisation_correction_packets=( receive_monitor_data.no_valid_polarisation_correction_packets ), no_valid_polarisation_correction_packet_rate=( receive_monitor_data.no_valid_polarisation_correction_packet_rate ), no_valid_station_beam_packets=receive_monitor_data.no_valid_station_beam_packets, no_valid_station_beam_packet_rate=receive_monitor_data.no_valid_station_beam_packet_rate, no_valid_pst_beam_packets=receive_monitor_data.no_valid_pst_beam_packets, no_valid_pst_beam_packet_rate=receive_monitor_data.no_valid_pst_beam_packet_rate, ), )