Source code for ska_ser_skallop.subscribing.subscription

import logging
from queue import Queue
from typing import List, Optional, Tuple, Union

from ska_ser_skallop.subscribing import base, helpers
from ska_ser_skallop.subscribing.event_item import EventItem

logger = logging.getLogger(__name__)


[docs]class EventsPusher(base.EventsPusherBase): """Object that pushes events onto a given buffer when called by a push event""" def __init__( self, queue: Queue, handler: Union[base.MessageHandlerBase, None] = None, ) -> None: """ :param queue: the queue that must be populated with :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` when events are being pushed from a producer onto it :param handler: a handler that will be used as input when an :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` gets generated from a new event, defaults to None """ self.queue = queue self.handler = handler self.first_event = None self.subscription = None self.stash = [] self.tracer = helpers.Tracer() def _event_pushed_unidempotently(self) -> bool: # this means an event has been pushed but the ojbect has not been fully # defined return self.subscription is None def _events_stashed_from_idempotent_calls(self) -> bool: return len(self.stash) > 0 def _stash_event(self, event: base.EventDataInt) -> None: self.stash.append(event) def _clean_stashed_events(self, subscription: base.SubscriptionBase) -> None: for event in self.stash: item = EventItem(event, subscription, self.handler) self.queue.put(item) self.tracer.message("event from stashed events placed on buffer") self.stash = []
[docs] def push_event(self, event: base.EventDataInt) -> None: """Called by a callback from a producer with a new incoming event and results in a new "class"`EventItem` being created and placed on the provided queue :param event: the incoming event """ if event.attr_value is None: self.tracer.message(f"event pushed without a Device attribute {event}") else: self.tracer.message( f"new event received: {event.attr_value.name} " f"is {helpers.get_attr_value_as_str(event.attr_value)} on device " f"{event.device.name()}" ) if self._event_pushed_unidempotently(): # always keep record of the first event for diagnostic purposes self.tracer.message( "event pushed idempotently will be stashed until subscription known" ) self.first_event = event self._stash_event(event) else: assert self.subscription is not None item = EventItem(event, self.subscription, self.handler) self.tracer.message("new event placed on buffer") self.queue.put(item)
[docs] def set_subscription(self, subscription: base.SubscriptionBase) -> None: """This method is used by the calling :py:class:`Subscription` object during the initiation of a subscription on a :py:class:`~ska_ser_skallop.subscribing.base.Producer`. Because object is needed as part of the subscription call, the subscription can only be created after the :py:class:`EventsPusher` and thus requiring this method. """ # in cases an if self._events_stashed_from_idempotent_calls(): self._clean_stashed_events(subscription) self.subscription = subscription device_name, attr, id_name = subscription.describe() self.tracer.message( f"subscription {device_name}.{attr}:{id_name} set on pusher" )
[docs]class Subscription(base.SubscriptionBase): """ class that ties a subscription to a producer in order to keep record of subscriptions. It handles the basic subscribing and unsubscribing behaviour to the producer. """ buffer_size = 100 def __init__( self, producer: base.Producer, attr: str, handler: base.MessageHandlerBase, master: bool = False, ) -> None: """ :param producer: The producer that will be subscribed to :param attr: The attribute (the item of interest) of that producer for which change events must be notified about :param handler: The object that will be used to consume the event data and react on the contents of the event :param master: whether the subscription life cycle should determine all other subscriptions (i.e. if the subscription becomes removed then all other remaining subscriptions must also be removed), defaults to False """ self.producer = producer self.id = -1 # redefining handler as MessagehandlerBase self.handler: base.MessageHandlerBase = handler self.attr = attr self.polled = False self.tracer = helpers.Tracer() self.master = master self.eventsPusher = None
[docs] def handle_timedout(self, *args, **kwargs) -> str: """Delivers diagnostic information about a subscription in case of a timeout :return: the diagnostic data """ return self.handler.handle_timedout(self.producer, self.attr, *args, **kwargs)
[docs] def describe(self) -> Tuple[str, str, Union[int, None]]: """Describe it self in terms of a tuple of items describing key attributes about the subscription in the following order: 1. Producer Name 2. Attribute Name 3. Subscripton id (may be None if a subscription has been removed or not executed yet) :return: the tuple of items """ producer_name = self.producer.name() attr = self.attr id_name = self.id return producer_name, attr, id_name
[docs] def poll(self) -> List[EventItem]: """Used when a polling based subscription is made on a producer and gets any internally buffered events that may have occurred since the last time it was queried. :return: A list (empty if none was generated) of :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` instances """ assert self.polled assert isinstance(self.id, int) events = self.producer.get_events(self.id) return [EventItem(event, self, self.handler) for event in events]
[docs] def suppress_timeout(self) -> bool: """whether a subscription (implied by its handler) should suppress or not suppress a timeout :return: the result """ return self.handler.suppress_timeout()
[docs] def unsubscribe(self): """Removes a subscription on its producer.""" assert self.id is not None try: self.producer.unsubscribe_event(self.id) except KeyError: self.tracer.message( f"NOTE: subscription with id {self.id} is already unsubscribed on " f"{self.producer} for attr {self.attr}" ) return self.tracer.message( f"subscription with id {self.id} removed from {self.producer} for attr " f"{self.attr}" )
def _enable_polling(self): logger.warning( f"subscription to {self.producer.name()}.{self.attr} not configured, " f"setting polling to 1000..." ) self.producer.poll_attribute(self.attr, 1000)
[docs] def subscribe_by_callback(self, board: Queue) -> None: """Registers a subscription on its Producer by means of a callback being called on a provided object that will result in placing the event as an :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` on the provided queue :param board: the queue that must be used by the callback object to place new :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` instances on """ events_pusher = EventsPusher(board, self.handler) # self._enable_polling_if_not_set() try: self.id = self.producer.subscribe_event( self.attr, base.CHANGE_EVENT, events_pusher ) except Exception: # assume failure is due to lack of polling # self._enable_polling() # try again self.id = self.producer.subscribe_event( self.attr, base.CHANGE_EVENT, events_pusher ) self.tracer.message( f"new subscription with id {self.id} created on {self.producer} for attr " f"{self.attr} with callback" ) events_pusher.set_subscription(self) self.eventsPusher = events_pusher
[docs] def subscribe_buffer(self, buffersize: Optional[int] = buffer_size) -> None: """Registers a subscription on its Producer by means of an internal events buffer that holds newly generated events temporarily until collected with the `poll` :param buffersize: the size of the internal buffer to be used by the producer, defaults to buffer_size """ self.id = self.producer.subscribe_event( self.attr, base.CHANGE_EVENT, buffersize ) self.tracer.message( f"new subscription with id {self.id} created on {self.producer} for attr " f"{self.attr} with callback" ) self.polled = True
[docs] def get_handler_logs(self) -> str: """returns logs generated by the handler during the handling of events to do with this particular subscription :return: the logs messages """ return self.handler.replay()
[docs] def get_internal_logs(self) -> str: """returns any internal logs generated by its tracing object for diagnostic purposes :return: the internal logs """ return self.tracer.print_messages()
[docs] def get_event_pushing_logs(self) -> str: """returns any logs generated by the events pusher object used as a subscriber onto a producer during callbacks :return: the diagnostic logs """ return self.eventsPusher.tracer.print_messages()