.tangobridge package

Module contents

Implements a tango bridge to a tango gql service for rest and websocket connections.

The the tango bridge is essentially a facade in front of a rest and and websocket controller, each one responsible for monitoring the connection health and ensuring successful calls to the services. The websocket service indirectly implements tango based subscriptions to devices that gets used by a subscribing module before being access by the tango bridge.

Submodules

.authentication module

Handles user authentication to obtain access to tango gql services.

exception AuthException[source]

Bases: Exception

Raised when authentication unsuccessful.

class AuthenticatedUser(cookies: Any, username: str, password: str)[source]

Bases: NamedTuple

Bundle authenticated user values as a singel object.

property auth: bool

Whether the object represents a successful authentication.

Returns

True if authentication successful

cookies: Any

Alias for field number 0

password: str

Alias for field number 2

username: str

Alias for field number 1

class Authenticator(env: ska_ser_skallop.connectors.remoting.tangobridge.configuration.Environment)[source]

Bases: object

Generates an authenticated user.

get_authenticated_user() ska_ser_skallop.connectors.remoting.tangobridge.authentication.AuthenticatedUser[source]

Return authenticated user values for gaining access to tango gql services.

Raises

AuthException – when authentication unsuccessful

Returns

authenticated user values as a AuthenticatedUser object.

.configuration module

Manage configuration settings and credentials for connecting to tango gql services.

class CredentailsDict[source]

Bases: TypedDict

Bundle user username and password as single dictionary object.

password: str
username: str
class Credentials(username: str, password: str)[source]

Bases: NamedTuple

Bundle user username and password as single object.

asdict() ska_ser_skallop.connectors.remoting.tangobridge.configuration.CredentailsDict[source]

Return credentials as a dictionary.

Returns

the credentials as a dictionary

password: str

Alias for field number 1

username: str

Alias for field number 0

class Environment(username: str, password: str, domain: str, kube_branch: str, telescope: str, kubehost: str, tango_bridge_ip: Optional[str] = None, bypass_auth: bool = False, kube_namespace: Optional[str] = None)[source]

Bases: NamedTuple

Stores environment settings as a collection into a single object.

bypass_auth: bool

Alias for field number 7

domain: str

Alias for field number 2

get_credentials() Optional[ska_ser_skallop.connectors.remoting.tangobridge.configuration.Credentials][source]

Generate a Credentials object if exists.

Returns

the credentials if they exist otherwise return None.

kube_branch: str

Alias for field number 3

kube_namespace: Optional[str]

Alias for field number 8

kubehost: str

Alias for field number 5

password: str

Alias for field number 1

tango_bridge_ip: Optional[str]

Alias for field number 6

telescope: str

Alias for field number 4

username: str

Alias for field number 0

class Settings(service_name: str, tangogql: str)[source]

Bases: NamedTuple

Bundle service name and tango gql end point.

service_name: str

Alias for field number 0

tangogql: str

Alias for field number 1

get_env() ska_ser_skallop.connectors.remoting.tangobridge.configuration.Environment[source]

Collect the relevant environment settings from host env.

The following environmental settings is relevant for remoting:

  1. KUBE_HOST: the domain name hosting the k8 cluster (default k8s.skao.stfc)

  2. BYPASS_AUTH: whether authentication must be skipped (default is not to skip)

  3. TARANTA_USER: the taranta username needed for authentication on tangogql service

  4. TARANTA_PASSWORD: the taranta password needed for authentication on tangogql service

  5. DOMAIN: The type of environment in which SUT is deployed nl: branch/integration/staging

  6. KUBE_BRANCH: The branch name (in case of branch domain) used for the k8 deployment

  7. TANGO_BRIDGE_IP: an alternative raw ip address to use for connecting to k8 cluster

Note KUBE_BRANCH is needed only when the DOMAIN variable is set to branch. The branch value is the name of the git branch used to deploy an instance of the SUT. If TANGO_BRIDGE_IP is given then the DOMAIN and KUBE_BRANCH variables become irrelevant.

Returns

The environment settings as a single Environment object

get_tango_gql_rest_url(settings: ska_ser_skallop.connectors.remoting.tangobridge.configuration.Settings, env: ska_ser_skallop.connectors.remoting.tangobridge.configuration.Environment) str[source]

Generate a tango gql rest service url from a given environment and tangogql settings.

Parameters
  • settings (Settings) – The settings used to name the tangogql service

  • env (Environment) – The environment values obtained from the host.

Returns

A tangogql rest http endpoint

get_tango_gql_ws_url(settings: ska_ser_skallop.connectors.remoting.tangobridge.configuration.Settings, env: ska_ser_skallop.connectors.remoting.tangobridge.configuration.Environment) str[source]

Generate a tango gql websocket service url from a given environment and tangogql settings.

Parameters
  • settings (Settings) – The settings used to name the tangogql service

  • env (Environment) – The environment values obtained from the host.

Returns

A tangogql websocket endpoint

.control module

Enable conccurent monitoring and control as both asyncio routines or concurrent threads.

class BaseControllerTasks[source]

Bases: object

Base object containing the common methods to handle a list of Tasks/Futures.

cancelled_tasks: List[Union[_asyncio.Task, ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture]] = []
done_tasks: List[Union[_asyncio.Task, ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture]] = []
failed_tasks: List[BaseException] = []
move_to_done()[source]

Move any tasks that are completed to list of tasks in the done state.

raise_any_exceptions()[source]

Raise any exceptions generated from failed tasks.

Raises

Exception – The exception with the list of task exceptions

