Source code for ska_pst.lmc.dsp.dsp_disk_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides an implementation of the DSP.DISK PST component manager."""

from __future__ import annotations

import logging
from typing import Any, List

from overrides import override
from ska_pst.lmc.component import PstProcessApiSubcomponentManager
from ska_pst.lmc.dsp.disk_monitor_task import DiskMonitorTask
from ska_pst.lmc.dsp.dsp_disk_model import (
    DspDiskMonitorData,
    DspDiskMonitorDataStore,
    DspDiskSubbandMonitorData,
)
from ska_pst.lmc.dsp.dsp_disk_process_api import (
    PstDspDiskProcessApi,
    PstDspDiskProcessApiGrpc,
    PstDspDiskProcessApiSimulator,
)
from ska_pst.lmc.dsp.dsp_util import calculate_dsp_subband_resources

__all__ = ["PstDspDiskComponentManager"]


[docs]class PstDspDiskComponentManager( PstProcessApiSubcomponentManager[ DspDiskSubbandMonitorData, DspDiskMonitorData, PstDspDiskProcessApi, DspDiskMonitorDataStore ] ): """Component manager for the DSP.DISK component for the PST.LMC subsystem.""" _api: PstDspDiskProcessApi def __init__( self: PstDspDiskComponentManager, *, device_name: str, process_api_endpoint: str, api: PstDspDiskProcessApi | None = None, logger: logging.Logger | None = None, **kwargs: Any, ): """ Initialise instance of the component manager. :param device_interface: an abstract interface of the TANGO device. :type device_interface: PstApiDeviceInterface[DspDiskMonitorData] :param api: an API object used to delegate functionality to. :type api: `PstProcessApi` :param logger: a logger for this object to use :type logger: `logging.Logger` """ logger = logger or logging.getLogger(__name__) logger.debug( f"Setting up DSP.DISK component manager with device_name='{device_name}'" + f" and api_endpoint='{process_api_endpoint}'" ) api = api or PstDspDiskProcessApiSimulator( logger=logger, ) self._disk_monitor_task: DiskMonitorTask | None = None super().__init__( device_name=device_name, subcomponent_name="dsp_disk", process_api_endpoint=process_api_endpoint, api=api, logger=logger, data_store=DspDiskMonitorDataStore(), **kwargs, )
[docs] def stop_disk_stats_monitoring(self: PstDspDiskComponentManager) -> None: """Stop monitoring of disk usage.""" if self._disk_monitor_task is not None: self._disk_monitor_task.shutdown() self._disk_monitor_task = None
@override def _simulator_api(self: PstDspDiskComponentManager) -> PstDspDiskProcessApi: """Get instance of the simulator API.""" self.logger.debug("DSP.DISK component manager setting up simulated API") return PstDspDiskProcessApiSimulator( logger=self.logger, ) @override def _grpc_api(self: PstDspDiskComponentManager) -> PstDspDiskProcessApi: """Get instance of a gRPC API.""" self.logger.debug("DSP.DISK component manager setting up gRPC API") return PstDspDiskProcessApiGrpc( client_id=self.subcomponent_id, grpc_endpoint=self.process_api_endpoint, logger=self.logger, ) def _create_disk_monitor_task(self: PstDspDiskComponentManager) -> None: """Create a instance of a DiskMonitorTask.""" self._disk_monitor_task = DiskMonitorTask( stats_action=self._update_disk_stats_from_api, logger=self.logger, monitoring_polling_rate_ms=self.monitoring_polling_rate_ms, ) @override def connect(self: PstDspDiskComponentManager) -> None: """Establish connection to API component.""" super().connect() if self._disk_monitor_task is None: self._create_disk_monitor_task() self._disk_monitor_task.start_monitoring() # type: ignore @override def disconnect(self: PstDspDiskComponentManager) -> None: """Disconnect from API component.""" if self._disk_monitor_task is not None: self._disk_monitor_task.stop_monitoring(timeout=1.0) self._disk_monitor_task = None super().disconnect() @property def disk_capacity(self: PstDspDiskComponentManager) -> int: """Get size, in bytes, for the disk for DSP processing for this beam.""" return self.monitor_data.disk_capacity @property def available_disk_space(self: PstDspDiskComponentManager) -> int: """Get currently available bytes of the disk.""" return self.monitor_data.available_disk_space @property def disk_used_bytes(self: PstDspDiskComponentManager) -> int: """Get amount of bytes used on the disk that DSP is writing to.""" return self.monitor_data.disk_used_bytes @property def disk_used_percentage(self: PstDspDiskComponentManager) -> float: """Get the percentage of used disk space that DSP is writing to.""" return self.monitor_data.disk_used_percentage @property def data_recorded(self: PstDspDiskComponentManager) -> int: """Get total amount of bytes written in current scan across all subbands.""" return self.monitor_data.data_recorded @property def data_record_rate(self: PstDspDiskComponentManager) -> float: """Get total rate of writing to disk across all subbands, in bytes/second.""" return self.monitor_data.data_record_rate @property def available_recording_time(self: PstDspDiskComponentManager) -> float: """Get estimated available recording time left for current scan.""" return self.monitor_data.available_recording_time @property def subband_data_recorded(self: PstDspDiskComponentManager) -> List[int]: """Get a list of bytes written, one record per subband.""" return self.monitor_data.subband_data_recorded @property def subband_data_record_rate(self: PstDspDiskComponentManager) -> List[float]: """Get a list of current rate of writing per subband, in bytes/seconds.""" return self.monitor_data.subband_data_record_rate def _update_disk_stats_from_api(self: PstDspDiskComponentManager, *args: Any, **kwargs: Any) -> None: """ Update the disk usage details calling API. This gets the `disk_capacity` and `available_disk_space` from the API via calling the :py:meth:`ProcessApi.get_env` method. This is used to get the value of the disk capacity and the available disk space before the DSP.DISK starts monitoring. This is needed as BEAM.MGMT needs to know the value before we monitor. """ environment_values = self._api.get_env() try: self._monitor_data_store.update_disk_stats(**environment_values) self._monitor_data_handler.update_monitor_data(notify=False) self.monitor_data_updated_callback(environment_values) except Exception: self.logger.warning( f"Failure to get disk stats from API. environment_values={environment_values}", exc_info=True ) raise @override def validate_configure_scan(self: PstDspDiskComponentManager, configuration: dict) -> None: """ Validate a ConfigureScan request sent from CSP.LMC to the DSP.DISK sub-component. This asserts the request can be converted to DSP.DISK resources and then calls the process API to perform the validation. :param configuration: configuration that would be used when the configure_beam and configure_scan methods are called. :type configuration: dict """ dsp_resources = calculate_dsp_subband_resources(beam_id=self.beam_id, **configuration) self._api.validate_configure_beam(configuration=dsp_resources[1]) self._api.validate_configure_scan(configuration=configuration) @override def _configure_beam(self: PstDspDiskComponentManager, configuration: dict) -> None: """ Configure the beam of the the component with the resources. :param configuration: configuration for beam :type configuration: dict """ dsp_resources = calculate_dsp_subband_resources(beam_id=self.beam_id, **configuration) # deal only with subband 1 for now. self.logger.debug(f"Submitting API with dsp_resources={dsp_resources[1]}") self._api.configure_beam(configuration=dsp_resources[1]) self._update_disk_stats_from_api()