Source code for ska_ser_skallop.connectors.remoting.testing

"""Objects that can be used to substitute external dependencies for test purposes."""
import logging
from queue import Queue
from typing import Any, Generator

from ska_ser_skallop.subscribing import base, helpers

logger = logging.getLogger(__name__)


[docs]class TestSubscriber(base.Subscriber): """Implements a subscriber that generates a log message for every event pushed. This can be used during exploration testing of tango bridge and remote device proxy. """ def __init__(self) -> None: """Initialise object.""" self.queue = Queue()
[docs] def push_event(self, event: base.EventDataInt) -> None: """Receive subscribed event. :param event: The event from producer. :type event: base.EventDataInt """ self.queue.put_nowait(event)
[docs] def log_events(self, nr_of_events: int, timeout=2): """Generate a log message for each log event received up till now and block if events empty. :param nr_of_events: The nr of events to retrieve from the buffer. :type nr_of_events: int :param timeout: The maximum time to wait for any incoming message, defaults to 2 :type timeout: int, optional """ while nr_of_events: event = next(self.events(timeout)) results = helpers.describe_event(event) logger.info( f"{results[0]:<20}{results[1]:<20}{results[2]:<20}{results[3]:<20}" ) print(f"{results[0]:<20}{results[1]:<20}{results[2]:<20}{results[3]:<20}") nr_of_events -= 1
[docs] def events(self, timeout=2) -> Generator[Any, None, None]: """Generate events as and when they arrive. E.g. .. code-block:: python for event in subscriber.events(): print(event) :param timeout: The maximum time to wait for any incoming message, defaults to 2 :type timeout: int, optional :yield: [description] """ yield self.queue.get(timeout=timeout)