Source code for ska_ser_skallop.subscribing.message_board

"""Contains the MessageBoard class and related classes to implement a Message Board"""
import threading
from concurrent import futures
from contextlib import contextmanager
from datetime import datetime
from functools import reduce
from queue import Empty
from time import sleep
from typing import Iterator, List, Optional, Set

from ska_ser_skallop.subscribing import base, helpers
from ska_ser_skallop.subscribing.event_item import EventItem
from ska_ser_skallop.subscribing.exceptions import EventTimedOut
from ska_ser_skallop.subscribing.message_handler import MessageHandler
from ska_ser_skallop.subscribing.subscription import Subscription


[docs]class SubscriptionAlreadyRemoved(KeyError): """ Indicate a subscription that has been requested to be removed has already been removed from the set of subscriptions """ def __init__(self, message: str) -> None: self.message = message
[docs]class MessageBoard(base.MessageBoardBase): """ Encapsulates a queue containing events that gets places by events pushers onto it. Also keeps track of subscriptions and allow adding and removing of a subscription. Lastly, it keeps log of various internal messages and transitions to assist in describing the occurrence of events. """ gathered_sleep_time = 0.05 log_filter = helpers.LogBook.log_filer def __init__(self) -> None: super().__init__() self.archived_subscriptions: Set[base.SubscriptionBase] = set() self.polling = threading.Event() self.tracer = helpers.Tracer() self.logbook = [] self.logBook = helpers.LogBook()
[docs] def log(self, message: str, time: Optional[datetime] = None, label=None) -> None: """Logs a message with optional time of occurrence :param message: the messages to log :param time: the time at which this message was deemed to take place (if None then it will use its own time), defaults to None :param label: any labels to annotate the message with, defaults to None """ self.logBook.log(message, time, label)
[docs] def play_log_book( self, filter_log: bool = True, log_filter_pattern: str = "" ) -> str: """Returns the contents of the logbook as a string of messages seperated by newline characters :param filter_log: whether the log book should filter out messages labeled as :py:attr:`log_filter`, defaults to True :return: the logbook contents """ return self.logBook.read(filter_log, log_filter_pattern)
[docs] def add_subscription( self, producer: base.Producer, attr: str, handler: MessageHandler, polling: bool = False, ) -> Subscription: """Registers and initiate a new subscription on a given producer with a specific attribute. A subscription can be seen as a request to be notified when an event occurs on a particular attribute of an entity. In this implementation an event is seen as when the value of an attribute of that entity has changed. The role of the producer is then to notify the subscriber with two possible ways: 1. By means of a given call to be made by a producer (call back) on a given object 2. By storing the event temporarily in a buffer for later retrieval (polling = True) However, both of these options achieve the same external result on the :py:class:`MessageBoard` by resulting in events being placed on its internal queue. These events can then be retrieved later from the :py:class:`MessageBoard`, together with information about the subscription and a given handler to consume the results of the event. :param producer: the object responsible for generating the events and upon which the subscription must be placed. :param attr: the attribute (item of of interest) for which a change in value must result in a notification event :param handler: the object responsible for consuming the contents of the event. Note this is different from the subscriber handler that will be given to the producer for handling the incoming event, in this case the handler is just an optional attrubute of the :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` that will be placed on the queue, providing a means for consuming events as they get pulled from the buffer. :param polling: whether the internal event generation must be done via polling or call back. In general the call back mechanism is simpler but in cases where a producer is not able to call a subscriber directly then the polling method is also available, defaults to False :return: returns the created subscription in cases the client code wants to use the subscription later """ new_subscription = Subscription(producer, attr, handler) if polling: new_subscription.subscribe_buffer() device_name, attr, id_name = new_subscription.describe() self.tracer.message( f"new subscription {device_name}.{attr}:{id_name} added as polled" ) else: new_subscription.subscribe_by_callback(self.board) device_name, attr, id_name = new_subscription.describe() self.tracer.message( f"new subscription {device_name}.{attr}:{id_name} added as callback" ) assert new_subscription not in self.subscriptions self.subscriptions.add(new_subscription) return new_subscription
[docs] def remove_all_subscriptions(self) -> None: """Remove all subscriptions by unsubscribing on the producers. Note all removed subscriptions are placed in the archived list for diagnostic purposes. """ if self.subscriptions: while self.subscriptions: subscription = self.subscriptions.pop() subscription.unsubscribe() self.archived_subscriptions.add(subscription) self.tracer.message( f"all subscriptions (len = {len(self.archived_subscriptions)}) removed " "and archived" )
[docs] def remove_subscription(self, subscription: base.SubscriptionBase) -> None: """Remove a given subscription by unsubscribing on the producer. Note the removed subscriptions is placed in the archived list for diagnostic purposes. :param subscription: the subscription to remove """ producer_name, attr, id_name = subscription.describe() if subscription not in self.subscriptions: self.log( f"Warning: requested to remove subscription {producer_name}.{attr}:" f"{id_name}, but it has already been removed." ) else: subscription.unsubscribe() self.archived_subscriptions.add(subscription) self.subscriptions.remove(subscription) self.tracer.message( f"subscription {producer_name}.{attr}:{id_name} removed and archived" )
def _get_printeable_timedout(self, timeout: float, print_tracer=False) -> str: waits = [ subscription.handle_timedout(print_tracer=print_tracer) for subscription in self.subscriptions ] if waits: aggregate_waits = reduce(lambda x, y: f"{x}\n{y}", waits) message = ( f"event timed out after {timeout} seconds\n" f"remaining subscriptions:" f"{aggregate_waits}" ) else: message = "error: no subscriptions found" return message def _gather_from_subscribed_buffers( self, polled_subscriptions: List[Subscription] ) -> None: while self.polling.is_set(): gathered_binned = [s.poll() for s in polled_subscriptions] gathered = reduce(lambda x, y: x + y, gathered_binned) gathered.sort(key=lambda item: item.get_date_lodged()) if gathered: for item in gathered: self.board.put(item) return sleep(self.gathered_sleep_time)
[docs] def replay_subscription(self, subscription: base.SubscriptionBase) -> str: """Replay the tracer logs generated on a particular subscription :param subscription: the subscription to replay :return: the tracer logs """ producer = subscription.producer attr = subscription.attr handler_logs = subscription.get_handler_logs() internal_logs = subscription.get_internal_logs() internal_events_pusher_logs = "" if not subscription.polled: internal_events_pusher_logs = ( f"\nEvents Pusher Logs:{subscription.get_event_pushing_logs()}" ) return ( f"subscription[{producer}:{attr}]{handler_logs}\n" f"internal subscription logs:" f"{internal_logs}" f"{internal_events_pusher_logs}" )
[docs] def replay_subscriptions(self) -> str: """Replay the tracer logs from current and archived subscriptions :return: the tracer logs """ reduced_archived_logs = "" reduced_active_subscription_logs = "" if self.archived_subscriptions: archived_logs = [ self.replay_subscription(s) for s in self.archived_subscriptions ] if archived_logs: reduced_archived_logs = reduce(lambda x, y: f"{x}\n{y}", archived_logs) else: reduced_archived_logs = "no archived messages lodged" reduced_archived_logs = ( f"archived subscriptions ({len(self.subscriptions)}):" f"{reduced_archived_logs}" ) if self.subscriptions: active_subscription_logs = [ self.replay_subscription(s) for s in self.subscriptions ] if active_subscription_logs: reduced_active_subscription_logs = reduce( lambda x, y: f"{x}\n{y}", active_subscription_logs ) else: reduced_active_subscription_logs = "no active messages lodged" reduced_active_subscription_logs = ( f"active subscriptions ({len(self.subscriptions)}):" f"{reduced_active_subscription_logs}" ) return f"\n\n{reduced_active_subscription_logs}{reduced_archived_logs}"
[docs] def replay_self(self) -> str: """Replay internal logs from a tracer object :return: the tracer logs """ header = "logs from Messageboard" return f"{header}{self.tracer.print_messages()}"
@contextmanager def _concurrent_if_polling(self): polled_subscriptions = [s for s in self.subscriptions if s.polled] if polled_subscriptions: executor = futures.ThreadPoolExecutor( max_workers=1, thread_name_prefix="device_poller" ) self.polling.set() self.tracer.message( "gathering task submitted concurrently to poll " f"{len(polled_subscriptions)} subscriptions" ) gathering_task = executor.submit( self._gather_from_subscribed_buffers, polled_subscriptions ) try: yield True finally: self.polling.clear() gathering_task.result() self.tracer.message("gathering task finished") else: yield False
[docs] def print_remaining_subscriptions(self) -> str: """Returns a description of all current subscriptions :return: the description of the subscriptions """ def print( s_base: base.SubscriptionBase, ) -> str: producer_name, attr, _ = s_base.describe() return f"{producer_name}::{attr}" prints = [print(sub) for sub in self.subscriptions] if prints: return reduce(lambda x, y: f"{x},{y}", prints) return ""
[docs] def get_current_items(self) -> List[EventItem]: """Get the current events contained in the buffer of a message board object. Note this should be a method of the MessageBoard Class in skallop :return: A list of current event items """ items: List[EventItem] = [] while not self.board.empty(): items.append(self.board.get_nowait()) return items
[docs] def get_items(self, timeout: float = 0) -> Iterator[EventItem]: """Iterates over the :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` instances placed in its internal queue as a consequence of subscriptions to :py:class:`~ska_ser_skallop.subscribing.base.Producer` instances. If the queue is currently empty it will wait for a given timeout until either returning with a :py:class:`StopIteration` or raising an :py:exc:`ska_ser_skallop.subscribing.exceptions.EventTimedOut` exception (depending on whether a subscription is configured to suppress or not suppress timeouts) :param timeout: the maximum amount of time to wait for any events to be placed on the queue, defaults to 0 in the case of call back subscriptions, otherwise in case of a polling subscription, it will wait at least for the :py:attr:`gathered_sleep_time` amount of time. :raises EventTimedOut: if timeout is not suppressed by subscription (note a timeout condition will result in all remaining subscriptions to be canceled) :return: returns a :py:class:`StopIteration` during a suppressed timeout or when all subscriptions have been removed. :yield: yields an item in the queue if not empty """ while self.subscriptions: try: with self._concurrent_if_polling() as polling: # timeout out can not be zero in case of polling timeout = timeout if not polling else self.gathered_sleep_time item = self.board.get(timeout=timeout) except Empty: if all(s.suppress_timeout() for s in self.subscriptions): self.tracer.message( f"iteration stopped by surpressed timeout; " f"{len(self.subscriptions)} subscriptions with be cancelled" ) self.remove_all_subscriptions() return StopIteration() message = self._get_printeable_timedout(timeout) # message = self.play_log_book() remaining_subscriptions = self.print_remaining_subscriptions() self.remove_all_subscriptions() raise EventTimedOut(message, remaining_events=remaining_subscriptions) device_name, attr, value, date = item.describe() self.tracer.message( f"yielding new event item {device_name}.{attr}::{value}@{date}" ) yield item
[docs] def task_done(self): """Gets called to indicate the item moved from the queue has finished being " worked on. This is usefull for synchronization purposes when for example an event handler needs to wait for another event handler in a thread to finish before processing the next event. """ self.board.task_done()