Source code for ska_ser_skallop.connectors.remoting.tangobridge.tangobridge

"""Implements a bridge connection between a tango gql service and a client."""

import logging
import os
from typing import Any, Dict, List, Union

from ska_ser_skallop.subscribing import base
from ska_ser_skallop.utils.singleton import Singleton

from .configuration import Settings
from .factories import TBridgeFactory
from .restcontrol import RestController
from .subscribing import DeviceAttributeSubscriber, DeviceSubscriptionCallback
from .wscontrol import WSController

# create logger

logger = logging.getLogger(__name__)


[docs]class TangoBridge(metaclass=Singleton): """Class that realizes a connection to a tango gql interface. The class provides a client with a REST interface to call gql queries on as well as creating and maintaining websocket subscriptions to tango event producers. Note this class is a singleton and gets created and initialised only once. """ settings = Settings("taranta", "graphiql") monitor_polling_period = 0.2 subscriber_polling_rate: float = 15 def __init__( self, factory: TBridgeFactory = TBridgeFactory(), subscriber_polling_rate: float = 15, ) -> None: """Initialize a TangoBridge class. :param factory: The factory that will be used to get the implementations of a tangogql rest and websocket interface, defaults to TBridgeFactory() :type factory: TBridgeFactory :param subscriber_polling_rate: the rate at which a websocket subscription will be checked for being healthy, defaults to 15 :type subscriber_polling_rate: float """ # start controller deamon self._controller = factory.get_controller() # start rest_controller thread self._rest_controller = RestController(factory) if not os.getenv("USE_ONLY_POLLING"): # start ws_controller thread self._ws_controller = WSController(factory) self.subscriber = DeviceAttributeSubscriber( self._ws_controller, subscriber_polling_rate ) @property def url(self) -> str: """Return the url used for connecting to a tangogql service. :return: The url used for connecting to a tangogql service :rtype: str """ return self._rest_controller.url @property def tango_gql_healthy(self) -> bool: """Indicate whether the tango gql rest service is still available. :return: Whether the tango gql rest service is still available :rtype: bool """ return self._rest_controller.tangogql_healthy.is_set() @property def tango_subscriptions_healthy(self) -> bool: """Indicate whether the tango gql websocket service is still available. :return: Whether the tango gql websocket service is still available. :rtype: bool """ return self._ws_controller.ws_healthy
[docs] def wait_for_tango_subscriptions_healthy(self, timeout=5): """Block until the tangogql websocket is healthy. :param timeout: How long to wait intill a timeout is thrown, defaults to 5 :type timeout: int, optional """ self._ws_controller.wait_until_ws_healthy(timeout)
[docs] def wait_for_tango_gql_healthy(self, timeout: float = 5): """Block until the tangogql rest service is healthy. :param timeout: How long to wait intill a timeout is thrown, defaults to 5, defaults to 5 :type timeout: float """ self._rest_controller.tangogql_healthy.wait(timeout)
[docs] def re_authenticate(self): """Perform the authentication of a user again.""" self._rest_controller.update_authentication()
[docs] def remove_subscription( self, device_name: str, attribute: str, subscription_id: int ): """Remove a given subscription from the tangogql websocket service. :param device_name: The tango device subscribed to :type device_name: str :param attribute: The tango attribute subscribed to :type attribute: str :param subscription_id: The id to identify the subscription service with :type subscription_id: int """ self.subscriber.remove_subscription(device_name, attribute, subscription_id)
[docs] def add_subscription( self, device_name: str, attribute: str, callback: DeviceSubscriptionCallback ) -> int: """Create a new subscription on the tangogql websocket service. :param device_name: The tango device to subscribe to :type device_name: str :param attribute: The tango attribute to subscribed to :type attribute: str :param callback: A function to be called when a subscription event is received :type callback: DeviceSubscriptionCallback :return: The id to identify the subscription service with :rtype: int """ return self.subscriber.add_subscription(device_name, attribute, callback)
[docs] def get_events( self, device_name: str, attribute: str, subscription_id: int ) -> List[base.EventDataInt]: """Get a list of current events generated on a particular subscription. :param device_name: The tango device subscribed to :type device_name: str :param attribute: The tango attribute subscribed to :type attribute: str :param subscription_id: The id to identify the subscription service with :type subscription_id: int :return: A list of events generated from the subscription :rtype: List[base.EventDataInt] """ return self.subscriber.get_events(device_name, attribute, subscription_id)
[docs] def reload_rest_connection(self): """Re connects to the rest service.""" self._rest_controller.reload()
[docs] def reload_ws_connection(self): """Re connects to the websocket service.""" self._ws_controller.reload()
[docs] def tear_down_rest_connection(self): """Tear down all the running threads related to the rest connection.""" self._rest_controller.tear_down()
[docs] def tear_down_ws_connection(self): """Tear down all the running threads related to the ws connection.""" self._ws_controller.tear_down()
[docs] def tear_down_connections(self): """Tear down all the running threads on rest and ws connections.""" self.subscriber.close() self._ws_controller.tear_down() self._rest_controller.tear_down() self._controller.stop()
[docs] def call_graphql( self, query: str, variables: Union[Dict[Any, Any], None] = None, operation_name: Union[str, None] = None, **kwargs ) -> Any: """Call a gql structured rest query on a tango gql service. :param query: The tangogql formatted query :type query: str :param variables: Any variables used within the query, defaults to None :type variables: Union[Dict[Any, Any], None], optional :param operation_name: The tango gql operation to be performed, defaults to None :type operation_name: Union[str, None], optional :param kwargs: additional keyword arguments :return: The result of the query :rtype: Any """ # noqa: DAR101 return self._rest_controller.call_graphql( query, variables, operation_name, **kwargs )
[docs]class PollingBasedTangoBridge(TangoBridge): """Type of Tangobridge class that does not make use of websocket.""" def __init__( self, factory: TBridgeFactory = TBridgeFactory(), subscriber_polling_rate: float = 15, ) -> None: """Initialize a TangoBridge class. :param factory: The factory that will be used to get the implementations of a tangogql rest and websocket interface, defaults to TBridgeFactory() :type factory: TBridgeFactory :param subscriber_polling_rate: the rate at which a websocket subscription will be checked for being healthy, defaults to 15 :type subscriber_polling_rate: float """ # start controller deamon self._controller = factory.get_controller() # start rest_controller thread self._rest_controller = RestController(factory)
[docs] def tear_down_connections(self): """Tear down all the running threads on rest and ws connections.""" self._rest_controller.tear_down() self._controller.stop()
[docs] def tear_down_ws_connection(self) -> None: """Tear down all the running threads related to the ws connection. :raises NotImplementedError: _description_ """ raise NotImplementedError("This class does not use a websocket")
[docs] def reload_ws_connection(self) -> None: """Re connects to the websocket service. :raises NotImplementedError: _description_ """ raise NotImplementedError("This class does not use a websocket")
[docs] def get_events( self, device_name: str, attribute: str, subscription_id: int ) -> List[base.EventDataInt]: """Get a list of current events generated on a particular subscription. :param device_name: The tango device subscribed to :type device_name: str :param attribute: The tango attribute subscribed to :type attribute: str :param subscription_id: The id to identify the subscription service with :type subscription_id: int :rtype: List[base.EventDataInt] :raises NotImplementedError: _description_ """ raise NotImplementedError("This class does not use a websocket")
@property def tango_subscriptions_healthy(self) -> bool: """Indicate whether the tango gql websocket service is still available. :return: Whether the tango gql websocket service is still available. :rtype: bool """ return True
[docs] def wait_for_tango_subscriptions_healthy(self, timeout=5) -> None: # type: ignore """Block until the tangogql websocket is healthy. :param timeout: How long to wait until a timeout is thrown, defaults to 5 :raises NotImplementedError: _description_ """ raise NotImplementedError("This class does not use a websocket")
[docs] def remove_subscription( self, device_name: str, attribute: str, subscription_id: int ) -> None: """Remove a given subscription from the tangogql websocket service. :param device_name: The tango device subscribed to :type device_name: str :param attribute: The tango attribute subscribed to :type attribute: str :param subscription_id: The id to identify the subscription service with :type subscription_id: int :raises NotImplementedError: _description_ """ raise NotImplementedError("This class does not use a websocket")
[docs] def add_subscription( self, device_name: str, attribute: str, callback: DeviceSubscriptionCallback ) -> int: """Create a new subscription on the tangogql websocket service. :param device_name: The tango device to subscribe to :type device_name: str :param attribute: The tango attribute to subscribed to :type attribute: str :param callback: A function to be called when a subscription event is received :type callback: DeviceSubscriptionCallback :raises NotImplementedError: _description_ """ raise NotImplementedError("This class does not use a websocket")
[docs]def get_tango_bridge(factory: TBridgeFactory = TBridgeFactory()): """Generate a tango bridge object. Note if env USE_ONLY_POLLING is set, a tangobridge that does not use ws will be created. :param factory: __ :returns: a Tangobridge object """ if os.getenv("USE_ONLY_POLLING"): return PollingBasedTangoBridge() # type:ignore return TangoBridge() # type: ignore