# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""Module for implementing a test implementations of the gRPC LMC service."""
from __future__ import annotations
import logging
import threading
from concurrent import futures
from dataclasses import dataclass
from typing import Any, Callable, Generator, Optional
import grpc
from grpc import ServicerContext
from ska_pst.grpc.lmc.ska_pst_lmc_pb2 import (
AbortRequest,
AbortResponse,
ConfigureBeamRequest,
ConfigureBeamResponse,
ConfigureScanRequest,
ConfigureScanResponse,
ConnectionRequest,
ConnectionResponse,
DeconfigureBeamRequest,
DeconfigureBeamResponse,
DeconfigureScanRequest,
DeconfigureScanResponse,
ErrorCode,
GetBeamConfigurationRequest,
GetBeamConfigurationResponse,
GetEnvironmentRequest,
GetEnvironmentResponse,
GetLogLevelRequest,
GetLogLevelResponse,
GetScanConfigurationRequest,
GetScanConfigurationResponse,
GetStateRequest,
GetStateResponse,
GoToFaultRequest,
GoToFaultResponse,
HealthCheckRequest,
HealthCheckResponse,
MonitorRequest,
MonitorResponse,
ResetRequest,
ResetResponse,
SetLogLevelRequest,
SetLogLevelResponse,
StartScanRequest,
StartScanResponse,
Status,
StopScanRequest,
StopScanResponse,
)
from ska_pst.grpc.lmc.ska_pst_lmc_pb2_grpc import PstLmcServiceServicer
from ska_pst.lmc.component.grpc_lmc_client import GRPC_STATUS_DETAILS_METADATA_KEY
from ska_pst.lmc.util.callback import callback_safely
__all__ = [
"TestMockServicer",
"TestPstLmcService",
]
@dataclass
class TestGrpcStatus(grpc.Status):
code: grpc.StatusCode
details: str
trailing_metadata: Optional[Any] = None
class TestMockException(Exception):
# Disable PyTest thinking class is test suite class
__test__: bool = False
def __init__(
self: TestMockException,
grpc_status_code: grpc.StatusCode,
message: str,
error_code: Optional[ErrorCode] = None,
) -> None:
self.grpc_status_code = grpc_status_code
self.error_code = error_code
self.message = message
def as_grpc_status(self: TestMockException) -> TestGrpcStatus:
if self.error_code:
status = Status(code=self.error_code, message=self.message)
return TestGrpcStatus(
code=self.grpc_status_code,
details=self.message,
trailing_metadata=((GRPC_STATUS_DETAILS_METADATA_KEY, status.SerializeToString()),),
)
else:
return TestGrpcStatus(
code=self.grpc_status_code,
details=self.message,
)
[docs]class TestMockServicer(PstLmcServiceServicer):
"""
Test Servicer that acts on the requests sent to the test server.
This is meant to be used within testing frameworks to allow asserting that the API is called.
"""
# Disable PyTest thinking class is test suite class
__test__: bool = False
def __init__(
self: TestMockServicer,
context: Any,
logger: logging.Logger,
) -> None:
"""
Initialise the test mock servicer.
All requests will be delegated to the context parameter which should be a mock that can then be used
to assert calls and can return results when called.
:param context: this should be any object that can be used to assert or create errors (i.e. a mock).
"""
self._context = context
self._logger = logger
[docs] def connect(
self: TestMockServicer, request: ConnectionRequest, context: ServicerContext
) -> ConnectionResponse:
"""Handle connection request from client."""
self._logger.debug("connect request")
return self._context.connect(request)
[docs] def get_beam_configuration(
self: TestMockServicer, request: GetBeamConfigurationRequest, context: ServicerContext
) -> GetBeamConfigurationResponse:
"""Handle getting the assigned resources."""
self._logger.debug("get_beam_configuration called.")
try:
return self._context.get_beam_configuration(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def get_scan_configuration(
self: TestMockServicer, request: GetScanConfigurationRequest, context: ServicerContext
) -> GetScanConfigurationResponse:
"""Handle getting the scan configuration."""
self._logger.debug("get_scan_configuration called.")
try:
return self._context.get_scan_configuration(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def start_scan(
self: TestMockServicer, request: StartScanRequest, context: ServicerContext
) -> StartScanResponse:
"""Handle scan."""
self._logger.debug("scan request")
try:
return self._context.start_scan(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def stop_scan(
self: TestMockServicer, request: StopScanRequest, context: ServicerContext
) -> StopScanResponse:
"""Handle end scan."""
self._logger.debug("stop_scan request")
try:
return self._context.stop_scan(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def abort(self: TestMockServicer, request: AbortRequest, context: ServicerContext) -> AbortResponse:
"""Handle end scan."""
self._logger.debug("abort requested")
try:
return self._context.abort(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def reset(self: TestMockServicer, request: ResetRequest, context: ServicerContext) -> ResetResponse:
"""Handle reset."""
self._logger.debug("reset requested")
try:
return self._context.reset(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def get_state(
self: TestMockServicer, request: GetStateRequest, context: ServicerContext
) -> GetStateResponse:
"""Handle getting the state of the service."""
self._logger.debug("get_state called.")
try:
return self._context.get_state(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def go_to_fault(
self: TestMockServicer, request: GoToFaultRequest, context: ServicerContext
) -> GoToFaultResponse:
"""Handle putting service into FAULT state."""
self._logger.debug("go_to_fault called.")
try:
return self._context.go_to_fault(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def monitor(
self: TestMockServicer, request: MonitorRequest, context: ServicerContext
) -> Generator[MonitorResponse, None, None]:
"""Handle monitor."""
self._logger.debug("monitor request")
try:
return self._context.monitor(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def get_env(
self: TestMockServicer, request: GetEnvironmentRequest, context: ServicerContext
) -> GetEnvironmentResponse:
"""Get environment."""
self._logger.debug("get_env")
try:
return self._context.get_env(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def set_log_level(
self: TestMockServicer, request: SetLogLevelRequest, context: ServicerContext
) -> SetLogLevelResponse:
"""Set Log Level."""
self._logger.debug("set_log_level")
try:
return self._context.set_log_level(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def get_log_level(
self: TestMockServicer, request: GetLogLevelRequest, context: ServicerContext
) -> GetLogLevelResponse:
"""Get Log Level."""
self._logger.debug("get_log_level")
try:
return self._context.get_log_level(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs] def health_check(
self: TestMockServicer, request: HealthCheckRequest, context: ServicerContext
) -> Generator[HealthCheckResponse, None, None]:
"""Handle monitor."""
self._logger.debug("perform health request")
try:
return self._context.perform_health_check(request)
except TestMockException as e:
context.abort_with_status(e.as_grpc_status())
assert False, "Unreachable"
[docs]class TestPstLmcService:
"""
The service class for testing the gRPC LMC service.
This class is designed to be used in unit testing but having instances
of the :py:class:`TestMockServicer` passed in as a parameter or can be
used in an integration test where this could be run dependently with
specific instances of a :py:class:`PstLmcServiceServicer`. For now
there is only the TestMockServicer that has been implemented.
"""
# Disable PyTest thinking class is test suite class
__test__: bool = False
def __init__(
self: TestPstLmcService,
grpc_server: grpc.Server,
logger: Optional[logging.Logger] = None,
**kwargs: Any,
) -> None:
"""
Initialise the service.
This uses a `futures.ThreadPoolExecutor` to create a gRPC server that
exposes an API by the servicer instance that is provided. The service
is exposed on a given port and can listen on a given interface or all.
:param servicer: the :py:class:`PstLmcServiceServicer` that provides the
implementation of the service.
:param port: the port that the service listens on.
:param interface: the network interface (NIC) that the service should listen
on. If not supplied, "0.0.0.0" will be used.
:param max_workers: the maximum number of workers the thread pool will use.
:param logger: the logger to use with the class.
"""
self._logger = logger or logging.getLogger(__name__)
self._server = grpc_server
self._running = False
self._thread: Optional[threading.Thread] = None
def __del__(self: TestPstLmcService) -> None:
"""Drop of the instance has been requested."""
self.stop()
def _serve(self: TestPstLmcService, started_callback: Optional[Callable] = None) -> None:
"""Start the gRPC to serve."""
self._logger.info("Starting server")
try:
self._server.start()
self._running = True
callback_safely(started_callback)
self._server.wait_for_termination()
self._running = False
self._logger.debug("Server has terminated")
except futures.CancelledError:
self._logger.debug("Serve task has been cancelled.")
except KeyboardInterrupt:
self._logger.debug("Interrupt has been requested. Exiting serve")
except Exception:
self._logger.error("Unknown exception has happened while serving.", exc_info=True)
[docs] def serve(self: TestPstLmcService, started_callback: Optional[Callable] = None) -> None:
"""
Start the gRPC server to serve requests.
This method should be called synchronously as this method will set up the gRPC in a background thread
itself. If the client wants to be notified when the background thread is serving then a
started_callback should be passed.
:param started_callback: a callback that will be called when the background thread is running,
defaults to None.
:type started_callback: Optional[Callable], optional
"""
t = threading.Thread(target=self._serve, args=(started_callback,))
t.start()
self._thread = t
[docs] def stop(self: TestPstLmcService) -> None:
"""Stop the background serving of requests."""
if self._running:
self._logger.debug("Stopping test gRPC service.")
self._server.stop(grace=None)
self._logger.debug("Service stopped.")
if self._thread is not None:
self._thread.join()
self._thread = None