# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides an implementation of the DSP.DISK PST component manager."""
from __future__ import annotations
import logging
from typing import Any, List
from overrides import override
from ska_pst.lmc.component import PstProcessApiSubcomponentManager
from ska_pst.lmc.dsp.disk_monitor_task import DiskMonitorTask
from ska_pst.lmc.dsp.dsp_disk_model import (
DspDiskMonitorData,
DspDiskMonitorDataStore,
DspDiskSubbandMonitorData,
)
from ska_pst.lmc.dsp.dsp_disk_process_api import (
PstDspDiskProcessApi,
PstDspDiskProcessApiGrpc,
PstDspDiskProcessApiSimulator,
)
from ska_pst.lmc.dsp.dsp_util import calculate_dsp_subband_resources
__all__ = ["PstDspDiskComponentManager"]
[docs]class PstDspDiskComponentManager(
PstProcessApiSubcomponentManager[
DspDiskSubbandMonitorData, DspDiskMonitorData, PstDspDiskProcessApi, DspDiskMonitorDataStore
]
):
"""Component manager for the DSP.DISK component for the PST.LMC subsystem."""
_api: PstDspDiskProcessApi
def __init__(
self: PstDspDiskComponentManager,
*,
device_name: str,
process_api_endpoint: str,
api: PstDspDiskProcessApi | None = None,
logger: logging.Logger | None = None,
**kwargs: Any,
):
"""
Initialise instance of the component manager.
:param device_interface: an abstract interface of the TANGO device.
:type device_interface: PstApiDeviceInterface[DspDiskMonitorData]
:param api: an API object used to delegate functionality to.
:type api: `PstProcessApi`
:param logger: a logger for this object to use
:type logger: `logging.Logger`
"""
logger = logger or logging.getLogger(__name__)
logger.debug(
f"Setting up DSP.DISK component manager with device_name='{device_name}'"
+ f" and api_endpoint='{process_api_endpoint}'"
)
api = api or PstDspDiskProcessApiSimulator(
logger=logger,
)
self._disk_monitor_task: DiskMonitorTask | None = None
super().__init__(
device_name=device_name,
subcomponent_name="dsp_disk",
process_api_endpoint=process_api_endpoint,
api=api,
logger=logger,
data_store=DspDiskMonitorDataStore(),
**kwargs,
)
[docs] def stop_disk_stats_monitoring(self: PstDspDiskComponentManager) -> None:
"""Stop monitoring of disk usage."""
if self._disk_monitor_task is not None:
self._disk_monitor_task.shutdown()
self._disk_monitor_task = None
@override
def _simulator_api(self: PstDspDiskComponentManager) -> PstDspDiskProcessApi:
"""Get instance of the simulator API."""
self.logger.debug("DSP.DISK component manager setting up simulated API")
return PstDspDiskProcessApiSimulator(
logger=self.logger,
)
@override
def _grpc_api(self: PstDspDiskComponentManager) -> PstDspDiskProcessApi:
"""Get instance of a gRPC API."""
self.logger.debug("DSP.DISK component manager setting up gRPC API")
return PstDspDiskProcessApiGrpc(
client_id=self.subcomponent_id,
grpc_endpoint=self.process_api_endpoint,
logger=self.logger,
)
def _create_disk_monitor_task(self: PstDspDiskComponentManager) -> None:
"""Create a instance of a DiskMonitorTask."""
self._disk_monitor_task = DiskMonitorTask(
stats_action=self._update_disk_stats_from_api,
logger=self.logger,
monitoring_polling_rate_ms=self.monitoring_polling_rate_ms,
)
@override
def connect(self: PstDspDiskComponentManager) -> None:
"""Establish connection to API component."""
super().connect()
if self._disk_monitor_task is None:
self._create_disk_monitor_task()
self._disk_monitor_task.start_monitoring() # type: ignore
@override
def disconnect(self: PstDspDiskComponentManager) -> None:
"""Disconnect from API component."""
if self._disk_monitor_task is not None:
self._disk_monitor_task.stop_monitoring(timeout=1.0)
self._disk_monitor_task = None
super().disconnect()
@property
def disk_capacity(self: PstDspDiskComponentManager) -> int:
"""Get size, in bytes, for the disk for DSP processing for this beam."""
return self.monitor_data.disk_capacity
@property
def available_disk_space(self: PstDspDiskComponentManager) -> int:
"""Get currently available bytes of the disk."""
return self.monitor_data.available_disk_space
@property
def disk_used_bytes(self: PstDspDiskComponentManager) -> int:
"""Get amount of bytes used on the disk that DSP is writing to."""
return self.monitor_data.disk_used_bytes
@property
def disk_used_percentage(self: PstDspDiskComponentManager) -> float:
"""Get the percentage of used disk space that DSP is writing to."""
return self.monitor_data.disk_used_percentage
@property
def data_recorded(self: PstDspDiskComponentManager) -> int:
"""Get total amount of bytes written in current scan across all subbands."""
return self.monitor_data.data_recorded
@property
def data_record_rate(self: PstDspDiskComponentManager) -> float:
"""Get total rate of writing to disk across all subbands, in bytes/second."""
return self.monitor_data.data_record_rate
@property
def available_recording_time(self: PstDspDiskComponentManager) -> float:
"""Get estimated available recording time left for current scan."""
return self.monitor_data.available_recording_time
@property
def subband_data_recorded(self: PstDspDiskComponentManager) -> List[int]:
"""Get a list of bytes written, one record per subband."""
return self.monitor_data.subband_data_recorded
@property
def subband_data_record_rate(self: PstDspDiskComponentManager) -> List[float]:
"""Get a list of current rate of writing per subband, in bytes/seconds."""
return self.monitor_data.subband_data_record_rate
def _update_disk_stats_from_api(self: PstDspDiskComponentManager, *args: Any, **kwargs: Any) -> None:
"""
Update the disk usage details calling API.
This gets the `disk_capacity` and `available_disk_space` from the API via
calling the :py:meth:`ProcessApi.get_env` method.
This is used to get the value of the disk capacity and the available disk
space before the DSP.DISK starts monitoring. This is needed as BEAM.MGMT
needs to know the value before we monitor.
"""
environment_values = self._api.get_env()
try:
self._monitor_data_store.update_disk_stats(**environment_values)
self._monitor_data_handler.update_monitor_data(notify=False)
self.monitor_data_updated_callback(environment_values)
except Exception:
self.logger.warning(
f"Failure to get disk stats from API. environment_values={environment_values}", exc_info=True
)
raise
@override
def validate_configure_scan(self: PstDspDiskComponentManager, configuration: dict) -> None:
"""
Validate a ConfigureScan request sent from CSP.LMC to the DSP.DISK sub-component.
This asserts the request can be converted to DSP.DISK resources and then calls the process API to
perform the validation.
:param configuration: configuration that would be used when the configure_beam and configure_scan
methods are called.
:type configuration: dict
"""
dsp_resources = calculate_dsp_subband_resources(beam_id=self.beam_id, **configuration)
self._api.validate_configure_beam(configuration=dsp_resources[1])
self._api.validate_configure_scan(configuration=configuration)
@override
def _configure_beam(self: PstDspDiskComponentManager, configuration: dict) -> None:
"""
Configure the beam of the the component with the resources.
:param configuration: configuration for beam
:type configuration: dict
"""
dsp_resources = calculate_dsp_subband_resources(beam_id=self.beam_id, **configuration)
# deal only with subband 1 for now.
self.logger.debug(f"Submitting API with dsp_resources={dsp_resources[1]}")
self._api.configure_beam(configuration=dsp_resources[1])
self._update_disk_stats_from_api()