Source code for ska_tango_base.software_bus._observer

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
#
import functools
import inspect
import typing
from collections import defaultdict

from .. import type_hints
from ._bus import current_signal_bus_logger
from ._signal import NoValue, Signal

ObserverT = typing.TypeVar("ObserverT", bound="Observer")
SharingObserverT = typing.TypeVar("SharingObserverT", bound="SharingObserver")
ValueT = typing.TypeVar("ValueT")


[docs] class ListenerMethod(typing.Protocol): """Method on an :py:class:`Observer` which can receive a signal.""" @property def __listen_to_signal__(self) -> typing.Any: """Signal to listen to.""" @__listen_to_signal__.setter def __listen_to_signal__(self, value: typing.Any) -> None: """Signal to listen to.""" def __call__(self, observer: ObserverT, value: typing.Any) -> None: """Respond to emission."""
_ListenerDecorator: typing.TypeAlias = typing.Callable[ [typing.Callable[[ObserverT, typing.Any], None]], ListenerMethod ] # We are using a object here, rather than a function, to make it easier to # avoid circular references. If we just used a single function as an observer, # then users would have to be careful to make sure closures only hold weak # references to objects holding a reference to the bus. By making the observer # a class, we can manage this problem in the _SignalBus ourselves.
[docs] class Observer: """ An observer that handles different signals with different methods. Sub-classes can mark a method as a :py:const:`ListenerMethod` using the :py:func:`listen_to_signal()` decorator. :py:const:`ListenerMethod` will only be called for signals they are registered to. """ _static_listener_methods: typing.ClassVar[ dict[str | Signal[typing.Any], list[ListenerMethod]] ] @property def _listener_methods(self) -> dict[str, list[ListenerMethod]]: return typing.cast( dict[str, list[ListenerMethod]], self._static_listener_methods ) @classmethod def __init_subclass__(cls, **kwargs: typing.Any) -> None: """Gather all the listener methods on subclasses.""" super().__init_subclass__(**kwargs) listeners = defaultdict[typing.Any, list[ListenerMethod]](lambda: []) for key in dir(cls): obj = getattr(cls, key) if inspect.isroutine(obj) and hasattr(obj, "__listen_to_signal__"): listeners[obj.__listen_to_signal__].append( typing.cast(ListenerMethod, obj) ) cls._static_listener_methods = dict(listeners)
[docs] def notify_emission(self, signal: str, value: typing.Any) -> None: """Call all the listener methods for the given signal.""" for unbound_method in self._listener_methods.get(signal, []): try: unbound_method(self, value) except Exception: logger = current_signal_bus_logger.get() logger.exception( ( "Listener method %r threw an unexpected exception for %r. " + "Continuing with remaining listeners." ), unbound_method, value, )
@typing.overload def listen_to_signal(signal: str) -> _ListenerDecorator[ObserverT]: ... @typing.overload def listen_to_signal( signal: Signal[ValueT], ) -> _ListenerDecorator[SharingObserverT]: ... @typing.overload def listen_to_signal( signal: str, listener: typing.Callable[[ObserverT, typing.Any], None] ) -> ListenerMethod: ... @typing.overload def listen_to_signal( signal: Signal[ValueT], listener: typing.Callable[[SharingObserverT, typing.Any], None], ) -> ListenerMethod: ...
[docs] def listen_to_signal( signal: str | Signal[ValueT], listener: typing.Callable[[ObserverT, typing.Any], None] | None = None, ) -> ( _ListenerDecorator[ObserverT] | _ListenerDecorator[SharingObserverT] | ListenerMethod ): """ Mark a method as listening to a signal. This function will be called asynchronously whenever a value is emitted for the signal. For :py:class:`Observer` objects, only a `signal` specified as a `str` is supported, however, for :py:class:`SharingObserver` objects both `str` and :py:class:`Signal` objects are accepted. For a :py:class:`SharingObserver`, signal names are relative to the parent object. """ # This functools.partial trick allows us to use listen_to_signal as a # decorator: # # ``` # @listen_to_signal("signal") # def on_signal(self, old_value, new_value): # ... # ``` # and as a function: # ``` # listener = listen_to_signal("signal", method) # ``` if listener is None: return typing.cast( _ListenerDecorator[typing.Any], functools.partial(listen_to_signal, signal) ) # Used by Observer.__init_subclass__ to know that this method wants to # listen to this signal. listener = typing.cast(ListenerMethod, listener) listener.__listen_to_signal__ = signal return listener
[docs] class SharingObserver(Observer): """ An observer that shares its bus with sub-objects. When a :py:class:`~ska_tango_base.type_hints.BusProtocol` is assigned to :py:attr:`shared_bus` this will recursively set on all sub-objects which also inherit from :py:class:`SharingObserver`. Subclasses can be notified when a new :py:attr:`shared_bus` is available by overriding :py:meth:`on_new_shared_bus`. Each :py:class:`SharingObserver` object has a :py:attr:`observer_prefix` that prefixes any signals listened to by that object. The root object, where the bus was originally assigned, has an :py:attr:`observer_prefix` of "." and each sub-object has an :py:attr:`observer_prefix` based on the path to access that sub-object from the root object. For example: .. code :: python class SubObj(SharingObserver): bar = Signal[str]() @listen_to_signal(bar) def on_bar(self, value: str): print(value) class MySharer(SharingObserver): def __init__(self): self.foo = SubObj() self.shared_bus = SignalBus() self.shared_bus.start_thread() sharer = MySharer() assert sharer.observer_prefix == "." assert sharer.foo.observer_prefix == ".foo" Here, the ``sharer.foo.on_bar`` listener method will receive values emitted on the bus whenever ``sharer.foo.bar`` is set. """ _canonicalised_listener_methods: dict[str, list[ListenerMethod]] _shared_bus: type_hints.BusProtocol | None ROOT_OBSERVER_PREFIX: typing.ClassVar[str] = "."
[docs] def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None: """Initialise the device.""" self._shared_bus = None self._path_from_root = self.ROOT_OBSERVER_PREFIX super().__init__(*args, **kwargs)
@property def observer_prefix(self) -> str: """Return the path from the root ``SharingObserver`` holding the bus.""" return self._path_from_root @property def shared_bus(self) -> type_hints.BusProtocol: """The shared bus used by this ``SharingObserver``.""" if self._shared_bus is None: raise RuntimeError("The bus has not been shared!") return self._shared_bus @shared_bus.setter def shared_bus(self, bus: type_hints.BusProtocol) -> None: self._shared_bus = bus self._shared_bus.register_observer(self) self.on_new_shared_bus() for key, obj in vars(self).items(): if isinstance(obj, SharingObserver): obj._path_from_root = f"{self._path_from_root}{key}." obj.shared_bus = self._shared_bus all_vars: dict[str, typing.Any] = {} for cls in reversed(self.__class__.mro()): all_vars.update(vars(cls)) for key, obj in all_vars.items(): if isinstance(obj, Signal) and obj._initial_value is not NoValue: obj.__set__(self, obj._initial_value) @property def _listener_methods(self) -> dict[str, list[ListenerMethod]]: return self._canonicalised_listener_methods
[docs] def on_new_shared_bus(self) -> None: """ Notify that a new bus is available. When overriding this method you must call ``super().on_new_shared_bus``. """ listeners = defaultdict(lambda: []) for signal, methods in self._static_listener_methods.items(): if isinstance(signal, str): canonical = canonicalise_relative_to(self, signal) elif isinstance(signal, Signal): canonical = signal._absolute_name_for(self) else: raise TypeError(f"Unknown signal type '{type(signal)}'") listeners[canonical].extend(methods) self._canonicalised_listener_methods = dict(listeners)
[docs] def canonicalise_relative_to( obj: type_hints.SharingObserverProtocol, signal: str ) -> str: """ Return the absolute signal name, relative to obj. If signal is already absolute, it is returned as is. """ if signal.startswith(obj.ROOT_OBSERVER_PREFIX): return signal return f"{obj.observer_prefix}{signal}"