# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""
Module for providing the API to be communicate with the DSP process.
The :py:class:`PstDspDiskProcessApiSimulator` is used in testing or
simulation mode, while the :py:class:`PstDspDiskProcessApiGrpc` is used
to connect to a remote application that exposes a gRPC API.
"""
from __future__ import annotations
import logging
from typing import Callable
from overrides import override
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import BeamConfiguration, DspDiskBeamConfiguration
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import DspDiskMonitorData as DspDiskMonitorDataProtobuf
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import DspDiskScanConfiguration, MonitorData, ScanConfiguration
from ska_pst.lmc.component import PstProcessApi, PstProcessApiGrpc, PstProcessApiSimulator
from ska_pst.lmc.dsp.dsp_disk_model import DspDiskSubbandMonitorData
from ska_pst.lmc.dsp.dsp_disk_simulator import PstDspDiskSimulator
from ska_pst.lmc.dsp.dsp_util import generate_dsp_scan_request
__all__ = [
"PstDspDiskProcessApi",
"PstDspDiskProcessApiSimulator",
]
[docs]class PstDspDiskProcessApi(PstProcessApi):
"""
Abstract class for the API of the DSP process.
This extends from :py:class:`PstProcessApi` but
provides the specific method of getting the monitoring
data.
"""
[docs]class PstDspDiskProcessApiSimulator(
PstProcessApiSimulator[DspDiskSubbandMonitorData, PstDspDiskSimulator], PstDspDiskProcessApi
):
"""A simulator implementation version of the API of `PstDspDiskProcessApi`."""
def __init__(
self: PstDspDiskProcessApiSimulator,
simulator: PstDspDiskSimulator | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise the API.
:param logger: the logger to use for the API.
:param component_state_callback: this allows the API to call back to the component manager / TANGO
device to deal with state model changes.
:param simulator: the simulator instance to use in the API.
"""
simulator = simulator or PstDspDiskSimulator()
super().__init__(logger=logger, simulator=simulator)
@override
def get_env(self: PstDspDiskProcessApiSimulator) -> dict:
"""Get simulated environment values for DSP.DISK."""
import shutil
(disk_capacity, _, available_disk_space) = shutil.disk_usage("/")
return {
"disk_capacity": disk_capacity,
"available_disk_space": available_disk_space,
}
[docs]class PstDspDiskProcessApiGrpc(PstProcessApiGrpc, PstDspDiskProcessApi):
"""This is an gRPC implementation of the `PstDspDiskProcessApi` API.
This uses an instance of a `PstGrpcLmcClient` to send requests through
to the DSP.DISK application. Instances of this class should be per
subband, rather than one for all of DSP.DISK as a whole.
"""
@override
def _get_configure_beam_request(self: PstDspDiskProcessApiGrpc, configuration: dict) -> BeamConfiguration:
return BeamConfiguration(dsp_disk=DspDiskBeamConfiguration(**configuration))
@override
def _handle_monitor_response(
self: PstDspDiskProcessApiGrpc, data: MonitorData, *, monitor_data_callback: Callable[..., None]
) -> None:
dsp_disk_data: DspDiskMonitorDataProtobuf = data.dsp_disk
monitor_data_callback(
subband_id=1,
subband_data=DspDiskSubbandMonitorData(
disk_capacity=dsp_disk_data.disk_capacity,
available_disk_space=dsp_disk_data.disk_available_bytes,
data_recorded=dsp_disk_data.bytes_written,
data_record_rate=dsp_disk_data.write_rate,
),
)
@override
def _get_configure_scan_request(
self: PstDspDiskProcessApiGrpc, configure_parameters: dict
) -> ScanConfiguration:
return ScanConfiguration(
dsp_disk=DspDiskScanConfiguration(**generate_dsp_scan_request(**configure_parameters))
)