"""Implements a bridge connection between a tango gql service and a client."""
import logging
import os
from typing import Any, Dict, List, Union
from ska_ser_skallop.subscribing import base
from ska_ser_skallop.utils.singleton import Singleton
from .configuration import Settings
from .factories import TBridgeFactory
from .restcontrol import RestController
from .subscribing import DeviceAttributeSubscriber, DeviceSubscriptionCallback
from .wscontrol import WSController
# create logger
logger = logging.getLogger(__name__)
[docs]class TangoBridge(metaclass=Singleton):
"""Class that realizes a connection to a tango gql interface.
The class provides a client with a REST interface to call gql
queries on as well as creating and maintaining websocket subscriptions
to tango event producers.
Note this class is a singleton and gets created and initialised only once.
"""
settings = Settings("taranta", "graphiql")
monitor_polling_period = 0.2
subscriber_polling_rate: float = 15
def __init__(
self,
factory: TBridgeFactory = TBridgeFactory(),
subscriber_polling_rate: float = 15,
) -> None:
"""Initialize a TangoBridge class.
:param factory: The factory that will be used to get the
implementations of a tangogql rest and websocket interface,
defaults to TBridgeFactory()
:type factory: TBridgeFactory
:param subscriber_polling_rate: the rate at which a websocket
subscription will be checked for being healthy, defaults to 15
:type subscriber_polling_rate: float
"""
# start controller deamon
self._controller = factory.get_controller()
# start rest_controller thread
self._rest_controller = RestController(factory)
if not os.getenv("USE_ONLY_POLLING"):
# start ws_controller thread
self._ws_controller = WSController(factory)
self.subscriber = DeviceAttributeSubscriber(
self._ws_controller, subscriber_polling_rate
)
@property
def url(self) -> str:
"""Return the url used for connecting to a tangogql service.
:return: The url used for connecting to a tangogql service
:rtype: str
"""
return self._rest_controller.url
@property
def tango_gql_healthy(self) -> bool:
"""Indicate whether the tango gql rest service is still available.
:return: Whether the tango gql rest service is still available
:rtype: bool
"""
return self._rest_controller.tangogql_healthy.is_set()
@property
def tango_subscriptions_healthy(self) -> bool:
"""Indicate whether the tango gql websocket service is still available.
:return: Whether the tango gql websocket service is still available.
:rtype: bool
"""
return self._ws_controller.ws_healthy
[docs] def wait_for_tango_subscriptions_healthy(self, timeout=5):
"""Block until the tangogql websocket is healthy.
:param timeout: How long to wait intill a timeout is thrown, defaults to 5
:type timeout: int, optional
"""
self._ws_controller.wait_until_ws_healthy(timeout)
[docs] def wait_for_tango_gql_healthy(self, timeout: float = 5):
"""Block until the tangogql rest service is healthy.
:param timeout: How long to wait intill a timeout is thrown, defaults to 5, defaults to 5
:type timeout: float
"""
self._rest_controller.tangogql_healthy.wait(timeout)
[docs] def re_authenticate(self):
"""Perform the authentication of a user again."""
self._rest_controller.update_authentication()
[docs] def remove_subscription(
self, device_name: str, attribute: str, subscription_id: int
):
"""Remove a given subscription from the tangogql websocket service.
:param device_name: The tango device subscribed to
:type device_name: str
:param attribute: The tango attribute subscribed to
:type attribute: str
:param subscription_id: The id to identify the subscription service with
:type subscription_id: int
"""
self.subscriber.remove_subscription(device_name, attribute, subscription_id)
[docs] def add_subscription(
self, device_name: str, attribute: str, callback: DeviceSubscriptionCallback
) -> int:
"""Create a new subscription on the tangogql websocket service.
:param device_name: The tango device to subscribe to
:type device_name: str
:param attribute: The tango attribute to subscribed to
:type attribute: str
:param callback: A function to be called when a subscription event is received
:type callback: DeviceSubscriptionCallback
:return: The id to identify the subscription service with
:rtype: int
"""
return self.subscriber.add_subscription(device_name, attribute, callback)
[docs] def get_events(
self, device_name: str, attribute: str, subscription_id: int
) -> List[base.EventDataInt]:
"""Get a list of current events generated on a particular subscription.
:param device_name: The tango device subscribed to
:type device_name: str
:param attribute: The tango attribute subscribed to
:type attribute: str
:param subscription_id: The id to identify the subscription service with
:type subscription_id: int
:return: A list of events generated from the subscription
:rtype: List[base.EventDataInt]
"""
return self.subscriber.get_events(device_name, attribute, subscription_id)
[docs] def reload_rest_connection(self):
"""Re connects to the rest service."""
self._rest_controller.reload()
[docs] def reload_ws_connection(self):
"""Re connects to the websocket service."""
self._ws_controller.reload()
[docs] def tear_down_rest_connection(self):
"""Tear down all the running threads related to the rest connection."""
self._rest_controller.tear_down()
[docs] def tear_down_ws_connection(self):
"""Tear down all the running threads related to the ws connection."""
self._ws_controller.tear_down()
[docs] def tear_down_connections(self):
"""Tear down all the running threads on rest and ws connections."""
self.subscriber.close()
self._ws_controller.tear_down()
self._rest_controller.tear_down()
self._controller.stop()
[docs] def call_graphql(
self,
query: str,
variables: Union[Dict[Any, Any], None] = None,
operation_name: Union[str, None] = None,
**kwargs
) -> Any:
"""Call a gql structured rest query on a tango gql service.
:param query: The tangogql formatted query
:type query: str
:param variables: Any variables used within the query, defaults to None
:type variables: Union[Dict[Any, Any], None], optional
:param operation_name: The tango gql operation to be performed, defaults to None
:type operation_name: Union[str, None], optional
:param kwargs: additional keyword arguments
:return: The result of the query
:rtype: Any
""" # noqa: DAR101
return self._rest_controller.call_graphql(
query, variables, operation_name, **kwargs
)
[docs]class PollingBasedTangoBridge(TangoBridge):
"""Type of Tangobridge class that does not make use of websocket."""
def __init__(
self,
factory: TBridgeFactory = TBridgeFactory(),
subscriber_polling_rate: float = 15,
) -> None:
"""Initialize a TangoBridge class.
:param factory: The factory that will be used to get the
implementations of a tangogql rest and websocket interface,
defaults to TBridgeFactory()
:type factory: TBridgeFactory
:param subscriber_polling_rate: the rate at which a websocket
subscription will be checked for being healthy, defaults to 15
:type subscriber_polling_rate: float
"""
# start controller deamon
self._controller = factory.get_controller()
# start rest_controller thread
self._rest_controller = RestController(factory)
[docs] def tear_down_connections(self):
"""Tear down all the running threads on rest and ws connections."""
self._rest_controller.tear_down()
self._controller.stop()
[docs] def tear_down_ws_connection(self) -> None:
"""Tear down all the running threads related to the ws connection.
:raises NotImplementedError: _description_
"""
raise NotImplementedError("This class does not use a websocket")
[docs] def reload_ws_connection(self) -> None:
"""Re connects to the websocket service.
:raises NotImplementedError: _description_
"""
raise NotImplementedError("This class does not use a websocket")
[docs] def get_events(
self, device_name: str, attribute: str, subscription_id: int
) -> List[base.EventDataInt]:
"""Get a list of current events generated on a particular subscription.
:param device_name: The tango device subscribed to
:type device_name: str
:param attribute: The tango attribute subscribed to
:type attribute: str
:param subscription_id: The id to identify the subscription service with
:type subscription_id: int
:rtype: List[base.EventDataInt]
:raises NotImplementedError: _description_
"""
raise NotImplementedError("This class does not use a websocket")
@property
def tango_subscriptions_healthy(self) -> bool:
"""Indicate whether the tango gql websocket service is still available.
:return: Whether the tango gql websocket service is still available.
:rtype: bool
"""
return True
[docs] def wait_for_tango_subscriptions_healthy(self, timeout=5) -> None: # type: ignore
"""Block until the tangogql websocket is healthy.
:param timeout: How long to wait until a timeout is thrown, defaults to 5
:raises NotImplementedError: _description_
"""
raise NotImplementedError("This class does not use a websocket")
[docs] def remove_subscription(
self, device_name: str, attribute: str, subscription_id: int
) -> None:
"""Remove a given subscription from the tangogql websocket service.
:param device_name: The tango device subscribed to
:type device_name: str
:param attribute: The tango attribute subscribed to
:type attribute: str
:param subscription_id: The id to identify the subscription service with
:type subscription_id: int
:raises NotImplementedError: _description_
"""
raise NotImplementedError("This class does not use a websocket")
[docs] def add_subscription(
self, device_name: str, attribute: str, callback: DeviceSubscriptionCallback
) -> int:
"""Create a new subscription on the tangogql websocket service.
:param device_name: The tango device to subscribe to
:type device_name: str
:param attribute: The tango attribute to subscribed to
:type attribute: str
:param callback: A function to be called when a subscription event is received
:type callback: DeviceSubscriptionCallback
:raises NotImplementedError: _description_
"""
raise NotImplementedError("This class does not use a websocket")
[docs]def get_tango_bridge(factory: TBridgeFactory = TBridgeFactory()):
"""Generate a tango bridge object.
Note if env USE_ONLY_POLLING is set, a tangobridge that does not use ws will be created.
:param factory: __
:returns: a Tangobridge object
"""
if os.getenv("USE_ONLY_POLLING"):
return PollingBasedTangoBridge() # type:ignore
return TangoBridge() # type: ignore