Source code for ska_pst_lmc.receive.receive_device

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST LMC project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the RECV.MGMT capability for the Pulsar Timing Sub-element."""

from __future__ import annotations

import dataclasses
from typing import Any, List, Optional

import tango
from ska_tango_base.control_model import SimulationMode
from tango import DebugIt
from tango.server import attribute, command, device_property, run

import ska_pst_lmc.release as release
from ska_pst_lmc.component import as_device_attribute_name
from ska_pst_lmc.component.pst_device import PstBaseProcessDevice
from ska_pst_lmc.receive.receive_component_manager import PstReceiveComponentManager
from ska_pst_lmc.receive.receive_model import ReceiveData

__all__ = ["PstReceive", "main"]


[docs]class PstReceive(PstBaseProcessDevice[PstReceiveComponentManager, ReceiveData]): """A software TANGO device for managing the RECV component of the PST.LMC system.""" # ----------------- # Device Properties # ----------------- process_api_endpoint = device_property(dtype=str, doc="Endpoint for the RECV.CORE service.") subband_udp_ports = device_property( dtype=(int,), default_value=[20000], doc="The UDP ports for RECV subbands to listen on." ) # --------------- # General methods # ---------------
[docs] def init_device(self: PstReceive) -> None: """ Initialise the attributes and properties of the PstReceive. This overrides the :py:class:`SKABaseDevice`. """ util = tango.Util.instance() util.set_serial_model(tango.SerialModel.NO_SYNC) super().init_device() self._build_state = "{}, {}, {}".format(release.NAME, release.VERSION, release.DESCRIPTION) self._version_id = release.VERSION for f in dataclasses.fields(ReceiveData): attr_name = as_device_attribute_name(f.name) self.set_change_event(attr_name, True, False) self.set_archive_event(attr_name, True, False) self.set_change_event("subbandBeamConfiguration", True, False) self.set_archive_event("subbandBeamConfiguration", True, False)
[docs] def create_component_manager( self: PstReceive, ) -> PstReceiveComponentManager: """ Create and return a component manager for this device. :return: a component manager for this device. """ return PstReceiveComponentManager( device_interface=self, simulation_mode=SimulationMode.TRUE, logger=self.logger, subband_udp_ports=self.subband_udp_ports, )
[docs] def always_executed_hook(self: PstReceive) -> None: """Execute call before any TANGO command is executed."""
[docs] def delete_device(self: PstReceive) -> None: """ Delete resources allocated in init_device. This method allows for any memory or other resources allocated in the init_device method to be released. This method is called by the device destructor and by the device Init command. """
[docs] def handle_monitor_data_update(self: PstReceive, monitor_data: ReceiveData) -> None: """ Handle monitoring data. :param monitor_data: the latest monitoring data that has been reported. :type monitor_data: ReceiveData """ for (key, value) in dataclasses.asdict(monitor_data).items(): self.handle_attribute_value_update(key, value)
# ---------- # Attributes # ---------- @attribute( dtype=float, unit="Gigabits per second", standard_unit="Gigabits per second", display_unit="Gb/s", max_value=200, min_value=0, doc="Current data receive rate from the CBF interface", ) def dataReceiveRate(self: PstReceive) -> float: """ Get the current data receive rate from the CBF interface. :returns: current data receive rate from the CBF interface in Gb/s. :rtype: float """ return self.component_manager.data_receive_rate @attribute( dtype=int, unit="Bytes", standard_unit="Bytes", display_unit="B", doc="Total number of bytes received from the CBF in the current scan", ) def dataReceived(self: PstReceive) -> int: """ Get the total amount of data received from CBF interface for current scan. :returns: total amount of data received from CBF interface for current scan in Bytes :rtype: int """ return self.component_manager.data_received @attribute( dtype=float, label="Drop Rate", unit="Bytes per second", standard_unit="Bytes per second", display_unit="B/s", max_value=200, min_value=-1, max_alarm=10, min_alarm=-1, max_warning=1, min_warning=-1, doc="Current rate of CBF ingest data being dropped or lost by the receiving process", ) def dataDropRate(self: PstReceive) -> float: """ Get the current rate of CBF ingest data being dropped or lost by the receiving proces. :returns: current rate of CBF ingest data being dropped or lost in B/s. :rtype: float """ return self.component_manager.data_drop_rate @attribute( dtype=int, label="Dropped", unit="Bytes", standard_unit="Bytes", display_unit="B", doc="Total number of bytes dropped in the current scan", ) def dataDropped(self: PstReceive) -> int: """ Get the total number of bytes dropped in the current scan. :returns: total number of bytes dropped in the current scan. :rtype: int """ return self.component_manager.data_dropped @attribute( dtype=float, label="Misordered packets", doc=( "Number of out of order UDP packets received in the current scan." "The UDP packets for all frequency channels of a given set of" "time samples that start at time t0 shall arrive before the" "first packet containing data sampled at time t0+2 delta_t," "where delta_t is the time spanned by the set of time samples" "in a single packet." ), ) def misorderedPackets(self: PstReceive) -> int: """ Get the total number of packets received out of order in the current scan. :returns: total number of packets received out of order in the current scan. :rtype: int """ return self.component_manager.misordered_packets @attribute( dtype=float, label="Misordered packet rate", unit="packets/sec", doc="The current rate of misordered packets.", ) def misorderedPacketRate(self: PstReceive) -> float: """ Get the current rate of misordered packets. :returns: the current rate of misordered packets in packets/seconds. :rtype: float """ return self.component_manager.misordered_packet_rate @attribute( dtype=int, label="Malformed packets", doc=( "Malformed packets are valid UDP packets, but where contents of" "the UDP payload does not conform to the specification in the" "CBF/PST ICD. Examples of malformation include: bad magic-word" "field, invalid meta-data, incorrect packet size." ), ) def malformedPackets(self: PstReceive) -> int: """ Get the total number of packets marked as malformed for current scan. :returns: the total number of packets marked as malformed for current scan. :rtype: int """ return self.component_manager.malformed_packets @attribute( dtype=float, label="Malformed packet rate", unit="packets/sec", doc="The current rate of malformed packets.", ) def malformedPacketRate(self: PstReceive) -> float: """ Get current rate of malformed packets. :return: current rate of malformed packets in packets/seconds. :rtype: float """ return self.component_manager.malformed_packet_rate @attribute( dtype=int, label="Misdirected packets", doc=( "Total number of (valid) UDP packets that were unexpectedly received." "Misdirection could be due to wrong ScanID, Beam ID, Network Interface" "or UDP port. Receiving misdirected packets is a sign that there is" "something wrong with the upstream configuration for the scan." ), ) def misdirectedPackets(self: PstReceive) -> int: """ Get the total number of packets as marked as misdirected for current scan. :returns: the total number of packets as marked as misdirected for current scan. :rtype: int """ return self.component_manager.misdirected_packets @attribute( dtype=float, label="Misdirected packet rate", unit="packets/sec", doc="The current rate of misdirected packets.", ) def misdirectedPacketRate(self: PstReceive) -> float: """ Get the current rate of misdirected packets. :return: the current rate of misdirected packets in packets/seconds. :rtype: float """ return self.component_manager.misdirected_packet_rate @attribute( dtype=int, label="Checksum failure packets", doc="Total number of packets with a UDP, IP header or CRC checksum failure.", ) def checksumFailurePackets(self: PstReceive) -> int: """ Get the total number of packets with checksum failures for current scan. :return: the total number of packets with checksum failures for current scan. :rtype: int """ return self.component_manager.checksum_failure_packets @attribute( dtype=float, label="Checksum failure packet rate", unit="packets/sec", doc="The current rate of packets with checkesum failures.", ) def checksumFailurePacketRate(self: PstReceive) -> float: """ Get the current rate of packets with checkesum failures. :return: the current rate of packets with checkesum failures in packets/seconds. :rtype: float """ return self.component_manager.checksum_failure_packet_rate @attribute( dtype=int, label="Timestamp sync error packets", doc=( "The number of packets received where the timestamp has become" "desynchronised with the packet sequence number * sampling interval" ), ) def timestampSyncErrorPackets(self: PstReceive) -> int: """ Get the total number of packets with a timestamp sync error for current scan. :return: the total number of packets with a timestamp sync error for current scan. :rtype: int """ return self.component_manager.timestamp_sync_error_packets @attribute( dtype=float, label="Timestamp sync error packet rate", unit="packets/sec", doc="The current rate of packets with a timestamp sync error.", ) def timestampSyncErrorPacketRate(self: PstReceive) -> float: """ Get the current rate of packets with a timestamp sync error. :return: the current rate of packets with a timestamp sync error in packets/seconds. :rtype: float """ return self.component_manager.timestamp_sync_error_packet_rate @attribute( dtype=int, label="Seq. number sync error packets", doc=( "The number of packets received where the packet sequence number has" "become desynchronised with the data rate and elapsed time." ), ) def seqNumberSyncErrorPackets(self: PstReceive) -> int: """ Get the total number of packets with a seq num sync error in current scan. :return: the total number of packets with a seq num sync error in current scan. :rtype: int """ return self.component_manager.seq_number_sync_error_packets @attribute( dtype=float, label="Seq. number sync error packet rate", unit="packets/sec", doc="The current rate of packets with a sequence number sync error.", ) def seqNumberSyncErrorPacketRate(self: PstReceive) -> float: """ Get the current rate of packets with a sequence number sync error. :return: the current rate of packets with a sequence number sync error in packets/seconds. :rtype: float """ return self.component_manager.seq_number_sync_error_packet_rate @attribute( dtype=str, label="Data receive IP address.", doc="The IP address that PST RECV is listening for data.", ) def dataReceiveIpAddress(self: PstReceive) -> str: """ Get the data receive IP address. It is only valid to call this method when the TANGO device is turned on and communicating. """ return self.component_manager.data_host @attribute( dtype=str, label="The current subband beam configuration.", doc="Current calculated subband beam configuration.", ) def subbandBeamConfiguration(self: PstReceive) -> str: """ Get current subband beam configuration. Retrieves the current subband configuration that is calculated during the `ConfigureBeam` request. When RECV is deconfigured for beam then the response is an empty JSON object `{}`. :return: current subband beam configuration. :rtype: str """ import json return json.dumps(self.component_manager.subband_beam_configuration) # -------- # Commands # -------- @command( dtype_out=("str",), doc_out="Version strings", ) @DebugIt() def GetVersionInfo(self: PstReceive) -> List[str]: """ Return the version information of the device. :return: The result code and the command unique ID """ return [f"{self.__class__.__name__}, {self._build_state}"]
# ---------- # Run server # ----------
[docs]def main(args: Optional[list] = None, **kwargs: Any) -> int: """ Entry point for module. :param args: positional arguments :param kwargs: named arguments :return: exit code :rtype: int """ return run((PstReceive,), args=args, **kwargs)
if __name__ == "__main__": main()