import logging
from queue import Queue
from typing import List, Optional, Tuple, Union
from ska_ser_skallop.subscribing import base, helpers
from ska_ser_skallop.subscribing.event_item import EventItem
logger = logging.getLogger(__name__)
[docs]class EventsPusher(base.EventsPusherBase):
"""Object that pushes events onto a given buffer when called by a push event"""
def __init__(
self,
queue: Queue,
handler: Union[base.MessageHandlerBase, None] = None,
) -> None:
"""
:param queue: the queue that must be populated with
:py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` when
events are being pushed from a producer onto it
:param handler: a handler that will be used as input when an
:py:class:`~ska_ser_skallop.subscribing.event_item.EventItem`
gets generated from a new event, defaults to None
"""
self.queue = queue
self.handler = handler
self.first_event = None
self.subscription = None
self.stash = []
self.tracer = helpers.Tracer()
def _event_pushed_unidempotently(self) -> bool:
# this means an event has been pushed but the ojbect has not been fully
# defined
return self.subscription is None
def _events_stashed_from_idempotent_calls(self) -> bool:
return len(self.stash) > 0
def _stash_event(self, event: base.EventDataInt) -> None:
self.stash.append(event)
def _clean_stashed_events(self, subscription: base.SubscriptionBase) -> None:
for event in self.stash:
item = EventItem(event, subscription, self.handler)
self.queue.put(item)
self.tracer.message("event from stashed events placed on buffer")
self.stash = []
[docs] def push_event(self, event: base.EventDataInt) -> None:
"""Called by a callback from a producer with a new incoming event and results in
a new "class"`EventItem` being created and placed on the provided queue
:param event: the incoming event
"""
if event.attr_value is None:
self.tracer.message(f"event pushed without a Device attribute {event}")
else:
self.tracer.message(
f"new event received: {event.attr_value.name} "
f"is {helpers.get_attr_value_as_str(event.attr_value)} on device "
f"{event.device.name()}"
)
if self._event_pushed_unidempotently():
# always keep record of the first event for diagnostic purposes
self.tracer.message(
"event pushed idempotently will be stashed until subscription known"
)
self.first_event = event
self._stash_event(event)
else:
assert self.subscription is not None
item = EventItem(event, self.subscription, self.handler)
self.tracer.message("new event placed on buffer")
self.queue.put(item)
[docs] def set_subscription(self, subscription: base.SubscriptionBase) -> None:
"""This method is used by the calling :py:class:`Subscription` object during the
initiation of a subscription on a
:py:class:`~ska_ser_skallop.subscribing.base.Producer`. Because object is needed
as part of the subscription call, the subscription can only be created after the
:py:class:`EventsPusher` and thus requiring this method.
"""
# in cases an
if self._events_stashed_from_idempotent_calls():
self._clean_stashed_events(subscription)
self.subscription = subscription
device_name, attr, id_name = subscription.describe()
self.tracer.message(
f"subscription {device_name}.{attr}:{id_name} set on pusher"
)
[docs]class Subscription(base.SubscriptionBase):
"""
class that ties a subscription to a producer in order to keep record of
subscriptions. It handles the basic subscribing and unsubscribing behaviour to the
producer.
"""
buffer_size = 100
def __init__(
self,
producer: base.Producer,
attr: str,
handler: base.MessageHandlerBase,
master: bool = False,
) -> None:
"""
:param producer: The producer that will be subscribed to
:param attr: The attribute (the item of interest) of that producer for which
change events must be notified about
:param handler: The object that will be used to consume the event data and react
on the contents of the event
:param master: whether the subscription life cycle should determine all other
subscriptions (i.e. if the subscription becomes removed then all other
remaining subscriptions must also be removed), defaults to False
"""
self.producer = producer
self.id = -1
# redefining handler as MessagehandlerBase
self.handler: base.MessageHandlerBase = handler
self.attr = attr
self.polled = False
self.tracer = helpers.Tracer()
self.master = master
self.eventsPusher = None
[docs] def handle_timedout(self, *args, **kwargs) -> str:
"""Delivers diagnostic information about a subscription in case of a timeout
:return: the diagnostic data
"""
return self.handler.handle_timedout(self.producer, self.attr, *args, **kwargs)
[docs] def describe(self) -> Tuple[str, str, Union[int, None]]:
"""Describe it self in terms of a tuple of items describing key attributes about
the subscription in the following order:
1. Producer Name
2. Attribute Name
3. Subscripton id (may be None if a subscription has been removed or not
executed yet)
:return: the tuple of items
"""
producer_name = self.producer.name()
attr = self.attr
id_name = self.id
return producer_name, attr, id_name
[docs] def poll(self) -> List[EventItem]:
"""Used when a polling based subscription is made on a producer and gets any
internally buffered events that may have occurred since the last time it was
queried.
:return: A list (empty if none was generated) of
:py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` instances
"""
assert self.polled
assert isinstance(self.id, int)
events = self.producer.get_events(self.id)
return [EventItem(event, self, self.handler) for event in events]
[docs] def suppress_timeout(self) -> bool:
"""whether a subscription (implied by its handler) should suppress or not
suppress a timeout
:return: the result
"""
return self.handler.suppress_timeout()
[docs] def unsubscribe(self):
"""Removes a subscription on its producer."""
assert self.id is not None
try:
self.producer.unsubscribe_event(self.id)
except KeyError:
self.tracer.message(
f"NOTE: subscription with id {self.id} is already unsubscribed on "
f"{self.producer} for attr {self.attr}"
)
return
self.tracer.message(
f"subscription with id {self.id} removed from {self.producer} for attr "
f"{self.attr}"
)
def _enable_polling(self):
logger.warning(
f"subscription to {self.producer.name()}.{self.attr} not configured, "
f"setting polling to 1000..."
)
self.producer.poll_attribute(self.attr, 1000)
[docs] def subscribe_by_callback(self, board: Queue) -> None:
"""Registers a subscription on its Producer by means of a callback being called
on a provided object that will result in placing the event as an
:py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` on the provided queue
:param board: the queue that must be used by the callback object to place new
:py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` instances on
"""
events_pusher = EventsPusher(board, self.handler)
# self._enable_polling_if_not_set()
try:
self.id = self.producer.subscribe_event(
self.attr, base.CHANGE_EVENT, events_pusher
)
except Exception:
# assume failure is due to lack of polling
# self._enable_polling()
# try again
self.id = self.producer.subscribe_event(
self.attr, base.CHANGE_EVENT, events_pusher
)
self.tracer.message(
f"new subscription with id {self.id} created on {self.producer} for attr "
f"{self.attr} with callback"
)
events_pusher.set_subscription(self)
self.eventsPusher = events_pusher
[docs] def subscribe_buffer(self, buffersize: Optional[int] = buffer_size) -> None:
"""Registers a subscription on its Producer by means of an internal events
buffer that holds newly generated events temporarily until collected with the
`poll`
:param buffersize: the size of the internal buffer to be used by the producer,
defaults to buffer_size
"""
self.id = self.producer.subscribe_event(
self.attr, base.CHANGE_EVENT, buffersize
)
self.tracer.message(
f"new subscription with id {self.id} created on {self.producer} for attr "
f"{self.attr} with callback"
)
self.polled = True
[docs] def get_handler_logs(self) -> str:
"""returns logs generated by the handler during the handling of events to do
with this particular subscription
:return: the logs messages
"""
return self.handler.replay()
[docs] def get_internal_logs(self) -> str:
"""returns any internal logs generated by its tracing object for diagnostic
purposes
:return: the internal logs
"""
return self.tracer.print_messages()
[docs] def get_event_pushing_logs(self) -> str:
"""returns any logs generated by the events pusher object used as a subscriber
onto a producer during callbacks
:return: the diagnostic logs
"""
return self.eventsPusher.tracer.print_messages()