# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the Simulated DSP capability for PST."""
from __future__ import annotations
import shutil
from typing import Any
import numpy as np
from overrides import override
from ska_pst.lmc.component import SUBBAND_1, PstSimulator
from .dsp_model import DspMonitorData, DspMonitorDataStore
__all__ = ["PstDspSimulator"]
[docs]class PstDspSimulator(PstSimulator[DspMonitorData, DspMonitorDataStore]):
"""
Simulator for the DSP process of the PST.LMC sub-system.
This is currently a stub to allow the DSP functionality of the LMC
to work. When the monitoring attributes are known and how to simulate
them then this class will be updated.
"""
def __init__(
self: PstDspSimulator,
disk_capacity: int | None = None,
available_disk_space: int | None = None,
data_record_rate: float | None = None,
**kwargs: Any,
) -> None:
"""
Initialise the DSP simulator.
:param disk_capacity: the max size of the size to simulate, default is is determined from shutil
:type disk_capacity: int | None, optional
:param available_disk_space: initial available space on disk to simulate, default is determined from
shutil
:type available_disk_space: int | None, optional
:param data_record_rate: the data record rate. Default is a random number.
:type data_record_rate: float | None, optional
"""
super().__init__(data_store=DspMonitorDataStore(), **kwargs)
configuration: dict = {}
if disk_capacity is not None:
configuration["disk_capacity"] = disk_capacity
if available_disk_space is not None:
configuration["available_disk_space"] = available_disk_space
if data_record_rate is not None:
configuration["data_record_rate"] = data_record_rate
self._data_recorded = 0
self.configure_scan(configuration=configuration)
self._scan = False
@property
def disk_capacity(self: PstDspSimulator) -> int:
"""Get simulated disk capacity."""
return self._disk_capacity
@disk_capacity.setter
def disk_capacity(self: PstDspSimulator, disk_capacity: int) -> None:
"""
Set simulated disk capacity.
:param disk_capacity: the new disk capacity, in bytes.
"""
self._disk_capacity = disk_capacity
@property
def available_disk_space(self: PstDspSimulator) -> int:
"""Get simulated available bytes left of disk."""
return self._available_disk_space
@available_disk_space.setter
def available_disk_space(self: PstDspSimulator, available_disk_space: int) -> None:
"""
Set simulated available bytes left of disk.
:param available_disk_space: the new about of bytes available on the disk.
"""
self._available_disk_space = available_disk_space
@override
def configure_scan(self: PstDspSimulator, configuration: dict) -> None:
"""
Simulate configuring a scan.
:param configuration: the configuration to be configured
:type configuration: dict
"""
self.num_subbands = 1
default_disk_capacity, _, default_available_disk_space = shutil.disk_usage("/")
self.disk_capacity = disk_capacity = configuration.get("disk_capacity", default_disk_capacity)
self.available_disk_space = available_disk_space = configuration.get(
"available_disk_space", default_available_disk_space
)
data_record_rate = (np.random.rand(self.num_subbands) + 0.5) * 1e8
self._data_record_rate = configuration.get("data_record_rate", data_record_rate)
self._data_store.reset()
self._data_store.update_disk_stats(
disk_capacity=disk_capacity, available_disk_space=available_disk_space
)
self._data_store.update_subband(
subband_id=SUBBAND_1,
subband_data=DspMonitorData(
disk_capacity=self.disk_capacity,
available_disk_space=self.available_disk_space,
data_recorded=0,
data_record_rate=self._data_record_rate,
),
)
@override
def _update(self: PstDspSimulator) -> None:
"""Simulate the update of DSP data."""
# create initial write rate
data_record_rate = self._data_record_rate
# determine actual bytes written, can't go more than disk available
data_recorded = int(data_record_rate)
data_recorded = min(data_recorded, self.available_disk_space)
# update disk available
self.available_disk_space -= data_recorded
# update subband values
self._data_recorded += data_recorded
self._data_store.update_subband(
subband_id=SUBBAND_1,
subband_data=DspMonitorData(
disk_capacity=self.disk_capacity,
available_disk_space=self.available_disk_space,
data_recorded=self._data_recorded,
data_record_rate=self._data_record_rate,
),
)
[docs] def get_data(self: PstDspSimulator) -> DspMonitorData:
"""
Get current DSP data.
Updates the current simulated data and returns the latest data.
:returns: current simulated DSP data.
:rtype: :py:class:`DspMonitorData`
"""
if self._scan:
self._update()
return self._data_store.monitor_data
@override
def get_env(self: PstDspSimulator) -> dict:
"""
Simulate the disk environment.
This gets the disk capacity and available disk space.
"""
import shutil
disk_capacity, _, available_disk_space = shutil.disk_usage("/")
return {
"disk_capacity": disk_capacity,
"available_disk_space": available_disk_space,
}