Source code for ska_pst.lmc.dsp.dsp_model

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the model data classes for DSPSR pipeline based pipelines."""

from __future__ import annotations

__all__ = [
    "DspMonitorData",
    "DspMonitorDataStore",
]

import dataclasses
import sys
from dataclasses import dataclass, field, fields
from typing import Any

from ska_pst.lmc.component import SUBBAND_1, MonitorDataStore

DEFAULT_RECORDING_TIME: float = float(60 * 60 * 24 * 365)
"""
The default available recording time.

Need a default recording time that won't set off DSP device alarm
use around 1 year in seconds (as float)
"""

DISK_MONITORING_ATTRS: list[str] = [
    "disk_capacity",
    "available_disk_space",
    "data_recorded",
    "data_record_rate",
    "available_recording_time",
    "disk_used_bytes",
    "disk_used_percentage",
]
"""List of DSP attributes relating to disk usage."""

DSPSR_PIPELINE_MONITORING_ATTRS: list[str] = [
    "processing_time",
    "processing_time_percent",
    "data_time",
    "bytes_processed",
    "bytes_processing_rate",
    "overall_efficiency",
    "efficiency",
]
"""List of DSP monitoring attributes relating to DSPSR pipelines."""


[docs]@dataclass(kw_only=True) class DspMonitorData: """ A data class to represent the DSPSR pipeline monitoring across all subbands. This class is used to model the combined subband data for DSPSR pipelines. """ disk_capacity: int = field(default=sys.maxsize) """Total amount of bytes for the disk used for DSP processing for the beam.""" available_disk_space: int = field(default=sys.maxsize) """Total currently available bytes of the disk used.""" data_recorded: int = 0 """Amount of bytes written by current scan.""" data_record_rate: float = 0.0 """Current rate of writing of data to disk.""" available_recording_time: float = field(init=False) """ Estimated available recording time left for current scan. This is a calculated as ``available_disk_space / data_record_rate``. """ disk_used_bytes: int = field(init=False) """ Total amount of disk spaced used, in bytes. This is calculated as ``disk_capacity - available_disk_space`` """ disk_used_percentage: float = field(init=False) """ The percentage of disk spaced used. This is calculated as ``100.0 * (disk_capacity - available_disk_space)/disk_capacity`` """ processing_time: float = 0.0 """The number of seconds the pipeline used to perform signal processing.""" processing_time_percent: float = 0.0 """The percentage of time used to perform processing of the data during the last reporting interval.""" data_time: float = 0.0 """The number of seconds spanned by the data that the pipeline processed.""" bytes_processed: int = 0 """The number of input bytes processed by the pipeline.""" bytes_processing_rate: float = 0.0 """The data processing rate in bytes per second during the last reporting interval.""" overall_efficiency: float = 0.0 """ The overall efficiency of the pipeline. Efficiency is defined as data_time / processing_time. A value greater than 1.0 means that the pipeline processes data faster than it is received. """ efficiency: float = 0.0 """The efficiency of the pipeline during the last reporting interval.""" def __post_init__(self: DspMonitorData) -> None: """Perform post initialisation.""" epsilon: float = 1e-8 self.disk_used_bytes = disk_used_bytes = self.disk_capacity - self.available_disk_space self.disk_used_percentage = 100.0 * disk_used_bytes / (self.disk_capacity + epsilon) available_recording_time = self.available_disk_space / (self.data_record_rate + epsilon) self.available_recording_time = min(available_recording_time, DEFAULT_RECORDING_TIME)
[docs] @staticmethod def default() -> DspMonitorData: """Get a default pipeline monitoring data value.""" return DspMonitorData()
[docs] @staticmethod def init_fields() -> set[str]: """Get the set of data class fields are in the __init__ method.""" return {f.name for f in fields(DspMonitorData) if f.init}
[docs]class DspMonitorDataStore(MonitorDataStore[DspMonitorData, DspMonitorData]): """ Data store used to aggregate the subband data for DSP Flow Through. This class is a stub at the moment to allow until the monitoring attributes are determined in a future PI. """ def __init__(self: DspMonitorDataStore) -> None: """Initialise data monitor store.""" # default disk available bytes to being Python's max sized int. self._available_disk_space: int = sys.maxsize self._disk_capacity: int = sys.maxsize super().__init__()
[docs] def update_disk_stats( self: DspMonitorDataStore, disk_capacity: int, available_disk_space: int, **kwargs: Any, ) -> None: """ Update disk statistics. :param disk_capacity: the total disk capacity. :type disk_capacity: int :param available_disk_space: the available amount of disk space left. :type available_disk_space: int """ self._disk_capacity = disk_capacity self._available_disk_space = available_disk_space
@property def monitor_data(self: DspMonitorDataStore) -> DspMonitorData: """ Get current monitoring data for DSP. This returns the latest monitoring data calculated from the current subband data. If no subband data is available then the response is a default :py:class:`DspMonitorData` object. """ number_subbands: int = len(self._subband_data) if number_subbands == 0: return DspMonitorData( available_disk_space=self._available_disk_space, disk_capacity=self._disk_capacity, ) # for now we only have 1 subband, when we handle subbands we will need to # be able to merge the values in a consistent way. curr_data = self._subband_data[SUBBAND_1] # Since we have a background monitoring of the disk that is separate to # normal scanning monitoring we need to use the minimum of the disk capacity # and available disk space. disk_capacity = min(self._disk_capacity, curr_data.disk_capacity) available_disk_space = min(self._available_disk_space, curr_data.available_disk_space) return dataclasses.replace( curr_data, disk_capacity=disk_capacity, available_disk_space=available_disk_space )