Source code for ska_pst.lmc.receive.receive_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides an implementation of the RECV PST component manager."""

from __future__ import annotations

import logging
from functools import cached_property
from typing import Any, Callable, List

from overrides import override
from ska_pst.lmc.component import PstProcessApiSubcomponentManager

from .receive_grpc_api_strategy import PstReceiveGrpcApiStrategy
from .receive_model import ReceiveData, ReceiveDataStore
from .receive_simulator import PstReceiveSimulator
from .receive_util import calculate_receive_subband_resources


[docs]class PstReceiveComponentManager( PstProcessApiSubcomponentManager[ReceiveData, ReceiveData, ReceiveDataStore] ): """Component manager for the RECV component for the PST.LMC subsystem.""" def __init__( self: PstReceiveComponentManager, *, device_name: str, process_api_endpoint: str, subband_resources_callback: Callable[[dict], None], logger: logging.Logger | None = None, **kwargs: Any, ): """ Initialise instance of the component manager. :param device_interface: an abstract interface of the TANGO device. :type device_interface: PstApiDeviceInterface[ReceiveData] :param api: an API object used to delegate functionality to. :type api: `PstProcessApi` :param logger: a logger for this object to use :type logger: `logging.Logger` """ logger = logger or logging.getLogger(__name__) logger.debug( f"Setting up RECV component manager with device_name='{device_name}'" + f"and api_endpoint='{process_api_endpoint}'" ) self._subband_resources_callback = subband_resources_callback super().__init__( device_name=device_name, subcomponent_name="recv", process_api_endpoint=process_api_endpoint, simulator=PstReceiveSimulator(), grpc_strategy=PstReceiveGrpcApiStrategy(), logger=logger, data_store=ReceiveDataStore(), **kwargs, ) self._subband_beam_configuration: dict = {} @cached_property def _api_env(self: PstReceiveComponentManager) -> dict: """Get the environment from the core app via its API.""" return self._api.get_env() @property def data_host(self: PstReceiveComponentManager) -> str: """ Get the IP address of the NIC used for receiving data during a scan. :return: the IP address of the NIC used for receiving data during a scan. :rtype: str """ return self._api_env["data_host"] @property def data_mac(self: PstReceiveComponentManager) -> str: """ Get the MAC address corresponding to the data_host. :return: the MAC address corresponding to the data_host. :rtype: str """ return self._api_env["data_mac"] @property def subband_udp_ports(self: PstReceiveComponentManager) -> List[int]: """ Get the port numbers used by all the subbands for receiving data during a scan. :return: the port numbers used by all the subbands for receiving data during a scan. :rtype: List[int] """ return [self._api_env["data_port"]] @property def subband_beam_configuration(self: PstReceiveComponentManager) -> dict: """ Get the current subband beam configuration. This is the current subband beam configuration that is calculated during the `configure_beam`. :return: the current subband beam configuration. :rtype: dict """ return self._subband_beam_configuration @subband_beam_configuration.setter def subband_beam_configuration(self: PstReceiveComponentManager, config: dict) -> None: self._subband_beam_configuration = config self._subband_resources_callback(config) @override def validate_configure_scan(self: PstReceiveComponentManager, configuration: dict) -> None: """ Validate a ConfigureScan request sent from CSP.LMC to for the RECV sub-component. If this command fails it will ensure then a call to either ConfigureBeam or ConfigureScan would have failed leaving the BEAM in an invalid state. :param configuration: configuration that would be used when the configure_beam and configure_scan methods are called. :type configuration: dict """ pst_processing_mode = configuration["pst_processing_mode"] recv_resources = calculate_receive_subband_resources( beam_id=self.beam_id, data_host=self.data_host, data_mac=self.data_mac, subband_udp_ports=self.subband_udp_ports, **configuration, ) subband_resources = { "common": recv_resources["common"], "subband": recv_resources["subbands"][1], } self._api.validate_configure_beam( configuration=subband_resources, pst_processing_mode=pst_processing_mode ) self._api.validate_configure_scan(configuration=configuration) @override def _configure_beam(self: PstReceiveComponentManager, configuration: dict) -> None: """ Configure beam resources in the component. :param configuration: configuration for beam :type configuration: dict """ # enforce clearing the cache of API env. Send _api_env is a @cached_property # all we need to do is delete the property if it exists. try: del self._api_env except AttributeError: # ignore the deletion of api env if it was not set. pass pst_processing_mode = configuration["pst_processing_mode"] # deal only with subband 1 for now. otherwise we have to deal with tracking # multiple long running tasks. recv_resources = calculate_receive_subband_resources( beam_id=self.beam_id, data_host=self.data_host, data_mac=self.data_mac, subband_udp_ports=self.subband_udp_ports, **configuration, ) self.logger.debug(f"Submitting API with recv_resources={recv_resources}") subband_resources = { "common": recv_resources["common"], "subband": recv_resources["subbands"][1], } self._api.configure_beam(configuration=subband_resources, pst_processing_mode=pst_processing_mode) self.subband_beam_configuration = recv_resources @override def deconfigure_beam(self: PstReceiveComponentManager) -> None: """ Deconfigure the RECV component's beam configuration. :param task_callback: callback for background processing to update device status. :type task_callback: Callback """ super().deconfigure_beam() self.subband_beam_configuration = {}