Source code for ska_ser_skallop.connectors.remoting.tangobridge.factories

"""Provide Factories for creating implementation of tangobridge components."""
import asyncio
import atexit
from abc import abstractmethod
from typing import NamedTuple

import requests
import websockets
from python_graphql_client.graphql_client import GraphqlClient

from .authentication import AuthenticatedUser, Authenticator
from .configuration import (
    Settings,
    get_env,
    get_tango_gql_rest_url,
    get_tango_gql_ws_url,
)
from .control import Controller


[docs]class AbstractFactory: """Abstract Factory for generating remote tangobridge components.""" def __init__(self) -> None: """Initialise object.""" pass
[docs] @abstractmethod def get_graphql_client(self, *args, **kwargs): """Construct a graphql_client. :param args: additional positional args to pass to graphql :param kwargs: additional keyword args to pass to graphql """ pass
[docs] @abstractmethod def get_requests(self): """Construct a requests module.""" pass
[docs] @abstractmethod def get_websockets(self): """Construct a websockets module.""" pass
[docs] @abstractmethod def get_controller(self): """Construct a controller object.""" pass
[docs] @abstractmethod def generate_async_queue(self): """Construct an async queue.""" pass
[docs] @abstractmethod def teardown(self): """Call teardown on threadable objects.""" pass
[docs]class TangoGQLClient(NamedTuple): """Bundles a tango gql client and its url as a single object.""" client: GraphqlClient url: str
[docs]class TBridgeFactory(AbstractFactory): """Implementation of Abstract factory for for prodiving tangbridge components.""" settings = Settings(service_name="taranta", tangogql="graphiql") def __init__(self) -> None: """Initialise object.""" super().__init__() self.controller = None self.mocks = {} self.teardown_called = False atexit.register(self.teardown) self._env = None self._authenticated_user = None @property def env(self): """Get the host environment. :return: the host environment """ if self._env: return self._env self._env = get_env() return self._env
[docs] def get_tango_gql_rest_url(self) -> str: """Return the url for calling the tango gql. :return: The url for calling the tango gql. """ return get_tango_gql_rest_url(self.settings, self.env)
[docs] def get_tango_gql_ws_url(self) -> str: """Return the url for calling the tango gql websocket. :return: the url for calling the tango gql websocket. """ return get_tango_gql_ws_url(self.settings, self.env)
[docs] def get_tango_gql_service_url(self): """Return the url for calling the tango gql service. :return: the url for calling the tango gql service. """ return get_tango_gql_rest_url(self.settings, self.env)
[docs] def get_graphql_client(self, *args, **kwargs) -> TangoGQLClient: """Construct a graphql client object. :param args: additional positional arguments for the graphql client :param kwargs: additional keyword arguments for the graphql client :return: the graphql client object """ # noqa: DAR101 url = self.get_tango_gql_service_url() client = GraphqlClient(url, *args, **kwargs) return TangoGQLClient(client, url)
[docs] def get_requests(self): """Construct a request module. :return: [description] """ return requests
[docs] def get_websockets(self): """Return the websocket module to use for websocket connections. :return: the websocket module to use for websocket connections. """ return websockets
[docs] def get_controller(self) -> "Controller": """Return the Controller object to use for managing asyncio Tasks. If the Controller instance does not exist then a instance of the asyncio thread will be created together with the initialisation process. :return: [description] """ if not self.controller: self.controller = Controller() return self.controller
[docs] def get_new_authenticated_user(self) -> AuthenticatedUser: """Generate an authentication process using the given env variables. :return: The authentication result as a authenticated user data object. """ authenticator = Authenticator(self.env) self._authenticated_user = authenticator.get_authenticated_user() return self._authenticated_user
@property def authenticated_user(self) -> AuthenticatedUser: """Return the authenticated user. If this is the first time a new authentication process will run, otherwise the existing authenticated user will eb returned. :return: the authenticated user """ if not self._authenticated_user: self._authenticated_user = self.get_new_authenticated_user() return self._authenticated_user
[docs] def generate_async_queue(self) -> asyncio.Queue: """Generate an asyncio queue that is used by the controller asyncio thread. An asyncio queue can be used as a message bus between tasks running in the asyncio thread and the main thread. Note this requires an existing controller instance and will geerate an AssertionError if the the controller was not generated. :return: asyncio queue that is used by the controller asyncio thread. :raises AssertionError: when method called before a controller was initiated. """ assert self.controller, "method must be called after a controller was generated" return asyncio.Queue()
[docs] def teardown(self): """Perform a teardown of controller objects created. This is usefull for testing purposes in which the item under test does not have full life cycle controll. """ if not self.teardown_called: if self.controller: self.controller.stop() self.teardown_called = True