Source code for ska_pst.lmc.component.grpc_process_api

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the base gRPC implementation for the ``PstProcessApi``."""

from __future__ import annotations

__all__ = ["PstProcessApiGrpc"]

import functools
import logging
import threading
from typing import Any, Protocol

import backoff
from overrides import override
from ska_control_model import LoggingLevel, ObsState, PstProcessingMode
from ska_pst.common.constants import DEFAULT_HEALTH_CHECK_INTERVAL_MS, DEFAULT_MONITORING_INTERVAL_MS
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
    AsciiHeaderProto,
    BeamConfiguration,
    ConfigureBeamRequest,
    ConfigureScanRequest,
    LogLevel,
    MonitorData,
    MonitorResponse,
    PstProcessingModeProto,
    ScanConfiguration,
    SetLogLevelRequest,
    StartScanRequest,
)
from ska_pst.lmc.component.grpc_lmc_client import (
    AlreadyScanningException,
    BaseGrpcException,
    FaultOccurredWhileAborting,
    InvalidRequestException,
    NotConfiguredForScanException,
    NotScanningException,
    PstGrpcLmcClient,
    ResourcesAlreadyAssignedException,
    ResourcesNotAssignedException,
    ScanConfiguredAlreadyException,
    TimeoutException,
)
from ska_pst.lmc.health_check import HealthCheckHandler, HealthCheckState
from ska_pst.lmc.util import BackgroundTaskProcessor, StreamingTask, background_task
from ska_pst.lmc.validation import ValidationError
from ska_pydada import AsciiHeader

from .monitor_data_handler import MonitorDataCallback
from .process_api import PstProcessApi

log_level_map = {
    LoggingLevel.INFO: LogLevel.INFO,
    LoggingLevel.DEBUG: LogLevel.DEBUG,
    LoggingLevel.FATAL: LogLevel.CRITICAL,
    LoggingLevel.WARNING: LogLevel.WARNING,
    LoggingLevel.OFF: LogLevel.INFO,
}


[docs]class GrpcApiStrategy(Protocol): """A Python protocol used by ``PstProcessApiGrpc`` to delegate subcomponent specific functionality."""
[docs] def get_beam_configuration_msg(self: GrpcApiStrategy, *, configuration: dict) -> BeamConfiguration: """ Get the gRPC BeamConfiguration Protobuf message for the subcomponent. Implementations of this should map the PST beam configuration into a Protobuf ``BeamConfiguration`` message. The message itself uses a "one-of" pattern and as such the implementations of this protocol method should create the specific protobuf message and then return the ``BeamConfiguration`` message. .. code-block:: python return BeamConfiguration(smrb=SmrbBeamConfiguration(**configuration)) :param configuration: the PST beam configuration :type configuration: dict :return: the gRPC BeamConfiguration Protobuf message for the subcomponent. :rtype: BeamConfiguration """
[docs] def get_scan_configuration_msg(self: GrpcApiStrategy, *, configuration: dict) -> ScanConfiguration: """ Get the gRPC ScanConfiguration Protobuf message for the subcomponent. Implementations of this should map the PST scan configuration into a Protobuf ``ScanConfiguration`` message. The message itself uses a "one-of" pattern and as such the implementations of this protocol method should create the specific protobuf message and then return the ``ScanConfiguration`` message. .. code-block:: python return ScanConfiguration( dsp_flow_through=DspFlowThroughScanConfiguration( **generate_dsp_scan_request(**configuration) ) ) :param configuration: the PST scan configuration :type configuration: dict :return: the gRPC ScanConfiguration Protobuf message for the subcomponent. :rtype: ScanConfiguration """
[docs] def handle_monitor_response( self: GrpcApiStrategy, *, data: MonitorData, callback: MonitorDataCallback ) -> None: """ Handle the gRPC monitoring data response. This should map the monitoring ``data`` to the appropriate internal subcomponent data model and then call the appropriate callback. :param data: the gRPC/Protobuf monitoring data message from server :type data: MonitorData :param callback: the callback used to update the LMC subcomponent model :type callback: MonitorDataCallback """
[docs]class PstProcessApiGrpc(PstProcessApi): """Helper class to be used by subclasses of `PstProcessApi` that use gRPC. This class should be added as a parent class of gRPC client APIs. Common logic of methods can be refactored to this class. This also means that requests that have empty request messages can be handled by this class specifically. Where request parameters need to be converted to the appropriate protobuf message, then subclasses of this class need to implement the `_get_<method_name>_request`. For monitoring the subclasses must implement the `_handle_monitor_response` method. """ def __init__( self: PstProcessApiGrpc, client_id: str, grpc_endpoint: str, strategy: GrpcApiStrategy, default_timeout: float = 60.0, connection_timeout: float = 360.0, logger: logging.Logger | None = None, background_task_processor: BackgroundTaskProcessor | None = None, **kwargs: Any, ) -> None: """ Initialise the API. :param client_id: the identification of the client, this should be based off the FQDN of the MGMT device. :type client_id: str :param grpc_endpoint: the service endpoint to connect to. :type grpc_endpoint: str :param strategy: an instance of a GrpcApiStrategy which is used to delegate getting gRPC messages to and handling monitoring data. :type strategy: GrpcApiStrategy :param default_timeout: the default timeout, in seconds, to use for all requests, defaults to 60.0 (i.e. 1 minute) :type default_timeout: float, optional :param connection_timeout: the timeout to use when connecting to gRPC server. Default value is 360 seconds, which allows for the K8s crash loop backoff which maxes out at 5mins but a padding is given to allow for the K8s service to record service is available. :type connection_timeout: float, optional :param logger: the logger to use for the API. :param background_task_processor: an optional background processor that will run background tasks like `monitor`. :type logger: logging.Logger | None, optional :type background_task_processor: BackgroundTaskProcessor | None, optional """ logger = logger or logging.getLogger(__name__) logger.info(f"Creating instance of gRPC Process API for '{client_id}'") self._strategy = strategy self._client_id = client_id self._grpc_client = PstGrpcLmcClient( client_id=client_id, endpoint=grpc_endpoint, logger=logger, default_timeout=default_timeout ) self._background_task_processor = background_task_processor or BackgroundTaskProcessor( default_logger=logger ) # need a reentrant lock self._monitor_task: StreamingTask[MonitorData] | None = None self._health_check_task: StreamingTask[HealthCheckState] | None = None self._connection_timeout = connection_timeout self._default_timeout = default_timeout super().__init__(logger=logger) @override def connect(self: PstProcessApiGrpc) -> None: """Connect to the remote gRPC service.""" self._logger.debug(f"About to call gRPC client connect for '{self._client_id}'") @backoff.on_exception( backoff.expo, Exception, factor=1, max_value=5.0, # each attempt to retry at max every 5 seconds max_time=self._connection_timeout, # the default of this is 360 seconds logger=self._logger, ) def _connect() -> None: self._grpc_client.connect() _connect() @override def disconnect(self: PstProcessApiGrpc) -> None: """ Disconnect from the external process. This will ensure any monitoring background task has stopped. """ self.stop_monitoring(wait_for_task=False) def _get_start_scan_request( self: PstProcessApiGrpc, scan_id: int, start_time: str | None = None, **kwargs: Any ) -> StartScanRequest: """Convert scan parameters dictionary to instance of `StartScanRequest`. For now this is an empty request, however, in the future it is possible that this request will have parameters and could be specific to the component. """ header_data = AsciiHeader() header_data["SCAN_ID"] = scan_id if start_time: header_data["START_TIME"] = start_time header = AsciiHeaderProto(header=str(header_data)) return StartScanRequest(header=header) @override def validate_configure_beam( self: PstProcessApiGrpc, configuration: dict, pst_processing_mode: PstProcessingMode, timeout: float | None = None, ) -> None: """ Validate configuration for a `configure_beam` request. :param configuration: Dictionary of resources to allocate. :type configuration: dict :param pst_processing_mode: the PST processing mode that the configuration is for. :type pst_processing_mode: PstProcessingMode :param timeout: the timeout, in seconds, for the request, defaults to ``default_timeout`` attribute of the gRPC API client. :type timeout: float | None, optional :raises ValidationError: if there is an issue validating the request. The error message contains the details. :raises TimeoutException: if the request took longer than expected. """ self._logger.debug(f"Validating configure_beam request for '{self._client_id}': {configuration}") beam_configuration = self._strategy.get_beam_configuration_msg(configuration=configuration) beam_configuration.pst_processing_mode = PstProcessingModeProto.Name(pst_processing_mode.value) request = ConfigureBeamRequest( beam_configuration=beam_configuration, dry_run=True, ) try: self._grpc_client.configure_beam(request=request, timeout=timeout) except TimeoutException: self._logger.warning( f"validate_configure_beam call timed out for {self._client_id}", exc_info=True ) except (InvalidRequestException, ResourcesAlreadyAssignedException) as e: self._logger.error(f"gRPC request to {self._client_id} failed validation: {e.message}") raise ValidationError(e.message) from e @override def configure_beam( self: PstProcessApiGrpc, configuration: dict, pst_processing_mode: PstProcessingMode, timeout: float | None = None, ) -> None: """ Configure the beam with the resources defined in configuration. :param configuration: Dictionary of resources to allocate. :type configuration: dict :param pst_processing_mode: the PST processing mode that the configuration is for. :type pst_processing_mode: PstProcessingMode :param timeout: the timeout, in seconds, for the request, defaults to ``default_timeout`` attribute of the gRPC API client. :type timeout: float | None, optional :raises BaseGrpcException: if there was a un-recoverable server server error. """ self._logger.debug(f"Configuring beam for '{self._client_id}': {configuration}") beam_configuration = self._strategy.get_beam_configuration_msg(configuration=configuration) beam_configuration.pst_processing_mode = PstProcessingModeProto.Name(pst_processing_mode.value) request = ConfigureBeamRequest( beam_configuration=beam_configuration, dry_run=False, ) try: self._grpc_client.configure_beam(request=request, timeout=timeout) except ResourcesAlreadyAssignedException as e: self._logger.error(e.message) raise except TimeoutException: self._logger.warning(f"configure_beam call timed out for {self._client_id}", exc_info=True) except BaseGrpcException: self._logger.error( f"Problem processing configure_beam request for '{self._client_id}'", exc_info=True ) raise @override def deconfigure_beam(self: PstProcessApiGrpc) -> None: """Deconfigure the beam, releasing all resources.""" try: self._grpc_client.deconfigure_beam() except ResourcesNotAssignedException as e: self._logger.warning(e.message) except TimeoutException: self._logger.warning(f"deconfigure_beam call timed out for {self._client_id}", exc_info=True) except BaseGrpcException: self._logger.error( f"Problem processing deconfigure_beam request for '{self._client_id}'", exc_info=True ) raise @override def validate_configure_scan(self: PstProcessApiGrpc, configuration: dict) -> None: """ Validate a configure_scan request. :param configuration: the configuration for the scan. :type configuration: dict :raises ValidationError: if there is an issue validating the request. The error message contains the details. """ self._logger.debug(f"Validating configure_scan for '{self._client_id}': {configuration}") scan_configuration = self._strategy.get_scan_configuration_msg(configuration=configuration) request = ConfigureScanRequest( scan_configuration=scan_configuration, dry_run=True, ) try: self._grpc_client.configure_scan(request) except TimeoutException: self._logger.warning( f"validate_configure_scan call timed out for {self._client_id}", exc_info=True ) except (InvalidRequestException, ScanConfiguredAlreadyException) as e: self._logger.error(f"gRPC request to {self._client_id} failed validation: {e.message}") raise ValidationError(e.message) from e @override def configure_scan(self: PstProcessApiGrpc, configuration: dict) -> None: """ Configure a scan. :param configuration: the configuration for the scan. :type configuration: dict """ self._logger.debug(f"Configuring scan for '{self._client_id}': {configuration}") scan_configuration = self._strategy.get_scan_configuration_msg(configuration=configuration) request = ConfigureScanRequest(scan_configuration=scan_configuration, dry_run=False) try: self._grpc_client.configure_scan(request) except ScanConfiguredAlreadyException as e: self._logger.error(e.message) raise except TimeoutException: self._logger.warning(f"configure_scan call timed out for {self._client_id}", exc_info=True) except BaseGrpcException: self._logger.error( f"Problem processing 'configure' request for '{self._client_id}'", exc_info=True ) raise @override def deconfigure_scan(self: PstProcessApiGrpc) -> None: """Deconfigure a scan.""" try: self.stop_monitoring() self._grpc_client.deconfigure_scan() except NotConfiguredForScanException as e: self._logger.warning(e.message) except TimeoutException: self._logger.warning(f"deconfigure_scan call timed out for {self._client_id}", exc_info=True) except BaseGrpcException: self._logger.error( f"Problem processing 'deconfigure' request for '{self._client_id}'", exc_info=True ) raise @override def start_scan( self: PstProcessApiGrpc, scan_id: int, **kwargs: Any, ) -> None: """ Start scanning. :param scan_id: the ID for the scan. :type scan_id: int :param kwargs: additional arguments, needed to allow for future proofing of scan request coming from TM / CSP. :type kwargs: dict """ request = self._get_start_scan_request(scan_id=scan_id, **kwargs) try: self._grpc_client.start_scan(request) except AlreadyScanningException as e: self._logger.warning(e.message) except TimeoutException: self._logger.warning(f"start_scan call timed out for {self._client_id}", exc_info=True) except BaseGrpcException: self._logger.error(f"Problem processing scan request for '{self._client_id}'", exc_info=True) raise @override def stop_scan(self: PstProcessApiGrpc, timeout: float = 60.0) -> None: """ End a scan. This will call out to the remote service to end a scan. It will also stop monitoring as monitoring is only valid if the service is in a scan. :param timeout: the timeout, in seconds, for the request, defaults to ``default_timeout`` attribute of the gRPC API client. :type timeout: float | None, optional """ try: self._grpc_client.stop_scan(timeout=timeout) except NotScanningException as e: self._logger.warning(e.message) except TimeoutException: self._logger.warning(f"stop_scan call timed out for {self._client_id}", exc_info=True) except BaseGrpcException: self._logger.error(f"Problem processing stop_scan request for '{self._client_id}'", exc_info=True) raise @override def abort(self: PstProcessApiGrpc) -> None: """Abort a scan.""" try: # stop monitoring if monitoring is happening. This would be the # case if our state was SCANNING. self.stop_monitoring() self._grpc_client.abort() except TimeoutException: self._logger.warning(f"abort call timed out for {self._client_id}", exc_info=True) except FaultOccurredWhileAborting: self._logger.error(f"A fault occurred while aborting - '{self._client_id}'", exc_info=True) raise except BaseGrpcException: self._logger.error(f"Problem in aborting request for '{self._client_id}'", exc_info=True) raise @override def reset(self: PstProcessApiGrpc) -> None: """Reset service.""" try: self.stop_monitoring() self._logger.debug(f"About to call reset on {self._grpc_client._client_id} gRPC.") self._grpc_client.reset() except TimeoutException: self._logger.warning(f"reset call timed out for {self._client_id}", exc_info=True) except BaseGrpcException: self._logger.error(f"Error raised while resetting '{self._client_id}'", exc_info=True) raise @override def restart(self: PstProcessApiGrpc) -> None: """Restart service.""" self.stop_monitoring() self._logger.debug(f"About to call restart on {self._grpc_client._client_id} gRPC.") self._grpc_client.restart() @override def go_to_fault(self: PstProcessApiGrpc) -> None: """ Put remote service into FAULT state. This is used to put the remote service into a FAULT state to match the status of the LMC component. """ try: self.stop_monitoring() self._grpc_client.go_to_fault() except TimeoutException: self._logger.warning(f"go_to_fault call timed out for {self._client_id}", exc_info=True) except BaseGrpcException: self._logger.warning( f"Error in trying to put remote service '{self._client_id}' in FAULT state.", exc_info=True ) @override def get_env(self: PstProcessApiGrpc) -> dict: """Get the environment properties from the remote gRPC service.""" return self._grpc_client.get_env() @override def set_log_level(self: PstProcessApiGrpc, log_level: LoggingLevel) -> None: """ Set the LogLevel of the remote gRPC service. :param log_level: The required TANGO LoggingLevel. :returns: None. """ try: self._grpc_client.set_log_level(request=SetLogLevelRequest(log_level=log_level_map[log_level])) except TimeoutException: self._logger.warning(f"set_log_level call timed out for {self._client_id}", exc_info=True) except BaseGrpcException: self._logger.warning( f"Error in trying to update remote service '{self._client_id}' LogLevel to {log_level}.", exc_info=True, )
[docs] def get_log_level(self: PstProcessApiGrpc) -> LoggingLevel: """Get the LogLevel of the remote gRPC service.""" return self._grpc_client.get_log_level()
@override def stop_monitoring(self: PstProcessApiGrpc, *, wait_for_task: bool = True, **kwargs: Any) -> None: """ Stop background monitoring. By default this will stop the monitoring background task synchronously and wait for it to complete. However, this functionality can be overriden by setting ``wait_for_task=False`` which will instruct the background task to stop but won't wait for the task to have completely stopped. This is helpful in the case of when the LMC device has been instructed to go into an OFFLINE mode. :param wait_for_task: whether to wait for the background monitoring task to completely stop, defaults to True :type wait_for_task: bool, optional """ if self._monitor_task is not None: self._logger.debug(f"{self._client_id} stopping monitoring") self._monitor_task.stop(wait_for_generator=wait_for_task) self._monitor_task = None @background_task @override def monitor( self: PstProcessApiGrpc, monitor_data_callback: MonitorDataCallback, polling_rate: int = DEFAULT_MONITORING_INTERVAL_MS, monitor_abort_event: threading.Event | None = None, ) -> None: """ Monitor data of remote service. :param monitor_data_callback: callback to use when there is an update of the sub-band monitor data. :param polling_rate: the rate, in milliseconds, at which the monitoring should poll. The default value is 5000ms (i.e. 5 seconds). :param monitor_abort_event: a :py:class:`threading.Event` that can be used to signal to stop monitoring. If not set then the background task will create one. """ try: if self._monitor_task is not None: self.stop_monitoring() self._logger.debug(f"{self._client_id} starting monitoring.") monitor_abort_event = monitor_abort_event or threading.Event() item_generator = functools.partial(self._grpc_client.monitor, polling_rate=polling_rate) def _item_handler(response: MonitorResponse) -> None: self._strategy.handle_monitor_response( data=response.monitor_data, callback=monitor_data_callback ) self._monitor_task = StreamingTask( task_name=f"{self._client_id} monitor", item_handler=_item_handler, item_generator=item_generator, logger=self._logger, ) self._monitor_task.start(abort_event=monitor_abort_event) except Exception: self._logger.error(f"{self._client_id} monitor() threw an exception.", exc_info=True) @background_task @override def perform_health_check( self: PstProcessApiGrpc, health_check_handler: HealthCheckHandler, health_check_interval: int = DEFAULT_HEALTH_CHECK_INTERVAL_MS, health_check_abort_event: threading.Event | None = None, ) -> None: """ Perform health check of a process in the background. This method will call the :py:meth:`PstGrpcLmcClient.perform_health_check` which will stream back health check state items from the remote gRPC service. This method will then delegate the requests to the provided ``health_check_handler`` instance. This method will catch exceptions from the gRPC API client and create a :py:class:`HealthCheckState` instance with exception property being set to the exception that was thrown by the API. :param health_check_handler: an object that implements the ``HealthCheckHandler`` protocol. Any health check state object that is returned from the service is delegated to this handler to be handled. :type health_check_handler: HealthCheckHandler :param health_check_interval: the interval, in milliseconds, at which health check should be perform, defaults to 1000 (i.e. 1 second). :type health_check_interval: int, optional :param health_check_abort_event: a threading primitive to be used to stop the health check by an external mechanism, defaults to None :type health_check_abort_event: threading.Event | None, optional """ try: if self._health_check_task is not None: self.stop_health_check() self._logger.debug(f"Starting background task for health check of {self._client_id}") health_check_abort_event = health_check_abort_event or threading.Event() def _exception_handler(exception: Exception) -> None: self._logger.debug(f"Received exception during health check for {self._client_id}") state = HealthCheckState( service_name=self._grpc_client._service_name, # type: ignore service_uuid=self._grpc_client._service_uuid, # type: ignore exception=exception, ) health_check_handler.handle_health_check_state(state) item_generator = functools.partial( self._grpc_client.perform_health_check, health_check_interval=health_check_interval ) self._health_check_task = StreamingTask( task_name=f"{self._client_id} health check", item_handler=health_check_handler.handle_health_check_state, item_generator=item_generator, exception_handler=_exception_handler, ) self._health_check_task.start(abort_event=health_check_abort_event) except Exception: self._logger.error( f"{self._client_id} perform_health_check() threw an exception. Exiting background thread", exc_info=True, ) @override def stop_health_check(self: PstProcessApiGrpc, *, wait_for_task: bool = True, **kwargs: Any) -> None: """ Stop performing health check of service. By default this will stop the health check background task synchronously and wait for it to complete. However, this functionality can be overriden by setting ``wait_for_task=False`` which will instruct the background task to stop but won't wait for the task to have completely stopped. This is helpful in the case of when the LMC device has been instructed to go into an OFFLINE mode. :param wait_for_task: whether to wait for the background health check task to completely stop, defaults to True :type wait_for_task: bool, optional """ if self._health_check_task is not None: self._logger.debug(f"{self._client_id} stopping health check") self._health_check_task.stop(wait_for_generator=wait_for_task) self._health_check_task = None @override def get_state(self: PstProcessApiGrpc) -> ObsState: """Get the ObsState of the remote system.""" return self._grpc_client.get_state()