# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the Beam capability for the Pulsar Timing Sub-element."""
from __future__ import annotations
from typing import Any, List, Optional, Tuple, cast
import ska_pst.lmc.release as release
import tango
from ska_control_model import HealthState, ObsMode, ObsState, SimulationMode
from ska_pst.lmc.beam.beam_component_manager import PstBeamComponentManager
from ska_pst.lmc.beam.beam_device_interface import PstBeamDeviceInterface
from ska_pst.lmc.component import as_device_attribute_name
from ska_pst.lmc.component.pst_device import DEFAULT_HEALTH_CHECK_INTERVAL_MS, PstBaseDevice
from ska_pst.lmc.validation import PstConfigValidator
from ska_tango_base.commands import ResultCode
from tango import DebugIt
from tango.server import attribute, command, device_property, run
__all__ = ["PstBeam", "main"]
EXPOSED_ATTRIBUTES = [
# General
"channelBlockConfiguration",
"pstProcessingMode",
# RECV
"dataReceiveRate",
"dataReceived",
"dataDropRate",
"dataDropped",
"misorderedPackets",
"misorderedPacketRate",
"malformedPackets",
"malformedPacketRate",
"misdirectedPackets",
"misdirectedPacketRate",
"checksumFailurePackets",
"checksumFailurePacketRate",
"timestampSyncErrorPackets",
"timestampSyncErrorPacketRate",
"seqNumberSyncErrorPackets",
"seqNumberSyncErrorPacketRate",
"invalidPolarisationCorrectionPackets",
"invalidPolarisationCorrectionPacketRate",
"invalidStationBeamPackets",
"invalidStationBeamPacketRate",
"invalidPstBeamPackets",
"invalidPstBeamPacketRate",
"percentageDataReceived",
"percentageValidStationBeam",
"percentageValidPstBeam",
"percentageValidPolarisationCorrection",
# DSP Disk
"dataRecordRate",
"dataRecorded",
"diskCapacity",
"diskUsedBytes",
"diskUsedPercentage",
"availableDiskSpace",
"expectedDataRecordRate",
"availableRecordingTime",
# DSPSR based pipelines
"dspProcessingTime",
"dspDataTime",
"dspBytesProcessed",
"dspProcessingTimePercent",
"dspOverallEfficiency",
"dspEfficiency",
"dspBytesProcessingRate",
# SMRB
"ringBufferUtilisation",
# STAT
"realPolAMeanFreqAvg",
"realPolAVarianceFreqAvg",
"realPolANumClippedSamples",
"imagPolAMeanFreqAvg",
"imagPolAVarianceFreqAvg",
"imagPolANumClippedSamples",
"realPolAMeanFreqAvgRfiExcised",
"realPolAVarianceFreqAvgRfiExcised",
"realPolANumClippedSamplesRfiExcised",
"imagPolAMeanFreqAvgRfiExcised",
"imagPolAVarianceFreqAvgRfiExcised",
"imagPolANumClippedSamplesRfiExcised",
"realPolBMeanFreqAvg",
"realPolBVarianceFreqAvg",
"realPolBNumClippedSamples",
"imagPolBMeanFreqAvg",
"imagPolBVarianceFreqAvg",
"imagPolBNumClippedSamples",
"realPolBMeanFreqAvgRfiExcised",
"realPolBVarianceFreqAvgRfiExcised",
"realPolBNumClippedSamplesRfiExcised",
"imagPolBMeanFreqAvgRfiExcised",
"imagPolBVarianceFreqAvgRfiExcised",
"imagPolBNumClippedSamplesRfiExcised",
# Subcomponent health state and obs state
"recvHealthState",
"recvObsState",
"smrbHealthState",
"smrbObsState",
"statHealthState",
"statObsState",
"dspHealthState",
"dspObsState",
]
[docs]class PstBeam(PstBaseDevice[PstBeamComponentManager], PstBeamDeviceInterface):
"""
A logical TANGO device representing a Beam Capability for PST.LMC.
**Properties:**
- Device Property
RecvProcessApiEndpoint
- Type:'DevString'
SmrbProcessApiEndpoint
- Type:'DevString'
DspProcessApiEndpoint
- Type:'DevString'
StatProcessApiEndpoint
- Type:'DevString'
ScanOutputDirPattern
- Type:'DevString'
"""
# -----------------
# Device Properties
# -----------------
RecvProcessApiEndpoint = device_property(
dtype=str,
)
SmrbProcessApiEndpoint = device_property(
dtype=str,
)
DspProcessApiEndpoint = device_property(
dtype=str,
)
StatProcessApiEndpoint = device_property(
dtype=str,
)
ScanOutputDirPattern = device_property(
dtype=str, doc=("The pattern for directory used for scan output files.")
)
# ---------------
# General methods
# ---------------
[docs] def init_device(self: PstBeam) -> None:
"""
Initialise the attributes and properties of the PstBeam.
This overrides the :py:class:`SKABaseDevice`.
"""
util = tango.Util.instance()
util.set_serial_model(tango.SerialModel.NO_SYNC)
super().init_device()
self._build_state = "{}, {}, {}".format(release.NAME, release.VERSION, release.DESCRIPTION)
self._version_id = release.VERSION
for attr in EXPOSED_ATTRIBUTES:
self.set_change_event(attr, True, False)
self.set_archive_event(attr, True, False)
[docs] def create_component_manager(
self: PstBeam,
) -> PstBeamComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
# ensure that monitoring polling rate is set to the default value
self._monitoring_polling_rate_ms = self.DefaultMonitoringPollingRate
self._health_check_interval = self.DefaultHealthCheckInterval or DEFAULT_HEALTH_CHECK_INTERVAL_MS
return PstBeamComponentManager(
device_interface=self,
simulation_mode=SimulationMode.TRUE,
logger=self.logger,
)
[docs] def always_executed_hook(self: PstBeam) -> None:
"""Execute call before any TANGO command is executed."""
[docs] def delete_device(self: PstBeam) -> None:
"""
Delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the init_device method to be
released. This method is called by the device destructor and by the device Init command.
"""
# stop the task executor
self.component_manager._pst_task_executor.stop()
super().delete_device()
[docs] def handle_attribute_value_update(self: PstBeam, attribute_name: str, value: Any) -> None:
"""
Handle update of a device attribute value.
:param attribute_name: the name of the attribute to update.
:type attribute_name: str
:param value: the new value of the attribute to update to.
:type value: Any
"""
try:
attr_key = as_device_attribute_name(attribute_name)
# avoid setting an attribute that we've not currently exposed.
if attr_key not in EXPOSED_ATTRIBUTES:
return
self.push_change_event(attr_key, value)
self.push_archive_event(attr_key, value)
except Exception:
self.logger.warning(
f"Error in attempting to set device attribute {attribute_name}.", exc_info=True
)
[docs] def update_obs_mode(self: PstBeam, obs_mode: ObsMode) -> None:
"""
Update the observation mode of the device.
When the device has been configured for a scan it should be put into
PULSAR_TIMING.
When the device is in an IDLE state the
observation mode should be set to IDLE too.
When PST supports VLBI and the beam has been configured for VLBI then
the observation mode should be set to VLBI not PULSAR_TIMING
:param obs_mode: the observation mode enum value.
:type obs_mode: ObsMode
"""
self._obs_mode = obs_mode
self.push_change_event("obsMode", obs_mode)
self.push_archive_event("obsMode", obs_mode)
@property
def smrb_process_api_endpoint(self: PstBeam) -> str:
"""Get the gRPC process API endpoint for SMRB.CORE."""
return self.SmrbProcessApiEndpoint
@property
def recv_process_api_endpoint(self: PstBeam) -> str:
"""Get the gRPC process API endpoint for RECV.CORE."""
return self.RecvProcessApiEndpoint
@property
def dsp_process_api_endpoint(self: PstBeam) -> str:
"""Get the gRPC process API endpoint for DSP."""
return self.DspProcessApiEndpoint
@property
def stat_process_api_endpoint(self: PstBeam) -> str:
"""Get the gRPC process API endpoint for STAT.CORE."""
return self.StatProcessApiEndpoint
@property
def scan_output_dir_pattern(self: PstBeam) -> str:
"""Get the pattern for directory used for scan output files."""
return self.ScanOutputDirPattern
# ----------
# Attributes
# ----------
@property
def monitoring_polling_rate_ms(self: PstBeam) -> int:
"""Get the monitoring polling rate."""
return self._monitoring_polling_rate_ms
@attribute(
dtype=int,
label="Monitoring polling rate",
doc=("Rate at which data from CORE apps is monitored during a scan in milliseconds."),
)
def monitoringPollingRate(self: PstBeam) -> int:
"""Get the current monitoring polling rate, in milliseconds."""
return self._monitoring_polling_rate_ms
@monitoringPollingRate.write # type: ignore[no-redef]
def monitoringPollingRate(self: PstBeam, monitoring_polling_rate_ms: int) -> None:
"""Update the monitoring polling rate."""
self._monitoring_polling_rate_ms = monitoring_polling_rate_ms
self.component_manager.monitoring_polling_rate_ms = monitoring_polling_rate_ms
@property
def health_check_interval(self: PstBeam) -> int:
"""
Get the health check interval, in milliseconds.
This is a pure Python property to allow the BEAM component manager
to get it from the TANGO device via the :py:class:`PstBeamDeviceInterface`
interface class.
"""
# The or here is due to TANGO test infrastructure the value is not set
# from a the Device Property
return self._health_check_interval or DEFAULT_HEALTH_CHECK_INTERVAL_MS
@attribute(
dtype=int,
label="Health check interval",
unit="milliseconds",
display_unit="ms",
doc=("The interval with which the CORE apps report their health state, in milliseconds."),
)
def healthCheckInterval(self: PstBeam) -> int:
"""
Get the current health check polling interval, in milliseconds.
:return: the current health check polling interval, in milliseconds.
:rtype: int
"""
return self.health_check_interval
@healthCheckInterval.write # type: ignore[no-redef]
def healthCheckInterval(self: PstBeam, health_check_interval: int) -> None:
"""
Update the health check polling interval.
If this value is different to the currently configured health check interval, the
current health check background tasks will be stopped and restarted with the new
interval.
:param health_check_interval: the new health check interval to use for background
health check of core applications, in milliseconds.
:type health_check_interval: int
"""
self._health_check_interval = health_check_interval
self.component_manager.health_check_interval = health_check_interval
@attribute(
dtype=int,
unit="Bytes",
standard_unit="Bytes",
display_unit="B",
doc="Total capacity of the disk that DSP is writing to.",
)
def diskCapacity(self: PstBeam) -> int:
"""
Total capacity of the disk that DSP is writing to.
:returns: total capacity of the disk that DSP is writing to, in bytes.
:rtype: int
"""
return self.component_manager.disk_capacity
@attribute(
dtype=int,
unit="Bytes",
standard_unit="Bytes",
display_unit="B",
doc="Used space on the disk that DSP is writing to.",
)
def diskUsedBytes(self: PstBeam) -> int:
"""
Get used space on the disk that DSP is writing to.
This is ``diskCapacity - availableDiskSpace``.
:returns: used space on the disk that DSP is writing to, in bytes.
:rtype: int
"""
return self.component_manager.disk_used_bytes
@attribute(
dtype=float,
unit="Percentage",
display_unit="%",
max_value=100,
min_value=0,
doc="Used space on the disk that DSP is writing to.",
)
def diskUsedPercentage(self: PstBeam) -> float:
"""
Get used space on the disk that DSP is writing to.
This is ``100.0 * (diskCapacity - availableDiskSpace)/diskCapacity``.
:returns: used space on the disk that DSP is writing to as a percentage.
:rtype: float
"""
return self.component_manager.disk_used_percentage
@attribute(
dtype=int,
unit="Bytes",
standard_unit="Bytes",
display_unit="B",
doc="Available space on the disk that DSP is writing to.",
)
def availableDiskSpace(self: PstBeam) -> int:
"""
Available space on the disk that the PST.BEAM is writing to.
:returns: available space on the disk that PST.BEAM is writing to, in bytes.
:rtype: int
"""
return self.component_manager.available_disk_space
@attribute(
dtype=float,
unit="Seconds",
display_unit="s",
doc="Available time, in seconds, for writing available.",
min_warning=60.0,
min_alarm=10.0,
)
def availableRecordingTime(self: PstBeam) -> float:
"""
Get available time, in seconds, for writing available.
:returns: available time, in seconds, for writing available.
:rtype: float
"""
return self.component_manager.available_recording_time
# Scan monitoring values
@attribute(
dtype=float,
unit="Gigabits per second",
standard_unit="Gigabits per second",
display_unit="Gb/s",
max_value=200,
min_value=0,
doc="Current data receive rate from the CBF interface",
)
def dataReceiveRate(self: PstBeam) -> float:
"""
Get the current data receive rate from the CBF interface.
:returns: current data receive rate from the CBF interface in Gb/s.
:rtype: float
"""
return self.component_manager.data_receive_rate
@attribute(
dtype=int,
unit="Bytes",
standard_unit="Bytes",
display_unit="B",
doc="Total number of bytes received from the CBF in the current scan",
)
def dataReceived(self: PstBeam) -> int:
"""
Get the total amount of data received from CBF interface for current scan.
:returns: total amount of data received from CBF interface for current scan in Bytes
:rtype: int
"""
return self.component_manager.data_received
@attribute(
dtype=float,
label="Drop Rate",
unit="Bytes per second",
standard_unit="Bytes per second",
display_unit="B/s",
max_value=200,
min_value=-1,
max_warning=0.001,
doc="Current rate of CBF ingest data being dropped or lost by the receiving process",
)
def dataDropRate(self: PstBeam) -> float:
"""
Get the current rate of CBF ingest data being dropped or lost by the receiving process.
:returns: current rate of CBF ingest data being dropped or lost in Bytes/s.
:rtype: float
"""
return self.component_manager.data_drop_rate
@attribute(
dtype=int,
label="Dropped",
unit="Bytes",
standard_unit="Bytes",
display_unit="B",
doc="Total number of bytes dropped in the current scan",
)
def dataDropped(self: PstBeam) -> int:
"""
Get the total number of bytes dropped in the current scan.
:returns: total number of bytes dropped in the current scan.
:rtype: int
"""
return self.component_manager.data_dropped
@attribute(
dtype=int,
label="Misordered packets",
doc=(
"Number of out of order UDP packets received in the current scan."
"The UDP packets for all frequency channels of a given set of"
"time samples that start at time t0 shall arrive before the"
"first packet containing data sampled at time t0+2 delta_t,"
"where delta_t is the time spanned by the set of time samples"
"in a single packet."
),
)
def misorderedPackets(self: PstBeam) -> int:
"""
Get the total number of packets received out of order in the current scan.
:returns: total number of packets received out of order in the current scan.
:rtype: int
"""
return self.component_manager.misordered_packets
@attribute(
dtype=float,
label="Misordered packet rate",
unit="packets/sec",
doc="The current rate of misordered packets.",
max_warning=0.001,
)
def misorderedPacketRate(self: PstBeam) -> float:
"""
Get the current rate of misordered packets.
:returns: the current rate of misordered packets in packets/seconds.
:rtype: float
"""
return self.component_manager.misordered_packet_rate
@attribute(
dtype=int,
label="Malformed packets",
doc=(
"Malformed packets are valid UDP packets, but where contents of"
"the UDP payload does not conform to the specification in the"
"CBF/PST ICD. Examples of malformation include: bad magic-word"
"field, invalid meta-data, incorrect packet size."
),
)
def malformedPackets(self: PstBeam) -> int:
"""
Get the total number of packets marked as malformed for current scan.
:returns: the total number of packets marked as malformed for current scan.
:rtype: int
"""
return self.component_manager.malformed_packets
@attribute(
dtype=float,
label="Malformed packet rate",
unit="packets/sec",
doc="The current rate of malformed packets.",
max_warning=0.001,
)
def malformedPacketRate(self: PstBeam) -> float:
"""
Get current rate of malformed packets.
:return: current rate of malformed packets in packets/seconds.
:rtype: float
"""
return self.component_manager.malformed_packet_rate
@attribute(
dtype=int,
label="Misdirected packets",
doc=(
"Total number of (valid) UDP packets that were unexpectedly received."
"Misdirection could be due to wrong ScanID, Beam ID, Network Interface"
"or UDP port. Receiving misdirected packets is a sign that there is"
"something wrong with the upstream configuration for the scan."
),
)
def misdirectedPackets(self: PstBeam) -> int:
"""
Get the total number of packets as marked as misdirected for current scan.
:returns: the total number of packets as marked as misdirected for current scan.
:rtype: int
"""
return self.component_manager.misdirected_packets
@attribute(
dtype=float,
label="Misdirected packet rate",
unit="packets/sec",
doc="The current rate of misdirected packets.",
max_warning=0.001,
)
def misdirectedPacketRate(self: PstBeam) -> float:
"""
Get the current rate of misdirected packets.
:return: the current rate of misdirected packets in packets/seconds.
:rtype: float
"""
return self.component_manager.misdirected_packet_rate
@attribute(
dtype=int,
label="Checksum failure packets",
doc="Total number of packets with a UDP, IP header or CRC checksum failure.",
)
def checksumFailurePackets(self: PstBeam) -> int:
"""
Get the total number of packets with checksum failures for current scan.
:return: the total number of packets with checksum failures for current scan.
:rtype: int
"""
return self.component_manager.checksum_failure_packets
@attribute(
dtype=float,
label="Checksum failure packet rate",
unit="packets/sec",
doc="The current rate of packets with checksum failures.",
max_warning=0.001,
)
def checksumFailurePacketRate(self: PstBeam) -> float:
"""
Get the current rate of packets with checksum failures.
:return: the current rate of packets with checksum failures in packets/seconds.
:rtype: float
"""
return self.component_manager.checksum_failure_packet_rate
@attribute(
dtype=int,
label="Timestamp sync error packets",
doc=(
"The number of packets received where the timestamp has become"
"de-synchronised with the packet sequence number * sampling interval"
),
)
def timestampSyncErrorPackets(self: PstBeam) -> int:
"""
Get the total number of packets with a timestamp sync error for current scan.
:return: the total number of packets with a timestamp sync error for current scan.
:rtype: int
"""
return self.component_manager.timestamp_sync_error_packets
@attribute(
dtype=float,
label="Timestamp sync error packet rate",
unit="packets/sec",
doc="The current rate of packets with a timestamp sync error.",
max_warning=0.001,
)
def timestampSyncErrorPacketRate(self: PstBeam) -> float:
"""
Get the current rate of packets with a timestamp sync error.
:return: the current rate of packets with a timestamp sync error in packets/seconds.
:rtype: float
"""
return self.component_manager.timestamp_sync_error_packet_rate
@attribute(
dtype=int,
label="Seq. number sync error packets",
doc=(
"The number of packets received where the packet sequence number has"
"become de-synchronised with the data rate and elapsed time."
),
)
def seqNumberSyncErrorPackets(self: PstBeam) -> int:
"""
Get the total number of packets with a seq num sync error in current scan.
:return: the total number of packets with a seq num sync error in current scan.
:rtype: int
"""
return self.component_manager.seq_number_sync_error_packets
@attribute(
dtype=float,
label="Seq. number sync error packet rate",
unit="packets/sec",
doc="The current rate of packets with a sequence number sync error.",
max_warning=0.001,
)
def seqNumberSyncErrorPacketRate(self: PstBeam) -> float:
"""
Get the current rate of packets with a sequence number sync error.
:return: the current rate of packets with a sequence number sync error in packets/seconds.
:rtype: float
"""
return self.component_manager.seq_number_sync_error_packet_rate
@attribute(
dtype=int,
label="No valid Jones polarisation correction packets",
doc="The number of packets received where no valid Jones polarisation corrections have been applied",
)
def invalidPolarisationCorrectionPackets(self: PstBeam) -> int:
"""
Get the number of packets received where no valid Jones polarisation corrections have been applied.
:return: the number of packets received where no valid Jones polarisation corrections have been applied.
:rtype: int
""" # noqa: E501
return self.component_manager.invalid_polarisation_correction_packets
@attribute(
dtype=float,
label="No valid Jones polarisation correction packet rate",
unit="packets/sec",
doc=(
"The current rate of packets where no valid Jones polarisation "
"corrections have been applied in packets/sec"
),
max_warning=0.001,
)
def invalidPolarisationCorrectionPacketRate(self: PstBeam) -> float:
"""
Get rate of packets where no valid Jones polarisation corrections have been applied in packets/sec.
:return: rate of packets where no valid Jones polarisation corrections have been applied in packets/sec.
:rtype: float
""" # noqa: E501
return self.component_manager.invalid_polarisation_correction_packet_rate
@attribute(
dtype=int,
label="No valid station beam polynomial packets",
doc="The number of packets received where no valid station beam delay polynomials have been applied",
)
def invalidStationBeamPackets(self: PstBeam) -> int:
"""
Get the number of packets received where no valid station beam delay polynomials have been applied.
:return: the number of packets received where no valid station beam delay polynomials have been applied.
:rtype: int
""" # noqa: E501
return self.component_manager.invalid_station_beam_packets
@attribute(
dtype=float,
label="No valid station beam polynomial packet rate",
unit="packets/sec",
doc=(
"The current rate of packets where no valid station beam delay "
"polynomials have been applied in packets/sec"
),
max_warning=0.001,
)
def invalidStationBeamPacketRate(self: PstBeam) -> float:
"""
Get current rate of packets where no valid station beam delay polynomials have been applied in packets/sec.
:return: current rate of packets where no valid station beam delay polynomials have been applied in packets/sec.
:rtype: float
""" # noqa: E501
return self.component_manager.invalid_station_beam_packet_rate
@attribute(
dtype=int,
label="No valid PST beam polynomial packets",
doc="The number of packets received where no valid PST beam delay polynomials have been applied",
)
def invalidPstBeamPackets(self: PstBeam) -> float:
"""
Get the number of packets received where no valid PST beam delay polynomials have been applied.
:return: the number of packets received where no valid PST beam delay polynomials have been applied.
:rtype: int
""" # noqa: E501
return self.component_manager.invalid_pst_beam_packets
@attribute(
dtype=float,
unit="packets/sec",
label="No valid PST beam polynomial packet rate",
doc=(
"The current rate of packets where no valid PST beam delay "
"polynomials have been applied in packets/sec"
),
max_warning=0.001,
)
def invalidPstBeamPacketRate(self: PstBeam) -> float:
"""
Get current rate of packets where no valid PST beam delay polynomials have been applied in packets/sec.
:return: current rate of packets where no valid PST beam delay polynomials have been applied in packets/sec.
:rtype: float
""" # noqa: E501
return self.component_manager.invalid_pst_beam_packet_rate
@attribute(
dtype=float,
unit="Bytes per second",
display_unit="B/s",
doc="Current rate of writing to the disk.",
)
def dataRecordRate(self: PstBeam) -> float:
"""
Get current rate of writing to the disk.
:returns: use space on the disk that PST.BEAM is writing to, in bytes.
:rtype: float
"""
return self.component_manager.data_record_rate
@attribute(
dtype=int,
unit="Bytes",
display_unit="B",
doc="Number of bytes written during scan.",
)
def dataRecorded(self: PstBeam) -> int:
"""
Get number of bytes written during scan.
:returns: number of bytes written during scan.
:rtype: int
"""
return self.component_manager.data_recorded
@attribute(
dtype=float,
unit="Percentage",
display_unit="%",
doc="Current percentage of data received from CBF.",
)
def percentageDataReceived(self: PstBeam) -> int:
"""
Get current amount of CBF data received as a percentage of the total CBF data expected.
:returns: percentage of data received.
:rtype: float
"""
return self.component_manager.percentage_data_received
@attribute(
dtype=float,
unit="Percentage",
display_unit="%",
doc="Current percentage of data from CBF with valid station beam flags.",
)
def percentageValidStationBeam(self: PstBeam) -> int:
"""
Get current percentage of CBF data received with valid station beam flags.
:returns: percentage of data with valid station beam flags.
:rtype: float
"""
return self.component_manager.percentage_valid_station_beam
@attribute(
dtype=float,
unit="Percentage",
display_unit="%",
doc="Current percentage of data from CBF with valid station pst flags.",
)
def percentageValidPstBeam(self: PstBeam) -> int:
"""
Get current percentage of CBF data received with valid pst beam flags.
:returns: percentage of data with valid pst beam flags.
:rtype: float
"""
return self.component_manager.percentage_valid_pst_beam
@attribute(
dtype=float,
unit="Percentage",
display_unit="%",
doc="Current percentage of data from CBF with valid polarisation correction flags.",
)
def percentageValidPolarisationCorrection(self: PstBeam) -> int:
"""
Get current percentage of CBF data received with valid polarisation correction flags.
:returns: percentage of data with valid polarisation correction flags.
:rtype: float
"""
return self.component_manager.percentage_valid_polarisation_correction
@attribute(
dtype=str,
doc="The channel block configuration based on scan configuration.",
)
def channelBlockConfiguration(self: PstBeam) -> str:
"""
Get the channel block configuration.
This is a JSON serialised string of the channel block configuration
that is calculated during the `ConfigureScan` command. This
configuration has the number of channel blocks and a list of channel blocks,
within the channel block it has the following fields:
* ``destination_host`` - the IPv4 destination address for the packets in the channel block
* ``destination_port`` - destination port number for the packets in the channel block
* ``destination_mac`` - destination MAC address for the packets in the channel block
* ``start_pst_channel`` - The starting PST channel number
* ``num_channels`` - The number of PST channels ``num_channels``
* ``start_pst_frequency`` - the centre frequency, in Hz, of the first PST channel
The block of channel numbers in the range of from ``start_pst_channel``
through to ``start_pst_channel + num_channels - 1``, this can be thought
of as a range in the form of inclusive of the lower number and exclusive of
the higher number (e.g [1, 21) would be a range of 20 channels starting from
1 and ending at channel block 20 (inclusive).
Both the IPv4 and MAC address are provided so that the CBF doesn't have
to perform an Address Resolution Protocol (ARP) request.
.. code-block:: json
{
"num_channel_blocks": 2,
"channel_blocks": [
{
"destination_host": "10.10.0.1",
"destination_port": 20000,
"destination_mac": "01:23:45:ab:cd:ef",
"start_pst_channel": 0,
"num_channels": 12,
"start_pst_frequency": 49609375.0
},
{
"destination_host": "10.10.0.1",
"destination_port": 20001,
"destination_mac": "01:23:45:ab:cd:ef",
"start_pst_channel": 12,
"num_pst_channels": 10,
"start_pst_frequency": 49652778.0
]
}
:returns: the channel block configuration as a JSON string.
:rtype: str
"""
import json
return json.dumps(self.component_manager.channel_block_configuration)
@attribute(
dtype=float,
label="Utilisation",
unit="Percentage",
display_unit="%",
max_value=100,
min_value=0,
max_warning=50.0,
max_alarm=90.0,
doc="Percentage of the ring buffer elements that are full of data",
)
def ringBufferUtilisation(self: PstBeam) -> float:
"""
Get the percentage of the ring buffer elements that are full of data.
:returns: the percentage of the ring buffer elements that are full of data.
:rtype: float
"""
return self.component_manager.ring_buffer_utilisation
@attribute(
dtype=float,
unit="Gigabits per second",
display_unit="Gb/s",
doc="Expected rate of data to be received by PST Beam component.",
)
def expectedDataRecordRate(self: PstBeam) -> float:
"""
Get the expected rate of data to be received by PST Beam component.
:returns: the expected rate of data to be received by PST Beam component.
:rtype: float
"""
return self.component_manager.expected_data_record_rate
@attribute(
dtype=float,
label="Real Pol. A Mean",
)
def realPolAMeanFreqAvg(self: PstBeam) -> float:
"""Get the mean of the real data for pol A, averaged over all channels."""
return self.component_manager.real_pol_a_mean_freq_avg
@attribute(
dtype=float,
label="Real Pol. A Variance",
)
def realPolAVarianceFreqAvg(self: PstBeam) -> float:
"""Get the variance of the real data for pol A, averaged over all channels."""
return self.component_manager.real_pol_a_variance_freq_avg
@attribute(
dtype=int,
label="Real Pol. A Num. Clipped",
max_warning=1,
)
def realPolANumClippedSamples(self: PstBeam) -> int:
"""Get the num of clipped samples of the real data for pol A."""
return self.component_manager.real_pol_a_num_clipped_samples
@attribute(
dtype=float,
label="Imaginary Pol. A Mean",
)
def imagPolAMeanFreqAvg(self: PstBeam) -> float:
"""Get the mean of the imaginary data for pol A, averaged over all channels."""
return self.component_manager.imag_pol_a_mean_freq_avg
@attribute(
dtype=float,
label="Imaginary Pol. A Variance",
)
def imagPolAVarianceFreqAvg(self: PstBeam) -> float:
"""Get the variance of the imaginary data for pol A, averaged over all channels."""
return self.component_manager.imag_pol_a_variance_freq_avg
@attribute(
dtype=int,
label="Imaginary Pol. A Num. Clipped",
max_warning=1,
)
def imagPolANumClippedSamples(self: PstBeam) -> int:
"""Get the num of clipped samples of the imaginary data for pol A."""
return self.component_manager.imag_pol_a_num_clipped_samples
@attribute(
dtype=float,
label="Real Pol. A Mean (RFI excised)",
)
def realPolAMeanFreqAvgRfiExcised(self: PstBeam) -> float:
"""Get the mean of the real data for pol A, averaged over channels not flagged for RFI."""
return self.component_manager.real_pol_a_mean_freq_avg_rfi_excised
@attribute(
dtype=float,
label="Real Pol. A Variance (RFI excised)",
)
def realPolAVarianceFreqAvgRfiExcised(self: PstBeam) -> float:
"""Get the variance of the real data for pol A, averaged over channels not flagged for RFI."""
return self.component_manager.real_pol_a_variance_freq_avg_rfi_excised
@attribute(
dtype=int,
label="Real Pol. A Num. Clipped (RFI excised)",
max_warning=1,
)
def realPolANumClippedSamplesRfiExcised(self: PstBeam) -> int:
"""Get the num of clipped samples of the real data for pol A in channels not flagged for RFI."""
return self.component_manager.real_pol_a_num_clipped_samples_rfi_excised
@attribute(
dtype=float,
label="Imaginary Pol. A Mean (RFI excised)",
)
def imagPolAMeanFreqAvgRfiExcised(self: PstBeam) -> float:
"""Get the mean of the imaginary data for pol A, averaged over channels not flagged for RFI."""
return self.component_manager.imag_pol_a_mean_freq_avg_rfi_excised
@attribute(
dtype=float,
label="Imaginary Pol. A Variance (RFI excised)",
)
def imagPolAVarianceFreqAvgRfiExcised(self: PstBeam) -> float:
"""Get the variance of the imaginary data for pol A, averaged over channels not flagged for RFI."""
return self.component_manager.imag_pol_a_variance_freq_avg_rfi_excised
@attribute(
dtype=int,
label="Imaginary Pol. A Num. Clipped (RFI excised)",
max_warning=1,
)
def imagPolANumClippedSamplesRfiExcised(self: PstBeam) -> int:
"""Get the num of clipped samples of the imaginary data for pol A in channels not flagged for RFI."""
return self.component_manager.imag_pol_a_num_clipped_samples_rfi_excised
@attribute(
dtype=float,
label="Real Pol. B Mean",
)
def realPolBMeanFreqAvg(self: PstBeam) -> float:
"""Get the mean of the real data for pol B, averaged over all channels."""
return self.component_manager.real_pol_b_mean_freq_avg
@attribute(
dtype=float,
label="Real Pol. B Variance",
)
def realPolBVarianceFreqAvg(self: PstBeam) -> float:
"""Get the variance of the real data for pol B, averaged over all channels."""
return self.component_manager.real_pol_b_variance_freq_avg
@attribute(
dtype=int,
label="Real Pol. B Num. Clipped",
max_warning=1,
)
def realPolBNumClippedSamples(self: PstBeam) -> int:
"""Get the num of clipped samples of the real data for pol B."""
return self.component_manager.real_pol_b_num_clipped_samples
@attribute(
dtype=float,
label="Imaginary Pol. B Mean",
)
def imagPolBMeanFreqAvg(self: PstBeam) -> float:
"""Get the mean of the imaginary data for pol B, averaged over all channels."""
return self.component_manager.imag_pol_b_mean_freq_avg
@attribute(
dtype=float,
label="Imaginary Pol. B Variance",
)
def imagPolBVarianceFreqAvg(self: PstBeam) -> float:
"""Get the variance of the imaginary data for pol B, averaged over all channels."""
return self.component_manager.imag_pol_b_variance_freq_avg
@attribute(
dtype=int,
label="Imaginary Pol. B Num. Clipped",
max_warning=1,
)
def imagPolBNumClippedSamples(self: PstBeam) -> int:
"""Get the num of clipped samples of the imaginary data for pol B."""
return self.component_manager.imag_pol_b_num_clipped_samples
@attribute(
dtype=float,
label="Real Pol. B Mean (RFI excised)",
)
def realPolBMeanFreqAvgRfiExcised(self: PstBeam) -> float:
"""Get the mean of the real data for pol B, averaged over channels not flagged for RFI."""
return self.component_manager.real_pol_b_mean_freq_avg_rfi_excised
@attribute(
dtype=float,
label="Real Pol. B Variance (RFI excised)",
)
def realPolBVarianceFreqAvgRfiExcised(self: PstBeam) -> float:
"""Get the variance of the real data for pol B, averaged over channels not flagged for RFI."""
return self.component_manager.real_pol_b_variance_freq_avg_rfi_excised
@attribute(
dtype=int,
label="Real Pol. B Num. Clipped (RFI excised)",
max_warning=1,
)
def realPolBNumClippedSamplesRfiExcised(self: PstBeam) -> int:
"""Get the num of clipped samples of the real data for pol B in channels not flagged for RFI."""
return self.component_manager.real_pol_b_num_clipped_samples_rfi_excised
@attribute(
dtype=float,
label="Imaginary Pol. B Mean (RFI excised)",
)
def imagPolBMeanFreqAvgRfiExcised(self: PstBeam) -> float:
"""Get the mean of the imaginary data for pol B, averaged over channels not flagged for RFI."""
return self.component_manager.imag_pol_b_mean_freq_avg_rfi_excised
@attribute(
dtype=float,
label="Imaginary Pol. B Variance (RFI excised)",
)
def imagPolBVarianceFreqAvgRfiExcised(self: PstBeam) -> float:
"""Get the variance of the imaginary data for pol B, averaged over channels not flagged for RFI."""
return self.component_manager.imag_pol_b_variance_freq_avg_rfi_excised
@attribute(
dtype=int,
label="Imaginary Pol. B Num. Clipped (RFI excised)",
max_warning=1,
)
def imagPolBNumClippedSamplesRfiExcised(self: PstBeam) -> int:
"""Get the num of clipped samples of the imaginary data for pol B in channels not flagged for RFI."""
return self.component_manager.imag_pol_b_num_clipped_samples_rfi_excised
@attribute(
dtype=str, label="Current processing mode for PST, this is deprecated, use pstProcessingMode instead"
)
def processingMode(self: PstBeam) -> str:
"""
Get the current processing mode that PST is configured for.
This method is deprecated, use pstProcessingMode instead.
If the BEAM is not in a scan configured state, this will return "IDLE".
"""
current_processing_mode = self.component_manager.processing_mode
return current_processing_mode.name
@attribute(dtype=str, label="Current processing mode for PST")
def pstProcessingMode(self: PstBeam) -> str:
"""
Get the current processing mode that PST is configured for.
If the BEAM is not in a scan configured state, this will return "IDLE".
"""
current_processing_mode = self.component_manager.processing_mode
return current_processing_mode.name
@attribute(
dtype=float,
label="Pipeline Processing Time",
unit="seconds",
display_unit="s",
doc=("The number of seconds the pipeline used to perform signal processing."),
)
def dspProcessingTime(self: PstBeam) -> float:
"""Get processing_time."""
return cast(float, self.component_manager.processing_time)
@attribute(
dtype=float,
label="Pipeline Processing Time (%)",
unit="Percentage",
display_unit="%",
doc=(
"The percentage of time used to perform processing of the data "
"during the last reporting interval."
),
)
def dspProcessingTimePercent(self: PstBeam) -> float:
"""Get processing_time_percent."""
return cast(float, self.component_manager.processing_time_percent)
@attribute(
dtype=float,
label="Pipeline Data Time",
unit="seconds",
display_unit="s",
doc=("The number of seconds spanned by the data that the pipeline has processed."),
)
def dspDataTime(self: PstBeam) -> float:
"""Get data_time."""
return cast(float, self.component_manager.data_time)
@attribute(
dtype=float,
label="Pipeline Data Processed",
unit="bytes",
display_unit="B",
doc=("The number of input bytes processed by the pipeline."),
)
def dspBytesProcessed(self: PstBeam) -> int:
"""Get bytes_processed."""
return cast(int, self.component_manager.bytes_processed)
@attribute(
dtype=float,
label="Pipeline Bytes Processing Rate",
unit="bytes/second",
display_unit="B/s",
doc=("The data processing rate in bytes per second."),
)
def dspBytesProcessingRate(self: PstBeam) -> float:
"""Get bytes_processing_rate."""
return cast(float, self.component_manager.bytes_processing_rate)
@attribute(
dtype=float,
label="Pipeline Efficiency (Overall)",
doc=(
"The overall efficiency of the pipeline, where efficiency is defined "
"as (Pipeline Data Time)/(Pipeline Processing Time). A value greater than 1.0 "
"means that the pipeline processes data faster than it is received."
),
)
def dspOverallEfficiency(self: PstBeam) -> float:
"""Get overall_efficiency."""
return cast(float, self.component_manager.overall_efficiency)
@attribute(
dtype=float,
label="Pipeline Efficiency (Current)",
doc=("The efficiency of the pipeline during the last reporting interval."),
)
def dspEfficiency(self: PstBeam) -> float:
"""Get efficiency."""
return cast(float, self.component_manager.efficiency)
# --------
# Commands
# --------
@command(
dtype_in="DevString",
doc_in="JSON formatted string with the scan configuration.",
dtype_out="DevVarLongStringArray",
doc_out=(
"A tuple containing a return code and a string message "
"indicating status. The message is for information purpose "
"only."
),
)
@DebugIt()
def ConfigureScan(self: PstBeam, argin: str) -> Tuple[List[ResultCode], List[str]]:
"""
Configure the observing device parameters for the current scan.
:param argin: JSON formatted string with the scan configuration.
:return: A tuple containing a return code and a string message
indicating status. The message is for information purpose
only.
"""
handler = self.get_command_object("ConfigureScan")
result_code, message = handler(argin)
if result_code in [ResultCode.QUEUED, ResultCode.STARTED, ResultCode.OK]:
# store the configuration on command success
self._last_scan_configuration = argin
return ([result_code], [message])
@command(
dtype_out=("str",),
doc_out="Version strings",
)
@DebugIt()
def GetVersionInfo(self: PstBeam) -> List[str]:
"""
Return the version information of the device.
:return: The result code and the command unique ID
"""
return [f"{self.__class__.__name__}, {self._build_state}"]
@attribute(dtype=HealthState) # type: ignore[misc] # "Untyped decorator makes function untyped"
def recvHealthState(self: PstBeam) -> HealthState:
"""
Get the current health state for the RECV subcomponent.
:return: the current health state for the RECV subcomponent.
:rtype: HealthState
"""
return self.component_manager.recv_health_state
@attribute(dtype=ObsState) # type: ignore[misc] # "Untyped decorator makes function untyped"
def recvObsState(self: PstBeam) -> ObsState:
"""
Get the current observing state for the RECV subcomponent.
:return: the current observing state for the RECV subcomponent.
:rtype: ObsState
"""
return self.component_manager.recv_obs_state
@attribute(dtype=HealthState) # type: ignore[misc] # "Untyped decorator makes function untyped"
def smrbHealthState(self: PstBeam) -> HealthState:
"""
Get the current health state for the SMRB subcomponent.
:return: the current health state for the SMRB subcomponent.
:rtype: HealthState
"""
return self.component_manager.smrb_health_state
@attribute(dtype=ObsState) # type: ignore[misc] # "Untyped decorator makes function untyped"
def smrbObsState(self: PstBeam) -> ObsState:
"""
Get the current observing state for the SMRB subcomponent.
:return: the current observing state for the SMRB subcomponent.
:rtype: ObsState
"""
return self.component_manager.smrb_obs_state
@attribute(dtype=HealthState) # type: ignore[misc] # "Untyped decorator makes function untyped"
def statHealthState(self: PstBeam) -> HealthState:
"""
Get the current health state for the STAT subcomponent.
:return: the current health state for the STAT subcomponent.
:rtype: HealthState
"""
return self.component_manager.stat_health_state
@attribute(dtype=ObsState) # type: ignore[misc] # "Untyped decorator makes function untyped"
def statObsState(self: PstBeam) -> ObsState:
"""
Get the current observing state for the STAT subcomponent.
:return: the current observing state for the STAT subcomponent.
:rtype: ObsState
"""
return self.component_manager.stat_obs_state
@attribute(dtype=HealthState) # type: ignore[misc] # "Untyped decorator makes function untyped"
def dspHealthState(self: PstBeam) -> HealthState:
"""
Get the current health state for the DSP subcomponent.
:return: the current health state for the DSP subcomponent.
:rtype: HealthState
"""
return self.component_manager.dsp_health_state
@attribute(dtype=ObsState) # type: ignore[misc] # "Untyped decorator makes function untyped"
def dspObsState(self: PstBeam) -> ObsState:
"""
Get the current observing state for the DSP subcomponent.
:return: the current observing state for the DSP subcomponent.
:rtype: ObsState
"""
return self.component_manager.dsp_obs_state
# ----------
# Run server
# ----------
def main(args: Optional[list] = None, **kwargs: Any) -> int:
"""
Entry point for module.
:param args: positional arguments
:param kwargs: named arguments
:return: exit code
:rtype: int
"""
return run((PstBeam,), args=args, **kwargs)
if __name__ == "__main__":
main()