Source code for ska_ser_skallop.subscribing.producers

from collections import defaultdict, namedtuple
from typing import Any, Dict, List, Set, Union

import ska_ser_skallop.subscribing.base

Subscript = namedtuple("Subscript", ["attr", "subscriber"])


[docs]class BufferedSubscriber(ska_ser_skallop.subscribing.base.Subscriber): def __init__(self) -> None: self.events = []
[docs] def push_event(self, event: ska_ser_skallop.subscribing.base.EventDataInt) -> None: self.events.append(event)
[docs]class IndexedDictionary(Dict): def __init__(self): super().__init__() self._index = 0
[docs] def append(self, value) -> int: self._index += 1 self[self._index] = value return self._index
[docs]class Producer(ska_ser_skallop.subscribing.base.Producer): """An emulation of a producer with the ability to be subscribed to and to respond when being called to push events by in turn calling all its subscribers. This Class could be usefull to test subscriptions without needing externally running applications. """ def __init__(self, name: str) -> None: self._name = name self.subscriptions: IndexedDictionary = IndexedDictionary() self.observers: Dict[ str, Set[ska_ser_skallop.subscribing.base.Subscriber] ] = defaultdict(set)
[docs] def name(self) -> str: """Returns the name of the producer :return: producer name """ return self._name
[docs] def subscribe_event( self, attr: str, event_type: Any, subscriber: Union[ska_ser_skallop.subscribing.base.Subscriber, int], ) -> int: """Registers a subscription on a producer based on a given attr. :param attr: the attribute for which events must be generated when an event has been pushed :param event_type: The event type is to ensure the interface matches to the tango Device interface even though only events of Change type are considered. :param subscriber: The object that will be called by it's :py:meth:`push_event` method when an event has occurred. If instead of an object an integer is given then the subscription will result in internal buffer being populated when a new event is pushed (polling) :return: The subscription ID as a sequential index nr starting from 0 """ if isinstance(subscriber, int): # use internal buffer for events subscriber = BufferedSubscriber() self.observers[attr].add(subscriber) return self.subscriptions.append((attr, subscriber))
[docs] def unsubscribe_event(self, subscription_id: int) -> None: """Removes a subscriber from being notified when an event has been pushed for a particular subscription :param subscription_id: [description] """ attr, subscriber = self.subscriptions[subscription_id] self.observers[attr].remove(subscriber) self.subscriptions.pop(subscription_id)
def _push_event( self, attr: str, event: ska_ser_skallop.subscribing.base.EventDataInt ) -> None: observers_on_attr = self.observers.get(attr, set()) for subscriber in observers_on_attr: subscriber.push_event(event)
[docs] def push_event( self, attr: str, event: ska_ser_skallop.subscribing.base.EventDataInt ) -> None: self._push_event(attr, event)
[docs] def describe_subscription(self, subscription_id: int) -> Dict: """Gives a description of a current subscription :param subscription_id: the identification of the subscription :return: the description """ attr, subscriber = self.subscriptions[subscription_id] return {"attr": attr, "subscriber": subscriber}
[docs] def get_events( self, subscription_id: int ) -> List[ska_ser_skallop.subscribing.base.EventDataInt]: """Used for when polling based supscription is used :param subscription_id: the subscription id :return: a list of events generated since last retrieval """ _, subs = self.subscriptions[subscription_id] assert isinstance(subs, BufferedSubscriber) subscriber: BufferedSubscriber = subs return subscriber.events