# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides an implementation of the RECV PST component manager."""
from __future__ import annotations
import logging
from functools import cached_property
from typing import Any, Callable, List
from overrides import override
from ska_pst.lmc.component import PstProcessApiSubcomponentManager
from .receive_grpc_api_strategy import PstReceiveGrpcApiStrategy
from .receive_model import ReceiveData, ReceiveDataStore
from .receive_simulator import PstReceiveSimulator
from .receive_util import calculate_receive_subband_resources
[docs]class PstReceiveComponentManager(
PstProcessApiSubcomponentManager[ReceiveData, ReceiveData, ReceiveDataStore]
):
"""Component manager for the RECV component for the PST.LMC subsystem."""
def __init__(
self: PstReceiveComponentManager,
*,
device_name: str,
process_api_endpoint: str,
subband_resources_callback: Callable[[dict], None],
logger: logging.Logger | None = None,
**kwargs: Any,
):
"""
Initialise instance of the component manager.
:param device_interface: an abstract interface of the TANGO device.
:type device_interface: PstApiDeviceInterface[ReceiveData]
:param api: an API object used to delegate functionality to.
:type api: `PstProcessApi`
:param logger: a logger for this object to use
:type logger: `logging.Logger`
"""
logger = logger or logging.getLogger(__name__)
logger.debug(
f"Setting up RECV component manager with device_name='{device_name}'"
+ f"and api_endpoint='{process_api_endpoint}'"
)
self._subband_resources_callback = subband_resources_callback
super().__init__(
device_name=device_name,
subcomponent_name="recv",
process_api_endpoint=process_api_endpoint,
simulator=PstReceiveSimulator(),
grpc_strategy=PstReceiveGrpcApiStrategy(),
logger=logger,
data_store=ReceiveDataStore(),
**kwargs,
)
self._subband_beam_configuration: dict = {}
@cached_property
def _api_env(self: PstReceiveComponentManager) -> dict:
"""Get the environment from the core app via its API."""
return self._api.get_env()
@property
def data_host(self: PstReceiveComponentManager) -> str:
"""
Get the IP address of the NIC used for receiving data during a scan.
:return: the IP address of the NIC used for receiving data during a scan.
:rtype: str
"""
return self._api_env["data_host"]
@property
def data_mac(self: PstReceiveComponentManager) -> str:
"""
Get the MAC address corresponding to the data_host.
:return: the MAC address corresponding to the data_host.
:rtype: str
"""
return self._api_env["data_mac"]
@property
def subband_udp_ports(self: PstReceiveComponentManager) -> List[int]:
"""
Get the port numbers used by all the subbands for receiving data during a scan.
:return: the port numbers used by all the subbands for receiving data during a scan.
:rtype: List[int]
"""
return [self._api_env["data_port"]]
@property
def subband_beam_configuration(self: PstReceiveComponentManager) -> dict:
"""
Get the current subband beam configuration.
This is the current subband beam configuration that is calculated during the
`configure_beam`.
:return: the current subband beam configuration.
:rtype: dict
"""
return self._subband_beam_configuration
@subband_beam_configuration.setter
def subband_beam_configuration(self: PstReceiveComponentManager, config: dict) -> None:
self._subband_beam_configuration = config
self._subband_resources_callback(config)
@override
def validate_configure_scan(self: PstReceiveComponentManager, configuration: dict) -> None:
"""
Validate a ConfigureScan request sent from CSP.LMC to for the RECV sub-component.
If this command fails it will ensure then a call to either ConfigureBeam or ConfigureScan would have
failed leaving the BEAM in an invalid state.
:param configuration: configuration that would be used when the configure_beam and configure_scan
methods are called.
:type configuration: dict
"""
pst_processing_mode = configuration["pst_processing_mode"]
recv_resources = calculate_receive_subband_resources(
beam_id=self.beam_id,
data_host=self.data_host,
data_mac=self.data_mac,
subband_udp_ports=self.subband_udp_ports,
**configuration,
)
subband_resources = {
"common": recv_resources["common"],
"subband": recv_resources["subbands"][1],
}
self._api.validate_configure_beam(
configuration=subband_resources, pst_processing_mode=pst_processing_mode
)
self._api.validate_configure_scan(configuration=configuration)
@override
def _configure_beam(self: PstReceiveComponentManager, configuration: dict) -> None:
"""
Configure beam resources in the component.
:param configuration: configuration for beam
:type configuration: dict
"""
# enforce clearing the cache of API env. Send _api_env is a @cached_property
# all we need to do is delete the property if it exists.
try:
del self._api_env
except AttributeError:
# ignore the deletion of api env if it was not set.
pass
pst_processing_mode = configuration["pst_processing_mode"]
# deal only with subband 1 for now. otherwise we have to deal with tracking
# multiple long running tasks.
recv_resources = calculate_receive_subband_resources(
beam_id=self.beam_id,
data_host=self.data_host,
data_mac=self.data_mac,
subband_udp_ports=self.subband_udp_ports,
**configuration,
)
self.logger.debug(f"Submitting API with recv_resources={recv_resources}")
subband_resources = {
"common": recv_resources["common"],
"subband": recv_resources["subbands"][1],
}
self._api.configure_beam(configuration=subband_resources, pst_processing_mode=pst_processing_mode)
self.subband_beam_configuration = recv_resources
@override
def deconfigure_beam(self: PstReceiveComponentManager) -> None:
"""
Deconfigure the RECV component's beam configuration.
:param task_callback: callback for background processing to update device status.
:type task_callback: Callback
"""
super().deconfigure_beam()
self.subband_beam_configuration = {}