Source code for ska_pst_lmc.smrb.smrb_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST LMC project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides an implementation of the SMRB PST component manager."""

from __future__ import annotations

import functools
import logging
from typing import Any, Callable, Dict, List, Optional

from ska_tango_base.control_model import PowerState, SimulationMode
from ska_tango_base.executor import TaskStatus

from ska_pst_lmc.component import (
    MonitorDataHandler,
    PstApiComponentManager,
    PstApiDeviceInterface,
    TaskResponse,
)
from ska_pst_lmc.smrb.smrb_model import SmrbMonitorData, SmrbMonitorDataStore
from ska_pst_lmc.smrb.smrb_process_api import (
    PstSmrbProcessApi,
    PstSmrbProcessApiGrpc,
    PstSmrbProcessApiSimulator,
)
from ska_pst_lmc.smrb.smrb_util import calculate_smrb_subband_resources
from ska_pst_lmc.util.callback import Callback, wrap_callback

__all__ = ["PstSmrbComponentManager"]


[docs]class PstSmrbComponentManager(PstApiComponentManager[SmrbMonitorData, PstSmrbProcessApi]): """Component manager for the SMRB component for the PST.LMC subsystem.""" def __init__( self: PstSmrbComponentManager, *, device_interface: PstApiDeviceInterface[SmrbMonitorData], logger: logging.Logger, api: Optional[PstSmrbProcessApi] = None, **kwargs: Any, ): """ Initialise instance of the component manager. :param device_name: the FQDN of the current device. This is used within the gRPC process to identify who is doing the calling. :param process_api_endpoint: the endpoint of the gRPC process. :param logger: a logger for this object is to use. :param monitor_data_callback: the callback that monitoring data should call when data has been received. This should be used by the TANGO device to be notified when data has been updated. :param communication_state_callback: callback to be called when the status of the communications channel between the component manager and its component changes. :param component_state_callback: callback to be called when the component state changes. """ logger.debug( f"Setting up SMRB component manager with device_name='{device_interface.device_name}'" + f"and api_endpoint='{device_interface.process_api_endpoint}'" ) api = api or PstSmrbProcessApiSimulator( logger=logger, component_state_callback=device_interface.handle_component_state_change, ) # Set up handling of monitor data. self._monitor_data_handler = MonitorDataHandler( data_store=SmrbMonitorDataStore(), monitor_data_callback=device_interface.handle_monitor_data_update, ) super().__init__( device_interface=device_interface, api=api, logger=logger, power=PowerState.UNKNOWN, fault=None, **kwargs, ) @property def ring_buffer_utilisation(self: PstSmrbComponentManager) -> float: """ Get the percentage of the ring buffer elements that are full of data. :returns: the percentage of the ring buffer elements that are full of data. :rtype: float """ return self._monitor_data.ring_buffer_utilisation @property def ring_buffer_size(self: PstSmrbComponentManager) -> int: """ Get the capacity of the ring buffer, in bytes. :returns: the capacity of the ring buffer, in bytes. :rtype: int """ return self._monitor_data.ring_buffer_size @property def number_subbands(self: PstSmrbComponentManager) -> int: """ Get the number of sub-bands. :returns: the number of sub-bands. :rtype: int """ return self._monitor_data.number_subbands @property def ring_buffer_read(self: PstSmrbComponentManager) -> int: """ Get the amount of data, in bytes, that has been read. :returns: the amount of data that has been read. :rtype: int """ return self._monitor_data.ring_buffer_read @property def ring_buffer_written(self: PstSmrbComponentManager) -> int: """ Get the amount of data, in bytes, that has been written. :returns: the amount of data that has been written. :rtype: int """ return self._monitor_data.ring_buffer_written @property def subband_ring_buffer_utilisations(self: PstSmrbComponentManager) -> List[float]: """ Get the percentage of full ring buffer elements for each sub-band. :returns: the percentage of full ring buffer elements for each sub-band. :rtype: List[float] """ return self._monitor_data.subband_ring_buffer_utilisations @property def subband_ring_buffer_sizes(self: PstSmrbComponentManager) -> List[int]: """ Get the capacity of ring buffers for each sub-band. :returns: the capacity of ring buffers, in bytes, for each sub-band. :rtype: List[int] """ return self._monitor_data.subband_ring_buffer_sizes @property def subband_ring_buffer_read(self: PstSmrbComponentManager) -> List[int]: """ Get the capacity of ring buffers for each sub-band. :returns: the capacity of ring buffers, in bytes, for each sub-band. :rtype: List[int] """ return self._monitor_data.subband_ring_buffer_read @property def subband_ring_buffer_written(self: PstSmrbComponentManager) -> List[int]: """ Get the capacity of ring buffers for each sub-band. :returns: the capacity of ring buffers, in bytes, for each sub-band. :rtype: List[int] """ return self._monitor_data.subband_ring_buffer_written @property def _monitor_data(self: PstSmrbComponentManager) -> SmrbMonitorData: """Get monitor data from data handler.""" return self._monitor_data_handler.monitor_data def _update_api(self: PstSmrbComponentManager) -> None: """Update instance of API based on simulation mode.""" if self._simuation_mode == SimulationMode.TRUE: self._api = PstSmrbProcessApiSimulator( logger=self.logger, component_state_callback=self._push_component_state_update, ) else: self._api = PstSmrbProcessApiGrpc( client_id=self.device_name, grpc_endpoint=self.api_endpoint, logger=self.logger, component_state_callback=self._push_component_state_update, )
[docs] def validate_configure_scan( self: PstSmrbComponentManager, configuration: Dict[str, Any], task_callback: Callback = None ) -> TaskResponse: """ Validate a ConfigureScan request sent from CSP.LMC to the SMRB sub-component. This asserts the request can be converted to SMRB resources and then calls the process API to perform the validation. :param configuration: configuration that would be used when the configure_beam and configure_scan methods are called. :type configuration: Dict[str, Any] :param task_callback: callback for background processing to update device status. :type task_callback: Callback """ def _task(task_callback: Callable) -> None: task_callback(status=TaskStatus.IN_PROGRESS) try: smrb_resources = calculate_smrb_subband_resources(self.beam_id, request_params=configuration) self._api.validate_configure_beam(configuration=smrb_resources[1]) self._api.validate_configure_scan(configuration=configuration) task_callback(status=TaskStatus.COMPLETED, result="Completed") except Exception as e: self.logger.exception( f"Failed to validate scan configuration for {self.device_name}.", exc_info=True ) task_callback(status=TaskStatus.FAILED, exception=e) return self._submit_background_task(_task, task_callback=task_callback)
[docs] def configure_beam( self: PstSmrbComponentManager, configuration: Dict[str, Any], task_callback: Callback = None ) -> TaskResponse: """ Configure beam resources in the component. :param configuration: parameters to be configured and their requested values. """ smrb_resources = calculate_smrb_subband_resources(self.beam_id, request_params=configuration) # deal only with subband 1 for now. self.logger.debug(f"Submitting API with smrb_resources={smrb_resources[1]}") return self._submit_background_task( functools.partial(self._api.configure_beam, configuration=smrb_resources[1]), task_callback=task_callback, )
[docs] def start_scan( self: PstSmrbComponentManager, args: Dict[str, Any], task_callback: Callback = None ) -> TaskResponse: """Start scanning.""" def _task(task_callback: Callback = None) -> None: self._api.start_scan(args=args, task_callback=wrap_callback(task_callback)) self._api.monitor( # for now only handling 1 subband subband_monitor_data_callback=self._monitor_data_handler.handle_subband_data, polling_rate=self.monitoring_polling_rate, ) return self._submit_background_task(_task, task_callback=task_callback)
[docs] def stop_scan(self: PstSmrbComponentManager, task_callback: Callback = None) -> TaskResponse: """End scanning.""" def _task(task_callback: Callback = None) -> None: self._api.stop_scan(task_callback=wrap_callback(task_callback)) # reset the monitoring data self._monitor_data_handler.reset_monitor_data() return self._submit_background_task(_task, task_callback=task_callback)