Source code for ska_ser_skallop.connectors.remoting.tangobridge.subscribing

"""Module for handing and initiating subscriptions to device attributes on application layer."""
from __future__ import annotations

import atexit
import logging
from abc import abstractmethod
from collections import defaultdict
from datetime import datetime
from queue import Queue
from threading import Event, Lock, Thread
from time import sleep
from typing import Callable, Dict, List, Literal, NamedTuple, Union

from ska_ser_skallop.subscribing import base

from .parsing import parse
from .wscontrol import Selector, Subscriber, WSController

logger = logging.getLogger(__name__)


[docs]class DeviceAttribute(NamedTuple): """Bundles device name and attribute.""" device: str attr: str
[docs]class AbstractCallBackWrapper: """Abstraction of a callback wrapper object. This allows for different kinds of callback implementations initiated by a handler refering to them only in a generic way. """ def __init__( self, callback: Union[base.Subscriber, int, Callable[[base.EventDataInt], None]], ) -> None: """Initialise object. :param callback: The callback item to be used when a callback is ran. :type callback: Union[base.Subscriber, int, Callable[[base.EventDataInt], None]] """ self.callback = callback
[docs] @abstractmethod def run_callback(self, event: base.EventDataInt): """Run a predetermined callback on the given event. :param event: [description] :type event: base.EventDataInt """ pass
[docs]class CallBackWrapper(AbstractCallBackWrapper): """Wraps callbacks as functions to be called on a given event.""" def __init__(self, callback: Callable[[base.EventDataInt], None]) -> None: """Initialise object. :param callback: The callback (as a function with event as input) to use :type callback: Callable[[base.EventDataInt], None] """ super().__init__(callback) self.callback = callback
[docs] def run_callback(self, event: base.EventDataInt): """Run a predetermined callback on the given event. :param event: [description] :type event: base.EventDataInt """ self.callback(event)
[docs]class SubscriberCallBackWrapper(AbstractCallBackWrapper): """Wraps callbacks as a subscriber object called when event occurred.""" def __init__(self, subscriber: base.Subscriber) -> None: """Initialise object. :param subscriber: The subscriber to call when a event has occurred. :type subscriber: base.Subscriber """ super().__init__(subscriber) self.subscriber = subscriber
[docs] def run_callback(self, event: base.EventDataInt): """Run a predetermined callback on the given event. :param event: [description] :type event: base.EventDataInt """ self.subscriber.push_event(event)
[docs]class BufferedCallBackWrapper(AbstractCallBackWrapper): """Wraps callbacks as a buffer that will be populated whenever an event has occurred.""" def __init__(self, buffer_size: int) -> None: """Initialise object. :param buffer_size: the maximum size for holding events in a python Queue. :type buffer_size: int """ super().__init__(buffer_size) self.buffer = Queue(buffer_size)
[docs] def run_callback(self, event: base.EventDataInt): """Run a predetermined callback on the given event. :param event: [description] :type event: base.EventDataInt """ if self.buffer.full(): self.buffer.get_nowait() logger.debug( "buffer for subscribe events is full will drop the oldest event" ) self.buffer.put_nowait(event)
[docs] def get_events(self) -> List[base.EventDataInt]: """Retrieve any events currently generated and placed on the queue. :return: a list of events placed on the queue. """ events = [] while not self.buffer.empty(): events.append(self.buffer.get()) return events
[docs]def create_call_back_wrapper( callback: Union[base.Subscriber, int, Callable[[base.EventDataInt], None]] ) -> AbstractCallBackWrapper: """Create a callback wrapper based on the type of callback to be used. :param callback: the callback to use :type callback: Union[base.Subscriber, int, Callable[[base.EventDataInt], None]] :return: The callback wrapper """ if isinstance(callback, base.Subscriber): return SubscriberCallBackWrapper(callback) if isinstance(callback, int): return BufferedCallBackWrapper(callback) assert isinstance(callback, Callable) return CallBackWrapper(callback)
[docs]class WrongSubscription(Exception): """Exception when a wrong subscription have been returned to a waiting client.""" pass
[docs]class DeviceAttributeEventsProducer: """Produce events to subcribers for a particular subscription.""" def __init__(self) -> None: """Initialise object.""" self._index = 0 self._callbacks: Dict[int, AbstractCallBackWrapper] = {}
[docs] def add_callback( self, callback: Union[base.Subscriber, int, Callable[[base.EventDataInt], None]], ) -> int: """Add a new callback/subscriber for a given subscription. :param callback: the callback to use. :type callback: Union[base.Subscriber, int, Callable[[base.EventDataInt], None]] :return: the subscription id to identify the callback subscription with """ self._index += 1 wrapper = create_call_back_wrapper(callback) self._callbacks[self._index] = wrapper return self._index
[docs] def remove_callback(self, subscription_id: int) -> None: """Remove a callback subscription. :param subscription_id: [description] :type subscription_id: int """ if self._callbacks: if self._callbacks.get(subscription_id): self._callbacks.pop(subscription_id)
[docs] def get_events(self, subscription_id: int) -> List[base.EventDataInt]: """Get events for a particular subscription that have been buffered. Note this method assumes the subscription id identifies a subscription for which a buffer size have been given and thus generated a BufferedCallBackWrapper. (see :py:class:`BufferedCallBackWrapper`) :param subscription_id: [description] :type subscription_id: int :raises WrongSubscription: [description] :raises WrongSubscription: [description] :return: [description] """ if self._callbacks: if call_back := self._callbacks.get(subscription_id): try: assert isinstance(call_back, BufferedCallBackWrapper) return call_back.get_events() except AssertionError as exception: raise WrongSubscription( "No buffered subscriptions for subscription id " f"{subscription_id}" ) from exception raise WrongSubscription( f"No buffered subscriptions for subscription id {subscription_id}" )
[docs] def run_callbacks(self, event: base.EventDataInt): """Call all subscribers (callbacks) that have been subsribing for events. :param event: The event to be handled by the call back methods :type event: base.EventDataInt """ for wrapper in self._callbacks.values(): wrapper.run_callback(event)
[docs]class ParseResult(NamedTuple): """Bundling of parsed event data as key and data. key = :py:class:`DeviceAttribute` and data = :py:class:`base.EventDataInt`. """ key: DeviceAttribute data: base.EventDataInt
[docs]class SubscriptionHealth: """Object holding subscription health in a separate bundle.""" def __init__(self, device_name: str, attribute: str) -> None: """Initialise the object. :param device_name: The device name for the subscription. :type device_name: str :param attribute: The device attribute for the subscription :type attribute: str """ self.state: Literal["ok", "stale"] = "ok" self.last_update: float = datetime.now().timestamp() self.device_name = device_name self.attribute = attribute
[docs] def update_health(self, allowed_elapsed_time: int = 5): """Check if subscription received any acknowledgements/keep alive messages within time. If no messages occurred within the allowed_elapsed_time the health state is set to stale. :param allowed_elapsed_time: The time period for which an acknowledgement from subscription should occur, defaults to 5 """ now = datetime.now().timestamp() elapsed_time = now - self.last_update if elapsed_time > allowed_elapsed_time: self.state = "stale" else: self.state = "ok"
[docs] def ack(self): """Acknowledge that subscription is still active and alive.""" self.last_update = datetime.now().timestamp()
[docs]class SubscriptionManager(Subscriber): """Manages the health of subscriptions by listening in on events being produced from them.""" def __init__(self, ws_controller: WSController, polling_rate: float = 15) -> None: """Initialise object. :param ws_controller: The ws controller to use for connecting to subscriptions :type ws_controller: WSController :param polling_rate: the time interval at which a subscription will be checked, defaults to 15 :type polling_rate: float """ # TODO currently this object determines health by checking if a subscription event # have been returned within a given period. This results in false positives when # a producer is for some valid reason taking very long to generate an event. # A different method is needed whereby a special keep alive messages is send by tangogql # for a specific subscription to indicate that from it's side everything is still ok and # that it too is still waiting for events from the tango device. self.ws_controller = ws_controller self.managed_subscriptions: Dict[int, SubscriptionHealth] = {} self.lock = Lock() self._running = Event() self._running.set() self._polling_rate = polling_rate self._mon_deamon = Thread( target=self._mon_subscriptions, daemon=True, name="subscription_manager" ) self._disconnect_flag = False self._mon_deamon.start() atexit.register(self.close)
[docs] def close(self): """Close the subscription monitoring threads gracefully.""" self._running.clear()
def _mon_subscriptions(self): while self._running.is_set(): with self.lock: for sub_id, sub_health in self.managed_subscriptions.items(): sub_health.update_health(self._polling_rate) if sub_health.state == "stale": logger.warning( f"subscription id: {sub_id} for device " f"{sub_health.device_name} on attribute " f"{sub_health.attribute} has not received any events in " f"{self._polling_rate} seconds" ) sleep(self._polling_rate)
[docs] def add_subscription(self, device_name: str, attribute: str): """Add a new subscription to be monitored. :param device_name: The device for which a subscription must be made :type device_name: str :param attribute: The attribute upon which the subscription was made :type attribute: str """ sub_id = self.ws_controller.add_subscription(device_name, attribute) data_selector = Selector( lambda event: f"{sub_id}" == event.get("id"), f"select for subscription with id {sub_id}", ) # ensures the selector pushes events back to self when they occur. data_selector.subscribe(self) self.ws_controller.add_selector(data_selector) with self.lock: self.managed_subscriptions[sub_id] = SubscriptionHealth( device_name, attribute )
[docs] def remove_subscription(self, device_name: str, attribute: str): """Remove a subscription to be monitored for a device and attribute. :param device_name: The device for which a subscription must be made :type device_name: str :param attribute: The attribute upon which the subscription was made :type attribute: str """ sub_id = self.ws_controller.remove_subscription(device_name, attribute) if sub_id: with self.lock: self.managed_subscriptions.pop(sub_id)
[docs] def push_event(self, event: Dict) -> None: """Receive new events produced by the selector from subscriptions. :param event: The event data :type event: Dict """ sub_id = int(event["id"]) with self.lock: subscription = self.managed_subscriptions.get(sub_id) if subscription: subscription.ack() return
DeviceSubscriptionCallback = Union[ base.Subscriber, int, Callable[[base.EventDataInt], None] ] """An subscribe argument and object indicating action to do after subscribe event occurs. If a Subscriber is given, the action would be that of calling a 'push_event" on the object. If an integer is given, the action would be that of populating a buffer up to the int value size. If a callable (function) is given, the function with be called with the EventDataInt as argument. """
[docs]class DeviceAttributeSubscriber(Subscriber): """Manage subscriptions to tango gql for device attribute events.""" def __init__(self, ws_controller: WSController, polling_rate: float = 15) -> None: """Initialise object. :param ws_controller: the ws controller to use for creating subscriptions :type ws_controller: WSController :param polling_rate: the rate at which a subscription will be checked if it is healthy , defaults to 15 :type polling_rate: float """ self.subs: Dict[DeviceAttribute, DeviceAttributeEventsProducer] = defaultdict( DeviceAttributeEventsProducer ) self.ws_controller = ws_controller self.lock = Lock() # create a data selector that wil listen to events of type data self.data_selector = Selector( lambda event: event.get("type") == "data", "select for subscribe data", ) # add self as a listener for events on the data selector self.data_selector.subscribe(self) self.ws_controller.add_selector(self.data_selector) self._sub_manager = SubscriptionManager(ws_controller, polling_rate)
[docs] def add_subscription( self, device_name: str, attribute: str, callback: DeviceSubscriptionCallback ) -> int: """Create a new subscription for which a callback must be called when an event occurs. :param device_name: the tango device name :type device_name: str :param attribute: the device attribute :type attribute: str :param callback: the callback to run when the event occurs. :type callback: DeviceSubscriptionCallback :return: the subscription id to use for removing it in future """ subscription_name = DeviceAttribute(device_name, attribute) self._sub_manager.add_subscription(device_name, attribute) with self.lock: return self.subs[subscription_name].add_callback(callback)
@staticmethod def _parse(event: Dict) -> Union[ParseResult, None]: attributes = parse(event, "payload").parse("data").parse("attributes") device_name = attributes.parse("device", "unknown").value attribute = attributes.parse("attribute", "unknown").value value = attributes.parse("value", "unknown").value timestamp = attributes.parse("timestamp", "unknown").value return ParseResult( DeviceAttribute(device_name, attribute), base.EventDataInt(device_name, attribute, value, timestamp), )
[docs] def remove_subscription( self, device_name: str, attribute: str, subscription_id: int ): """Remove a subscription as identified by it's id. Note, even though the id is enough to locate and remove the subscription, the device attribute and name data is needed so as to allow for "piggy backing" same type of subscriptions as a single subscription to the tango gql service. Removing a subscription may thus not necessarily lead to a subscripion to the tangogql service being cancelled, but it will result in the particular callback not being called anymore. :param device_name: the tango device name :type device_name: str :param attribute: the device attribute :type attribute: str :param subscription_id: the subscription id to identify the subscription with. :type subscription_id: int """ device_attr = DeviceAttribute(device_name, attribute) producer = self.subs.get(device_attr) if producer: producer.remove_callback(subscription_id) self._sub_manager.remove_subscription(device_name, attribute)
[docs] def get_events( self, device_name: str, attribute: str, subscription_id: int ) -> List[base.EventDataInt]: """Receive events for a particular "buffered" subscription that stored them in a buffer. :param device_name: the tango device name :type device_name: str :param attribute: the device attribute :type attribute: str :param subscription_id: the subscription id to identify the subscription with :type subscription_id: int :raises WrongSubscription: When the particular subscription is not a buffered type :return: The list of events generated up till now. """ device_attr = DeviceAttribute(device_name, attribute) producer = self.subs.get(device_attr) if producer: return producer.get_events(subscription_id) raise WrongSubscription( f"No buffered subscriptions for subscription id {subscription_id}" )
[docs] def push_event(self, event: Dict): """Receive events from the selector coming from subscriptions to the websocket. :param event: [description] :type event: Dict """ result = self._parse(event) if result: with self.lock: producer = self.subs.get(result.key) if producer: producer.run_callbacks(result.data)
[docs] def close(self): """Close the subscription health monitoring threads.""" self._sub_manager.close()