Source code for ska_ser_skallop.subscribing.base

from abc import abstractmethod
from datetime import datetime
from queue import Queue
from typing import Any, Callable, Iterator, List, Set, Union

from ska_ser_skallop.utils.env import build_in_testing

CHANGE_EVENT = "CHANGE_EVENT"
if build_in_testing():
    from tango import EventType

    CHANGE_EVENT = EventType.CHANGE_EVENT


[docs]class EventTimeInt: """ An abstraction of a Device event's time of occurrence so as to be compatible with the Tango Device attribute's time object """ def __init__(self, timestamp: Union[float, None] = None) -> None: if not timestamp: self._val = datetime.now() else: self._val = datetime.fromtimestamp(timestamp)
[docs] def todatetime(self) -> datetime: """renders the object as a datetime type""" return self._val
[docs] def isoformat(self) -> str: """renders a datetime into isformatted string :return: the isoformattted string """ return self._val.isoformat()
[docs]class AttributeInt: """ An abstraction of a Device Attribute so that it provides the minimum set of fields and methods necessary to work with subscription module """ def __init__(self, name: str, value: Any, timestamp=None) -> None: self.time = EventTimeInt(timestamp) self.name = name self.value = value
[docs]class BaseProducer: def __init__(self, name: str) -> None: """initialise a producer by giving it a human readbale name""" self._name = name
[docs] def name(self) -> str: """ delivers a human readable identifier for the producer, should be the same as what is used to instantiated it """ return self._name
[docs]class EventDataInt: """ An abstraction of event data that provides the minium set of attributes and methods necessary to work with subscription module. It is derived from the specific tango EventData so that the EventData from tango devices can also work in place. """ def __init__( self, producer_name: str, attr_name: str, val: Any = None, timestamp=None, ) -> None: if isinstance(val, AttributeInt): self.attr_value = val else: self.attr_value: Union[AttributeInt, None] = AttributeInt( attr_name, val, timestamp ) self.err: bool = False self.errors: List = [] self.device: BaseProducer = BaseProducer(producer_name) self.attr_name: str = attr_name
[docs]class Subscriber:
[docs] def push_event(self, event: EventDataInt) -> None: """ the method that is called by the producer when an event occurs on a specific subscription """
[docs]class Producer(BaseProducer): """ something that can be subscribed or unsubscribed to the interface is an generalization of the DeviceProxy so that the DeviceProxy can be a specific instance of a producer without having to change its interface. A specific instance of a Producer must have at least the following methods: """
[docs] def poll_attribute(self, attr: str, period: int): """ sets a poll period on a producer to periodically check if an attribute value has change and only then publish it """
[docs] def is_attribute_polled(self, attr: str) -> bool: """checks if an attribute is polled""" return True
# subscription part
[docs] def get_events(self, subscription_id: int) -> List[EventDataInt]: """ returns events generated on a specific subscription that was held in an internal buffer. This is for when the subscription is polled based. """
[docs] def unsubscribe_event(self, subscription_id: int) -> None: """removes a subscription identified by an id"""
[docs] def subscribe_event( self, attr: str, event_type: Any, subscriber: Union[Subscriber, int, Callable[[EventDataInt], None]], ) -> int: """ sets up an subscription by registering a subscriber to be notified (via the push_event method) when a particular event type occurs on the specified attribute. In an integer is given in stead of a Subscriber the subscription is polled and the producers stores any events internally for collection by the subscriber at a later stage """
[docs]class SubscriptionBase: producer: Producer = Producer("") handler: object = None attr: str = "" polled: bool = False buffer_size = 100 @property def expendable(self) -> bool: if hasattr(self.handler, "expendable"): return self.handler.expendable # type: ignore return False
[docs] def describe(self) -> Any: return None
[docs] def suppress_timeout(self) -> bool: return False
[docs] def subscribe_by_callback(self, board: Queue) -> None: pass
[docs] def subscribe_buffer(self, buffersize=buffer_size) -> None: pass
[docs] def unsubscribe(self) -> None: pass
[docs] def handle_timedout(self, *args, **kwargs) -> str: return ""
[docs] def get_handler_logs(self) -> str: return ""
[docs] def get_internal_logs(self) -> str: return ""
[docs] def get_event_pushing_logs(self) -> str: return ""
[docs]class MessageHandlerBase:
[docs] def handle_event( self, event: EventDataInt, subscription: SubscriptionBase, *args ) -> None: pass
[docs] def print_event(self, event: EventDataInt, ignore_first=False) -> str: return ""
[docs] def handle_timedout(self, producer: Producer, attr: str, *args, **kwargs) -> str: return ""
[docs] def replay(self) -> str: return ""
[docs] def suppress_timeout(self) -> bool: return False
[docs]class EventItemBase: """TODO""" event: EventDataInt subscription: SubscriptionBase handler: Union[MessageHandlerBase, None]
[docs] @abstractmethod def handle_event(self): pass
[docs] @abstractmethod def event_value(self) -> Any: pass
[docs]class EventsPusherBase(Subscriber): queue: Queue[EventItemBase] handler: Union[MessageHandlerBase, None]
[docs] def push_event(self, event: EventDataInt) -> None: pass
[docs]class MessageBoardBase: board: Queue subscriptions: Set[SubscriptionBase] archived_subscriptions: Set[SubscriptionBase] def __init__(self) -> None: self.board = Queue() self.subscriptions = set() self.archived_subscriptions = set()
[docs] def add_subscription( self, producer: BaseProducer, attr: str, handler: MessageHandlerBase, polling: bool = False, ) -> SubscriptionBase: """adds a new subscription"""
[docs] def describe(self) -> str: return "\n".join( [f"{sub.producer.name(): <20}{sub.attr: <20}" for sub in self.subscriptions] )
@property def non_expendable_subscriptions(self) -> Set[SubscriptionBase]: return { subscription for subscription in self.subscriptions if not subscription.expendable } @property def all_non_expendable_subscriptions_removed(self) -> bool: return not self.non_expendable_subscriptions
[docs] @abstractmethod def remove_all_subscriptions(self): """removes all subscriptions"""
[docs] @abstractmethod def remove_subscription(self, subscription: SubscriptionBase): """removes a given subscription"""
[docs] @abstractmethod def log(self, message: str, time: datetime = None, label=None): """puts a log message on the message board"""
[docs] @abstractmethod def task_done(self): """ indicates an item popped from the message board is done (in case a wait until all done is called) """ self.board.task_done()
[docs] @abstractmethod def get_current_items(self) -> List[EventItemBase]: """Get the current events contained in the buffer of a message board object. Note this should be a method of the MessageBoard Class in skallop :return: A list of current event items """
[docs] @abstractmethod def get_items(self, timeout: float = 0) -> Iterator[EventItemBase]: """Iterates over the :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem` instances placed in its internal queue as a consequence of subscriptions to :py:class:`~ska_ser_skallop.subscribing.base.Producer` instances. If the queue is currently empty it will wait for a given timeout until either returning with a :py:class:`StopIteration` or raising an :py:exc:`ska_ser_skallop.subscribing.exceptions.EventTimedOut` exception (depending on whether a subscription is configured to suppress or not suppress timeouts) :param timeout: the maximum amount of time to wait for any events to be placed on the queue, defaults to 0 in the case of call back subscriptions, otherwise in case of a polling subscription, it will wait at least for the :py:attr:`ska_ser_skallop.subscribing.message_board.MessageBoard.gathered_sleep_time` amount of time. :raises EventTimedOut: if timeout is not suppressed by subscription (note a timeout condition will result in all remaining subscriptions to be canceled) :return: a :py:class:`StopIteration` during a suppressed timeout or when all subscriptions have been removed. :yield: yields an item in the queue if not empty """
[docs] @abstractmethod def play_log_book( self, filter_log: bool = True, log_filter_pattern: str = "" ) -> str: """Returns the contents of the logbook as a string of messages seperated by newline characters :param filter_log: whether the log book should filter out messages labeled as "log_filter", defaults to True :return: the logbook contents """