# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST LMC project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the Beam capability for the Pulsar Timing Sub-element."""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Tuple, cast
import tango
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import AdminMode, HealthState, SimulationMode
from ska_tango_base.executor import TaskStatus
from tango import DebugIt
from tango.server import attribute, command, device_property, run
import ska_pst_lmc.release as release
from ska_pst_lmc.beam.beam_component_manager import PstBeamComponentManager
from ska_pst_lmc.beam.beam_device_interface import PstBeamDeviceInterface
from ska_pst_lmc.component import as_device_attribute_name
from ska_pst_lmc.component.pst_device import PstBaseDevice
from ska_pst_lmc.util import Configuration
__all__ = ["PstBeam", "main"]
[docs]class PstBeam(PstBaseDevice[PstBeamComponentManager], PstBeamDeviceInterface):
"""
A logical TANGO device representing a Beam Capability for PST.LMC.
**Properties:**
- Device Property
RecvFQDN
- Type:'DevString'
SmrbFQDN
- Type:'DevString'
DspFQDN
- Type:'DevString'
StatFQDN
- Type:'DevString'
SendFQDN
- Type:'DevString'
"""
# -----------------
# Device Properties
# -----------------
RecvFQDN = device_property(
dtype=str,
)
SmrbFQDN = device_property(
dtype=str,
)
DspFQDN = device_property(
dtype=str,
)
StatFQDN = device_property(
dtype=str,
)
SendFQDN = device_property(
dtype=str,
)
ScanOutputDirPattern = device_property(
dtype=str, doc=("The pattern for directory used for scan output files.")
)
# ---------------
# General methods
# ---------------
[docs] def init_device(self: PstBeam) -> None:
"""
Initialise the attributes and properties of the PstBeam.
This overrides the :py:class:`SKABaseDevice`.
"""
util = tango.Util.instance()
util.set_serial_model(tango.SerialModel.NO_SYNC)
super().init_device()
self._build_state = "{}, {}, {}".format(release.NAME, release.VERSION, release.DESCRIPTION)
self._version_id = release.VERSION
for prop in [
"dataReceiveRate",
"dataReceived",
"dataDropRate",
"dataDropped",
"misorderedPackets",
"misorderedPacketRate",
"malformedPackets",
"malformedPacketRate",
"misdirectedPackets",
"misdirectedPacketRate",
"checksumFailurePackets",
"checksumFailurePacketRate",
"timestampSyncErrorPackets",
"timestampSyncErrorPacketRate",
"seqNumberSyncErrorPackets",
"seqNumberSyncErrorPacketRate",
"dataRecordRate",
"dataRecorded",
"diskCapacity",
"diskUsedBytes",
"diskUsedPercentage",
"availableDiskSpace",
"expectedDataRecordRate",
"availableRecordingTime",
"ringBufferUtilisation",
"channelBlockConfiguration",
]:
self.set_change_event(prop, True, False)
self.set_archive_event(prop, True, False)
[docs] def create_component_manager(
self: PstBeam,
) -> PstBeamComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
return PstBeamComponentManager(
device_interface=self,
simulation_mode=SimulationMode.TRUE,
logger=self.logger,
)
[docs] def always_executed_hook(self: PstBeam) -> None:
"""Execute call before any TANGO command is executed."""
[docs] def delete_device(self: PstBeam) -> None:
"""
Delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the init_device method to be
released. This method is called by the device destructor and by the device Init command.
"""
# stop the task executor
self.component_manager._pst_task_executor.stop()
super().delete_device()
[docs] def handle_attribute_value_update(self: PstBeam, attribute_name: str, value: Any) -> None:
"""
Handle update of a device attribute value.
:param attribute_name: the name of the attribute to update.
:type attribute_name: str
:param value: the new value of the attribute to update to.
:type value: Any
"""
try:
attr_key = as_device_attribute_name(attribute_name)
self.push_change_event(attr_key, value)
self.push_archive_event(attr_key, value)
except Exception:
self.logger.warning(
f"Error in attempting to set device attribute {attribute_name}.", exc_info=True
)
@property
def smrb_fqdn(self: PstBeam) -> str:
"""Get the fully qualified device name (FQDN) for the SMRB.MGMT device of this beam."""
return self.SmrbFQDN
@property
def recv_fqdn(self: PstBeam) -> str:
"""Get the fully qualified device name (FQDN) for the RECV.MGMT device of this beam."""
return self.RecvFQDN
@property
def dsp_fqdn(self: PstBeam) -> str:
"""Get the fully qualified device name (FQDN) for the DSP.MGMT device of this beam."""
return self.DspFQDN
@property
def stat_fqdn(self: PstBeam) -> str:
"""Get the fully qualified device name (FQDN) for the STAT.MGMT device of this beam."""
return self.StatFQDN
@property
def scan_output_dir_pattern(self: PstBeam) -> str:
"""Get the pattern for directory used for scan output files."""
return self.ScanOutputDirPattern
[docs] def handle_subdevice_fault(self: PstBeam, device_fqdn: str, fault_msg: str) -> None:
"""
Handle a fault raised from a subordinate device.
:param device_fqdn: the fully-qualified domain name of the subordinate device.
:type device_fqdn: str
:param fault_msg: the fault message from the subordinate device.
:type fault_msg: str
"""
self._health_failure_msg = fault_msg
self._component_state_changed(obsfault=True)
self.update_health_state(health_state=HealthState.FAILED)
# ----------
# Commands
# ----------
# ----------
# Attributes
# ----------
@property
def monitoring_polling_rate(self: PstBeam) -> int:
"""Get the monitoring polling rate."""
return self._monitoring_polling_rate
@attribute(
dtype=int,
label="Monitoring polling rate",
doc=("Rate at which data from CORE apps is monitored during a scan in milliseconds."),
)
def monitoringPollingRate(self: PstBeam) -> int:
"""Get the current monitoring polling rate, in milliseconds."""
return self._monitoring_polling_rate
@monitoringPollingRate.write # type: ignore[no-redef]
def monitoringPollingRate(self: PstBeam, monitoring_polling_rate: int) -> None:
"""Update the monitoring polling rate."""
self._monitoring_polling_rate = monitoring_polling_rate
self.component_manager.set_monitoring_polling_rate(monitoring_polling_rate)
@attribute(
dtype=int,
unit="Bytes",
standard_unit="Bytes",
display_unit="B",
doc="Total capacity of the disk that DSP is writing to.",
)
def diskCapacity(self: PstBeam) -> int:
"""
Total capacity of the disk that DSP is writing to.
:returns: total capacity of the disk that DSP is writing to, in bytes.
:rtype: int
"""
return self.component_manager.disk_capacity
@attribute(
dtype=int,
unit="Bytes",
standard_unit="Bytes",
display_unit="B",
doc="Used space on the disk that DSP is writing to.",
)
def diskUsedBytes(self: PstBeam) -> int:
"""
Get used space on the disk that DSP is writing to.
This is `diskCapacity - availableDiskSpace`.
:returns: used space on the disk that DSP is writing to, in bytes.
:rtype: int
"""
return self.component_manager.disk_used_bytes
@attribute(
dtype=float,
unit="Percentage",
display_unit="%",
max_value=100,
min_value=0,
max_alarm=99,
max_warning=95,
doc="Used space on the disk that DSP is writing to.",
)
def diskUsedPercentage(self: PstBeam) -> float:
"""
Get used space on the disk that DSP is writing to.
This is `100.0 * (diskCapacity - availableDiskSpace)/availableDiskSpace`.
:returns: used space on the disk that DSP is writing to as a percentage.
:rtype: float
"""
return self.component_manager.disk_used_percentage
@attribute(
dtype=int,
unit="Bytes",
standard_unit="Bytes",
display_unit="B",
doc="Available space on the disk that DSP is writing to.",
)
def availableDiskSpace(self: PstBeam) -> int:
"""
Available space on the disk that the PST.BEAM is writing to.
:returns: available space on the disk that PST.BEAM is writing to, in bytes.
:rtype: int
"""
return self.component_manager.available_disk_space
@attribute(
dtype=float,
unit="Seconds",
display_unit="s",
min_alarm=10.0,
min_warning=60.0,
doc="Available time, in seconds, for writing available.",
)
def availableRecordingTime(self: PstBeam) -> float:
"""
Get available time, in seconds, for writing available.
:returns: available time, in seconds, for writing available.
:rtype: float
"""
return self.component_manager.available_recording_time
# Scan monitoring values
@attribute(
dtype=float,
unit="Gigabits per second",
standard_unit="Gigabits per second",
display_unit="Gb/s",
max_value=200,
min_value=0,
doc="Current data receive rate from the CBF interface",
)
def dataReceiveRate(self: PstBeam) -> float:
"""
Get the current data receive rate from the CBF interface.
:returns: current data receive rate from the CBF interface in Gb/s.
:rtype: float
"""
return self.component_manager.data_receive_rate
@attribute(
dtype=int,
unit="Bytes",
standard_unit="Bytes",
display_unit="B",
doc="Total number of bytes received from the CBF in the current scan",
)
def dataReceived(self: PstBeam) -> int:
"""
Get the total amount of data received from CBF interface for current scan.
:returns: total amount of data received from CBF interface for current scan in Bytes
:rtype: int
"""
return self.component_manager.data_received
@attribute(
dtype=float,
label="Drop Rate",
unit="Bytes per second",
standard_unit="Bytes per second",
display_unit="B/s",
max_value=200,
min_value=-1,
max_alarm=10,
min_alarm=-1,
max_warning=1,
min_warning=-1,
doc="Current rate of CBF ingest data being dropped or lost by the receiving process",
)
def dataDropRate(self: PstBeam) -> float:
"""
Get the current rate of CBF ingest data being dropped or lost by the receiving proces.
:returns: current rate of CBF ingest data being dropped or lost in Bytes/s.
:rtype: float
"""
return self.component_manager.data_drop_rate
@attribute(
dtype=int,
label="Dropped",
unit="Bytes",
standard_unit="Bytes",
display_unit="B",
doc="Total number of bytes dropped in the current scan",
)
def dataDropped(self: PstBeam) -> int:
"""
Get the total number of bytes dropped in the current scan.
:returns: total number of bytes dropped in the current scan.
:rtype: int
"""
return self.component_manager.data_dropped
@attribute(
dtype=float,
label="Misordered packets",
doc=(
"Number of out of order UDP packets received in the current scan."
"The UDP packets for all frequency channels of a given set of"
"time samples that start at time t0 shall arrive before the"
"first packet containing data sampled at time t0+2 delta_t,"
"where delta_t is the time spanned by the set of time samples"
"in a single packet."
),
)
def misorderedPackets(self: PstBeam) -> int:
"""
Get the total number of packets received out of order in the current scan.
:returns: total number of packets received out of order in the current scan.
:rtype: int
"""
return self.component_manager.misordered_packets
@attribute(
dtype=float,
label="Misordered packet rate",
unit="packets/sec",
doc="The current rate of misordered packets.",
)
def misorderedPacketRate(self: PstBeam) -> float:
"""
Get the current rate of misordered packets.
:returns: the current rate of misordered packets in packets/seconds.
:rtype: float
"""
return self.component_manager.misordered_packet_rate
@attribute(
dtype=int,
label="Malformed packets",
doc=(
"Malformed packets are valid UDP packets, but where contents of"
"the UDP payload does not conform to the specification in the"
"CBF/PST ICD. Examples of malformation include: bad magic-word"
"field, invalid meta-data, incorrect packet size."
),
)
def malformedPackets(self: PstBeam) -> int:
"""
Get the total number of packets marked as malformed for current scan.
:returns: the total number of packets marked as malformed for current scan.
:rtype: int
"""
return self.component_manager.malformed_packets
@attribute(
dtype=float,
label="Malformed packet rate",
unit="packets/sec",
doc="The current rate of malformed packets.",
)
def malformedPacketRate(self: PstBeam) -> float:
"""
Get current rate of malformed packets.
:return: current rate of malformed packets in packets/seconds.
:rtype: float
"""
return self.component_manager.malformed_packet_rate
@attribute(
dtype=int,
label="Misdirected packets",
doc=(
"Total number of (valid) UDP packets that were unexpectedly received."
"Misdirection could be due to wrong ScanID, Beam ID, Network Interface"
"or UDP port. Receiving misdirected packets is a sign that there is"
"something wrong with the upstream configuration for the scan."
),
)
def misdirectedPackets(self: PstBeam) -> int:
"""
Get the total number of packets as marked as misdirected for current scan.
:returns: the total number of packets as marked as misdirected for current scan.
:rtype: int
"""
return self.component_manager.misdirected_packets
@attribute(
dtype=float,
label="Misdirected packet rate",
unit="packets/sec",
doc="The current rate of misdirected packets.",
)
def misdirectedPacketRate(self: PstBeam) -> float:
"""
Get the current rate of misdirected packets.
:return: the current rate of misdirected packets in packets/seconds.
:rtype: float
"""
return self.component_manager.misdirected_packet_rate
@attribute(
dtype=int,
label="Checksum failure packets",
doc="Total number of packets with a UDP, IP header or CRC checksum failure.",
)
def checksumFailurePackets(self: PstBeam) -> int:
"""
Get the total number of packets with checksum failures for current scan.
:return: the total number of packets with checksum failures for current scan.
:rtype: int
"""
return self.component_manager.checksum_failure_packets
@attribute(
dtype=float,
label="Checksum failure packet rate",
unit="packets/sec",
doc="The current rate of packets with checkesum failures.",
)
def checksumFailurePacketRate(self: PstBeam) -> float:
"""
Get the current rate of packets with checkesum failures.
:return: the current rate of packets with checkesum failures in packets/seconds.
:rtype: float
"""
return self.component_manager.checksum_failure_packet_rate
@attribute(
dtype=int,
label="Timestamp sync error packets",
doc=(
"The number of packets received where the timestamp has become"
"desynchronised with the packet sequence number * sampling interval"
),
)
def timestampSyncErrorPackets(self: PstBeam) -> int:
"""
Get the total number of packets with a timestamp sync error for current scan.
:return: the total number of packets with a timestamp sync error for current scan.
:rtype: int
"""
return self.component_manager.timestamp_sync_error_packets
@attribute(
dtype=float,
label="Timestamp sync error packet rate",
unit="packets/sec",
doc="The current rate of packets with a timestamp sync error.",
)
def timestampSyncErrorPacketRate(self: PstBeam) -> float:
"""
Get the current rate of packets with a timestamp sync error.
:return: the current rate of packets with a timestamp sync error in packets/seconds.
:rtype: float
"""
return self.component_manager.timestamp_sync_error_packet_rate
@attribute(
dtype=int,
label="Seq. number sync error packets",
doc=(
"The number of packets received where the packet sequence number has"
"become desynchronised with the data rate and elapsed time."
),
)
def seqNumberSyncErrorPackets(self: PstBeam) -> int:
"""
Get the total number of packets with a seq num sync error in current scan.
:return: the total number of packets with a seq num sync error in current scan.
:rtype: int
"""
return self.component_manager.seq_number_sync_error_packets
@attribute(
dtype=float,
label="Seq. number sync error packet rate",
unit="packets/sec",
doc="The current rate of packets with a sequence number sync error.",
)
def seqNumberSyncErrorPacketRate(self: PstBeam) -> float:
"""
Get the current rate of packets with a sequence number sync error.
:return: the current rate of packets with a sequence number sync error in packets/seconds.
:rtype: float
"""
return self.component_manager.seq_number_sync_error_packet_rate
@attribute(
dtype=float,
unit="Bytes per second",
display_unit="B/s",
doc="Current rate of writing to the disk.",
)
def dataRecordRate(self: PstBeam) -> float:
"""
Get current rate of writing to the disk.
:returns: use space on the disk that PST.BEAM is writing to, in bytes.
:rtype: float
"""
return self.component_manager.data_record_rate
@attribute(
dtype=int,
unit="Bytes",
display_unit="B",
doc="Number of bytes written during scan.",
)
def dataRecorded(self: PstBeam) -> int:
"""
Get number of bytes written during scan.
:returns: number of bytes written during scan.
:rtype: int
"""
return self.component_manager.data_recorded
@attribute(
dtype=str,
doc="The channel block configuration based on scan configuration.",
)
def channelBlockConfiguration(self: PstBeam) -> str:
"""
Get the channel block configuration.
This is a JSON serialised string of the channel block configuration
that is calculated during the `ConfigureScan` command. This
configuration includes the following properties:
* Number of channel blocks, between 1 and 4
* For each channel block, the block of channel numbers
using a range in the form of inclusive of the lower
number and exclusive of the higher number (e.g [1, 21)
would be a range of 20 channels starting from 1 and ending
at channel block 20 (inclusive).
* Channel block IPv4 address to send data to.
* Channel block UDP port
.. code-block:: python
{
"num_channel_blocks": 2,
"channel_blocks": [
{
"data_host": "10.10.0.1",
"data_port": 20000,
"start_channel": 0,
"num_channels": 12,
},
{
"data_host": "10.10.0.1",
"data_port": 20001,
"start_channel": 12,
"num_channels": 10,
},
]
}
:returns: the channel block configuration as a JSON string.
:rtype: str
"""
import json
return json.dumps(self.component_manager.channel_block_configuration)
@attribute(
dtype=float,
label="Utilisation",
unit="Percentage",
display_unit="%",
max_value=100,
min_value=0,
max_alarm=90,
max_warning=80,
doc="Percentage of the ring buffer elements that are full of data",
)
def ringBufferUtilisation(self: PstBeam) -> float:
"""
Get the percentage of the ring buffer elements that are full of data.
:returns: the percentage of the ring buffer elements that are full of data.
:rtype: float
"""
return self.component_manager.ring_buffer_utilisation
@attribute(
dtype=float,
unit="Gigabits per second",
display_unit="Gb/s",
doc="Expected rate of data to be received by PST Beam component.",
)
def expectedDataRecordRate(self: PstBeam) -> float:
"""
Get the expected rate of data to be received by PST Beam component.
:returns: the expected rate of data to be received by PST Beam component.
:rtype: float
"""
return self.component_manager.expected_data_record_rate
# --------
# Commands
# --------
@command(
dtype_out=("str",),
doc_out="Version strings",
)
@DebugIt()
def GetVersionInfo(self: PstBeam) -> List[str]:
"""
Return the version information of the device.
:return: The result code and the command unique ID
"""
return [f"{self.__class__.__name__}, {self._build_state}"]
def _update_admin_mode(self: PstBeam, admin_mode: AdminMode) -> None:
super()._update_admin_mode(admin_mode)
if hasattr(self, "component_manager"):
self.component_manager.update_admin_mode(admin_mode)
# ----------
# Run server
# ----------
[docs]def main(args: Optional[list] = None, **kwargs: Any) -> int:
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
:rtype: int
"""
return run((PstBeam,), args=args, **kwargs)
if __name__ == "__main__":
main()