# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
Module for providing the API to be communicate with the SMRB process.
The :py:class:`PstSmrbProcessApiSimulator` is used in testing or
simulation.)
"""
from __future__ import annotations
import logging
from typing import Callable
from overrides import override
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
BeamConfiguration,
MonitorData,
ScanConfiguration,
SmrbBeamConfiguration,
SmrbScanConfiguration,
)
from ska_pst.lmc.component import PstProcessApi, PstProcessApiGrpc, PstProcessApiSimulator
from ska_pst.lmc.smrb.smrb_model import SmrbSubbandMonitorData
from ska_pst.lmc.smrb.smrb_simulator import PstSmrbSimulator
__all__ = [
"PstSmrbProcessApi",
"PstSmrbProcessApiSimulator",
"PstSmrbProcessApiGrpc",
]
[docs]class PstSmrbProcessApi(PstProcessApi):
"""
Abstract class for the API of the SMRB process.
This extends from :py:class:`PstProcessApi` but
provides the specific method of getting the monitoring
data.
"""
[docs]class PstSmrbProcessApiSimulator(
PstProcessApiSimulator[SmrbSubbandMonitorData, PstSmrbSimulator], PstSmrbProcessApi
):
"""A simulator implementation version of the API of `PstSmrbProcessApi`."""
def __init__(
self: PstSmrbProcessApiSimulator,
simulator: PstSmrbSimulator | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise the API.
:param logger: the logger to use for the API.
:param component_state_callback: this allows the API to call back to the component manager / TANGO
device to deal with state model changes.
:param simulator: the simulator instance to use in the API.
"""
simulator = simulator or PstSmrbSimulator()
super().__init__(simulator=simulator, logger=logger)
[docs]class PstSmrbProcessApiGrpc(PstProcessApiGrpc, PstSmrbProcessApi):
"""This is an gRPC implementation of the `PstSmrbProcessApi` API.
This uses an instance of a `PstGrpcLmcClient` to send requests through
to the SMRB.RB application. Instances of this class should be per
subband, rather than one for all of SMRB as a whole.
"""
@override
def _get_configure_beam_request(self: PstSmrbProcessApiGrpc, configuration: dict) -> BeamConfiguration:
return BeamConfiguration(smrb=SmrbBeamConfiguration(**configuration))
@override
def _handle_monitor_response(
self: PstSmrbProcessApiGrpc, data: MonitorData, *, monitor_data_callback: Callable[..., None]
) -> None:
smrb_data_stats = data.smrb.data
smrb_weights_stats = data.smrb.weights
# assume the number of written and read are the same for data and weights
# so total written/read size in bytes is the total buffer size * written/read
buffer_size = smrb_data_stats.bufsz + smrb_weights_stats.bufsz
total_written = buffer_size * smrb_data_stats.written
total_read = buffer_size * smrb_data_stats.read
monitor_data_callback(
subband_id=1,
subband_data=SmrbSubbandMonitorData(
buffer_size=buffer_size,
total_written=total_written,
total_read=total_read,
full=smrb_data_stats.full,
num_of_buffers=smrb_data_stats.nbufs,
),
)
@override
def _get_configure_scan_request(self: PstProcessApiGrpc, configure_parameters: dict) -> ScanConfiguration:
return ScanConfiguration(smrb=SmrbScanConfiguration())