submitted_tasks: List[Union[_asyncio.Task, ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture]] = []
update()[source]

Poll the state of concurrent tasks and set to done those that are finished.

class Controller[source]

Bases: object

Class to manage running asyncio tasks on a separate thread.

The controller creates a separate deamon thread named “asyncio”. The deamon thread contains an asyncio loop that is used to manage asyncio tasks.

Thus the controller allows for a separate thread to run asyncio operations within a pytest environment. These asyncio operations can then be used by the Tango bridge for calls to external services.

The controller allows for two separate ways of achieving concurrency:

  1. From within a deamon like asynchronous routine that is running on the controller thread

  2. From within the main thread as a dispatched Future resulting from running the async routine on that thread.

The most typical use case is thus to first create a main asyncio routine that works like a deamon loop waiting for all asyncio tasks to complete in a gather command.

controller = Controller()

async def main():
    do_a = controller.create_async_task(doA())
    do_b = controller.create_async_task(doB())
    await asyncio.gather(do_a, do_b)

async def doA():
    await asyncio.sleep(1)

async def doB():
    await asyncio.sleep(2)

The main routine is then executed as a concurrent thread loaded onto the controller:

main_thread = controller.dispatch_concurrent_routine(main())
main_thread.result() # wait until doA and doB have completed
# gracefully tear down all tasks and futures that may still be running
controller.stop()

Note the controller has the ability to cancel all pending tasks (and Futures) when it gets terminated via the stop command (which is registered on the atexit method). This will result in a cancelled exception on an asyncio routine.

create_async_task(routine: Coroutine[Any, Any, ska_ser_skallop.connectors.remoting.tangobridge.control.T], name=None) _asyncio.Task[source]

Run an asynchronous routine concurrently within a given loop.

Note this must be called from within a currently executing routine within that loop.

Parameters
  • routine (Coroutine[Any, Any, T]) – the routine to run

  • name (str) – The name to be given to the task, defaults to None which will result in the name being the same as the coroutine function name.

Returns

the coroutine wrapped as a future allowing asynchronous awaiting

dispatch_concurrent_routine(routine: Coroutine[Any, Any, ska_ser_skallop.connectors.remoting.tangobridge.control.T], name='') concurrent.futures._base.Future[ska_ser_skallop.connectors.remoting.tangobridge.control.T][source]

Dispatch a separate thread to run an asynchronous task on its event loop.

Parameters
  • routine (Coroutine[Any, Any, T]) – an asynchronous routine that will be run on an event loop in a separate thread. Note any running or pending tasks in the thread will be cancelled during tear down of the thread.

  • name (str) – The name to be given to the task, defaults to “” which will result in the name being the same as the coroutine function name.

Returns

a Future representing the concurrent execution of the task, that can be waited upon at some later time to get the result.

get_loop() asyncio.events.AbstractEventLoop[source]

Return the event loop being used to generate async tasks on the thread.

Returns

the event loop.

run_async_task(routine: Coroutine[Any, Any, ska_ser_skallop.connectors.remoting.tangobridge.control.T], name='', timeout=100) ska_ser_skallop.connectors.remoting.tangobridge.control.T[source]

Run an async task on a separate controller thread.

Block until the task has finished or raised an exception.

Parameters
  • routine (Coroutine[Any, Any, T]) – an asynchronous routine that will be run on an event loop in a separate thread. Note any running or pending tasks in the thread will be cancelled during tear down of the thread.

  • timeout – A maximum amount of time to wait for the result of the task, default is 100s

  • name (str) – The name to be given to the task, defaults to “” which will result in the name being the same as the coroutine function name.

Raises

TimeoutError – if the task did not return within given timeout period

Returns

the result of the asynchronous task

stop()[source]

Perform a gracefull teardown of pending tasks.

task_polling_period = 0.2
class ControllerState[source]

Bases: NamedTuple

Represent a Controller state as a tuple of running and not running (mutually exclusively).

