"""Objects that can be used to substitute external dependencies for test purposes."""
import logging
from queue import Queue
from typing import Any, Generator
from ska_ser_skallop.subscribing import base, helpers
logger = logging.getLogger(__name__)
[docs]class TestSubscriber(base.Subscriber):
"""Implements a subscriber that generates a log message for every event pushed.
This can be used during exploration testing of tango bridge and remote device proxy.
"""
def __init__(self) -> None:
"""Initialise object."""
self.queue = Queue()
[docs] def push_event(self, event: base.EventDataInt) -> None:
"""Receive subscribed event.
:param event: The event from producer.
:type event: base.EventDataInt
"""
self.queue.put_nowait(event)
[docs] def log_events(self, nr_of_events: int, timeout=2):
"""Generate a log message for each log event received up till now and block if events empty.
:param nr_of_events: The nr of events to retrieve from the buffer.
:type nr_of_events: int
:param timeout: The maximum time to wait for any incoming message, defaults to 2
:type timeout: int, optional
"""
while nr_of_events:
event = next(self.events(timeout))
results = helpers.describe_event(event)
logger.info(
f"{results[0]:<20}{results[1]:<20}{results[2]:<20}{results[3]:<20}"
)
print(f"{results[0]:<20}{results[1]:<20}{results[2]:<20}{results[3]:<20}")
nr_of_events -= 1
[docs] def events(self, timeout=2) -> Generator[Any, None, None]:
"""Generate events as and when they arrive.
E.g.
.. code-block:: python
for event in subscriber.events():
print(event)
:param timeout: The maximum time to wait for any incoming message, defaults to 2
:type timeout: int, optional
:yield: [description]
"""
yield self.queue.get(timeout=timeout)