"""Manages rest type calls to a tango gql interface."""
import asyncio
import atexit
import logging
from threading import Event, Lock
from typing import Any, Dict, NamedTuple, Union
from aiohttp import ClientConnectorError, ClientResponseError, ServerDisconnectedError
from . import queries
from .factories import TBridgeFactory
logger = logging.getLogger(__name__)
[docs]class RestCallError(Exception):
"""Indicates an error during a rest based call to tango gql."""
pass
[docs]class RestHealthError(TimeoutError):
"""Indicates a timeout waiting for tango gql rest interface to be healthy."""
pass
[docs]class RestStatus(NamedTuple):
"""Bundle rest status as a monitoring and tangogql Event Tuple."""
monitoring = Event()
tangogql_healthy = Event()
[docs]class RestController:
"""Monitors and controls the tango gql connection for rest base calls."""
monitor_polling_period = 5
def __init__(self, factory: TBridgeFactory = TBridgeFactory()) -> None:
"""Initialise the object.
:param factory: The factory to use for getting a controller thread, defaults to
TBridgeFactory()
:type factory: TBridgeFactory
:raises RestHealthError: When a connection can not be established to the rest interface
"""
self.monitoring = Event()
self._lock = Lock()
self.tangogql_healthy = Event()
self._controller = factory.get_controller()
authenticated_user = factory.get_new_authenticated_user()
if authenticated_user.auth:
self.client_container = factory.get_graphql_client(
cookies=authenticated_user.cookies
)
else:
self.client_container = factory.get_graphql_client()
self._factory = factory
self.monitoring.set()
self.rest_deamon = self._load_rest_controller()
result = self.tangogql_healthy.wait(15)
if not result:
raise RestHealthError(
"Unable to create an initialization connection to "
f"{self.client_container.url} after waiting 10 seconds"
)
logger.info(
f"rest interface for tangogql initialized on {self.client_container.url}"
)
atexit.register(self.tear_down)
def _load_rest_controller(self):
return self._controller.dispatch_concurrent_routine(
self._rest_deamon_routine(), "monitor rest availability"
)
[docs] def stop(self):
"""Stop minitoring and control threads."""
self.monitoring.clear()
self.rest_deamon.cancel()
[docs] def tear_down(self):
"""Stop minitoring and control threads."""
self.monitoring.clear()
self.rest_deamon.cancel()
[docs] def reload(self):
"""Reload and initialise the tango gql."""
# TODO implement reload
pass
async def _rest_deamon_routine(self):
client, url = self.client_container
query = queries.rest_info()
disconnected_flag = False
while self.monitoring.is_set():
try:
await client.execute_async(query)
self.tangogql_healthy.set()
logger.debug("rest interface is healthy")
if disconnected_flag:
disconnected_flag = False
logger.info("Rest interface connection restored")
except ClientResponseError:
logger.warning(
f"Unable to reach rest service {url}, got response error."
)
self.tangogql_healthy.clear()
disconnected_flag = True
except (ConnectionError, ClientConnectorError, ServerDisconnectedError):
logger.warning(
f"Unable to reach rest service {url}, general connection failure."
)
self.tangogql_healthy.clear()
disconnected_flag = True
await asyncio.sleep(self.monitor_polling_period)
@property
def url(self) -> str:
"""Return the tango gql rest url for logging purposes.
:return: The http rest url
"""
return self.client_container.url
[docs] def update_authentication(self):
"""Re initialise connection based on un updated authentication."""
authenticated_user = self._factory.get_new_authenticated_user()
# disable monitoring
with self._lock:
self.monitoring.clear()
# wait for task to finish gracefully
self.rest_deamon.result()
# redefine client
self.client_container = self._factory.get_graphql_client(
cookies=authenticated_user.cookies
)
# restart monitoring
self.monitoring.set()
self.rest_deamon = self._controller.dispatch_concurrent_routine(
self._rest_deamon_routine()
)
[docs] def call_graphql(
self,
query: str,
variables: Union[Dict[Any, Any], None] = None,
operation_name: Union[str, None] = None,
**kwargs,
) -> Any:
"""Call a graph gql based query to the tango gql service.
If the tango gql is not available at the time of call, the program
will block for twice the allocated montoring poll period before raising a RestCallError.
In other words it will allow for the montiring loop to check two more times if the service,
becomes available before raising an exception.
:param query: The tango gql query as an encoded string
:type query: str
:param variables: Any graph gql variables to associate with the query, defaults to None
:type variables: Union[Dict[Any, Any], None], optional
:param operation_name: The gql operation to use, defaults to None
:type operation_name: Union[str, None], optional
:param kwargs: Any additional keyword arguments to pass on to the tangogql connector
:raises RestCallError: If call did not succeed within allocated waiting period
:raises ClientResponseError: If service responded with an unhandled exception
:return: The result of the call
"""
client, url = self.client_container
if not self.tangogql_healthy.is_set():
try:
# wait for 2 x monitoring poll time in case it comes online
self.tangogql_healthy.wait(self.monitor_polling_period * 2)
except TimeoutError as exception:
raise RestCallError(
f"Unable to call tangogql as {url} is not online"
) from exception
try:
return client.execute(query, variables, operation_name, **kwargs)
# return self._controller.run_async_task(
# client.execute_async(query, variables, operation_name, **kwargs)
# )
except ClientResponseError as exception:
logger.warning(f"sending rest query failed on {url}: {query}")
if exception.status != 200:
raise RestCallError(
f"call to {url} failed, status message: {exception}"
) from exception
raise exception