Source code for ska_pst.lmc.dsp.dsp_simulator

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the Simulated DSP capability for PST."""

from __future__ import annotations

import shutil
from typing import Any

import numpy as np
from overrides import override
from ska_pst.lmc.component import SUBBAND_1, PstSimulator

from .dsp_model import DspMonitorData, DspMonitorDataStore

__all__ = ["PstDspSimulator"]


[docs]class PstDspSimulator(PstSimulator[DspMonitorData, DspMonitorDataStore]): """ Simulator for the DSP process of the PST.LMC sub-system. This is currently a stub to allow the DSP functionality of the LMC to work. When the monitoring attributes are known and how to simulate them then this class will be updated. """ def __init__( self: PstDspSimulator, disk_capacity: int | None = None, available_disk_space: int | None = None, data_record_rate: float | None = None, **kwargs: Any, ) -> None: """ Initialise the DSP simulator. :param disk_capacity: the max size of the size to simulate, default is is determined from shutil :type disk_capacity: int | None, optional :param available_disk_space: initial available space on disk to simulate, default is determined from shutil :type available_disk_space: int | None, optional :param data_record_rate: the data record rate. Default is a random number. :type data_record_rate: float | None, optional """ super().__init__(data_store=DspMonitorDataStore(), **kwargs) configuration: dict = {} if disk_capacity is not None: configuration["disk_capacity"] = disk_capacity if available_disk_space is not None: configuration["available_disk_space"] = available_disk_space if data_record_rate is not None: configuration["data_record_rate"] = data_record_rate self._data_recorded = 0 self.configure_scan(configuration=configuration) self._scan = False @property def disk_capacity(self: PstDspSimulator) -> int: """Get simulated disk capacity.""" return self._disk_capacity @disk_capacity.setter def disk_capacity(self: PstDspSimulator, disk_capacity: int) -> None: """ Set simulated disk capacity. :param disk_capacity: the new disk capacity, in bytes. """ self._disk_capacity = disk_capacity @property def available_disk_space(self: PstDspSimulator) -> int: """Get simulated available bytes left of disk.""" return self._available_disk_space @available_disk_space.setter def available_disk_space(self: PstDspSimulator, available_disk_space: int) -> None: """ Set simulated available bytes left of disk. :param available_disk_space: the new about of bytes available on the disk. """ self._available_disk_space = available_disk_space @override def configure_scan(self: PstDspSimulator, configuration: dict) -> None: """ Simulate configuring a scan. :param configuration: the configuration to be configured :type configuration: dict """ self.num_subbands = 1 default_disk_capacity, _, default_available_disk_space = shutil.disk_usage("/") self.disk_capacity = disk_capacity = configuration.get("disk_capacity", default_disk_capacity) self.available_disk_space = available_disk_space = configuration.get( "available_disk_space", default_available_disk_space ) data_record_rate = (np.random.rand(self.num_subbands) + 0.5) * 1e8 self._data_record_rate = configuration.get("data_record_rate", data_record_rate) self._data_store.reset() self._data_store.update_disk_stats( disk_capacity=disk_capacity, available_disk_space=available_disk_space ) self._data_store.update_subband( subband_id=SUBBAND_1, subband_data=DspMonitorData( disk_capacity=self.disk_capacity, available_disk_space=self.available_disk_space, data_recorded=0, data_record_rate=self._data_record_rate, ), ) @override def _update(self: PstDspSimulator) -> None: """Simulate the update of DSP data.""" # create initial write rate data_record_rate = self._data_record_rate # determine actual bytes written, can't go more than disk available data_recorded = int(data_record_rate) data_recorded = min(data_recorded, self.available_disk_space) # update disk available self.available_disk_space -= data_recorded # update subband values self._data_recorded += data_recorded self._data_store.update_subband( subband_id=SUBBAND_1, subband_data=DspMonitorData( disk_capacity=self.disk_capacity, available_disk_space=self.available_disk_space, data_recorded=self._data_recorded, data_record_rate=self._data_record_rate, ), )
[docs] def get_data(self: PstDspSimulator) -> DspMonitorData: """ Get current DSP data. Updates the current simulated data and returns the latest data. :returns: current simulated DSP data. :rtype: :py:class:`DspMonitorData` """ if self._scan: self._update() return self._data_store.monitor_data
@override def get_env(self: PstDspSimulator) -> dict: """ Simulate the disk environment. This gets the disk capacity and available disk space. """ import shutil disk_capacity, _, available_disk_space = shutil.disk_usage("/") return { "disk_capacity": disk_capacity, "available_disk_space": available_disk_space, }