#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
#
import functools
import inspect
import typing
from collections import defaultdict
from .. import type_hints
from ._bus import current_signal_bus_logger
from ._signal import NoValue, Signal
ObserverT = typing.TypeVar("ObserverT", bound="Observer")
SharingObserverT = typing.TypeVar("SharingObserverT", bound="SharingObserver")
ValueT = typing.TypeVar("ValueT")
[docs]
class ListenerMethod(typing.Protocol):
"""Method on an :py:class:`Observer` which can receive a signal."""
@property
def __listen_to_signal__(self) -> typing.Any:
"""Signal to listen to."""
@__listen_to_signal__.setter
def __listen_to_signal__(self, value: typing.Any) -> None:
"""Signal to listen to."""
def __call__(self, observer: ObserverT, value: typing.Any) -> None:
"""Respond to emission."""
_ListenerDecorator: typing.TypeAlias = typing.Callable[
[typing.Callable[[ObserverT, typing.Any], None]], ListenerMethod
]
# We are using a object here, rather than a function, to make it easier to
# avoid circular references. If we just used a single function as an observer,
# then users would have to be careful to make sure closures only hold weak
# references to objects holding a reference to the bus. By making the observer
# a class, we can manage this problem in the _SignalBus ourselves.
[docs]
class Observer:
"""
An observer that handles different signals with different methods.
Sub-classes can mark a method as a :py:const:`ListenerMethod` using the
:py:func:`listen_to_signal()` decorator. :py:const:`ListenerMethod` will
only be called for signals they are registered to.
"""
_static_listener_methods: typing.ClassVar[
dict[str | Signal[typing.Any], list[ListenerMethod]]
]
@property
def _listener_methods(self) -> dict[str, list[ListenerMethod]]:
return typing.cast(
dict[str, list[ListenerMethod]], self._static_listener_methods
)
@classmethod
def __init_subclass__(cls, **kwargs: typing.Any) -> None:
"""Gather all the listener methods on subclasses."""
super().__init_subclass__(**kwargs)
listeners = defaultdict[typing.Any, list[ListenerMethod]](lambda: [])
for key in dir(cls):
obj = getattr(cls, key)
if inspect.isroutine(obj) and hasattr(obj, "__listen_to_signal__"):
listeners[obj.__listen_to_signal__].append(
typing.cast(ListenerMethod, obj)
)
cls._static_listener_methods = dict(listeners)
[docs]
def notify_emission(self, signal: str, value: typing.Any) -> None:
"""Call all the listener methods for the given signal."""
for unbound_method in self._listener_methods.get(signal, []):
try:
unbound_method(self, value)
except Exception:
logger = current_signal_bus_logger.get()
logger.exception(
(
"Listener method %r threw an unexpected exception for %r. "
+ "Continuing with remaining listeners."
),
unbound_method,
value,
)
@typing.overload
def listen_to_signal(signal: str) -> _ListenerDecorator[ObserverT]: ...
@typing.overload
def listen_to_signal(
signal: Signal[ValueT],
) -> _ListenerDecorator[SharingObserverT]: ...
@typing.overload
def listen_to_signal(
signal: str, listener: typing.Callable[[ObserverT, typing.Any], None]
) -> ListenerMethod: ...
@typing.overload
def listen_to_signal(
signal: Signal[ValueT],
listener: typing.Callable[[SharingObserverT, typing.Any], None],
) -> ListenerMethod: ...
[docs]
def listen_to_signal(
signal: str | Signal[ValueT],
listener: typing.Callable[[ObserverT, typing.Any], None] | None = None,
) -> (
_ListenerDecorator[ObserverT]
| _ListenerDecorator[SharingObserverT]
| ListenerMethod
):
"""
Mark a method as listening to a signal.
This function will be called asynchronously whenever a value is emitted
for the signal.
For :py:class:`Observer` objects, only a `signal` specified as a `str` is supported,
however, for :py:class:`SharingObserver` objects both `str` and :py:class:`Signal`
objects are accepted.
For a :py:class:`SharingObserver`, signal names are relative to the parent object.
"""
# This functools.partial trick allows us to use listen_to_signal as a
# decorator:
#
# ```
# @listen_to_signal("signal")
# def on_signal(self, old_value, new_value):
# ...
# ```
# and as a function:
# ```
# listener = listen_to_signal("signal", method)
# ```
if listener is None:
return typing.cast(
_ListenerDecorator[typing.Any], functools.partial(listen_to_signal, signal)
)
# Used by Observer.__init_subclass__ to know that this method wants to
# listen to this signal.
listener = typing.cast(ListenerMethod, listener)
listener.__listen_to_signal__ = signal
return listener
[docs]
class SharingObserver(Observer):
"""
An observer that shares its bus with sub-objects.
When a :py:class:`~ska_tango_base.type_hints.BusProtocol` is assigned to
:py:attr:`shared_bus` this will recursively set on all sub-objects which
also inherit from :py:class:`SharingObserver`.
Subclasses can be notified when a new :py:attr:`shared_bus` is available by
overriding :py:meth:`on_new_shared_bus`.
Each :py:class:`SharingObserver` object has a :py:attr:`observer_prefix` that
prefixes any signals listened to by that object. The root object, where the bus was
originally assigned, has an :py:attr:`observer_prefix` of "." and each sub-object
has an :py:attr:`observer_prefix` based on the path to access that sub-object from
the root object. For example:
.. code :: python
class SubObj(SharingObserver):
bar = Signal[str]()
@listen_to_signal(bar)
def on_bar(self, value: str):
print(value)
class MySharer(SharingObserver):
def __init__(self):
self.foo = SubObj()
self.shared_bus = SignalBus()
self.shared_bus.start_thread()
sharer = MySharer()
assert sharer.observer_prefix == "."
assert sharer.foo.observer_prefix == ".foo"
Here, the ``sharer.foo.on_bar`` listener method will receive values emitted
on the bus whenever ``sharer.foo.bar`` is set.
"""
_canonicalised_listener_methods: dict[str, list[ListenerMethod]]
_shared_bus: type_hints.BusProtocol | None
ROOT_OBSERVER_PREFIX: typing.ClassVar[str] = "."
[docs]
def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
"""Initialise the device."""
self._shared_bus = None
self._path_from_root = self.ROOT_OBSERVER_PREFIX
super().__init__(*args, **kwargs)
@property
def observer_prefix(self) -> str:
"""Return the path from the root ``SharingObserver`` holding the bus."""
return self._path_from_root
@property
def shared_bus(self) -> type_hints.BusProtocol:
"""The shared bus used by this ``SharingObserver``."""
if self._shared_bus is None:
raise RuntimeError("The bus has not been shared!")
return self._shared_bus
@shared_bus.setter
def shared_bus(self, bus: type_hints.BusProtocol) -> None:
self._shared_bus = bus
self._shared_bus.register_observer(self)
self.on_new_shared_bus()
for key, obj in vars(self).items():
if isinstance(obj, SharingObserver):
obj._path_from_root = f"{self._path_from_root}{key}."
obj.shared_bus = self._shared_bus
all_vars: dict[str, typing.Any] = {}
for cls in reversed(self.__class__.mro()):
all_vars.update(vars(cls))
for key, obj in all_vars.items():
if isinstance(obj, Signal) and obj._initial_value is not NoValue:
obj.__set__(self, obj._initial_value)
@property
def _listener_methods(self) -> dict[str, list[ListenerMethod]]:
return self._canonicalised_listener_methods
[docs]
def on_new_shared_bus(self) -> None:
"""
Notify that a new bus is available.
When overriding this method you must call ``super().on_new_shared_bus``.
"""
listeners = defaultdict(lambda: [])
for signal, methods in self._static_listener_methods.items():
if isinstance(signal, str):
canonical = canonicalise_relative_to(self, signal)
elif isinstance(signal, Signal):
canonical = signal._absolute_name_for(self)
else:
raise TypeError(f"Unknown signal type '{type(signal)}'")
listeners[canonical].extend(methods)
self._canonicalised_listener_methods = dict(listeners)
[docs]
def canonicalise_relative_to(
obj: type_hints.SharingObserverProtocol, signal: str
) -> str:
"""
Return the absolute signal name, relative to obj.
If signal is already absolute, it is returned as is.
"""
if signal.startswith(obj.ROOT_OBSERVER_PREFIX):
return signal
return f"{obj.observer_prefix}{signal}"