not_running = <threading.Event object>
running = <threading.Event object>
class Futures[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.control.BaseControllerTasks

A subsclass of BaseControllerTasks as a set of Future tasks.

add(future: concurrent.futures._base.Future, name='')[source]

Add a new Future to be monitored.

Parameters
  • future (Future) – The conccurent future to monitor

  • name (str) – A name to give to the future for logging purposes, deafult to “”

cancel_pending_and_join()[source]

Cancel any pending Futures and join the remaining ones until they have finished.

cancelled_tasks: List[ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture] = []
check_for_exceptions()[source]

Move any futures that has ended with an exception to that group.

done_tasks: List[ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture] = []
failed_tasks: List[BaseException] = []
submitted_tasks: List[ska_ser_skallop.connectors.remoting.tangobridge.control.NamedFuture] = []
update()[source]

Move any done futures to the completed ones and excepted futures to exceptions.

class MonitoredTask(future: concurrent.futures._base.Future, result: Any = 'pending', exception: Union[None, Exception] = None)[source]

Bases: NamedTuple

Represents a conccurrent task being monitored.

exception: Union[None, Exception]

Alias for field number 2

future: concurrent.futures._base.Future

Alias for field number 0

result: Any

Alias for field number 1

class NamedFuture(future: concurrent.futures._base.Future[Any], name: str)[source]

Bases: NamedTuple

Wraps a Conccurent Future as a Future class with a name.

block_until_complete(timeout: float) ska_ser_skallop.connectors.remoting.tangobridge.control.Outcome[source]

Block any further execution until future has completed.

Parameters

timeout (float) – [description]

Returns

The outcome as a combination of the result and any exceptions raised within Future

cancel() bool[source]

Cancel the future if possible.

Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.

Returns

True if the Future was successfully cancelled

cancelled()[source]

Return True if the future was cancelled.

Returns

True if the future was cancelled

done() bool[source]

Return True if the future was cancelled or finished executing.

Returns

Return True if the future was cancelled or finished executing

future: concurrent.futures._base.Future[Any]

Alias for field number 0

name: str

Alias for field number 1

poll_finished_with_exception(ignore_cancelled=True) Union[None, BaseException][source]

Check if the future has finished but ended with an exception.

Parameters

ignore_cancelled (bool) – Whether cancelled errors should be treated as an exception, default to True - meaning cancelled errors will be ignored

Returns

None if task has not finished yet or finished but no exception was raised otherwise returns the actual exception object.

class Outcome(result: Any, cancelled: Union[None, concurrent.futures._base.CancelledError], exception: Union[None, Exception])[source]

Bases: NamedTuple

Bundles a future result with any exceptions raised from it.

cancelled: Union[None, concurrent.futures._base.CancelledError]

Alias for field number 1

exception: Union[None, Exception]

Alias for field number 2

result: Any

Alias for field number 0

T
sphinx-autodoc can’t document typevars properly right now,

see https://github.com/agronholm/sphinx-autodoc-typehints/issues/39

A generic type for elements of an iterable.

Type

alias of TypeVar(‘T’)

class Tasks[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.control.BaseControllerTasks

A subsclass of BaseControllerTasks as a set of asyncio tasks.

add(task: _asyncio.Task)[source]

Add a new task to be monitored.

Parameters

task (asyncio.Task) – the asyncio Task to be monitored

async cancel_pending()[source]

Cancel any pending asyncio tasks.

This will cause the task to raise a cancelled Error and finish.

cancelled_tasks: List[_asyncio.Task] = []
done_tasks: List[_asyncio.Task] = []
failed_tasks: List[BaseException] = []
submitted_tasks: List[_asyncio.Task] = []
update()[source]

Move any done tasks to the completed ones and excepted tasks to exceptions.

cancel_future(future: concurrent.futures._base.Future, timeout=1)[source]

Attempt to cancel a future task in case it is still pending.

Will remain block if the future is running until it is completed

Parameters
  • future (Future) – The future that needs to be canceled

  • timeout (int, optional) – The maximum amount of time to wait for future to finish, defaults to 1

.factories module

Provide Factories for creating implementation of tangobridge components.

class AbstractFactory[source]

Bases: object

Abstract Factory for generating remote tangobridge components.

abstract generate_async_queue()[source]

Construct an async queue.

abstract get_controller()[source]

Construct a controller object.

abstract get_graphql_client(*args, **kwargs)[source]

Construct a graphql_client.

Parameters
  • args – additional positional args to pass to graphql

  • kwargs – additional keyword args to pass to graphql

abstract get_requests()[source]

Construct a requests module.

abstract get_websockets()[source]

Construct a websockets module.

abstract teardown()[source]

Call teardown on threadable objects.

class TBridgeFactory[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.factories.AbstractFactory

Implementation of Abstract factory for for prodiving tangbridge components.

property authenticated_user: ska_ser_skallop.connectors.remoting.tangobridge.authentication.AuthenticatedUser

Return the authenticated user.

If this is the first time a new authentication process will run, otherwise the existing authenticated user will eb returned.

Returns

the authenticated user

property env

Get the host environment.

Returns

the host environment

generate_async_queue() asyncio.queues.Queue[source]

Generate an asyncio queue that is used by the controller asyncio thread.

An asyncio queue can be used as a message bus between tasks running in the asyncio thread and the main thread.

Note this requires an existing controller instance and will geerate an AssertionError if the the controller was not generated.

Returns

asyncio queue that is used by the controller asyncio thread.

Raises

AssertionError – when method called before a controller was initiated.

get_controller() ska_ser_skallop.connectors.remoting.tangobridge.control.Controller[source]

Return the Controller object to use for managing asyncio Tasks.

If the Controller instance does not exist then a instance of the asyncio thread will be created together with the initialisation process.

Returns

[description]

get_graphql_client(*args, **kwargs) ska_ser_skallop.connectors.remoting.tangobridge.factories.TangoGQLClient[source]

Construct a graphql client object.

Parameters
  • args – additional positional arguments for the graphql client

  • kwargs – additional keyword arguments for the graphql client

Returns

the graphql client object

get_new_authenticated_user() ska_ser_skallop.connectors.remoting.tangobridge.authentication.AuthenticatedUser[source]

Generate an authentication process using the given env variables.

Returns

The authentication result as a authenticated user data object.

get_requests()[source]

Construct a request module.

Returns

[description]

get_tango_gql_rest_url() str[source]

Return the url for calling the tango gql.

Returns

The url for calling the tango gql.

get_tango_gql_service_url()[source]

Return the url for calling the tango gql service.

Returns

the url for calling the tango gql service.

get_tango_gql_ws_url() str[source]

Return the url for calling the tango gql websocket.

Returns

the url for calling the tango gql websocket.

get_websockets()[source]

Return the websocket module to use for websocket connections.

Returns

the websocket module to use for websocket connections.

settings = Settings(service_name='taranta', tangogql='graphiql')
teardown()[source]

Perform a teardown of controller objects created.

This is usefull for testing purposes in which the item under test does not have full life cycle controll.

class TangoGQLClient(client: python_graphql_client.graphql_client.GraphqlClient, url: str)[source]

Bases: NamedTuple

Bundles a tango gql client and its url as a single object.

client: python_graphql_client.graphql_client.GraphqlClient

Alias for field number 0

url: str

Alias for field number 1

.parsing module

Parse gql rest based results.

exception ParseError[source]

Bases: Exception

Signal an error occurred in parsing the data returned from gql query.

class Parser(data: Dict)[source]

Bases: object

Object use for parsing input data in a piece wise and recursive fashion.

The parse() method results in the object traversing in a chain like fashion within the dictionary structure until the data being pointed to is of type string (meaning it has reached the end of the path).

For example:

data = {
    'this': {
        'is': {
            'a': {
                'very': {
                    'long': {
                        'chain': 'This is the end of the chain'
                    }
                }
            }
        }
    }
}

parser = Parser(data)
assert_that(
    parser.parse('this').parse('is').parse('a').parse('very').parse('long').parse('chain').value
)is_equal_to('This is the end of the chain')

Each parse method has the option of a given default which means if there does not exist a given key, the default object will be used. Otherwise a ParseError will be raised.

parse(key: str, default=None) ska_ser_skallop.connectors.remoting.tangobridge.parsing.Parser[source]

Parse the data by traversing to the next item returned from the key.

If the result is of type string then the value is set to that result, otherwise the data is updated to point to the returned result for parsing further.

Parameters
  • key (str) – The key to lookup the value in the dictionary

  • default (Any) – The default value to used if key does not exists if none is given then a missing key will result in a parse error, defaults to None

Raises

ParseError – When the given key does not exist and no default value was given.

Returns

Itself so as to allow for chaining a parse execution

property value_as_singleton: Any

Return the first item in the parsed result (assumes result is a list).

This is usefull in cases whereby it is known beforehand that the result will always be a list with only one element in it.

Returns

The inner value from the list

parse(data: Dict, key: str, default=None) ska_ser_skallop.connectors.remoting.tangobridge.parsing.Parser[source]

Parse a given input dictionary.

E.g. :

data = {
    'this': {
        'is': {
            'a': {
                'very': {
                    'long': {
                        'chain': 'This is the end of the chain'
                    }
                }
            }
        }
    }
}
assert_that(
    parse('this').parse('is').parse('a').parse('very').parse('long').parse('chain').value
)is_equal_to('This is the end of the chain')
Parameters
  • data (Dict) – The input data

  • key (str) – The key pointing to the first item for traversing the parsing.

  • default (Any, optional) – Whether to return a default value if the key does not exist, defaults to None

Returns

A Parser object that can be traversed further of read to get the actual value from

.queries module

Defines tango gql queries to use on rest based calls to the tango gql service.

class GQLQuery(query: str, variables: Optional[Dict[Any, Any]])[source]

Bases: NamedTuple

Bundles a gql query into a query (str) and its variables as a NamedTuple.

query: str

Alias for field number 0

variables: Optional[Dict[Any, Any]]

Alias for field number 1

command(device: str, command_name: str, argin: Optional[Any] = None) ska_ser_skallop.connectors.remoting.tangobridge.queries.GQLQuery[source]

Return a gql command for commanding a device with given input arguments.

Parameters
  • device (str) – The tango device name

  • command_name (str) – The command name

  • argin (Any) – The argument to use for the command, defaults to None

Returns

The gql command for commanding a device with given input arguments

fetch_attributes(device: str) ska_ser_skallop.connectors.remoting.tangobridge.queries.GQLQuery[source]

Return a gql query for fetching all the attributes for a given device.

Parameters

device (str) – The tango device name

Returns

The gql query for fetching all the attributes for a given device.

fetch_commands(device: str) ska_ser_skallop.connectors.remoting.tangobridge.queries.GQLQuery[source]

Return a gql query for fetching all the commands from a given device.

Parameters

device (str) – The tango device name

Returns

The gql query for fetching all the commands from a given device.

get_device_state(device_name: str)[source]

Return a message to get state for a particular device.

Parameters

device_name (str) – The tango device name

Returns

The message to get state for a particular device.

load_all_attributes(device_name: str)[source]

Return a message to load all attribute for a given device.

Parameters

device_name (str) – The tango device name

Returns

The message to load all attribute for a given device.

read_attribute(device: str, attr_name: str) ska_ser_skallop.connectors.remoting.tangobridge.queries.GQLQuery[source]

Return a gql query for reading the value from a given attribute from a given device.

Parameters
  • device (str) – The tango device name

  • attr_name (str) – The device attribute

Returns

The gql query for reading the value from a given attribute from a given device.

read_attributes_from_multiple_devices(device_list: List[str], attr: Union[str, List[str]]) ska_ser_skallop.connectors.remoting.tangobridge.queries.GQLQuery[source]

Return a gql query for reading attribute/s from a given list of devices.

If the given attribute is in the form of a list then the attributes and devices, are paired (zipped) as respective reads for each device.

Parameters
  • device_list (List[str]) – The list of devices to be queried.

  • attr (Union[str, List[str]]) – The attribute/s to be read for each device (a list of attributes is assumed to be interpreted pair wise)

Returns

The gql query for reading attribute/s from a given list of devices.

rest_info()[source]

Return a message to query health status of tango gql.

Returns

The message to query health status of tango gql.

subscribe_device(device: str, attribute: str) str[source]

Return a subscription message for subscribing to a tango device attribute.

Parameters
  • device (str) – The device to subscribe to

  • attribute (str) – The attribute to which must be subscribed to

Returns

The subscription message for subscribing to a tango device attribute.

write_attribute(device: str, attribute: str, value: Any)[source]

Return a gql query for writing a given attribute value to a device.

Parameters
  • device – The tango device name

  • attribute – The device attribute

  • value – The value to be written to the device attribute

Returns

The gql query for setting an attribute

.restcontrol module

Manages rest type calls to a tango gql interface.

exception RestCallError[source]

Bases: Exception

Indicates an error during a rest based call to tango gql.

class RestController(factory: ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory = <ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory object>)[source]

Bases: object

Monitors and controls the tango gql connection for rest base calls.

call_graphql(query: str, variables: Optional[Dict[Any, Any]] = None, operation_name: Optional[str] = None, **kwargs) Any[source]

Call a graph gql based query to the tango gql service.

If the tango gql is not available at the time of call, the program will block for twice the allocated montoring poll period before raising a RestCallError. In other words it will allow for the montiring loop to check two more times if the service, becomes available before raising an exception.

Parameters
  • query (str) – The tango gql query as an encoded string

  • variables (Union[Dict[Any, Any], None], optional) – Any graph gql variables to associate with the query, defaults to None

  • operation_name (Union[str, None], optional) – The gql operation to use, defaults to None

  • kwargs – Any additional keyword arguments to pass on to the tangogql connector

Raises
  • RestCallError – If call did not succeed within allocated waiting period

  • ClientResponseError – If service responded with an unhandled exception

Returns

The result of the call

monitor_polling_period = 5
reload()[source]

Reload and initialise the tango gql.

stop()[source]

Stop minitoring and control threads.

tear_down()[source]

Stop minitoring and control threads.

update_authentication()[source]

Re initialise connection based on un updated authentication.

property url: str

Return the tango gql rest url for logging purposes.

Returns

The http rest url

exception RestHealthError[source]

Bases: TimeoutError

Indicates a timeout waiting for tango gql rest interface to be healthy.

class RestStatus[source]

Bases: NamedTuple

Bundle rest status as a monitoring and tangogql Event Tuple.

monitoring = <threading.Event object>
tangogql_healthy = <threading.Event object>

.subscribing module

Module for handing and initiating subscriptions to device attributes on application layer.

class AbstractCallBackWrapper(callback: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]])[source]

Bases: object

Abstraction of a callback wrapper object.

This allows for different kinds of callback implementations initiated by a handler refering to them only in a generic way.

abstract run_callback(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]

Run a predetermined callback on the given event.

Parameters

event (base.EventDataInt) – [description]

class BufferedCallBackWrapper(buffer_size: int)[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.subscribing.AbstractCallBackWrapper

Wraps callbacks as a buffer that will be populated whenever an event has occurred.

get_events() List[ska_ser_skallop.subscribing.base.EventDataInt][source]

Retrieve any events currently generated and placed on the queue.

Returns

a list of events placed on the queue.

run_callback(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]

Run a predetermined callback on the given event.

Parameters

event (base.EventDataInt) – [description]

class CallBackWrapper(callback: Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None])[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.subscribing.AbstractCallBackWrapper

Wraps callbacks as functions to be called on a given event.

run_callback(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]

Run a predetermined callback on the given event.

Parameters

event (base.EventDataInt) – [description]

class DeviceAttribute(device: str, attr: str)[source]

Bases: NamedTuple

Bundles device name and attribute.

attr: str

Alias for field number 1

device: str

Alias for field number 0

class DeviceAttributeEventsProducer[source]

Bases: object

Produce events to subcribers for a particular subscription.

add_callback(callback: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) int[source]

Add a new callback/subscriber for a given subscription.

Parameters

callback (Union[base.Subscriber, int, Callable[[base.EventDataInt], None]]) – the callback to use.

Returns

the subscription id to identify the callback subscription with

get_events(subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt][source]

Get events for a particular subscription that have been buffered.

Note this method assumes the subscription id identifies a subscription for which a buffer size have been given and thus generated a BufferedCallBackWrapper. (see BufferedCallBackWrapper)

Parameters

subscription_id (int) – [description]

Raises
Returns

[description]

remove_callback(subscription_id: int) None[source]

Remove a callback subscription.

Parameters

subscription_id (int) – [description]

run_callbacks(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]

Call all subscribers (callbacks) that have been subsribing for events.

Parameters

event (base.EventDataInt) – The event to be handled by the call back methods

class DeviceAttributeSubscriber(ws_controller: ska_ser_skallop.connectors.remoting.tangobridge.wscontrol.WSController, polling_rate: float = 15)[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber

Manage subscriptions to tango gql for device attribute events.

add_subscription(device_name: str, attribute: str, callback: ska_ser_skallop.connectors.remoting.tangobridge.subscribing.DeviceSubscriptionCallback) int[source]

Create a new subscription for which a callback must be called when an event occurs.

Parameters
  • device_name (str) – the tango device name

  • attribute (str) – the device attribute

  • callback (DeviceSubscriptionCallback) – the callback to run when the event occurs.

Returns

the subscription id to use for removing it in future

close()[source]

Close the subscription health monitoring threads.

get_events(device_name: str, attribute: str, subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt][source]

Receive events for a particular “buffered” subscription that stored them in a buffer.

Parameters
  • device_name (str) – the tango device name

  • attribute (str) – the device attribute

  • subscription_id (int) – the subscription id to identify the subscription with

Raises

WrongSubscription – When the particular subscription is not a buffered type

Returns

The list of events generated up till now.

push_event(event: Dict)[source]

Receive events from the selector coming from subscriptions to the websocket.

Parameters

event (Dict) – [description]

remove_subscription(device_name: str, attribute: str, subscription_id: int)[source]

Remove a subscription as identified by it’s id.

Note, even though the id is enough to locate and remove the subscription, the device attribute and name data is needed so as to allow for “piggy backing” same type of subscriptions as a single subscription to the tango gql service. Removing a subscription may thus not necessarily lead to a subscripion to the tangogql service being cancelled, but it will result in the particular callback not being called anymore.

Parameters
  • device_name (str) – the tango device name

  • attribute (str) – the device attribute

  • subscription_id (int) – the subscription id to identify the subscription with.

DeviceSubscriptionCallback

An subscribe argument and object indicating action to do after subscribe event occurs.

If a Subscriber is given, the action would be that of calling a ‘push_event” on the object. If an integer is given, the action would be that of populating a buffer up to the int value size. If a callable (function) is given, the function with be called with the EventDataInt as argument.

alias of Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]

class ParseResult(key: DeviceAttribute, data: base.EventDataInt)[source]

Bases: NamedTuple

Bundling of parsed event data as key and data.

key = DeviceAttribute and data = base.EventDataInt.

data: ska_ser_skallop.subscribing.base.EventDataInt

Alias for field number 1

key: ska_ser_skallop.connectors.remoting.tangobridge.subscribing.DeviceAttribute

Alias for field number 0

class SubscriberCallBackWrapper(subscriber: ska_ser_skallop.subscribing.base.Subscriber)[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.subscribing.AbstractCallBackWrapper

Wraps callbacks as a subscriber object called when event occurred.

run_callback(event: ska_ser_skallop.subscribing.base.EventDataInt)[source]

Run a predetermined callback on the given event.

Parameters

event (base.EventDataInt) – [description]

class SubscriptionHealth(device_name: str, attribute: str)[source]

Bases: object

Object holding subscription health in a separate bundle.

ack()[source]

Acknowledge that subscription is still active and alive.

update_health(allowed_elapsed_time: int = 5)[source]

Check if subscription received any acknowledgements/keep alive messages within time.

If no messages occurred within the allowed_elapsed_time the health state is set to stale.

Parameters

allowed_elapsed_time – The time period for which an acknowledgement from subscription should occur, defaults to 5

class SubscriptionManager(ws_controller: ska_ser_skallop.connectors.remoting.tangobridge.wscontrol.WSController, polling_rate: float = 15)[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber

Manages the health of subscriptions by listening in on events being produced from them.

add_subscription(device_name: str, attribute: str)[source]

Add a new subscription to be monitored.

Parameters
  • device_name (str) – The device for which a subscription must be made

  • attribute (str) – The attribute upon which the subscription was made

close()[source]

Close the subscription monitoring threads gracefully.

push_event(event: Dict) None[source]

Receive new events produced by the selector from subscriptions.

Parameters

event (Dict) – The event data

remove_subscription(device_name: str, attribute: str)[source]

Remove a subscription to be monitored for a device and attribute.

Parameters
  • device_name (str) – The device for which a subscription must be made

  • attribute (str) – The attribute upon which the subscription was made

exception WrongSubscription[source]

Bases: Exception

Exception when a wrong subscription have been returned to a waiting client.

create_call_back_wrapper(callback: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) ska_ser_skallop.connectors.remoting.tangobridge.subscribing.AbstractCallBackWrapper[source]

Create a callback wrapper based on the type of callback to be used.

Parameters

callback (Union[base.Subscriber, int, Callable[[base.EventDataInt], None]]) – the callback to use

Returns

The callback wrapper

DeviceSubscriptionCallback

An subscribe argument and object indicating action to do after subscribe event occurs.

If a Subscriber is given, the action would be that of calling a ‘push_event” on the object. If an integer is given, the action would be that of populating a buffer up to the int value size. If a callable (function) is given, the function with be called with the EventDataInt as argument.

alias of Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]

.tangobridge module

Implements a bridge connection between a tango gql service and a client.

class PollingBasedTangoBridge(*args: Any, **kwargs: Any)[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.tangobridge.TangoBridge

Type of Tangobridge class that does not make use of websocket.

add_subscription(device_name: str, attribute: str, callback: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) int[source]

Create a new subscription on the tangogql websocket service.

Parameters
  • device_name (str) – The tango device to subscribe to

  • attribute (str) – The tango attribute to subscribed to

  • callback (DeviceSubscriptionCallback) – A function to be called when a subscription event is received

Raises

NotImplementedError – _description_

get_events(device_name: str, attribute: str, subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt][source]

Get a list of current events generated on a particular subscription.

Parameters
  • device_name (str) – The tango device subscribed to

  • attribute (str) – The tango attribute subscribed to

  • subscription_id (int) – The id to identify the subscription service with

Return type

List[base.EventDataInt]

Raises

NotImplementedError – _description_

reload_ws_connection() None[source]

Re connects to the websocket service.

Raises

NotImplementedError – _description_

remove_subscription(device_name: str, attribute: str, subscription_id: int) None[source]

Remove a given subscription from the tangogql websocket service.

Parameters
  • device_name (str) – The tango device subscribed to

  • attribute (str) – The tango attribute subscribed to

  • subscription_id (int) – The id to identify the subscription service with

Raises

NotImplementedError – _description_

property tango_subscriptions_healthy: bool

Indicate whether the tango gql websocket service is still available.

Returns

Whether the tango gql websocket service is still available.

Return type

bool

tear_down_connections()[source]

Tear down all the running threads on rest and ws connections.

tear_down_ws_connection() None[source]

Tear down all the running threads related to the ws connection.

Raises

NotImplementedError – _description_

wait_for_tango_subscriptions_healthy(timeout=5) None[source]

Block until the tangogql websocket is healthy.

Parameters

timeout – How long to wait until a timeout is thrown, defaults to 5

Raises

NotImplementedError – _description_

class TangoBridge(*args: Any, **kwargs: Any)[source]

Bases: object

Class that realizes a connection to a tango gql interface.

The class provides a client with a REST interface to call gql queries on as well as creating and maintaining websocket subscriptions to tango event producers.

Note this class is a singleton and gets created and initialised only once.

add_subscription(device_name: str, attribute: str, callback: Union[ska_ser_skallop.subscribing.base.Subscriber, int, Callable[[ska_ser_skallop.subscribing.base.EventDataInt], None]]) int[source]

Create a new subscription on the tangogql websocket service.

Parameters
  • device_name (str) – The tango device to subscribe to

  • attribute (str) – The tango attribute to subscribed to

  • callback (DeviceSubscriptionCallback) – A function to be called when a subscription event is received

Returns

The id to identify the subscription service with

Return type

int

call_graphql(query: str, variables: Optional[Dict[Any, Any]] = None, operation_name: Optional[str] = None, **kwargs) Any[source]

Call a gql structured rest query on a tango gql service.

Parameters
  • query (str) – The tangogql formatted query

  • variables (Union[Dict[Any, Any], None], optional) – Any variables used within the query, defaults to None

  • operation_name (Union[str, None], optional) – The tango gql operation to be performed, defaults to None

  • kwargs – additional keyword arguments

Returns

The result of the query

Return type

Any

get_events(device_name: str, attribute: str, subscription_id: int) List[ska_ser_skallop.subscribing.base.EventDataInt][source]

Get a list of current events generated on a particular subscription.

Parameters
  • device_name (str) – The tango device subscribed to

  • attribute (str) – The tango attribute subscribed to

  • subscription_id (int) – The id to identify the subscription service with

Returns

A list of events generated from the subscription

Return type

List[base.EventDataInt]

monitor_polling_period = 0.2
re_authenticate()[source]

Perform the authentication of a user again.

reload_rest_connection()[source]

Re connects to the rest service.

reload_ws_connection()[source]

Re connects to the websocket service.

remove_subscription(device_name: str, attribute: str, subscription_id: int)[source]

Remove a given subscription from the tangogql websocket service.

Parameters
  • device_name (str) – The tango device subscribed to

  • attribute (str) – The tango attribute subscribed to

  • subscription_id (int) – The id to identify the subscription service with

settings = Settings(service_name='taranta', tangogql='graphiql')
subscriber_polling_rate: float = 15
property tango_gql_healthy: bool

Indicate whether the tango gql rest service is still available.

Returns

Whether the tango gql rest service is still available

Return type

bool

property tango_subscriptions_healthy: bool

Indicate whether the tango gql websocket service is still available.

Returns

Whether the tango gql websocket service is still available.

Return type

bool

tear_down_connections()[source]

Tear down all the running threads on rest and ws connections.

tear_down_rest_connection()[source]

Tear down all the running threads related to the rest connection.

tear_down_ws_connection()[source]

Tear down all the running threads related to the ws connection.

property url: str

Return the url used for connecting to a tangogql service.

Returns

The url used for connecting to a tangogql service

Return type

str

wait_for_tango_gql_healthy(timeout: float = 5)[source]

Block until the tangogql rest service is healthy.

Parameters

timeout (float) – How long to wait intill a timeout is thrown, defaults to 5, defaults to 5

wait_for_tango_subscriptions_healthy(timeout=5)[source]

Block until the tangogql websocket is healthy.

Parameters

timeout (int, optional) – How long to wait intill a timeout is thrown, defaults to 5

get_tango_bridge(factory: ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory = <ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory object>)[source]

Generate a tango bridge object.

Note if env USE_ONLY_POLLING is set, a tangobridge that does not use ws will be created. :param factory: __ :returns: a Tangobridge object

.wscontrol module

Facilitates a tangogql websocket connection to a client.

class BufferedSubscriber[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber

Subscriber that places received events in a buffer for later retrieval.

Getting results from the buffer happens asynchronously.

async get_event()[source]

Asynchronously wait for incoming events.

push_event(event: Any)[source]

Receive and handle the subscriber push event.

Parameters

event (Any) – A event that the producer is required to push to subscribers

class MessageContext(outbox: asyncio.queues.Queue, inbox: asyncio.queues.Queue, subscribers: List[ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber] = [])[source]

Bases: NamedTuple

Bundles a message inbox, outbox and subscribers list into a single object.

inbox: asyncio.queues.Queue

Alias for field number 1

outbox: asyncio.queues.Queue

Alias for field number 0

subscribers: List[ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber]

Alias for field number 2

class Selector(predicate: Callable[[Any], bool], name='')[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber

Subscriber that funnels/filters incoming events to downstream subscribers.

bind(loop)[source]

Bind this object to an asyncio loop for asynchronous waiting.

Parameters

loop ([type]) – the loop belonging to an asynchronous thread

async listen(stop: threading.Event)[source]

Listen asynchronously for events produced and push if selected.

Will stop listening when stop event is set.

Parameters

stop (Event) – The stop event which will signal to task to stop listening.

push_event(event: Any)[source]

Receive and handle the subscriber push event.

Parameters

event (Any) – A event that the producer is required to push to subscribers

subscribe(subscriber: ska_ser_skallop.connectors.remoting.tangobridge.base.Subscriber)[source]

Let subscribers subscribe to this object as a producer of selected events.

Parameters

subscriber (Subscriber) – the subscriber to be called when event is selected

class WSController(factory: ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory = <ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory object>)[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.base.MessagePusher

Monitors and controls a websocket connection.

add_selector(selector: ska_ser_skallop.connectors.remoting.tangobridge.wscontrol.Selector)[source]

Add a selector (filter) to listen for incoming subscribed events.

The selector will push events to downstream subscribers when certain kind of events (as defined by the selector’s predicate function) have been received from the websocket.

Parameters

selector (Selector) – [description]

add_subscription(device: str, attribute: str) int[source]

Add a new subscription to the websocket based on events from a device attribute.

Note the websocket will only produce a new subscription if there does not already exist a subscription for the same device and attribute, otherwise it will just “piggyback” on an existing subscription.

Parameters
  • device (str) – The device (tango device producer) which must be subscribed to

  • attribute (str) – The attribute from the device which will generate events.

Returns

The subscription id to use for when a subscription needs to be removed (remove_subscription()).

finish_selector_listeners()[source]

Gracefully end selector listening threads.

listen_to_websocket_health(subscriber: ska_ser_skallop.connectors.remoting.tangobridge.base.WSHealthSubscriber)[source]

Add a ws health subscriber that will receive ws health change events.

Parameters

subscriber – The object to be called when an event occurs

monitor_polling_period = 5
push_message(item: Any)[source]

Push a new message to be send by the websocket being controlled.

Parameters

item (Any) – The message to be send

async push_message_routine(item: Any)[source]

Send an asynchronous message on the websocket (must be from asyncio thread).

Parameters

item – The item to send

Raises

TimeoutError – when a messages could not be send due to a faulty websocket remaining faulty for longer than 10 seconds

reload()[source]

Close a current websocket connection and create a new one.

remove_subscription(device: str, attribute: str) Union[None, int][source]

Remove a subscription as identified by the given id.

Note a subscription will only be removed virtually if other subscriptions still exist to the same device and attribute. If no subscriptions to the same device and attribute remains, then the actual subscription will be removed.

Parameters
  • device (str) – The device for which a subscription have been made.

  • attribute (str) – The attribute for which a subscription has been made.

Returns

Returns empty if subscriptions still remain to the given device and attribute, otherwise will return the “base” subscription id upon which the subscriptions have been “piggy backed” on.

tear_down()[source]

Gracefully tear down threads related to monitoring and close ws connection.

tear_down_ws()[source]

Tear down montoring threads related to the websocket and close the ws connection.

wait_until_ws_healthy(timeout=1)[source]

Block until asynchronous monitoring threads have set the websocket as healthy.

Parameters

timeout (int, optional) – The maximum time to wait for websocket to become healthy, defaults to 1

property ws_healthy: bool

Whether the websocket connection is healthy.

Returns

Returns True if healthy

class WSHealthSelector[source]

Bases: ska_ser_skallop.connectors.remoting.tangobridge.wscontrol.Selector

Specific Selector that looks at only WS health type of events.

subscribe_health_subscriber(subscriber: ska_ser_skallop.connectors.remoting.tangobridge.base.WSHealthSubscriber)[source]

Add a ws health subscriber object to subscribe to ws health events.

Parameters

subscriber – the subscriber for which we health events should be published for.

class Websocket(factory: ska_ser_skallop.connectors.remoting.tangobridge.factories.TBridgeFactory)[source]

Bases: object

Manages a websocket connection and wraps the websocket api provided by a factory.

async block_until_healthy(timeout=60)[source]

Asynchronously wait until a websocket is healthy.

Parameters

timeout – the time to block until the task can not continue

Raises

TimeoutError – When unhealthy longer than given timeout

async close()[source]

Call the websocket close command asynchronously.

async connect()[source]

Asynchronously connects to a remove websocket service provider.

If the connection is not available the task will retry to connect every second until either a connection is successfull or the websocket is closed (close()).

get_health_selector() ska_ser_skallop.connectors.remoting.tangobridge.wscontrol.Selector[source]

Create a health selector that will generate events related to websocket health.

Returns

the created selector object

get_messages() AsyncGenerator[Any, None][source]

Get incoming websocket messages asynchronously as json decoded objects.

For example:

async for message in ws.get_messages():
    handle_message(message)
Yield

The json decoded message from the websocket

healthy() bool[source]

Report the current observed health of websocket.

Returns

Returns True if health is ok

async monitor_ws(timeout=1)[source]

Asynchronously ping the web socket continuously until ws is closed (close()).

Note this method assumes a health selector has already been defined as per get_health_selector().

Parameters

timeout (int, optional) – How long to wait for a ping result to be received before deeming it as faulty, defaults to 1.

async ping(timeout=1)[source]

Asynchronously ping the web socket.

Parameters

timeout (int, optional) – How long to wait for a ping result to be received before deeming it as faulty, defaults to 1.

async send(message: Union[str, bytes, Iterable[Any], AsyncIterable[Any]])[source]

Send an asynchronous message over the websocket.

Parameters

message (Union[str, bytes, Iterable[Any], AsyncIterable[Any]]) – The websocket message or messages

set_health_not_ok()[source]

Log and set the observed health of the websocket as not healthy.

set_health_ok()[source]

Log and set the observed health of the websocket as healthy.

wait_until_healthy(timeout=1)[source]

Wait for a given period until a thread has observed the health of ws to be ok.

Parameters

timeout (int, optional) – The time to wait, defaults to 1

Raises

TimeoutError – when the wait for a websocket exceeds the given timeout