# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the base gRPC implementation for the ``PstProcessApi``."""
from __future__ import annotations
__all__ = ["PstProcessApiGrpc"]
import functools
import logging
import threading
from typing import Any, Protocol
import backoff
from overrides import override
from ska_control_model import LoggingLevel, ObsState, PstProcessingMode
from ska_pst.common.constants import DEFAULT_HEALTH_CHECK_INTERVAL_MS, DEFAULT_MONITORING_INTERVAL_MS
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
AsciiHeaderProto,
BeamConfiguration,
ConfigureBeamRequest,
ConfigureScanRequest,
LogLevel,
MonitorData,
MonitorResponse,
PstProcessingModeProto,
ScanConfiguration,
SetLogLevelRequest,
StartScanRequest,
)
from ska_pst.lmc.component.grpc_lmc_client import (
AlreadyScanningException,
BaseGrpcException,
FaultOccurredWhileAborting,
InvalidRequestException,
NotConfiguredForScanException,
NotScanningException,
PstGrpcLmcClient,
ResourcesAlreadyAssignedException,
ResourcesNotAssignedException,
ScanConfiguredAlreadyException,
TimeoutException,
)
from ska_pst.lmc.health_check import HealthCheckHandler, HealthCheckState
from ska_pst.lmc.util import BackgroundTaskProcessor, StreamingTask, background_task
from ska_pst.lmc.validation import ValidationError
from ska_pydada import AsciiHeader
from .monitor_data_handler import MonitorDataCallback
from .process_api import PstProcessApi
log_level_map = {
LoggingLevel.INFO: LogLevel.INFO,
LoggingLevel.DEBUG: LogLevel.DEBUG,
LoggingLevel.FATAL: LogLevel.CRITICAL,
LoggingLevel.WARNING: LogLevel.WARNING,
LoggingLevel.OFF: LogLevel.INFO,
}
[docs]class GrpcApiStrategy(Protocol):
"""A Python protocol used by ``PstProcessApiGrpc`` to delegate subcomponent specific functionality."""
[docs] def get_beam_configuration_msg(self: GrpcApiStrategy, *, configuration: dict) -> BeamConfiguration:
"""
Get the gRPC BeamConfiguration Protobuf message for the subcomponent.
Implementations of this should map the PST beam configuration into a Protobuf
``BeamConfiguration`` message. The message itself uses a "one-of" pattern and as
such the implementations of this protocol method should create the specific protobuf
message and then return the ``BeamConfiguration`` message.
.. code-block:: python
return BeamConfiguration(smrb=SmrbBeamConfiguration(**configuration))
:param configuration: the PST beam configuration
:type configuration: dict
:return: the gRPC BeamConfiguration Protobuf message for the subcomponent.
:rtype: BeamConfiguration
"""
[docs] def get_scan_configuration_msg(self: GrpcApiStrategy, *, configuration: dict) -> ScanConfiguration:
"""
Get the gRPC ScanConfiguration Protobuf message for the subcomponent.
Implementations of this should map the PST scan configuration into a Protobuf
``ScanConfiguration`` message. The message itself uses a "one-of" pattern and as
such the implementations of this protocol method should create the specific protobuf
message and then return the ``ScanConfiguration`` message.
.. code-block:: python
return ScanConfiguration(
dsp_flow_through=DspFlowThroughScanConfiguration(
**generate_dsp_scan_request(**configuration)
)
)
:param configuration: the PST scan configuration
:type configuration: dict
:return: the gRPC ScanConfiguration Protobuf message for the subcomponent.
:rtype: ScanConfiguration
"""
[docs] def handle_monitor_response(
self: GrpcApiStrategy, *, data: MonitorData, callback: MonitorDataCallback
) -> None:
"""
Handle the gRPC monitoring data response.
This should map the monitoring ``data`` to the appropriate internal subcomponent
data model and then call the appropriate callback.
:param data: the gRPC/Protobuf monitoring data message from server
:type data: MonitorData
:param callback: the callback used to update the LMC subcomponent model
:type callback: MonitorDataCallback
"""
[docs]class PstProcessApiGrpc(PstProcessApi):
"""Helper class to be used by subclasses of `PstProcessApi` that use gRPC.
This class should be added as a parent class of gRPC client APIs. Common
logic of methods can be refactored to this class. This also means that
requests that have empty request messages can be handled by this class
specifically. Where request parameters need to be converted to the appropriate
protobuf message, then subclasses of this class need to implement the
`_get_<method_name>_request`.
For monitoring the subclasses must implement the `_handle_monitor_response`
method.
"""
def __init__(
self: PstProcessApiGrpc,
client_id: str,
grpc_endpoint: str,
strategy: GrpcApiStrategy,
default_timeout: float = 60.0,
connection_timeout: float = 360.0,
logger: logging.Logger | None = None,
background_task_processor: BackgroundTaskProcessor | None = None,
**kwargs: Any,
) -> None:
"""
Initialise the API.
:param client_id: the identification of the client, this should be based
off the FQDN of the MGMT device.
:type client_id: str
:param grpc_endpoint: the service endpoint to connect to.
:type grpc_endpoint: str
:param strategy: an instance of a GrpcApiStrategy which is used
to delegate getting gRPC messages to and handling monitoring data.
:type strategy: GrpcApiStrategy
:param default_timeout: the default timeout, in seconds, to use for all requests,
defaults to 60.0 (i.e. 1 minute)
:type default_timeout: float, optional
:param connection_timeout: the timeout to use when connecting to gRPC server.
Default value is 360 seconds, which allows for the K8s crash loop backoff
which maxes out at 5mins but a padding is given to allow for the K8s service
to record service is available.
:type connection_timeout: float, optional
:param logger: the logger to use for the API.
:param background_task_processor: an optional background processor that
will run background tasks like `monitor`.
:type logger: logging.Logger | None, optional
:type background_task_processor: BackgroundTaskProcessor | None, optional
"""
logger = logger or logging.getLogger(__name__)
logger.info(f"Creating instance of gRPC Process API for '{client_id}'")
self._strategy = strategy
self._client_id = client_id
self._grpc_client = PstGrpcLmcClient(
client_id=client_id, endpoint=grpc_endpoint, logger=logger, default_timeout=default_timeout
)
self._background_task_processor = background_task_processor or BackgroundTaskProcessor(
default_logger=logger
)
# need a reentrant lock
self._monitor_task: StreamingTask[MonitorData] | None = None
self._health_check_task: StreamingTask[HealthCheckState] | None = None
self._connection_timeout = connection_timeout
self._default_timeout = default_timeout
super().__init__(logger=logger)
@override
def connect(self: PstProcessApiGrpc) -> None:
"""Connect to the remote gRPC service."""
self._logger.debug(f"About to call gRPC client connect for '{self._client_id}'")
@backoff.on_exception(
backoff.expo,
Exception,
factor=1,
max_value=5.0, # each attempt to retry at max every 5 seconds
max_time=self._connection_timeout, # the default of this is 360 seconds
logger=self._logger,
)
def _connect() -> None:
self._grpc_client.connect()
_connect()
@override
def disconnect(self: PstProcessApiGrpc) -> None:
"""
Disconnect from the external process.
This will ensure any monitoring background task has stopped.
"""
self.stop_monitoring(wait_for_task=False)
def _get_start_scan_request(
self: PstProcessApiGrpc, scan_id: int, start_time: str | None = None, **kwargs: Any
) -> StartScanRequest:
"""Convert scan parameters dictionary to instance of `StartScanRequest`.
For now this is an empty request, however, in the future it is possible that this
request will have parameters and could be specific to the component.
"""
header_data = AsciiHeader()
header_data["SCAN_ID"] = scan_id
if start_time:
header_data["START_TIME"] = start_time
header = AsciiHeaderProto(header=str(header_data))
return StartScanRequest(header=header)
@override
def validate_configure_beam(
self: PstProcessApiGrpc,
configuration: dict,
pst_processing_mode: PstProcessingMode,
timeout: float | None = None,
) -> None:
"""
Validate configuration for a `configure_beam` request.
:param configuration: Dictionary of resources to allocate.
:type configuration: dict
:param pst_processing_mode: the PST processing mode that the configuration is for.
:type pst_processing_mode: PstProcessingMode
:param timeout: the timeout, in seconds, for the request, defaults to ``default_timeout`` attribute
of the gRPC API client.
:type timeout: float | None, optional
:raises ValidationError: if there is an issue validating the request.
The error message contains the details.
:raises TimeoutException: if the request took longer than expected.
"""
self._logger.debug(f"Validating configure_beam request for '{self._client_id}': {configuration}")
beam_configuration = self._strategy.get_beam_configuration_msg(configuration=configuration)
beam_configuration.pst_processing_mode = PstProcessingModeProto.Name(pst_processing_mode.value)
request = ConfigureBeamRequest(
beam_configuration=beam_configuration,
dry_run=True,
)
try:
self._grpc_client.configure_beam(request=request, timeout=timeout)
except TimeoutException:
self._logger.warning(
f"validate_configure_beam call timed out for {self._client_id}", exc_info=True
)
except (InvalidRequestException, ResourcesAlreadyAssignedException) as e:
self._logger.error(f"gRPC request to {self._client_id} failed validation: {e.message}")
raise ValidationError(e.message) from e
@override
def configure_beam(
self: PstProcessApiGrpc,
configuration: dict,
pst_processing_mode: PstProcessingMode,
timeout: float | None = None,
) -> None:
"""
Configure the beam with the resources defined in configuration.
:param configuration: Dictionary of resources to allocate.
:type configuration: dict
:param pst_processing_mode: the PST processing mode that the configuration is for.
:type pst_processing_mode: PstProcessingMode
:param timeout: the timeout, in seconds, for the request, defaults to ``default_timeout`` attribute
of the gRPC API client.
:type timeout: float | None, optional
:raises BaseGrpcException: if there was a un-recoverable server server error.
"""
self._logger.debug(f"Configuring beam for '{self._client_id}': {configuration}")
beam_configuration = self._strategy.get_beam_configuration_msg(configuration=configuration)
beam_configuration.pst_processing_mode = PstProcessingModeProto.Name(pst_processing_mode.value)
request = ConfigureBeamRequest(
beam_configuration=beam_configuration,
dry_run=False,
)
try:
self._grpc_client.configure_beam(request=request, timeout=timeout)
except ResourcesAlreadyAssignedException as e:
self._logger.error(e.message)
raise
except TimeoutException:
self._logger.warning(f"configure_beam call timed out for {self._client_id}", exc_info=True)
except BaseGrpcException:
self._logger.error(
f"Problem processing configure_beam request for '{self._client_id}'", exc_info=True
)
raise
@override
def deconfigure_beam(self: PstProcessApiGrpc) -> None:
"""Deconfigure the beam, releasing all resources."""
try:
self._grpc_client.deconfigure_beam()
except ResourcesNotAssignedException as e:
self._logger.warning(e.message)
except TimeoutException:
self._logger.warning(f"deconfigure_beam call timed out for {self._client_id}", exc_info=True)
except BaseGrpcException:
self._logger.error(
f"Problem processing deconfigure_beam request for '{self._client_id}'", exc_info=True
)
raise
@override
def validate_configure_scan(self: PstProcessApiGrpc, configuration: dict) -> None:
"""
Validate a configure_scan request.
:param configuration: the configuration for the scan.
:type configuration: dict
:raises ValidationError: if there is an issue validating the request. The error message contains the
details.
"""
self._logger.debug(f"Validating configure_scan for '{self._client_id}': {configuration}")
scan_configuration = self._strategy.get_scan_configuration_msg(configuration=configuration)
request = ConfigureScanRequest(
scan_configuration=scan_configuration,
dry_run=True,
)
try:
self._grpc_client.configure_scan(request)
except TimeoutException:
self._logger.warning(
f"validate_configure_scan call timed out for {self._client_id}", exc_info=True
)
except (InvalidRequestException, ScanConfiguredAlreadyException) as e:
self._logger.error(f"gRPC request to {self._client_id} failed validation: {e.message}")
raise ValidationError(e.message) from e
@override
def configure_scan(self: PstProcessApiGrpc, configuration: dict) -> None:
"""
Configure a scan.
:param configuration: the configuration for the scan.
:type configuration: dict
"""
self._logger.debug(f"Configuring scan for '{self._client_id}': {configuration}")
scan_configuration = self._strategy.get_scan_configuration_msg(configuration=configuration)
request = ConfigureScanRequest(scan_configuration=scan_configuration, dry_run=False)
try:
self._grpc_client.configure_scan(request)
except ScanConfiguredAlreadyException as e:
self._logger.error(e.message)
raise
except TimeoutException:
self._logger.warning(f"configure_scan call timed out for {self._client_id}", exc_info=True)
except BaseGrpcException:
self._logger.error(
f"Problem processing 'configure' request for '{self._client_id}'", exc_info=True
)
raise
@override
def deconfigure_scan(self: PstProcessApiGrpc) -> None:
"""Deconfigure a scan."""
try:
self.stop_monitoring()
self._grpc_client.deconfigure_scan()
except NotConfiguredForScanException as e:
self._logger.warning(e.message)
except TimeoutException:
self._logger.warning(f"deconfigure_scan call timed out for {self._client_id}", exc_info=True)
except BaseGrpcException:
self._logger.error(
f"Problem processing 'deconfigure' request for '{self._client_id}'", exc_info=True
)
raise
@override
def start_scan(
self: PstProcessApiGrpc,
scan_id: int,
**kwargs: Any,
) -> None:
"""
Start scanning.
:param scan_id: the ID for the scan.
:type scan_id: int
:param kwargs: additional arguments, needed to allow for future proofing
of scan request coming from TM / CSP.
:type kwargs: dict
"""
request = self._get_start_scan_request(scan_id=scan_id, **kwargs)
try:
self._grpc_client.start_scan(request)
except AlreadyScanningException as e:
self._logger.warning(e.message)
except TimeoutException:
self._logger.warning(f"start_scan call timed out for {self._client_id}", exc_info=True)
except BaseGrpcException:
self._logger.error(f"Problem processing scan request for '{self._client_id}'", exc_info=True)
raise
@override
def stop_scan(self: PstProcessApiGrpc, timeout: float = 60.0) -> None:
"""
End a scan.
This will call out to the remote service to end a scan. It will also stop monitoring as monitoring is
only valid if the service is in a scan.
:param timeout: the timeout, in seconds, for the request, defaults to ``default_timeout`` attribute
of the gRPC API client.
:type timeout: float | None, optional
"""
try:
self._grpc_client.stop_scan(timeout=timeout)
except NotScanningException as e:
self._logger.warning(e.message)
except TimeoutException:
self._logger.warning(f"stop_scan call timed out for {self._client_id}", exc_info=True)
except BaseGrpcException:
self._logger.error(f"Problem processing stop_scan request for '{self._client_id}'", exc_info=True)
raise
@override
def abort(self: PstProcessApiGrpc) -> None:
"""Abort a scan."""
try:
# stop monitoring if monitoring is happening. This would be the
# case if our state was SCANNING.
self.stop_monitoring()
self._grpc_client.abort()
except TimeoutException:
self._logger.warning(f"abort call timed out for {self._client_id}", exc_info=True)
except FaultOccurredWhileAborting:
self._logger.error(f"A fault occurred while aborting - '{self._client_id}'", exc_info=True)
raise
except BaseGrpcException:
self._logger.error(f"Problem in aborting request for '{self._client_id}'", exc_info=True)
raise
@override
def reset(self: PstProcessApiGrpc) -> None:
"""Reset service."""
try:
self.stop_monitoring()
self._logger.debug(f"About to call reset on {self._grpc_client._client_id} gRPC.")
self._grpc_client.reset()
except TimeoutException:
self._logger.warning(f"reset call timed out for {self._client_id}", exc_info=True)
except BaseGrpcException:
self._logger.error(f"Error raised while resetting '{self._client_id}'", exc_info=True)
raise
@override
def restart(self: PstProcessApiGrpc) -> None:
"""Restart service."""
self.stop_monitoring()
self._logger.debug(f"About to call restart on {self._grpc_client._client_id} gRPC.")
self._grpc_client.restart()
@override
def go_to_fault(self: PstProcessApiGrpc) -> None:
"""
Put remote service into FAULT state.
This is used to put the remote service into a FAULT state to match the status of the LMC component.
"""
try:
self.stop_monitoring()
self._grpc_client.go_to_fault()
except TimeoutException:
self._logger.warning(f"go_to_fault call timed out for {self._client_id}", exc_info=True)
except BaseGrpcException:
self._logger.warning(
f"Error in trying to put remote service '{self._client_id}' in FAULT state.", exc_info=True
)
@override
def get_env(self: PstProcessApiGrpc) -> dict:
"""Get the environment properties from the remote gRPC service."""
return self._grpc_client.get_env()
@override
def set_log_level(self: PstProcessApiGrpc, log_level: LoggingLevel) -> None:
"""
Set the LogLevel of the remote gRPC service.
:param log_level: The required TANGO LoggingLevel.
:returns: None.
"""
try:
self._grpc_client.set_log_level(request=SetLogLevelRequest(log_level=log_level_map[log_level]))
except TimeoutException:
self._logger.warning(f"set_log_level call timed out for {self._client_id}", exc_info=True)
except BaseGrpcException:
self._logger.warning(
f"Error in trying to update remote service '{self._client_id}' LogLevel to {log_level}.",
exc_info=True,
)
[docs] def get_log_level(self: PstProcessApiGrpc) -> LoggingLevel:
"""Get the LogLevel of the remote gRPC service."""
return self._grpc_client.get_log_level()
@override
def stop_monitoring(self: PstProcessApiGrpc, *, wait_for_task: bool = True, **kwargs: Any) -> None:
"""
Stop background monitoring.
By default this will stop the monitoring background task synchronously and
wait for it to complete. However, this functionality can be overriden by
setting ``wait_for_task=False`` which will instruct the background task to
stop but won't wait for the task to have completely stopped. This is
helpful in the case of when the LMC device has been instructed to go into
an OFFLINE mode.
:param wait_for_task: whether to wait for the background monitoring task
to completely stop, defaults to True
:type wait_for_task: bool, optional
"""
if self._monitor_task is not None:
self._logger.debug(f"{self._client_id} stopping monitoring")
self._monitor_task.stop(wait_for_generator=wait_for_task)
self._monitor_task = None
@background_task
@override
def monitor(
self: PstProcessApiGrpc,
monitor_data_callback: MonitorDataCallback,
polling_rate: int = DEFAULT_MONITORING_INTERVAL_MS,
monitor_abort_event: threading.Event | None = None,
) -> None:
"""
Monitor data of remote service.
:param monitor_data_callback: callback to use when there is an
update of the sub-band monitor data.
:param polling_rate: the rate, in milliseconds, at which the monitoring
should poll. The default value is 5000ms (i.e. 5 seconds).
:param monitor_abort_event: a :py:class:`threading.Event` that can be
used to signal to stop monitoring. If not set then the background task
will create one.
"""
try:
if self._monitor_task is not None:
self.stop_monitoring()
self._logger.debug(f"{self._client_id} starting monitoring.")
monitor_abort_event = monitor_abort_event or threading.Event()
item_generator = functools.partial(self._grpc_client.monitor, polling_rate=polling_rate)
def _item_handler(response: MonitorResponse) -> None:
self._strategy.handle_monitor_response(
data=response.monitor_data, callback=monitor_data_callback
)
self._monitor_task = StreamingTask(
task_name=f"{self._client_id} monitor",
item_handler=_item_handler,
item_generator=item_generator,
logger=self._logger,
)
self._monitor_task.start(abort_event=monitor_abort_event)
except Exception:
self._logger.error(f"{self._client_id} monitor() threw an exception.", exc_info=True)
@background_task
@override
def perform_health_check(
self: PstProcessApiGrpc,
health_check_handler: HealthCheckHandler,
health_check_interval: int = DEFAULT_HEALTH_CHECK_INTERVAL_MS,
health_check_abort_event: threading.Event | None = None,
) -> None:
"""
Perform health check of a process in the background.
This method will call the :py:meth:`PstGrpcLmcClient.perform_health_check`
which will stream back health check state items from the remote gRPC
service. This method will then delegate the requests to the provided
``health_check_handler`` instance.
This method will catch exceptions from the gRPC API client and
create a :py:class:`HealthCheckState` instance with exception property
being set to the exception that was thrown by the API.
:param health_check_handler: an object that implements the
``HealthCheckHandler`` protocol. Any health check state
object that is returned from the service is delegated to
this handler to be handled.
:type health_check_handler: HealthCheckHandler
:param health_check_interval: the interval, in milliseconds, at which
health check should be perform, defaults to 1000 (i.e. 1 second).
:type health_check_interval: int, optional
:param health_check_abort_event: a threading primitive to be used
to stop the health check by an external mechanism, defaults to None
:type health_check_abort_event: threading.Event | None, optional
"""
try:
if self._health_check_task is not None:
self.stop_health_check()
self._logger.debug(f"Starting background task for health check of {self._client_id}")
health_check_abort_event = health_check_abort_event or threading.Event()
def _exception_handler(exception: Exception) -> None:
self._logger.debug(f"Received exception during health check for {self._client_id}")
state = HealthCheckState(
service_name=self._grpc_client._service_name, # type: ignore
service_uuid=self._grpc_client._service_uuid, # type: ignore
exception=exception,
)
health_check_handler.handle_health_check_state(state)
item_generator = functools.partial(
self._grpc_client.perform_health_check, health_check_interval=health_check_interval
)
self._health_check_task = StreamingTask(
task_name=f"{self._client_id} health check",
item_handler=health_check_handler.handle_health_check_state,
item_generator=item_generator,
exception_handler=_exception_handler,
)
self._health_check_task.start(abort_event=health_check_abort_event)
except Exception:
self._logger.error(
f"{self._client_id} perform_health_check() threw an exception. Exiting background thread",
exc_info=True,
)
@override
def stop_health_check(self: PstProcessApiGrpc, *, wait_for_task: bool = True, **kwargs: Any) -> None:
"""
Stop performing health check of service.
By default this will stop the health check background task synchronously and
wait for it to complete. However, this functionality can be overriden by
setting ``wait_for_task=False`` which will instruct the background task to
stop but won't wait for the task to have completely stopped. This is
helpful in the case of when the LMC device has been instructed to go into
an OFFLINE mode.
:param wait_for_task: whether to wait for the background health check task
to completely stop, defaults to True
:type wait_for_task: bool, optional
"""
if self._health_check_task is not None:
self._logger.debug(f"{self._client_id} stopping health check")
self._health_check_task.stop(wait_for_generator=wait_for_task)
self._health_check_task = None
@override
def get_state(self: PstProcessApiGrpc) -> ObsState:
"""Get the ObsState of the remote system."""
return self._grpc_client.get_state()