# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for providing the gRPC LMC client to external processes."""
from __future__ import annotations
import functools
import logging
from threading import Event
from typing import Any, Callable, Dict, Generator, NoReturn, Type, TypeVar, cast
import grpc
from grpc import Channel, StatusCode
from ska_control_model import ObsState
from ska_pst.common.constants import DEFAULT_HEALTH_CHECK_INTERVAL_MS, DEFAULT_MONITORING_INTERVAL_MS
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
AbortRequest,
AsciiHeaderProto,
ConfigureBeamRequest,
ConfigureScanRequest,
ConnectionRequest,
ConnectionResponse,
DeconfigureBeamRequest,
DeconfigureScanRequest,
EnvValue,
ErrorCode,
GetBeamConfigurationRequest,
GetBeamConfigurationResponse,
GetEnvironmentRequest,
GetLogLevelRequest,
GetScanConfigurationRequest,
GetScanConfigurationResponse,
GetStateRequest,
GetStateResponse,
GoToFaultRequest,
HealthCheckRequest,
HealthCheckResponse,
LogLevel,
MonitorRequest,
MonitorResponse,
ResetRequest,
RestartRequest,
SetLogLevelRequest,
StartScanRequest,
Status,
StopScanRequest,
)
from ska_pst.grpc.lmc.ska_pst_lmc_pb2_grpc import PstLmcServiceStub
from ska_pst.lmc.health_check import HealthCheckState
from ska_pst.lmc.util.timeout_iterator import TimeoutIterator
from ska_pydada import AsciiHeader
GRPC_STATUS_DETAILS_METADATA_KEY = "grpc-status-details-bin"
class BaseGrpcException(Exception):
"""Base exception to capture gRPC related exceptions."""
def __init__(self: BaseGrpcException, message: str) -> None:
"""Initialise exception."""
self.message = message
super().__init__()
def __str__(self: BaseGrpcException) -> str:
"""
Get the exception message.
This is used in Python f-strings to get the message of the
base exception. The SKA base classes expect a string
representation of the exception.
:return: the exception message
:rtype: str
"""
return self.message
def __repr__(self: BaseGrpcException) -> str:
"""
Get a string representation of the exception.
This method is different to the ``__str__`` method
as it also includes the class name of the exception.
:return: a string representation of the exception.
:rtype: str
"""
cls = self.__class__.__name__
return f"{cls}('{self.message}')"
class AlreadyScanningException(BaseGrpcException):
"""
Exception for when the process is already scanning.
Raised when the server is already scanning and is in the SCANNING ObsState state. If this exception is
raised it is likely due to a mismatch in the state model of the LMC and the server, which could be the
case if a command line interface has interacted with the server directly.
The LMC can recover from this as it should only be raised when the scan command is called. The LMC should
log this happened but can safely go into SCANNING state.
"""
class NotScanningException(BaseGrpcException):
"""
Exception for when trying to end scan but component is not scanning.
Raised when the server is not in a scanning state but received an
end scan command. Just like :py:class:`AlreadyScanningException`
it is possible for the LMC to recover from this as this exception
is only raised during stop_scan. The LMC should log this happened
but can safely go into a READY state.
"""
class ResourcesAlreadyAssignedException(BaseGrpcException):
"""
Exception for when resources were already assigned.
Raised when the server is already in an assigned resources state and the request should not have been
called.
"""
class ResourcesNotAssignedException(BaseGrpcException):
"""
Exception for when resources have not been assigned.
Raised when the server does not have any resources assigned. This request should not have been called.
"""
class ScanConfiguredAlreadyException(BaseGrpcException):
"""
Exception for when scan has already been configured.
Raised when the server is in a READY state and is already configured for scan. This request should have
not been made.
"""
class NotConfiguredForScanException(BaseGrpcException):
"""
Exception for when server has no scan configuration.
Raised when the server does not have a scan configuration but as request to deconfigure, scan, or get scan
configuration was made but no configuration existed.
"""
class FaultOccurredWhileAborting(BaseGrpcException):
"""
Exception for when server went into a FAULT state while being commanded to ABORT.
Both ABORT and FAULT states are resettable, as such we don't want the LMC command to fail but provide
a warning that this occurred.
"""
class InvalidRequestException(BaseGrpcException):
"""
Exception with the actual request parameters.
This is raised when the server validates the request and request is not correct, such as the assign
resources message has a protobuf Oneof field for resources and the incorrect one was applied.
"""
class ServerError(BaseGrpcException):
"""
Exception when an exception on the server side happens.
The server raised an exception during the processing of the request and the logs of the server should be
checked. The client is not expected to handle this exception.
"""
def __init__(self: ServerError, error_code: int, message: str, **kwargs: Any) -> None:
"""Initialise exception."""
self.error_code = error_code
super().__init__(message)
class ServiceUnavailable(ServerError):
"""Exception raised when the gRPC service is not available."""
class TimeoutException(ServerError):
"""Exception raised when the gRPC service takes longer than expected."""
def __init__(self: TimeoutException, *, timeout: float, **kwargs: Any) -> None:
"""Initialise exception."""
self.timeout = timeout
super().__init__(**kwargs)
class UnknownGrpcException(BaseGrpcException):
"""
An unknown gRPC exception.
This error occurs due to gRPC itself. The client is not expected to handle this request.
"""
def __init__(self: UnknownGrpcException, error_code: int, message: str) -> None:
"""Initialise exception."""
self.error_code = error_code
super().__init__(message)
ERROR_CODE_EXCEPTION_MAP: Dict[ErrorCode, Type[BaseGrpcException]] = {
ErrorCode.ALREADY_SCANNING: AlreadyScanningException,
ErrorCode.NOT_SCANNING: NotScanningException,
ErrorCode.INVALID_REQUEST: InvalidRequestException,
ErrorCode.CONFIGURED_FOR_BEAM_ALREADY: ResourcesAlreadyAssignedException,
ErrorCode.NOT_CONFIGURED_FOR_BEAM: ResourcesNotAssignedException,
ErrorCode.CONFIGURED_FOR_SCAN_ALREADY: ScanConfiguredAlreadyException,
ErrorCode.NOT_CONFIGURED_FOR_SCAN: NotConfiguredForScanException,
ErrorCode.FAULT_OCCURRED_WHILE_ABORTING: FaultOccurredWhileAborting,
}
Wrapped = TypeVar("Wrapped", bound=Callable[..., Any])
def grpc_request(func: Wrapped) -> Wrapped:
"""
Return a decorated function that handles gRPC requests.
Methods decorated with this decorator will perform
a gRPC call and this decorator will handle mapping
gRPC errors into the PST specific errors.
:param func: the wrapped function
:type func: Wrapped
:return: the wrapped function
"""
@functools.wraps(func)
def _wrapper(
client: PstGrpcLmcClient,
*args: Any,
timeout: float | None = None,
**kwargs: Any,
) -> None:
timeout = timeout or client.default_timeout
try:
return func(client, *args, timeout=timeout, **kwargs)
except Exception as e:
_handle_grpc_error(e, timeout=timeout)
return cast(Wrapped, _wrapper)
def _handle_server_error(error: grpc.RpcError, timeout: float) -> None:
if hasattr(error, "code"):
error_code = error.code()
if error_code in [
StatusCode.FAILED_PRECONDITION,
StatusCode.INTERNAL,
StatusCode.INVALID_ARGUMENT,
]:
return
if hasattr(error, "details"):
message = error.details()
else:
message = "Unknown"
if error_code == StatusCode.UNAVAILABLE:
raise ServiceUnavailable(error_code=error_code, message=message) from error
elif error_code == StatusCode.DEADLINE_EXCEEDED:
raise TimeoutException(timeout=timeout, error_code=error_code, message=message) from error
raise ServerError(error_code=error_code, message=message) from error
def _handle_pst_error(error: grpc.RpcError) -> None:
grpc_error_code = StatusCode.UNKNOWN
if hasattr(error, "code"):
grpc_error_code = error.code()
if grpc_error_code not in [
StatusCode.FAILED_PRECONDITION,
StatusCode.INTERNAL,
StatusCode.INVALID_ARGUMENT,
]:
return
if hasattr(error, "trailing_metadata"):
for k, v in error.trailing_metadata():
if k == GRPC_STATUS_DETAILS_METADATA_KEY:
msg = Status()
msg.ParseFromString(v)
error_code = msg.code
if error_code in ERROR_CODE_EXCEPTION_MAP:
raise ERROR_CODE_EXCEPTION_MAP[error_code](msg.message) from error
def _handle_grpc_error(error: grpc.RpcError, timeout: float) -> NoReturn:
_handle_server_error(error, timeout)
_handle_pst_error(error)
# if here, we have an unknown error
if hasattr(error, "code"):
error_code = error.code()
else:
error_code = -1
logging.warning(f"Unknown exception found. {error_code=}, {error.details()=}", exc_info=True)
raise UnknownGrpcException(error_code, error.details()) from error
[docs]class PstGrpcLmcClient:
"""
The client API that connects to a remote gRPC service.
This client is a wrapper around the :py:class:`PstLmcServiceStub`
that is generated from the gRPC/Protobuf bindings.
Once fully implemented this class will be able to be used by
any of the LMC components :py:class:`PstProcessApi` implementations.
"""
_client_id: str
_channel: Channel
_endpoint: str
_service: PstLmcServiceStub
_logger: logging.Logger
_service_name: str | None
_service_uuid: str | None
def __init__(
self: PstGrpcLmcClient,
client_id: str,
endpoint: str,
default_timeout: float = 60.0,
logger: logging.Logger | None = None,
**kwargs: Any,
) -> None:
"""
Initialise gRPC client.
:param client_id: the client id to allow for tracing in logs.
:type client_id: str
:param endpoint: the gRPC endpoint to send requests to.
:type endpoint: str
:param default_timeout: the default timeout, in seconds, to use for all requests,
defaults to 60.0 (e.g. 1 minute)
:type default_timeout: float, optional
:param logger: the logger to use for logging information from client, defaults to None
:type logger: logging.Logger | None, optional
"""
self._logger = logger or logging.getLogger(__name__)
self._client_id = client_id
self._endpoint = endpoint
self._logger.info(f"Connecting '{client_id}' to remote endpoint '{endpoint}'")
self._channel = grpc.insecure_channel(endpoint, options=[("wait_for_ready", True)])
self._service = PstLmcServiceStub(channel=self._channel)
self._service_name = None
self._service_uuid = None
self.default_timeout = default_timeout
[docs] @grpc_request
def connect(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None:
"""
Connect client to the remote gRPC service.
This is used to let the server know that a client has connected.
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
"""
timeout = timeout or self.default_timeout
self._logger.debug(f"Connect called for client {self._client_id}")
request = ConnectionRequest(client_id=self._client_id)
response: ConnectionResponse = self._service.connect(request, timeout=timeout)
self._logger.info(f"Connected to {response.service_name} identified with UUID={response.uuid}")
self._service_name = response.service_name
self._service_uuid = response.uuid
[docs] @grpc_request
def get_beam_configuration(
self: PstGrpcLmcClient, timeout: float | None = None
) -> GetBeamConfigurationResponse:
"""
Call get_beam_configuration on remote gRPC service.
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
:return: the current beam configuration on the remote gRPC service.
:rtype: GetBeamConfigurationResponse
"""
timeout = timeout or self.default_timeout
self._logger.debug(f"Getting beam configuration from '{self._client_id}'.")
return self._service.get_beam_configuration(GetBeamConfigurationRequest(), timeout=timeout)
[docs] @grpc_request
def get_scan_configuration(
self: PstGrpcLmcClient, timeout: float | None = None
) -> GetScanConfigurationResponse:
"""
Call get_scan_configuration on remote gRPC service.
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
:return: the current scan configuration on the remote gRPC service
:rtype: GetScanConfigurationResponse
"""
timeout = timeout or self.default_timeout
self._logger.debug(f"Getting scan configuration from '{self._client_id}'.")
return self._service.get_scan_configuration(GetScanConfigurationRequest(), timeout=timeout)
[docs] @grpc_request
def start_scan(
self: PstGrpcLmcClient, request: StartScanRequest, timeout: float | None = None, **kwargs: Any
) -> None:
"""
Call start_scan on remote gRPC service.
:param request: the start scan request to send to the remote gRPC service
:type request: StartScanRequest
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
"""
timeout = timeout or self.default_timeout
self._service.start_scan(request, timeout=timeout)
[docs] @grpc_request
def stop_scan(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None:
"""
Call stop_scan on remote gRPC service.
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
"""
timeout = timeout or self.default_timeout
self._logger.debug(f"Stopping scan on '{self._client_id}'.")
self._service.stop_scan(StopScanRequest(), timeout=timeout)
[docs] @grpc_request
def go_to_fault(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None:
"""
Put the gRPC service in to a FAULT state.
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
"""
timeout = timeout or self.default_timeout
self._logger.debug(f"Forcing '{self._client_id}' into FAULT state.")
self._service.go_to_fault(GoToFaultRequest(), timeout=timeout)
[docs] @grpc_request
def get_state(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> ObsState:
"""
Call get_state on remote gRPC service.
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
:return: the current observation state of the remote gRPC service
:rtype: ObsState
"""
timeout = timeout or self.default_timeout
self._logger.debug(f"Calling get_state for '{self._client_id}'.")
result: GetStateResponse = self._service.get_state(GetStateRequest(), timeout=timeout)
return ObsState(result.state)
[docs] @grpc_request
def get_env(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> dict:
"""
Get the environment values from the remote gRPC service.
This will map the Protobuf `EnvVal` objects to the appropriate
Python types.
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
:return: the environment values from the remote gRPC service.
:rtype: dict
"""
timeout = timeout or self.default_timeout
def _map_value(value: EnvValue) -> Any:
if value.HasField("string_value"):
return value.string_value
elif value.HasField("float_value"):
return value.float_value
elif value.HasField("signed_int_value"):
return value.signed_int_value
else:
return value.unsigned_int_value
response = self._service.get_env(GetEnvironmentRequest(), timeout=timeout)
values = response.values
return {k: _map_value(values[k]) for k in values}
[docs] @grpc_request
def abort(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None:
"""
Abort scanning.
This method is to be used by the LMC device that needs to abort a long running action, in particular
scan. The ObsState model allows for this to be called if in IDLE (resources assigned), CONFIGURING
(configuring a scan), READY (configured for a scan but not scanning), SCANNING (a scan is running), or
RESETTING (is trying to reset from ABORTED/FAULT state).
After this call the state of the service should be ABORTED.
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
"""
timeout = timeout or self.default_timeout
self._logger.debug(f"Calling Abort on '{self._client_id}'.")
self._service.abort(AbortRequest(), timeout=timeout)
[docs] @grpc_request
def reset(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> None:
"""
Reset service.
This method is to be used by the LMC device that is currently in an ABORTED or FAULT state to reset
the service. After this call the state of the service should be in IDLE (resources assigned and not
configured for a scan).
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
"""
timeout = timeout or self.default_timeout
self._logger.debug(f"Calling Reset on '{self._client_id}'.")
self._service.reset(ResetRequest(), timeout=timeout)
[docs] @grpc_request
def restart(self: PstGrpcLmcClient, timeout: float | None = None) -> None:
"""
Restart service.
This method is to be used by the LMC device to restart a service regardless of its ObsState.
After this call the state of the service should be in IDLE (resources assigned and not
configured for a scan).
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
"""
timeout = timeout or self.default_timeout
try:
self._logger.warning(f"Restart called for {self._client_id}.")
self._service.restart(RestartRequest(), timeout=timeout)
except grpc.RpcError:
self._logger.warning(f"Service {self._client_id} is unavailable")
[docs] def monitor(
self: PstGrpcLmcClient,
abort_event: Event,
*,
polling_rate: int = DEFAULT_MONITORING_INTERVAL_MS,
) -> Generator[MonitorResponse, None, None]:
"""
Call monitor on remote gRPC service.
This method is not decorated with the ``grpc_request`` as
it handles timeout differently but this does handle the gRPC
errors just like the ``grpc_request`` decorator does.
:param abort_event: a :py:class:`threading.Event` that can be
used to signal to stop monitoring.
:type abort_event: Event
:param polling_rate: the rate, in milliseconds, at which the monitoring
should poll. The default value is 5000ms (i.e. 5 seconds).
:type polling_rate: int, optional
:yield: the current monitoring state from the remote gRPC service
:rtype: Generator[MonitorResponse, None, None]
"""
self._logger.debug(f"Starting background monitoring on '{self._client_id}'.")
# have timeout set to max of 5s or 2 times the polling interval
timeout = max(5.0, 2.0 * polling_rate / 1000.0)
try:
monitor_stream = self._service.monitor(MonitorRequest(polling_rate=polling_rate))
self._monitor_stream = TimeoutIterator(
iterator=monitor_stream,
timeout=timeout,
abort_event=abort_event,
expected_period=polling_rate / 1000.0,
)
while not abort_event.is_set():
try:
for t in self._monitor_stream:
yield t
except TimeoutError:
if abort_event.is_set():
# received timeout during abort event being set.
continue
self._logger.warning(
f"received timeout during monitoring for '{self._client_id}' before abort event set."
)
except grpc.RpcError as e:
_handle_grpc_error(e, timeout=timeout)
[docs] @grpc_request
def set_log_level(
self: PstGrpcLmcClient, request: SetLogLevelRequest, timeout: float | None = None, **kwargs: Any
) -> None:
"""
Set the LogLevel of the remote gRPC service.
:param request: The request containing LogLevel to be set on the remote gRPC service.
:type request: SetLogLevelRequest
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
"""
timeout = timeout or self.default_timeout
self._logger.debug(f"Setting log level on '{self._client_id}'. log_level={request.log_level}")
self._service.set_log_level(request=request, timeout=timeout)
[docs] @grpc_request
def get_log_level(self: PstGrpcLmcClient, timeout: float | None = None, **kwargs: Any) -> LogLevel:
"""
Get the LogLevel of the remote gRPC service.
:param timeout: the timeout, in seconds, for the request, defaults to ``self.default_timeout``
:type timeout: float | None, optional
:returns: The current LogLevel of the remote gRPC service.
:rtype: LogLevel
"""
timeout = timeout or self.default_timeout
self._logger.debug(f"Calling get_log_level for '{self._client_id}'.")
return self._service.get_log_level(request=GetLogLevelRequest(), timeout=timeout)
def __repr__(self: PstGrpcLmcClient) -> str:
"""
Get a string representation of instance.
:return: a string representation of instance
:rtype: str
"""
return f"PstGrpcLmcClient(client_id='{self._client_id}', endpoint='{self._endpoint}')"