# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the Simulated RECV capability for the Pulsar Timing Sub-element."""
from __future__ import annotations
import copy
from random import randint, random
from typing import Any
from overrides import override
from ska_pst.lmc.component import PstSimulator
from ska_pst.lmc.receive.receive_model import ReceiveData, ReceiveDataStore
[docs]def generate_random_update() -> ReceiveData:
"""Generate a random update of ReceivedData."""
data_receive_rate: float = 1.0 * randint(0, 90)
data_received: int = int(data_receive_rate * 1e9 / 8)
data_drop_rate: float = data_receive_rate / 1000.0 * random()
data_dropped: int = int(data_drop_rate * 1e9 / 8)
misordered_packets: int = randint(0, 3)
misordered_packet_rate: float = 1.0 * misordered_packets
malformed_packets: int = randint(0, 3)
malformed_packet_rate: float = 1.0 * malformed_packets
misdirected_packets: int = randint(0, 3)
misdirected_packet_rate: float = 1.0 * misordered_packets
checksum_failure_packets: int = randint(0, 3)
checksum_failure_packet_rate: float = 1.0 * checksum_failure_packets
timestamp_sync_error_packets: int = randint(0, 3)
timestamp_sync_error_packet_rate: float = 1.0 * timestamp_sync_error_packets
seq_number_sync_error_packets: int = randint(0, 3)
seq_number_sync_error_packet_rate: float = 1.0 * seq_number_sync_error_packets
invalid_polarisation_correction_packets: int = randint(0, 3)
invalid_polarisation_correction_packet_rate: float = 1.0 * invalid_polarisation_correction_packets
invalid_station_beam_packets: int = randint(0, 3)
invalid_station_beam_packet_rate: float = 1.0 * invalid_station_beam_packets
invalid_pst_beam_packets: int = randint(0, 3)
invalid_pst_beam_packet_rate: float = 1.0 * invalid_pst_beam_packets
percentage_data_received: float = 100.0 * randint(0, 1)
percentage_valid_station_beam: float = 100.0 * randint(0, 1)
percentage_valid_pst_beam: float = 100.0 * randint(0, 1)
percentage_valid_polarisation_correction: float = 100.0 * randint(0, 1)
return ReceiveData(
data_received=data_received,
data_receive_rate=data_receive_rate,
data_dropped=data_dropped,
data_drop_rate=data_drop_rate,
misordered_packets=misordered_packets,
misordered_packet_rate=misordered_packet_rate,
malformed_packets=malformed_packets,
malformed_packet_rate=malformed_packet_rate,
misdirected_packets=misdirected_packets,
misdirected_packet_rate=misdirected_packet_rate,
checksum_failure_packets=checksum_failure_packets,
checksum_failure_packet_rate=checksum_failure_packet_rate,
timestamp_sync_error_packets=timestamp_sync_error_packets,
timestamp_sync_error_packet_rate=timestamp_sync_error_packet_rate,
seq_number_sync_error_packets=seq_number_sync_error_packets,
seq_number_sync_error_packet_rate=seq_number_sync_error_packet_rate,
invalid_polarisation_correction_packets=invalid_polarisation_correction_packets,
invalid_polarisation_correction_packet_rate=invalid_polarisation_correction_packet_rate,
invalid_station_beam_packets=invalid_station_beam_packets,
invalid_station_beam_packet_rate=invalid_station_beam_packet_rate,
invalid_pst_beam_packets=invalid_pst_beam_packets,
invalid_pst_beam_packet_rate=invalid_pst_beam_packet_rate,
percentage_data_received=percentage_data_received,
percentage_valid_station_beam=percentage_valid_station_beam,
percentage_valid_pst_beam=percentage_valid_pst_beam,
percentage_valid_polarisation_correction=percentage_valid_polarisation_correction,
)
[docs]class PstReceiveSimulator(PstSimulator[ReceiveData, ReceiveDataStore]):
"""
Simulator for the RECV process of the PST.LMC sub-system.
This is used to generate random data and simulate what happens during the RECV process. Current
implementation has this internally with the TANGO device but future improvements will have this as a
separate process and the TANGO will connect via an API.
"""
def __init__(
self: PstReceiveSimulator,
**kwargs: Any,
) -> None:
"""Initialise the simulator."""
super().__init__(data_store=ReceiveDataStore(), **kwargs)
configuration: dict = {}
self.configure_scan(configuration=configuration)
self._scan = False
@override
def configure_scan(self: PstReceiveSimulator, configuration: dict) -> None:
"""
Simulate configuring a scan.
:param configuration: the configuration to be configured
:type configuration: dict
"""
if "num_subbands" in configuration:
self.num_subbands = configuration["num_subbands"]
self._data_store.reset()
for subband_id in range(1, self.num_subbands + 1):
self._data_store.update_subband(subband_id=subband_id, subband_data=ReceiveData())
@override
def _update(self: PstReceiveSimulator) -> None:
"""Simulate the update of RECV data."""
for subband_id, subband_data in self._data_store._subband_data.items():
subband_data = copy.copy(subband_data)
update: ReceiveData = generate_random_update()
# update totals
subband_data.data_received += update.data_received
subband_data.data_dropped += update.data_dropped
subband_data.misordered_packets += update.misordered_packets
subband_data.malformed_packets += update.malformed_packets
subband_data.misdirected_packets += update.misdirected_packets
subband_data.checksum_failure_packets += update.checksum_failure_packets
subband_data.timestamp_sync_error_packets += update.timestamp_sync_error_packets
subband_data.seq_number_sync_error_packets += update.seq_number_sync_error_packets
subband_data.invalid_polarisation_correction_packets = (
update.invalid_polarisation_correction_packets
)
subband_data.invalid_station_beam_packets = update.invalid_station_beam_packets
subband_data.invalid_pst_beam_packets = update.invalid_pst_beam_packets
# update the rates
subband_data.data_receive_rate = update.data_receive_rate
subband_data.data_drop_rate = update.data_drop_rate
subband_data.misordered_packet_rate = update.misordered_packet_rate
subband_data.malformed_packet_rate = update.malformed_packet_rate
subband_data.misdirected_packet_rate = update.misdirected_packet_rate
subband_data.checksum_failure_packet_rate = update.checksum_failure_packet_rate
subband_data.timestamp_sync_error_packet_rate = update.timestamp_sync_error_packet_rate
subband_data.seq_number_sync_error_packet_rate = update.seq_number_sync_error_packet_rate
subband_data.invalid_polarisation_correction_packet_rate = (
update.invalid_polarisation_correction_packet_rate
)
subband_data.invalid_station_beam_packet_rate = update.invalid_station_beam_packet_rate
subband_data.invalid_pst_beam_packet_rate = update.invalid_pst_beam_packet_rate
# update the qa percentages
subband_data.percentage_data_received += update.percentage_data_received
subband_data.percentage_valid_station_beam += update.percentage_valid_station_beam
subband_data.percentage_valid_pst_beam += update.percentage_valid_pst_beam
subband_data.percentage_valid_polarisation_correction += (
update.percentage_valid_polarisation_correction
)
self._data_store.update_subband(subband_id=subband_id, subband_data=subband_data)
[docs] def get_data(self: PstReceiveSimulator) -> ReceiveData:
"""
Get current RECV data.
Updates the current simulated data and returns the latest data.
:returns: current simulated RECV data.
:rtype: :py:class:`ReceiveData`
"""
if self._scan:
self._update()
return self._data_store.monitor_data
@override
def get_env(self: PstReceiveSimulator) -> dict:
"""
Get simulated environment values for RECV.CORE.
This returns the following:
* data_host = '127.0.0.1'
* data_mac = '01:23:45:ab:cd:ef',
* data_port = 32080
"""
return {
"data_host": "127.0.0.1",
"data_mac": "01:23:45:ab:cd:ef",
"data_port": 32080,
}