#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
import contextvars
import logging
import queue
import threading
import time
import typing
import weakref
import tango
from .. import type_hints
module_logger = logging.getLogger("software_bus")
current_signal_bus_logger = contextvars.ContextVar[logging.Logger](
"current_signal_bus_logger", default=module_logger
)
[docs]
class TimedOutError(Exception):
"""Timed out waiting for the signal bus background thread."""
[docs]
def __init__(self) -> None:
"""Initalise the exception with the default error message."""
super().__init__(self.__class__.__doc__)
class _WaitForSignalObserver:
"""An observer for SignalBus.wait_for_signal_value."""
def __init__(
self,
signal: str,
conditional: typing.Callable[[typing.Any], bool],
event: threading.Event,
):
self._signal = signal
self._conditional = conditional
self._event = event
def notify_emission(self, signal: str, value: typing.Any) -> None:
"""
Unblock the `wait_for_signal_value` caller when `self._conditional` passes.
First checks the signal is the one waited on, then unblocks the event if the
value is desired as indicated by self._conditional.
"""
if signal == self._signal:
if self._conditional(value):
self._event.set()
class _SignalBus:
"""
A software bus that notifies observers listening to signals.
The bus can :py:func:`emit()` values for a given signal. In order to
get notified when a value is emitted for any signals, an observer
can be registered with :py:func:`register_observer()`. Observers are stored
in a class :py:class:`weakref.WeakSet` and so must be kept alive by the
caller.
Each signal is identified by a user-provided string. When a value is emitted
for a given signal an "emission" is added to an
internal queue. This queue is serviced by a separate background thread,
which must be started with :py:func:`start_thread()`.
When background thread receives an emission from the internal queue it
will notify all the registered observer objects that still exist by
calling :py:func:`ObserverProtocol.notify_emission`. This notification occurs in an
unspecified order. Emissions are processed in the order they are received.
The internal queue has a maximum size and attempting to :py:func:`emit()` while the
queue is full will log a warning message before discarding the emission.
The expectation is that observers should return quickly compared to the rate
that signals are emitted, so the internal queue filling up should be considered
a misuse of ``SignalBus``. The limit is in place so that the failure mode is more
explicit than some queue growing and eating up all the memory on the system.
:param max_queue_size: Maximum size of the internal queue
:param name: name to use for the background thread
:param emit_timeout: how long to wait in seconds if the queue is full
"""
def __init__(
self,
logger: logging.Logger | None = None,
max_queue_size: int = 8196,
name: str = "",
emit_timeout: float = 10.0,
) -> None:
"""
Initialise the object.
:param logger: to use to for logging
:param max_queue_size: maximum number of emissions that can be stored
in the internal queue
"""
self._logger = module_logger if logger is None else logger
# We don't want to create circular references with this bus, so we only
# ever hold on to observers with weak references. If the observer goes
# away, we don't care and will just stop notifying it.
self._observers = weakref.WeakSet[type_hints.ObserverProtocol]()
self._emit_timeout = emit_timeout
self._blocking_emissions_lock = threading.Lock()
self._last_emission_queue_full_log = 0.0
self._blocking_emissions = 0
self._emission_queue = queue.Queue[tuple[str, typing.Any]](
maxsize=max_queue_size
)
self._emission_store: dict[str, typing.Any] = {}
self._emission_store_lock = threading.Lock()
if name:
name += " "
self._thread = threading.Thread(
target=self._run_bus_thread,
name=f"{name}SignalBus Thread",
)
def register_observer(self, observer: type_hints.ObserverProtocol) -> None:
"""
Register an observer to be notified when a signal is emitted.
`observer.notify_emission` will be called for every signal emitted
on the `SignalBus`. It is the responsibility of the observer to
determine whether the signal is interesting.
If `register_observer` is called while the SignalBus background thread is
running, the observer will be registered after any queued emissions
(calls to `SignalBus.emit`). `wait_for_thread` is supplied for synchronising
with the background thread.
If the observer is destroyed it will automatically be unregistered.
:param observer: The observer to register.
"""
if self._thread.is_alive():
self._emission_queue.put(("!register", observer))
return
self._observers.add(observer)
_LOG_QUEUE_FULL_PERIOD = 10.0
def emit(self, signal: str, value: typing.Any, *, store: bool = False) -> None:
"""
Emit a new value for the signal.
Any observes registered for this signal are notified asynchronously by the
background thread which is started by :py:func:`start_thread()`.
:param signal: Absolute name of the signal
:param value: New value to emit
:param store: Boolean whether to store the emitted value
"""
if signal.startswith("!"):
raise ValueError(
f'Invalid signal "{signal}". Signals may not start with a "!"'
)
if store:
self._emission_store_lock.acquire()
try:
self._emission_queue.put((signal, value), block=False)
except queue.Full:
with self._blocking_emissions_lock:
self._blocking_emissions += 1
try:
self._emission_queue.put((signal, value), timeout=self._emit_timeout)
except queue.Full:
self._logger.exception(
"Observers cannot keep up with rate signals are being emitted. "
"Emission %r has been discarded after waiting %s seconds.",
(signal, value),
self._emit_timeout,
)
else:
if store:
self._emission_store[signal] = value
else:
if store:
self._emission_store[signal] = value
finally:
if store:
self._emission_store_lock.release()
with self._blocking_emissions_lock:
now = time.monotonic()
if (
now - self._last_emission_queue_full_log > self._LOG_QUEUE_FULL_PERIOD
and self._blocking_emissions != 0
):
prefix = "has"
suffix = ""
if self._blocking_emissions != 1:
prefix = f"and {self._blocking_emissions - 1} other emissions have"
if self._last_emission_queue_full_log != 0.0:
since_last_log = now - self._last_emission_queue_full_log
suffix = f" in the last {since_last_log} seconds"
self._logger.warning(
"Observers cannot keep up with rate signals "
"are being emitted. "
"Emission %r %s had to block waiting for "
"space in the queue%s.",
(signal, value),
prefix,
suffix,
)
self._last_emission_queue_full_log = now
self._blocking_emissions = 0
def get_last_emitted_value(self, signal: str) -> typing.Any:
"""
Get the last emission that was stored for the given signal.
:param signal: Absolute name of the signal.
:raises KeyError: If no value has been stored for the signal.
"""
return self._emission_store[signal]
def delete_last_emitted_value(self, signal: str) -> None:
"""
Delete the last emission that was stored for the given signal.
:param signal: Absolute name of the signal.
:raises KeyError: If no value has been stored for the signal.
"""
del self._emission_store[signal]
def start_thread(self) -> None:
"""Start the background thread to notify observers about emissions."""
self._thread.start()
def shutdown_thread(self) -> None:
"""
Shutdown the background thread.
This waits for the thread to finish processing remaining emissions.
If the background thread is not running, this does nothing.
"""
# Wait for all emissions to be processed so we don't miss any events
if self._thread.is_alive():
self._emission_queue.put(("!shutdown", None))
self._thread.join()
def wait_for_thread(self, timeout: float | None = 5.0) -> None:
"""
Synchronise the calling thread with the background thread.
When this function returns, all emissions sequenced before this call
will have been processed by the background thread.
:param timeout: Time to wait for the background thread. A timeout of
``None`` will wait forever.
:raises TimedOutError: If the timeout is exceeded while waiting for
the background thread.
"""
event = threading.Event()
self._emission_queue.put(("!fence", (event, None)))
if not event.wait(timeout=timeout):
raise TimedOutError()
def wait_for_signal_value(
self,
signal: str,
conditional: typing.Callable[[typing.Any], bool] | None = None,
timeout: float | None = None,
) -> bool:
"""
Wait for a value to be emitted for `signal` where `conditional` returns True.
Whenever a value is emitted for the signal, `conditional(old_value, new_value)`
is called. The thread calling this method will block until `conditional` returns
True or the timeout is reached. If `conditional` is None, this method will
unblock on any value emitted for the signal.
:param signal: The signal to wait for a value emission.
:param conditional: Callable used to check for the desired signal value.
:param timeout: Time to wait for signal value. A timeout of None (the default)
will wait forever.
:returns: True if a desired value was emitted for the signal within the timeout,
False otherwise.
"""
if conditional is None:
def default(*_: typing.Any) -> bool:
return True
conditional = default
# Event gets set() by _run_bus_thread
event = threading.Event()
# The observer will exist at least until it is added to self._observers by the
# background thread. If the timeout is longer than it takes to be registered,
# the only strong reference to the observer will be in this method, and when
# execution leaves this scope, the garbage collector will delete the observer.
observer = _WaitForSignalObserver(signal, conditional, event)
self.register_observer(observer)
return event.wait(timeout)
def __del__(self) -> None:
"""Delete the object."""
if self._thread.is_alive():
self.shutdown_thread()
def _process_emission(self, signal: str, value: typing.Any) -> None:
for obs in self._observers:
try:
obs.notify_emission(signal, value)
except Exception:
self._logger.exception(
(
"Observer %r threw an unexpected exception while "
+ "processing emission %r. Continuing with remaining observers."
),
obs,
(signal, value),
)
def _run_bus_thread(self) -> None:
current_signal_bus_logger.set(self._logger)
with tango.EnsureOmniThread():
while True:
signal, value = self._emission_queue.get()
if signal.startswith("!"):
match signal:
case "!shutdown":
break
case "!fence":
received, pause_request = value
received.set()
if pause_request is not None:
self._pause(*pause_request)
case "!register":
self._observers.add(value)
case _:
self._logger.error(
f'Invalid control signal "{signal}". Skipping.'
)
else:
self._process_emission(signal, value)
# Workarounds for pytango#688, see comment in SignalBusMixin.__init__()
def _pause(
self, should_unpause_now: typing.Callable[[], bool], max_duration: float
) -> None:
"""Pause thread until should_unpause_now returns True."""
end_timestamp = time.monotonic() + max_duration
while time.monotonic() < end_timestamp:
if should_unpause_now():
return
time.sleep(0.1)
self._logger.error(
"SignalBus background thread pause exceed "
f"maximum duration of {max_duration} ms."
)
def _pause_thread_until(
self,
callback: typing.Callable[[], bool],
max_pause_duration: int = 10000,
ack_timeout: int = 1000,
) -> None:
"""
Pause the background thread until the callback returns True.
This function will first clear the queue, before pausing the
background thread. Signals emitted after this function is called,
will not be processed until the background thread is unpaused.
The thread will emit and error and unpause if it exceeds the
maximum duration.
This function exists to workaround pytango#688 and should not
be used outside of ska-tango-base.
:param callback: Returns True when we should stop pausing
:param max_pause_duration: Maximum duration to pause for
:param ack_timeout: Timeout for pause acknowledgement
:returns: Once the queue has been cleared and the background
thread has acknowledged the pause request.
"""
event = threading.Event()
self._emission_queue.put(("!fence", (event, (callback, max_pause_duration))))
if not event.wait(timeout=ack_timeout):
raise TimedOutError()