Source code for ska_ser_skallop.subscribing.message_handler

from contextlib import contextmanager
from datetime import datetime
from typing import Iterator, List, Tuple

from ska_ser_skallop.subscribing import helpers
from ska_ser_skallop.subscribing.base import (
    AttributeInt,
    BaseProducer,
    EventDataInt,
    MessageBoardBase,
    MessageHandlerBase,
    SubscriptionBase,
)


[docs]class MessageHandler(MessageHandlerBase): """ A basic implementation of the :py:class:`~ska_ser_skallop.subscribing.base.MessageHandlerBase` containing typical generic event handling behaviour for convenience. Developers can inherit from this base class in order to implement event handling. The following basic features are provided: #. Recording and updating of current state from event data (see :py:func:`load_event`, :py:func:`replay` ) #. Subscription behavior: remove the subscription generating this event or remove all from messageboard #. A pre and post context for handling an event (see :py:func:`handle_context` and :py:func:`handle_event` ) #. Logging/Printing of the data (see :py:func:`print_event`, :py:func:`replay` ) """ tracer: helpers.Tracer def __init__( self, board: MessageBoardBase, handler_annotation: str = "", enable_pre_handling_annotations: bool = True, ) -> None: """ :param board: A reference to the messagboard (as an base class) in order to log messages to and update subscriptions :param handler_annotation: a string to be added to all print messages when the :py:func:`print_event` is called, defaults to '' :param enable_pre_handling_annotations: if True then the printing of events shall include annotations about their order (start event, 2nd event, update event), defaults to True """ # init states/flags self.cancel_at_next_event: bool = False self.cancelled_by_base_class = False self.current_subscription = None self.second_event_received = False self.first_event_received = False self.event_annotation = "" # init parameters if enable_pre_handling_annotations: self.start_event_annotation = "start event" self.second_event_annotation = "2nd event" self.updated_event_annotation = "update event" else: self.start_event_annotation = "" self.second_event_annotation = "" self.updated_event_annotation = "" self.board = board self.handler_annotation = handler_annotation self.tracer = helpers.Tracer(f"Handler created: {self.describe_self()}") self.expendable = False def _annotate_with(self, message) -> None: self.event_annotation += f" {message}" def _annotate(self) -> str: return f"{self.event_annotation} {self.handler_annotation}"
[docs] def describe_self(self) -> str: """Currently returns empty string. When inheriting you should overide this method and return a string representation of what the current event handler does :return: a description of what the handler does """ return ""
def _get_attr_value_as_str(self, attr: AttributeInt) -> str: return helpers.get_attr_value_as_str(attr) def _get_attr_value_as_int(self, attr: AttributeInt) -> int: # TODO return 0 def _get_attr_value_as_list(self, attr: AttributeInt) -> List: # TODO return [] def _describe_event(self, event: EventDataInt) -> Tuple[str, str, str, str]: return helpers.describe_event(event) def _print_event(self, event: EventDataInt) -> None: device_name, attr_name, attr_value, time = self._describe_event(event) self.tracer.message( f"Event received: {device_name}.{attr_name} is recorded to be {attr_value} " f"at {time}{self._annotate()}" ) def _pre_handling(self, event: EventDataInt, subscription: SubscriptionBase): self.event_annotation = "" self.tracer.message("event handling started") self.load_event(event, subscription) self._update_event_handler_state() self._print_event(event) if self.cancel_at_next_event: self.unsubscribe(subscription) self.cancelled_by_base_class = True def _update_event_handler_state(self): if not self.first_event_received: self._annotate_with(self.start_event_annotation) self.first_event_received = True else: if not self.second_event_received: self._annotate_with(self.second_event_annotation) self.second_event_received = True else: self._annotate_with(self.updated_event_annotation) def _post_handling(self): self.tracer.message(f"event handled: {self.event_annotation}") self.board.task_done()
[docs] @contextmanager def handle_context( self, event: EventDataInt, subscription: SubscriptionBase ) -> Iterator[None]: """Context manager for use with an handler event method. When you place your event handling code within this context then the basic pre and post functionality will be included in the handling: #. Pre-handling: determine the basic ordering state (wether it is the first, second or subsequent events). In addition it may also unsubscribe automatically before processing the event if the attribute ``cancel_at_next_event`` has been set #. Post-handling: redords on the message board when the event item retrieved is finished handling it. When inheriting you can update the pre and post methods (``_pre_handling``, ``_post_handling``) with more sophisticated behaviour: :param event: The eventdata as extracted from the messageboard :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` object :param subscription: The subscription used to generate the event data (also extracted from the messageboard :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` object) """ self._pre_handling(event, subscription) yield self._post_handling()
[docs] def handle_event( self, event: EventDataInt, subscription: SubscriptionBase, *args ) -> None: """This is the basic method that normally gets called by the client to handle an event on the messageboard. It should be overridded when you want to create specific behaviour. :param event: The eventdata as extracted from the messageboard :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` object :param subscription: The subscription used to generate the event data (also extracted from the messageboard :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` object) """ with self.handle_context(event, subscription): pass
[docs] def load_event(self, event: EventDataInt, subscription: SubscriptionBase) -> None: """Updates the state of the handler by setting the contents of attr:`current_event` and attr:`current_subscription` to input parameters :param event: The eventdata as extracted from the messageboard :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` object :param subscription: The subscription used to generate the event data (also extracted from the messageboard :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` object) """ self.current_event = event self.current_subscription = subscription
[docs] def unsubscribe_all(self): """ Unsubscribe all currently running subscriptions on a messageboard. Use this if a current event indicates that the behaviour is finished. """ self.board.remove_all_subscriptions() self.tracer.message( "All subscriptions removed from message board by handler, no more messages " "expected" )
[docs] def unsubscribe(self, subscription: SubscriptionBase): """Unsubscribes the given subscription fom the messageboard. Use this typically if the incoming event indicates no further events is expected (and thus no further waiting is necessary) :param subscription: The subscription used to generate the event data (also extracted from the messageboard :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` object) """ if not self.cancelled_by_base_class: self.board.remove_subscription(subscription) device_name, attr, id_name = subscription.describe() self.tracer.message( f"Subscription {device_name}.{attr}:{id_name} removed from message " "board, no more messages expected" ) if self.board.all_non_expendable_subscriptions_removed: self.board.remove_all_subscriptions()
[docs] def handle_timedout( self, producer: BaseProducer, attr: str, print_header: bool = True, print_tracer: bool = False, ) -> str: """called by the messageboard when it has waited too long for events on a subscription. The result of this is a string containing diagnostic data about the event handler to assist a tester in determining the cause of the time out. :param producer: The producer of the event data :param attr: the attribute upon which the subscription is based :param print_header: whether the diagnostic data should be preceded by a print header, defaults to True :return: the actual diagnostic data used to understand the cause of the time out """ header = "" if print_header: header = f"\nDevice: {producer.name()} Attribute: {attr}" tracer = self.tracer.print_messages() self_desc = f" handler used: {self.describe_self()}" descr = tracer if print_tracer else self_desc return f"{header}{descr}"
[docs] def suppress_timeout(self) -> bool: """This method is used by the messageboard to determine whether or not it should raise and exception when the timeout occurred or wether it should simply halt the iteration. *Note*, this is set to always return *False*. Override this method, if you want your handler to behave differently. :return: indicates whether it should """ return False
[docs] def replay(self) -> str: """returns a string that lists all the messages logged by the handler during its lifetime in order to assist with diagnosis" :return: the diagnostic string """ return self.tracer.print_messages()
def _unpack_event(self, event: EventDataInt) -> Tuple[str, str, str, datetime]: return helpers.unpack_event(event)
[docs] def print_event(self, event: EventDataInt, ignore_first: bool = False) -> str: """This method is typically called by the client to assist the testing script. It returns a string describing the received event with annotations about its order and context. It also writes the same string to the messageboard so that all messages coming from event handlers can be centralized and replayed later. :param event: The eventdata as extracted from the messageboard :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` object :param ignore_first: whether a the first event resulting from a subscription should be ignored. This is typically when a subscription on a tango device results in an immediate event being generated before a test execution has ran. In such cases it may be useful to treat those events as not part of the actual test scenario, defaults to *False* :return: the event described in a useful context in order to assist the tester in understanding the dynamic situation. """ if ignore_first: if not self.second_event_received: return "" device_name, attr_name, attr_value, time = self._unpack_event(event) message = f"{device_name:<40}{attr_name:<20}{attr_value:<10}{self._annotate()}" self.board.log(message, time=time) return f"{time.isoformat():<30}{message}"