from abc import abstractmethod
from datetime import datetime
from queue import Queue
from typing import Any, Callable, Iterator, List, Set, Union
from ska_ser_skallop.utils.env import build_in_testing
CHANGE_EVENT = "CHANGE_EVENT"
if build_in_testing():
from tango import EventType
CHANGE_EVENT = EventType.CHANGE_EVENT
[docs]class EventTimeInt:
"""
An abstraction of a Device event's time of occurrence so as to be compatible
with the Tango Device attribute's time object
"""
def __init__(self, timestamp: Union[float, None] = None) -> None:
if not timestamp:
self._val = datetime.now()
else:
self._val = datetime.fromtimestamp(timestamp)
[docs] def todatetime(self) -> datetime:
"""renders the object as a datetime type"""
return self._val
[docs]class AttributeInt:
"""
An abstraction of a Device Attribute so that it provides the minimum set of fields
and methods necessary to work with subscription module
"""
def __init__(self, name: str, value: Any, timestamp=None) -> None:
self.time = EventTimeInt(timestamp)
self.name = name
self.value = value
[docs]class BaseProducer:
def __init__(self, name: str) -> None:
"""initialise a producer by giving it a human readbale name"""
self._name = name
[docs] def name(self) -> str:
"""
delivers a human readable identifier for the producer, should be the same as
what is used to instantiated it
"""
return self._name
[docs]class EventDataInt:
"""
An abstraction of event data that provides the minium set of attributes and methods
necessary to work with subscription module. It is derived from the specific tango
EventData so that the EventData from tango devices can also work in place.
"""
def __init__(
self,
producer_name: str,
attr_name: str,
val: Any = None,
timestamp=None,
) -> None:
if isinstance(val, AttributeInt):
self.attr_value = val
else:
self.attr_value: Union[AttributeInt, None] = AttributeInt(
attr_name, val, timestamp
)
self.err: bool = False
self.errors: List = []
self.device: BaseProducer = BaseProducer(producer_name)
self.attr_name: str = attr_name
[docs]class Subscriber:
[docs] def push_event(self, event: EventDataInt) -> None:
"""
the method that is called by the producer when an event occurs on a specific
subscription
"""
[docs]class Producer(BaseProducer):
"""
something that can be subscribed or unsubscribed to
the interface is an generalization of the DeviceProxy so that the DeviceProxy can be
a specific instance of a producer without having to change its interface. A specific
instance of a Producer must have at least the following methods:
"""
[docs] def poll_attribute(self, attr: str, period: int):
"""
sets a poll period on a producer to periodically check if an attribute value has
change and only then publish it
"""
[docs] def is_attribute_polled(self, attr: str) -> bool:
"""checks if an attribute is polled"""
return True
# subscription part
[docs] def get_events(self, subscription_id: int) -> List[EventDataInt]:
"""
returns events generated on a specific subscription that was held in an
internal buffer. This is for when the subscription is polled based.
"""
[docs] def unsubscribe_event(self, subscription_id: int) -> None:
"""removes a subscription identified by an id"""
[docs] def subscribe_event(
self,
attr: str,
event_type: Any,
subscriber: Union[Subscriber, int, Callable[[EventDataInt], None]],
) -> int:
"""
sets up an subscription by registering a subscriber to be notified (via the
push_event method) when a particular event type occurs on the specified
attribute. In an integer is given in stead of a Subscriber the subscription is
polled and the producers stores any events internally for collection by the
subscriber at a later stage
"""
[docs]class SubscriptionBase:
producer: Producer = Producer("")
handler: object = None
attr: str = ""
polled: bool = False
buffer_size = 100
@property
def expendable(self) -> bool:
if hasattr(self.handler, "expendable"):
return self.handler.expendable # type: ignore
return False
[docs] def describe(self) -> Any:
return None
[docs] def suppress_timeout(self) -> bool:
return False
[docs] def subscribe_by_callback(self, board: Queue) -> None:
pass
[docs] def subscribe_buffer(self, buffersize=buffer_size) -> None:
pass
[docs] def unsubscribe(self) -> None:
pass
[docs] def handle_timedout(self, *args, **kwargs) -> str:
return ""
[docs] def get_handler_logs(self) -> str:
return ""
[docs] def get_internal_logs(self) -> str:
return ""
[docs] def get_event_pushing_logs(self) -> str:
return ""
[docs]class MessageHandlerBase:
[docs] def handle_event(
self, event: EventDataInt, subscription: SubscriptionBase, *args
) -> None:
pass
[docs] def print_event(self, event: EventDataInt, ignore_first=False) -> str:
return ""
[docs] def handle_timedout(self, producer: Producer, attr: str, *args, **kwargs) -> str:
return ""
[docs] def replay(self) -> str:
return ""
[docs] def suppress_timeout(self) -> bool:
return False
[docs]class EventItemBase:
"""TODO"""
event: EventDataInt
subscription: SubscriptionBase
handler: Union[MessageHandlerBase, None]
[docs] @abstractmethod
def handle_event(self):
pass
[docs] @abstractmethod
def event_value(self) -> Any:
pass
[docs]class EventsPusherBase(Subscriber):
queue: Queue[EventItemBase]
handler: Union[MessageHandlerBase, None]
[docs] def push_event(self, event: EventDataInt) -> None:
pass
[docs]class MessageBoardBase:
board: Queue
subscriptions: Set[SubscriptionBase]
archived_subscriptions: Set[SubscriptionBase]
def __init__(self) -> None:
self.board = Queue()
self.subscriptions = set()
self.archived_subscriptions = set()
[docs] def add_subscription(
self,
producer: BaseProducer,
attr: str,
handler: MessageHandlerBase,
polling: bool = False,
) -> SubscriptionBase:
"""adds a new subscription"""
[docs] def describe(self) -> str:
return "\n".join(
[f"{sub.producer.name(): <20}{sub.attr: <20}" for sub in self.subscriptions]
)
@property
def non_expendable_subscriptions(self) -> Set[SubscriptionBase]:
return {
subscription
for subscription in self.subscriptions
if not subscription.expendable
}
@property
def all_non_expendable_subscriptions_removed(self) -> bool:
return not self.non_expendable_subscriptions
[docs] @abstractmethod
def remove_all_subscriptions(self):
"""removes all subscriptions"""
[docs] @abstractmethod
def remove_subscription(self, subscription: SubscriptionBase):
"""removes a given subscription"""
[docs] @abstractmethod
def log(self, message: str, time: datetime = None, label=None):
"""puts a log message on the message board"""
[docs] @abstractmethod
def task_done(self):
"""
indicates an item popped from the message board is done (in case a wait until
all done is called)
"""
self.board.task_done()
[docs] @abstractmethod
def get_current_items(self) -> List[EventItemBase]:
"""Get the current events contained in the buffer of a message board object.
Note this should be a method of the MessageBoard Class in skallop
:return: A list of current event items
"""
[docs] @abstractmethod
def get_items(self, timeout: float = 0) -> Iterator[EventItemBase]:
"""Iterates over the :py:class:`~ska_ser_skallop.subscribing.event_item.EventItem`
instances placed in its internal queue as a consequence of subscriptions to
:py:class:`~ska_ser_skallop.subscribing.base.Producer` instances. If the queue is
currently empty it will wait for a given timeout until either returning with a
:py:class:`StopIteration` or raising an
:py:exc:`ska_ser_skallop.subscribing.exceptions.EventTimedOut` exception
(depending on whether a subscription is configured to suppress or not suppress
timeouts)
:param timeout: the maximum amount of time to wait for any events to be placed
on the queue, defaults to 0 in the case of call back subscriptions,
otherwise in case of a polling subscription, it will wait at least for the
:py:attr:`ska_ser_skallop.subscribing.message_board.MessageBoard.gathered_sleep_time`
amount of time.
:raises EventTimedOut: if timeout is not suppressed by
subscription (note a timeout condition will result in all remaining
subscriptions to be canceled)
:return: a :py:class:`StopIteration` during a suppressed timeout or when
all subscriptions have been removed.
:yield: yields an item in the queue if not empty
"""
[docs] @abstractmethod
def play_log_book(
self, filter_log: bool = True, log_filter_pattern: str = ""
) -> str:
"""Returns the contents of the logbook as a string of messages seperated by
newline characters
:param filter_log: whether the log book should filter out messages labeled as
"log_filter", defaults to True
:return: the logbook contents
"""