Source code for ska_pst.lmc.dsp.dsp_pipeline_model

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the model data classes for DSPSR pipeline based pipelines."""

from __future__ import annotations

__all__ = [
    "DspPipelineMonitorData",
    "DspPipelineMonitorDataStore",
    "DspPipelineMonitorData",
]

from dataclasses import dataclass

from ska_pst.lmc.component import SUBBAND_1, MonitorDataStore


[docs]@dataclass class DspPipelineMonitorData: """ A data class to represent the DSP.FT monitoring across all subbands. This class is used to model the combined subband data for the DSP Flow Through. The monitoring attributes will added in a future PI. For now this is an empty class. """ processing_time: float = 0.0 """The number of seconds the pipeline used to perform signal processing.""" processing_time_percent: float = 0.0 """The percentage of time used to perform processing of the data during the last reporting interval.""" data_time: float = 0.0 """The number of seconds spanned by the data that the pipeline processed.""" bytes_processed: int = 0 """The number of input bytes processed by the pipeline.""" bytes_processing_rate: float = 0.0 """The data processing rate in bytes per second during the last reporting interval.""" overall_efficiency: float = 0.0 """ The overall efficiency of the pipeline. Efficiency is defined as data_time / processing_time. A value greater than 1.0 means that the pipeline processes data faster than it is received. """ efficiency: float = 0.0 """The efficiency of the pipeline during the last reporting interval."""
[docs]class DspPipelineMonitorDataStore(MonitorDataStore[DspPipelineMonitorData, DspPipelineMonitorData]): """ Data store used to aggregate the subband data for DSP Flow Through. This class is a stub at the moment to allow until the monitoring attributes are determined in a future PI. """ @property def monitor_data(self: DspPipelineMonitorDataStore) -> DspPipelineMonitorData: """ Get current monitoring data for DSP. This returns the latest monitoring data calculated from the current subband data. If no subband data is available then the response is a default :py:class:`DspFlowThroughMonitorData` object. """ number_subbands: int = len(self._subband_data) if number_subbands == 0: return DspPipelineMonitorData() # for now we only have 1 subband return self._subband_data[SUBBAND_1]