Source code for ska_pst.lmc.receive.receive_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides an implementation of the RECV PST component manager."""

from __future__ import annotations

import logging
from functools import cache
from typing import Any, Callable, List, Optional

from overrides import override
from ska_pst.lmc.component import PstProcessApiSubcomponentManager
from ska_pst.lmc.receive.receive_model import ReceiveData, ReceiveDataStore
from ska_pst.lmc.receive.receive_process_api import (
    PstReceiveProcessApi,
    PstReceiveProcessApiGrpc,
    PstReceiveProcessApiSimulator,
)
from ska_pst.lmc.receive.receive_util import calculate_receive_subband_resources


[docs]class PstReceiveComponentManager( PstProcessApiSubcomponentManager[ReceiveData, ReceiveData, PstReceiveProcessApi, ReceiveDataStore] ): """Component manager for the RECV component for the PST.LMC subsystem.""" def __init__( self: PstReceiveComponentManager, *, device_name: str, process_api_endpoint: str, subband_resources_callback: Callable[[dict], None], api: PstReceiveProcessApi | None = None, logger: logging.Logger | None = None, **kwargs: Any, ): """ Initialise instance of the component manager. :param device_interface: an abstract interface of the TANGO device. :type device_interface: PstApiDeviceInterface[ReceiveData] :param api: an API object used to delegate functionality to. :type api: `PstProcessApi` :param logger: a logger for this object to use :type logger: `logging.Logger` """ logger = logger or logging.getLogger(__name__) logger.debug( f"Setting up RECV component manager with device_name='{device_name}'" + f"and api_endpoint='{process_api_endpoint}'" ) api = api or PstReceiveProcessApiSimulator( logger=logger, ) self._subband_udp_ports: List[int] = [] self._subband_resources_callback = subband_resources_callback self._data_host: Optional[str] = None self._data_mac: Optional[str] = None super().__init__( device_name=device_name, subcomponent_name="recv", process_api_endpoint=process_api_endpoint, api=api, logger=logger, data_store=ReceiveDataStore(), **kwargs, ) self._subband_beam_configuration: dict = {} @override def _simulator_api(self: PstReceiveComponentManager) -> PstReceiveProcessApi: """Get instance of the simulator API.""" self.logger.debug("RECV component manager setting up simulated API") return PstReceiveProcessApiSimulator( logger=self.logger, ) @override def _grpc_api(self: PstReceiveComponentManager) -> PstReceiveProcessApi: """Get instance of a gRPC API.""" self.logger.debug("RECV component manager setting up gRPC API") return PstReceiveProcessApiGrpc( client_id=self.subcomponent_id, grpc_endpoint=self.process_api_endpoint, logger=self.logger, ) @property def data_receive_rate(self: PstReceiveComponentManager) -> float: """ Get the current data receive rate from the CBF interface. :returns: current data receive rate from the CBF interface in Gb/s. :rtype: float """ return self.monitor_data.data_receive_rate @property def data_received(self: PstReceiveComponentManager) -> int: """ Get the total amount of data received from CBF interface for current scan. :returns: total amount of data received from CBF interface for current scan in Bytes :rtype: int """ return self.monitor_data.data_received @property def data_drop_rate(self: PstReceiveComponentManager) -> float: """ Get the current rate of CBF ingest data being dropped or lost by the receiving process. :returns: current rate of CBF ingest data being dropped or lost in MB/s. :rtype: float """ return self.monitor_data.data_drop_rate @property def data_dropped(self: PstReceiveComponentManager) -> int: """ Get the total number of bytes dropped in the current scan. :returns: total number of bytes dropped in the current scan in Bytes. :rtype: int """ return self.monitor_data.data_dropped @property def misordered_packets(self: PstReceiveComponentManager) -> int: """ Get the total number of packets received out of order in the current scan. :returns: total number of packets received out of order in the current scan. :rtype: int """ return self.monitor_data.misordered_packets @property def misordered_packet_rate(self: PstReceiveComponentManager) -> float: """ Get the rate of packets that are received out of order in packets/sec. :returns: the rate of packets that are received out of order in packets/sec. :rtype: float """ return self.monitor_data.misordered_packet_rate @property def malformed_packets(self: PstReceiveComponentManager) -> int: """ Get the total number of packets marked as malformed for the current scan. Malformed packets are valid UDP packets, but where contents of the UDP payload does not conform to the specification in the CBF/PST ICD. Examples of malformation include: bad magic-word field, invalid meta-data, incorrect packet size. :return: total number of packets marked as malformed for the current scan. :rtype: int """ return self.monitor_data.malformed_packets @property def malformed_packet_rate(self: PstReceiveComponentManager) -> float: """ Get the current rate of malformed packets in packets/sec. :return: the current rate of malformed packets in packets/seconds. :rtype: float """ return self.monitor_data.malformed_packet_rate @property def misdirected_packets(self: PstReceiveComponentManager) -> int: """ Get the total of misdirected packets received during current scan. Total number of (valid) UDP packets that were unexpectedly received. Misdirection could be due to wrong ScanID, Beam ID, Network Interface or UDP port. Receiving misdirected packets is a sign that there is something wrong with the upstream configuration for the scan. :return: the total of misdirected packets received during current scan. :rtype: int """ return self.monitor_data.misdirected_packets @property def misdirected_packet_rate(self: PstReceiveComponentManager) -> float: """ Get the current rate of misdirected packets in packets/sec. :return: the current rate of misdirected packets in packets/seconds. :rtype: float """ return self.monitor_data.misdirected_packet_rate @property def checksum_failure_packets(self: PstReceiveComponentManager) -> int: """ Get the total number of packets with a checksum failure in current scan. Total number of packets with a UDP, IP header or CRC checksum failure. :return: the total number of packets with a checksum failure in current scan. :rtype: int """ return self.monitor_data.checksum_failure_packets @property def checksum_failure_packet_rate(self: PstReceiveComponentManager) -> float: """ Get the current rate of packets with a checksum failure in packets/sec. :return: the current rate of packets with a checksum failure in packets/seconds. :rtype: float """ return self.monitor_data.checksum_failure_packet_rate @property def timestamp_sync_error_packets(self: PstReceiveComponentManager) -> int: """ Get the total number of packets with a timestamp sync error for current scan. The number of packets received where the timestamp has become desynchronised with the packet sequence number * sampling interval :return: the total number of packets with a timestamp sync error for current scan. :rtype: int """ return self.monitor_data.timestamp_sync_error_packets @property def timestamp_sync_error_packet_rate(self: PstReceiveComponentManager) -> float: """ Get the current rate of packets marked as having a timestamp sync error in packets/sec. :return: the current rate of packets marked as having a timestamp sync error in packets/seconds. :rtype: float """ return self.monitor_data.timestamp_sync_error_packet_rate @property def seq_number_sync_error_packets(self: PstReceiveComponentManager) -> int: """ Get total number of packets with a sequence number sync error for current scan. The number of packets received where the packet sequence number has become desynchronised with the data rate and elapsed time. :return: total number of packets with a sequence number sync error for current scan. :rtype: int """ return self.monitor_data.seq_number_sync_error_packets @property def seq_number_sync_error_packet_rate(self: PstReceiveComponentManager) -> float: """ Get current rate of packets marked as having a sequence number sync error in packets/sec. :return: current rate of packets marked as having a sequence number sync error in packets/seconds. :rtype: float """ return self.monitor_data.seq_number_sync_error_packet_rate @property def no_valid_polarisation_correction_packets(self: PstReceiveComponentManager) -> int: """ Get the number of packets received where no valid Jones polarisation corrections have been applied. :return: the number of packets received where no valid Jones polarisation corrections have been applied. :rtype: int """ # noqa: E501 return self.monitor_data.no_valid_polarisation_correction_packets @property def no_valid_polarisation_correction_packet_rate(self: PstReceiveComponentManager) -> float: """ Get rate of packets where no valid Jones polarisation corrections have been applied in packets/sec. :return: rate of packets where no valid Jones polarisation corrections have been applied in packets/sec. :rtype: float """ # noqa: E501 return self.monitor_data.no_valid_polarisation_correction_packet_rate @property def no_valid_station_beam_packets(self: PstReceiveComponentManager) -> int: """ Get the number of packets received where no valid station beam delay polynomials have been applied. :return: the number of packets received where no valid station beam delay polynomials have been applied. :rtype: int """ # noqa: E501 return self.monitor_data.no_valid_station_beam_packets @property def no_valid_station_beam_packet_rate(self: PstReceiveComponentManager) -> float: """ Get current rate of packets where no valid station beam delay polynomials have been applied in packets/sec. :return: current rate of packets where no valid station beam delay polynomials have been applied in packets/sec. :rtype: float """ # noqa: E501 return self.monitor_data.no_valid_station_beam_packet_rate @property def no_valid_pst_beam_packets(self: PstReceiveComponentManager) -> int: """ Get the number of packets received where no valid PST beam delay polynomials have been applied. :return: the number of packets received where no valid PST beam delay polynomials have been applied. :rtype: int """ # noqa: E501 return self.monitor_data.no_valid_pst_beam_packets @property def no_valid_pst_beam_packet_rate(self: PstReceiveComponentManager) -> float: """ Get current rate of packets where no valid PST beam delay polynomials have been applied in packets/sec. :return: current rate of packets where no valid PST beam delay polynomials have been applied in packets/sec. :rtype: float """ # noqa: E501 return self.monitor_data.no_valid_pst_beam_packet_rate @cache def _get_env(self: PstReceiveComponentManager) -> dict: return self._api.get_env() @property def data_host(self: PstReceiveComponentManager) -> str: """ Get the IP address of the NIC used for receiving data during a scan. :return: the IP address of the NIC used for receiving data during a scan. :rtype: str """ if self._data_host is None: self._data_host = self._get_env()["data_host"] return self._data_host @property def data_mac(self: PstReceiveComponentManager) -> str: """ Get the MAC address corresponding to the data_host. :return: the MAC address corresponding to the data_host. :rtype: str """ if self._data_mac is None: self._data_mac = self._get_env()["data_mac"] return self._data_mac @property def subband_udp_ports(self: PstReceiveComponentManager) -> List[int]: """ Get the port numbers used by all the subbands for receiving data during a scan. :return: the port numbers used by all the subbands for receiving data during a scan. :rtype: str """ if not self._subband_udp_ports: self._subband_udp_ports = [self._get_env()["data_port"]] return self._subband_udp_ports @property def subband_beam_configuration(self: PstReceiveComponentManager) -> dict: """ Get the current subband beam configuration. This is the current subband beam configuration that is calculated during the `configure_beam`. :return: the current subband beam configuration. :rtype: dict """ return self._subband_beam_configuration @subband_beam_configuration.setter def subband_beam_configuration(self: PstReceiveComponentManager, config: dict) -> None: self._subband_beam_configuration = config self._subband_resources_callback(config) @override def validate_configure_scan(self: PstReceiveComponentManager, configuration: dict) -> None: """ Validate a ConfigureScan request sent from CSP.LMC to for the RECV sub-component. If this command fails it will ensure then a call to either ConfigureBeam or ConfigureScan would have failed leaving the BEAM in an invalid state. :param configuration: configuration that would be used when the configure_beam and configure_scan methods are called. :type configuration: dict """ recv_resources = calculate_receive_subband_resources( beam_id=self.beam_id, data_host=self.data_host, data_mac=self.data_mac, subband_udp_ports=self.subband_udp_ports, **configuration, ) subband_resources = { "common": recv_resources["common"], "subband": recv_resources["subbands"][1], } self._api.validate_configure_beam(configuration=subband_resources) self._api.validate_configure_scan(configuration=configuration) @override def _configure_beam(self: PstReceiveComponentManager, configuration: dict) -> None: """ Configure beam resources in the component. :param configuration: configuration for beam :type configuration: dict """ # deal only with subband 1 for now. otherwise we have to deal with tracking # multiple long running tasks. recv_resources = calculate_receive_subband_resources( beam_id=self.beam_id, data_host=self.data_host, data_mac=self.data_mac, subband_udp_ports=self.subband_udp_ports, **configuration, ) self.logger.debug(f"Submitting API with recv_resources={recv_resources}") subband_resources = { "common": recv_resources["common"], "subband": recv_resources["subbands"][1], } self._api.configure_beam(configuration=subband_resources) self.subband_beam_configuration = recv_resources @override def deconfigure_beam(self: PstReceiveComponentManager) -> None: """ Deconfigure the RECV component's beam configuration. :param task_callback: callback for background processing to update device status. :type task_callback: Callback """ super().deconfigure_beam() self.subband_beam_configuration = {}