Source code for ska_ser_skallop.connectors.remoting.tangobridge.restcontrol

"""Manages rest type calls to a tango gql interface."""
import asyncio
import atexit
import logging
from threading import Event, Lock
from typing import Any, Dict, NamedTuple, Union

from aiohttp import ClientConnectorError, ClientResponseError, ServerDisconnectedError

from . import queries
from .factories import TBridgeFactory

logger = logging.getLogger(__name__)


[docs]class RestCallError(Exception): """Indicates an error during a rest based call to tango gql.""" pass
[docs]class RestHealthError(TimeoutError): """Indicates a timeout waiting for tango gql rest interface to be healthy.""" pass
[docs]class RestStatus(NamedTuple): """Bundle rest status as a monitoring and tangogql Event Tuple.""" monitoring = Event() tangogql_healthy = Event()
[docs]class RestController: """Monitors and controls the tango gql connection for rest base calls.""" monitor_polling_period = 5 def __init__(self, factory: TBridgeFactory = TBridgeFactory()) -> None: """Initialise the object. :param factory: The factory to use for getting a controller thread, defaults to TBridgeFactory() :type factory: TBridgeFactory :raises RestHealthError: When a connection can not be established to the rest interface """ self.monitoring = Event() self._lock = Lock() self.tangogql_healthy = Event() self._controller = factory.get_controller() authenticated_user = factory.get_new_authenticated_user() if authenticated_user.auth: self.client_container = factory.get_graphql_client( cookies=authenticated_user.cookies ) else: self.client_container = factory.get_graphql_client() self._factory = factory self.monitoring.set() self.rest_deamon = self._load_rest_controller() result = self.tangogql_healthy.wait(15) if not result: raise RestHealthError( "Unable to create an initialization connection to " f"{self.client_container.url} after waiting 10 seconds" ) logger.info( f"rest interface for tangogql initialized on {self.client_container.url}" ) atexit.register(self.tear_down) def _load_rest_controller(self): return self._controller.dispatch_concurrent_routine( self._rest_deamon_routine(), "monitor rest availability" )
[docs] def stop(self): """Stop minitoring and control threads.""" self.monitoring.clear() self.rest_deamon.cancel()
[docs] def tear_down(self): """Stop minitoring and control threads.""" self.monitoring.clear() self.rest_deamon.cancel()
[docs] def reload(self): """Reload and initialise the tango gql.""" # TODO implement reload pass
async def _rest_deamon_routine(self): client, url = self.client_container query = queries.rest_info() disconnected_flag = False while self.monitoring.is_set(): try: await client.execute_async(query) self.tangogql_healthy.set() logger.debug("rest interface is healthy") if disconnected_flag: disconnected_flag = False logger.info("Rest interface connection restored") except ClientResponseError: logger.warning( f"Unable to reach rest service {url}, got response error." ) self.tangogql_healthy.clear() disconnected_flag = True except (ConnectionError, ClientConnectorError, ServerDisconnectedError): logger.warning( f"Unable to reach rest service {url}, general connection failure." ) self.tangogql_healthy.clear() disconnected_flag = True await asyncio.sleep(self.monitor_polling_period) @property def url(self) -> str: """Return the tango gql rest url for logging purposes. :return: The http rest url """ return self.client_container.url
[docs] def update_authentication(self): """Re initialise connection based on un updated authentication.""" authenticated_user = self._factory.get_new_authenticated_user() # disable monitoring with self._lock: self.monitoring.clear() # wait for task to finish gracefully self.rest_deamon.result() # redefine client self.client_container = self._factory.get_graphql_client( cookies=authenticated_user.cookies ) # restart monitoring self.monitoring.set() self.rest_deamon = self._controller.dispatch_concurrent_routine( self._rest_deamon_routine() )
[docs] def call_graphql( self, query: str, variables: Union[Dict[Any, Any], None] = None, operation_name: Union[str, None] = None, **kwargs, ) -> Any: """Call a graph gql based query to the tango gql service. If the tango gql is not available at the time of call, the program will block for twice the allocated montoring poll period before raising a RestCallError. In other words it will allow for the montiring loop to check two more times if the service, becomes available before raising an exception. :param query: The tango gql query as an encoded string :type query: str :param variables: Any graph gql variables to associate with the query, defaults to None :type variables: Union[Dict[Any, Any], None], optional :param operation_name: The gql operation to use, defaults to None :type operation_name: Union[str, None], optional :param kwargs: Any additional keyword arguments to pass on to the tangogql connector :raises RestCallError: If call did not succeed within allocated waiting period :raises ClientResponseError: If service responded with an unhandled exception :return: The result of the call """ client, url = self.client_container if not self.tangogql_healthy.is_set(): try: # wait for 2 x monitoring poll time in case it comes online self.tangogql_healthy.wait(self.monitor_polling_period * 2) except TimeoutError as exception: raise RestCallError( f"Unable to call tangogql as {url} is not online" ) from exception try: return client.execute(query, variables, operation_name, **kwargs) # return self._controller.run_async_task( # client.execute_async(query, variables, operation_name, **kwargs) # ) except ClientResponseError as exception: logger.warning(f"sending rest query failed on {url}: {query}") if exception.status != 200: raise RestCallError( f"call to {url} failed, status message: {exception}" ) from exception raise exception