# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the model data classes for DSPSR pipeline based pipelines."""
from __future__ import annotations
__all__ = [
"DspMonitorData",
"DspMonitorDataStore",
]
import dataclasses
import sys
from dataclasses import dataclass, field, fields
from typing import Any
from ska_pst.lmc.component import SUBBAND_1, MonitorDataStore
DEFAULT_RECORDING_TIME: float = float(60 * 60 * 24 * 365)
"""
The default available recording time.
Need a default recording time that won't set off DSP device alarm
use around 1 year in seconds (as float)
"""
DISK_MONITORING_ATTRS: list[str] = [
"disk_capacity",
"available_disk_space",
"data_recorded",
"data_record_rate",
"available_recording_time",
"disk_used_bytes",
"disk_used_percentage",
]
"""List of DSP attributes relating to disk usage."""
DSPSR_PIPELINE_MONITORING_ATTRS: list[str] = [
"processing_time",
"processing_time_percent",
"data_time",
"bytes_processed",
"bytes_processing_rate",
"overall_efficiency",
"efficiency",
]
"""List of DSP monitoring attributes relating to DSPSR pipelines."""
[docs]@dataclass(kw_only=True)
class DspMonitorData:
"""
A data class to represent the DSPSR pipeline monitoring across all subbands.
This class is used to model the combined subband data for DSPSR pipelines.
"""
disk_capacity: int = field(default=sys.maxsize)
"""Total amount of bytes for the disk used for DSP processing for the beam."""
available_disk_space: int = field(default=sys.maxsize)
"""Total currently available bytes of the disk used."""
data_recorded: int = 0
"""Amount of bytes written by current scan."""
data_record_rate: float = 0.0
"""Current rate of writing of data to disk."""
available_recording_time: float = field(init=False)
"""
Estimated available recording time left for current scan.
This is a calculated as ``available_disk_space / data_record_rate``.
"""
disk_used_bytes: int = field(init=False)
"""
Total amount of disk spaced used, in bytes.
This is calculated as ``disk_capacity - available_disk_space``
"""
disk_used_percentage: float = field(init=False)
"""
The percentage of disk spaced used.
This is calculated as ``100.0 * (disk_capacity - available_disk_space)/disk_capacity``
"""
processing_time: float = 0.0
"""The number of seconds the pipeline used to perform signal processing."""
processing_time_percent: float = 0.0
"""The percentage of time used to perform processing of the data during the last reporting interval."""
data_time: float = 0.0
"""The number of seconds spanned by the data that the pipeline processed."""
bytes_processed: int = 0
"""The number of input bytes processed by the pipeline."""
bytes_processing_rate: float = 0.0
"""The data processing rate in bytes per second during the last reporting interval."""
overall_efficiency: float = 0.0
"""
The overall efficiency of the pipeline.
Efficiency is defined as data_time / processing_time. A value greater than 1.0 means that
the pipeline processes data faster than it is received.
"""
efficiency: float = 0.0
"""The efficiency of the pipeline during the last reporting interval."""
def __post_init__(self: DspMonitorData) -> None:
"""Perform post initialisation."""
epsilon: float = 1e-8
self.disk_used_bytes = disk_used_bytes = self.disk_capacity - self.available_disk_space
self.disk_used_percentage = 100.0 * disk_used_bytes / (self.disk_capacity + epsilon)
available_recording_time = self.available_disk_space / (self.data_record_rate + epsilon)
self.available_recording_time = min(available_recording_time, DEFAULT_RECORDING_TIME)
[docs] @staticmethod
def default() -> DspMonitorData:
"""Get a default pipeline monitoring data value."""
return DspMonitorData()
[docs] @staticmethod
def init_fields() -> set[str]:
"""Get the set of data class fields are in the __init__ method."""
return {f.name for f in fields(DspMonitorData) if f.init}
[docs]class DspMonitorDataStore(MonitorDataStore[DspMonitorData, DspMonitorData]):
"""
Data store used to aggregate the subband data for DSP Flow Through.
This class is a stub at the moment to allow until the monitoring attributes
are determined in a future PI.
"""
def __init__(self: DspMonitorDataStore) -> None:
"""Initialise data monitor store."""
# default disk available bytes to being Python's max sized int.
self._available_disk_space: int = sys.maxsize
self._disk_capacity: int = sys.maxsize
super().__init__()
[docs] def update_disk_stats(
self: DspMonitorDataStore,
disk_capacity: int,
available_disk_space: int,
**kwargs: Any,
) -> None:
"""
Update disk statistics.
:param disk_capacity: the total disk capacity.
:type disk_capacity: int
:param available_disk_space: the available amount of disk space left.
:type available_disk_space: int
"""
self._disk_capacity = disk_capacity
self._available_disk_space = available_disk_space
@property
def monitor_data(self: DspMonitorDataStore) -> DspMonitorData:
"""
Get current monitoring data for DSP.
This returns the latest monitoring data calculated from the current
subband data. If no subband data is available then the response is
a default :py:class:`DspMonitorData` object.
"""
number_subbands: int = len(self._subband_data)
if number_subbands == 0:
return DspMonitorData(
available_disk_space=self._available_disk_space,
disk_capacity=self._disk_capacity,
)
# for now we only have 1 subband, when we handle subbands we will need to
# be able to merge the values in a consistent way.
curr_data = self._subband_data[SUBBAND_1]
# Since we have a background monitoring of the disk that is separate to
# normal scanning monitoring we need to use the minimum of the disk capacity
# and available disk space.
disk_capacity = min(self._disk_capacity, curr_data.disk_capacity)
available_disk_space = min(self._available_disk_space, curr_data.available_disk_space)
return dataclasses.replace(
curr_data, disk_capacity=disk_capacity, available_disk_space=available_disk_space
)