"""Contains the MessageBoard class and related classes to implement a Message Board"""
import threading
from concurrent import futures
from contextlib import contextmanager
from datetime import datetime
from functools import reduce
from queue import Empty
from time import sleep
from typing import Iterator, List, Optional, Set
from ska_ser_skallop.subscribing import base, helpers
from ska_ser_skallop.subscribing.event_item import EventItem
from ska_ser_skallop.subscribing.exceptions import EventTimedOut
from ska_ser_skallop.subscribing.message_handler import MessageHandler
from ska_ser_skallop.subscribing.subscription import Subscription
[docs]class SubscriptionAlreadyRemoved(KeyError):
"""
Indicate a subscription that has been requested to be removed has already been
removed from the set of subscriptions
"""
def __init__(self, message: str) -> None:
self.message = message
[docs]class MessageBoard(base.MessageBoardBase):
"""
Encapsulates a queue containing events that gets places by events pushers
onto it. Also keeps track of subscriptions and allow adding and removing of a
subscription. Lastly, it keeps log of various internal messages and transitions to
assist in describing the occurrence of events.
"""
gathered_sleep_time = 0.05
log_filter = helpers.LogBook.log_filer
def __init__(self) -> None:
super().__init__()
self.archived_subscriptions: Set[base.SubscriptionBase] = set()
self.polling = threading.Event()
self.tracer = helpers.Tracer()
self.logbook = []
self.logBook = helpers.LogBook()
[docs] def log(self, message: str, time: Optional[datetime] = None, label=None) -> None:
"""Logs a message with optional time of occurrence
:param message: the messages to log
:param time: the time at which this message was deemed to take place (if None
then it will use its own time), defaults to None
:param label: any labels to annotate the message with, defaults to None
"""
self.logBook.log(message, time, label)
[docs] def play_log_book(
self, filter_log: bool = True, log_filter_pattern: str = ""
) -> str:
"""Returns the contents of the logbook as a string of messages seperated by newline
characters
:param filter_log: whether the log book should filter out messages labeled as
:py:attr:`log_filter`, defaults to True
:return: the logbook contents
"""
return self.logBook.read(filter_log, log_filter_pattern)
[docs] def add_subscription(
self,
producer: base.Producer,
attr: str,
handler: MessageHandler,
polling: bool = False,
) -> Subscription:
"""Registers and initiate a new subscription on a given producer with a specific
attribute. A subscription can be seen as a request to be notified when an event
occurs on a particular attribute of an entity. In this implementation an event
is seen as when the value of an attribute of that entity has changed. The role
of the producer is then to notify the subscriber with two possible ways:
1. By means of a given call to be made by a producer (call back) on a given
object
2. By storing the event temporarily in a buffer for later retrieval
(polling = True)
However, both of these options achieve the same external result on the
:py:class:`MessageBoard` by resulting in events being placed on its internal queue.
These events can then be retrieved later from the :py:class:`MessageBoard`,
together with information about the subscription and a given handler to consume
the results of the event.
:param producer: the object responsible for generating the events and upon which
the subscription must be placed.
:param attr: the attribute (item of of interest) for which a change in value
must result in a notification event
:param handler: the object responsible for consuming the contents of the event.
Note this is different from the subscriber handler that will be given to the
producer for handling the incoming event, in this case the handler is just
an optional attrubute of the
:py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` that will be
placed on the queue, providing a means for consuming events as they get
pulled from the buffer.
:param polling: whether the internal event generation must be done via polling
or call back. In general the call back mechanism is simpler but in cases
where a producer is not able to call a subscriber directly then the polling
method is also available, defaults to False
:return: returns the created subscription in cases the client code wants to use
the subscription later
"""
new_subscription = Subscription(producer, attr, handler)
if polling:
new_subscription.subscribe_buffer()
device_name, attr, id_name = new_subscription.describe()
self.tracer.message(
f"new subscription {device_name}.{attr}:{id_name} added as polled"
)
else:
new_subscription.subscribe_by_callback(self.board)
device_name, attr, id_name = new_subscription.describe()
self.tracer.message(
f"new subscription {device_name}.{attr}:{id_name} added as callback"
)
assert new_subscription not in self.subscriptions
self.subscriptions.add(new_subscription)
return new_subscription
[docs] def remove_all_subscriptions(self) -> None:
"""Remove all subscriptions by unsubscribing on the producers. Note all removed
subscriptions are placed in the archived list for diagnostic purposes.
"""
if self.subscriptions:
while self.subscriptions:
subscription = self.subscriptions.pop()
subscription.unsubscribe()
self.archived_subscriptions.add(subscription)
self.tracer.message(
f"all subscriptions (len = {len(self.archived_subscriptions)}) removed "
"and archived"
)
[docs] def remove_subscription(self, subscription: base.SubscriptionBase) -> None:
"""Remove a given subscription by unsubscribing on the producer. Note the
removed subscriptions is placed in the archived list for diagnostic purposes.
:param subscription: the subscription to remove
"""
producer_name, attr, id_name = subscription.describe()
if subscription not in self.subscriptions:
self.log(
f"Warning: requested to remove subscription {producer_name}.{attr}:"
f"{id_name}, but it has already been removed."
)
else:
subscription.unsubscribe()
self.archived_subscriptions.add(subscription)
self.subscriptions.remove(subscription)
self.tracer.message(
f"subscription {producer_name}.{attr}:{id_name} removed and archived"
)
def _get_printeable_timedout(self, timeout: float, print_tracer=False) -> str:
waits = [
subscription.handle_timedout(print_tracer=print_tracer)
for subscription in self.subscriptions
]
if waits:
aggregate_waits = reduce(lambda x, y: f"{x}\n{y}", waits)
message = (
f"event timed out after {timeout} seconds\n"
f"remaining subscriptions:"
f"{aggregate_waits}"
)
else:
message = "error: no subscriptions found"
return message
def _gather_from_subscribed_buffers(
self, polled_subscriptions: List[Subscription]
) -> None:
while self.polling.is_set():
gathered_binned = [s.poll() for s in polled_subscriptions]
gathered = reduce(lambda x, y: x + y, gathered_binned)
gathered.sort(key=lambda item: item.get_date_lodged())
if gathered:
for item in gathered:
self.board.put(item)
return
sleep(self.gathered_sleep_time)
[docs] def replay_subscription(self, subscription: base.SubscriptionBase) -> str:
"""Replay the tracer logs generated on a particular subscription
:param subscription: the subscription to replay
:return: the tracer logs
"""
producer = subscription.producer
attr = subscription.attr
handler_logs = subscription.get_handler_logs()
internal_logs = subscription.get_internal_logs()
internal_events_pusher_logs = ""
if not subscription.polled:
internal_events_pusher_logs = (
f"\nEvents Pusher Logs:{subscription.get_event_pushing_logs()}"
)
return (
f"subscription[{producer}:{attr}]{handler_logs}\n"
f"internal subscription logs:"
f"{internal_logs}"
f"{internal_events_pusher_logs}"
)
[docs] def replay_subscriptions(self) -> str:
"""Replay the tracer logs from current and archived subscriptions
:return: the tracer logs
"""
reduced_archived_logs = ""
reduced_active_subscription_logs = ""
if self.archived_subscriptions:
archived_logs = [
self.replay_subscription(s) for s in self.archived_subscriptions
]
if archived_logs:
reduced_archived_logs = reduce(lambda x, y: f"{x}\n{y}", archived_logs)
else:
reduced_archived_logs = "no archived messages lodged"
reduced_archived_logs = (
f"archived subscriptions ({len(self.subscriptions)}):"
f"{reduced_archived_logs}"
)
if self.subscriptions:
active_subscription_logs = [
self.replay_subscription(s) for s in self.subscriptions
]
if active_subscription_logs:
reduced_active_subscription_logs = reduce(
lambda x, y: f"{x}\n{y}", active_subscription_logs
)
else:
reduced_active_subscription_logs = "no active messages lodged"
reduced_active_subscription_logs = (
f"active subscriptions ({len(self.subscriptions)}):"
f"{reduced_active_subscription_logs}"
)
return f"\n\n{reduced_active_subscription_logs}{reduced_archived_logs}"
[docs] def replay_self(self) -> str:
"""Replay internal logs from a tracer object
:return: the tracer logs
"""
header = "logs from Messageboard"
return f"{header}{self.tracer.print_messages()}"
@contextmanager
def _concurrent_if_polling(self):
polled_subscriptions = [s for s in self.subscriptions if s.polled]
if polled_subscriptions:
executor = futures.ThreadPoolExecutor(
max_workers=1, thread_name_prefix="device_poller"
)
self.polling.set()
self.tracer.message(
"gathering task submitted concurrently to poll "
f"{len(polled_subscriptions)} subscriptions"
)
gathering_task = executor.submit(
self._gather_from_subscribed_buffers, polled_subscriptions
)
try:
yield True
finally:
self.polling.clear()
gathering_task.result()
self.tracer.message("gathering task finished")
else:
yield False
[docs] def print_remaining_subscriptions(self) -> str:
"""Returns a description of all current subscriptions
:return: the description of the subscriptions
"""
def print(
s_base: base.SubscriptionBase,
) -> str:
producer_name, attr, _ = s_base.describe()
return f"{producer_name}::{attr}"
prints = [print(sub) for sub in self.subscriptions]
if prints:
return reduce(lambda x, y: f"{x},{y}", prints)
return ""
[docs] def get_current_items(self) -> List[EventItem]:
"""Get the current events contained in the buffer of a message board object.
Note this should be a method of the MessageBoard Class in skallop
:return: A list of current event items
"""
items: List[EventItem] = []
while not self.board.empty():
items.append(self.board.get_nowait())
return items
[docs] def get_items(self, timeout: float = 0) -> Iterator[EventItem]:
"""Iterates over the :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem`
instances placed in its internal queue as a consequence of subscriptions to
:py:class:`~ska_ser_skallop.subscribing.base.Producer` instances. If the queue is
currently empty it will wait for a given timeout until either returning with a
:py:class:`StopIteration` or raising an
:py:exc:`ska_ser_skallop.subscribing.exceptions.EventTimedOut` exception
(depending on whether a subscription is configured to suppress or not suppress
timeouts)
:param timeout: the maximum amount of time to wait for any events to be placed
on the queue, defaults to 0 in the case of call back subscriptions,
otherwise in case of a polling subscription, it will wait at least for the
:py:attr:`gathered_sleep_time` amount of time.
:raises EventTimedOut: if timeout is not suppressed by
subscription (note a timeout condition will result in all remaining
subscriptions to be canceled)
:return: returns a :py:class:`StopIteration` during a suppressed timeout or when
all subscriptions have been removed.
:yield: yields an item in the queue if not empty
"""
while self.subscriptions:
try:
with self._concurrent_if_polling() as polling:
# timeout out can not be zero in case of polling
timeout = timeout if not polling else self.gathered_sleep_time
item = self.board.get(timeout=timeout)
except Empty:
if all(s.suppress_timeout() for s in self.subscriptions):
self.tracer.message(
f"iteration stopped by surpressed timeout; "
f"{len(self.subscriptions)} subscriptions with be cancelled"
)
self.remove_all_subscriptions()
return StopIteration()
message = self._get_printeable_timedout(timeout)
# message = self.play_log_book()
remaining_subscriptions = self.print_remaining_subscriptions()
self.remove_all_subscriptions()
raise EventTimedOut(message, remaining_events=remaining_subscriptions)
device_name, attr, value, date = item.describe()
self.tracer.message(
f"yielding new event item {device_name}.{attr}::{value}@{date}"
)
yield item
[docs] def task_done(self):
"""Gets called to indicate the item moved from the queue has finished being "
worked on. This is usefull for synchronization purposes when for example an
event handler needs to wait for another event handler in a thread to finish
before processing the next event.
"""
self.board.task_done()