#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""Internal software bus implementation."""
from __future__ import annotations
import functools
import inspect
import logging
import threading
import time
import typing
from collections import defaultdict
from collections.abc import Callable, Iterable
from contextlib import contextmanager
from contextvars import ContextVar
from queue import Full, Queue
from threading import RLock, Thread
from types import FrameType
from typing import (
Any,
ClassVar,
Generic,
Iterator,
Protocol,
TypeAlias,
TypeVar,
cast,
overload,
)
from weakref import WeakSet
import tango
from packaging import version
from tango import (
AttrQuality,
AutoTangoAllowThreads,
AutoTangoMonitor,
EnsureOmniThread,
Except,
)
from tango.device_server import Attribute
from tango.server import attribute
from ._autodoc_hacks import _TANGO_IS_MOCKED_BY_AUTODOC
from ._notango import TimedAttrData as _TimedAttrData
from .ska_device import SKADevice
from .type_hints import (
BusProtocol,
ObserverProtocol,
SharingObserverProtocol,
)
module_logger = logging.getLogger(__name__)
current_signal_bus_logger: ContextVar[logging.Logger] = ContextVar(
"current_signal_bus_logger", default=module_logger
)
__all__ = [
"AttrSignal",
"CachingAttrSignal",
"ListenerMethod",
"NoValue",
"Observer",
"SharingObserver",
"Signal",
"SignalBusMixin",
"attribute_from_signal",
"canonicalise_relative_to",
"listen_to_signal",
]
class _WaitForSignalObserver:
"""An observer for SignalBus.wait_for_signal_value."""
def __init__(
self,
signal: str,
conditional: Callable[[Any], bool],
event: threading.Event,
):
self._signal = signal
self._conditional = conditional
self._event = event
def notify_emission(self, signal: str, value: Any) -> None:
"""
Unblock the `wait_for_signal_value` caller when `self._conditional` passes.
First checks the signal is the one waited on, then unblocks the event if the
value is desired as indicated by self._conditional.
"""
if signal == self._signal:
if self._conditional(value):
self._event.set()
NoValue = object()
"""Used to communicate that no value has been emitted for a signal.
Required to distinguish from ``None``, which is a valid value to emit.
"""
class TimedOutError(Exception):
"""Timed out waiting for the signal bus background thread."""
def __init__(self) -> None:
"""Initalise the exception with the default error message."""
super().__init__(self.__class__.__doc__)
class _SignalBus:
"""
A software bus that notifies observers listening to signals.
The bus can :py:func:`emit()` values for a given signal. In order to
get notified when a value is emitted for any signals, an observer
can be registered with :py:func:`register_observer()`. Observers are stored
in a class :py:class:`weakref.WeakSet` and so must be kept alive by the
caller.
Each signal is identified by a user-provided string. When a value is emitted
for a given signal an "emission" is added to an
internal queue. This queue is serviced by a separate background thread,
which must be started with :py:func:`start_thread()`.
When background thread receives an emission from the internal queue it
will notify all the registered observer objects that still exist by
calling :py:func:`ObserverProtocol.notify_emission`. This notification occurs in an
unspecified order. Emissions are processed in the order they are received.
The internal queue has a maximum size and attempting to :py:func:`emit()` while the
queue is full will log a warning message before discarding the emission.
The expectation is that observers should return quickly compared to the rate
that signals are emitted, so the internal queue filling up should be considered
a misuse of ``SignalBus``. The limit is in place so that the failure mode is more
explicit than some queue growing and eating up all the memory on the system.
:param max_queue_size: Maximum size of the internal queue
:param name: name to use for the background thread
"""
def __init__(
self,
logger: logging.Logger | None = None,
max_queue_size: int = 8196,
name: str = "",
) -> None:
"""
Initialise the object.
:param logger: to use to for logging
:param max_queue_size: maximum number of emissions that can be stored
in the internal queue
"""
self._logger = module_logger if logger is None else logger
# We don't want to create circular references with this bus, so we only
# ever hold on to observers with weak references. If the observer goes
# away, we don't care and will just stop notifying it.
self._observers = WeakSet[ObserverProtocol]()
self._emission_lock = RLock()
self._last_emission_queue_full_log = 0.0
self._discarded_emissions_count = 0
self._emission_queue = Queue[tuple[str, Any]](maxsize=max_queue_size)
if name:
name += " "
self._thread = Thread(
target=self._run_bus_thread,
name=f"{name}SignalBus Thread",
)
def register_observer(self, observer: ObserverProtocol) -> None:
"""
Register an observer to be notified when a signal is emitted.
`observer.notify_emission` will be called for every signal emitted
on the `SignalBus`. It is the responsibility of the observer to
determine whether the signal is interesting.
If `register_observer` is called while the SignalBus background thread is
running, the observer will be registered after any queued emissions
(calls to `SignalBus.emit`). `wait_for_thread` is supplied for synchronising
with the background thread.
If the observer is destroyed it will automatically be unregistered.
:param observer: The observer to register.
"""
if self._thread.is_alive():
self._emission_queue.put(("!register", observer))
return
self._observers.add(observer)
_LOG_QUEUE_FULL_PERIOD = 10.0
def emit(self, signal: str, value: Any) -> None:
"""
Emit a new value for the signal.
When this function returns the last value will be updated with ``value``. That
is, provided no other thread is emitting values for this signal, the following
assert will never fire for any ``value`` and ``signal``:
.. code:: python
bus.emit(signal, value)
assert bus.get_last_emitted_value(signal) == value
Any observes registered for this signal are notified asynchronously by the
background thread which is started by :py:func:`start_thread()`.
:param signal: name of the signal
:param value: new value to emit
"""
if signal.startswith("!"):
raise ValueError(
f'Invalid signal "{signal}". Signals may not start with a "!"'
)
with self._emission_lock:
try:
self._emission_queue.put((signal, value), block=False)
except Full:
now = time.time()
if (
now - self._last_emission_queue_full_log
> self._LOG_QUEUE_FULL_PERIOD
):
discard_prefix = ""
discard_suffix = ""
if self._discarded_emissions_count != 0:
discard_prefix = (
f"and {self._discarded_emissions_count} other emissions "
)
if self._last_emission_queue_full_log != 0.0:
discard_suffix = (
f" in the last {self._LOG_QUEUE_FULL_PERIOD} seconds"
)
self._logger.exception(
"Observers cannot keep up with rate signals are being emitted. "
"Emission %r %shave been discarded%s.",
(signal, value),
discard_prefix,
discard_suffix,
)
self._last_emission_queue_full_log = now
self._discarded_emissions_count = 0
else:
self._discarded_emissions_count += 1
def start_thread(self) -> None:
"""Start the background thread to notify observers about emissions."""
self._thread.start()
def shutdown_thread(self) -> None:
"""
Shutdown the background thread.
This waits for the thread to finish processing remaining emissions.
"""
# TODO: Should we wait for processing to finish, or flush the queue
# first?
self._emission_queue.put(("!shutdown", None))
self._thread.join()
def wait_for_thread(self, timeout: float | None = 5.0) -> None:
"""
Synchronise the calling thread with the background thread.
When this function returns, all emissions sequenced before this call
will have been processed by the background thread.
:param timeout: Time to wait for the background thread. A timeout of
``None`` will wait forever.
:raises TimedOutError: If the timeout is exceeded while waiting for
the background thread.
"""
event = threading.Event()
self._emission_queue.put(("!fence", (event, None)))
if not event.wait(timeout=timeout):
raise TimedOutError()
def wait_for_signal_value(
self,
signal: str,
conditional: Callable[[Any], bool] | None = None,
timeout: float | None = None,
) -> bool:
"""
Wait for a value to be emitted for `signal` where `conditional` returns True.
Whenever a value is emitted for the signal, `conditional(old_value, new_value)`
is called. The thread calling this method will block until `conditional` returns
True or the timeout is reached. If `conditional` is None, this method will
unblock on any value emitted for the signal.
:param signal: The signal to wait for a value emission.
:param conditional: Callable used to check for the desired signal value.
:param timeout: Time to wait for signal value. A timeout of None (the default)
will wait forever.
:returns: True if a desired value was emitted for the signal within the timeout,
False otherwise.
"""
if conditional is None:
def default(*_: Any) -> bool:
return True
conditional = default
# Event gets set() by _run_bus_thread
event = threading.Event()
# The observer will exist at least until it is added to self._observers by the
# background thread. If the timeout is longer than it takes to be registered,
# the only strong reference to the observer will be in this method, and when
# execution leaves this scope, the garbage collector will delete the observer.
observer = _WaitForSignalObserver(signal, conditional, event)
self.register_observer(observer)
return event.wait(timeout)
def __del__(self) -> None:
"""Delete the object."""
if self._thread.is_alive():
self.shutdown_thread()
def _process_emission(self, signal: str, value: Any) -> None:
for obs in self._observers:
try:
obs.notify_emission(signal, value)
except Exception:
self._logger.exception(
(
"Observer %r threw an unexpected exception while "
+ "processing emission %r. Continuing with remaining observers."
),
obs,
(signal, value),
)
def _run_bus_thread(self) -> None:
current_signal_bus_logger.set(self._logger)
with EnsureOmniThread():
while True:
signal, value = self._emission_queue.get()
if signal.startswith("!"):
match signal:
case "!shutdown":
break
case "!fence":
received, pause_request = value
received.set()
if pause_request is not None:
self._pause(*pause_request)
case "!register":
self._observers.add(value)
case _:
self._logger.error(
f'Invalid control signal "{signal}". Skipping.'
)
else:
self._process_emission(signal, value)
# Workarounds for pytango#688, see comment in SignalBusMixin.__init__()
def _pause(
self, should_unpause_now: Callable[[], bool], max_duration: float
) -> None:
"""Pause thread until should_unpause_now returns True."""
end_timestamp = time.monotonic() + max_duration
while time.monotonic() < end_timestamp:
if should_unpause_now():
return
time.sleep(0.1)
self._logger.error(
"SignalBus background thread pause exceed "
f"maximum duration of {max_duration} ms."
)
def _pause_thread_until(
self,
callback: Callable[[], bool],
max_pause_duration: int = 10000,
ack_timeout: int = 1000,
) -> None:
"""
Pause the background thread until the callback returns True.
This function will first clear the queue, before pausing the
background thread. Signals emitted after this function is called,
will not be processed until the background thread is unpaused.
The thread will emit and error and unpause if it exceeds the
maximum duration.
This function exists to workaround pytango#688 and should not
be used outside of ska-tango-base.
:param callback: Returns True when we should stop pausing
:param max_pause_duration: Maximum duration to pause for
:param ack_timeout: Timeout for pause acknowledgement
:returns: Once the queue has been cleared and the background
thread has acknowledged the pause request.
"""
event = threading.Event()
self._emission_queue.put(("!fence", (event, (callback, max_pause_duration))))
if not event.wait(timeout=ack_timeout):
raise TimedOutError()
ObserverT = TypeVar("ObserverT", bound="Observer")
SharingObserverT = TypeVar("SharingObserverT", bound="SharingObserver")
[docs]
class ListenerMethod(Protocol):
"""Method on an :py:class:`Observer` which can receive a signal."""
@property
def __listen_to_signal__(self) -> Any:
"""Signal to listen to."""
@__listen_to_signal__.setter
def __listen_to_signal__(self, value: Any) -> None:
"""Signal to listen to."""
def __call__(self, observer: ObserverT, value: Any) -> None:
"""Respond to emission."""
# We are using a object here, rather than a function, to make it easier to
# avoid circular references. If we just used a single function as an observer,
# then users would have to be careful to make sure closures only hold weak
# references to objects holding a reference to the bus. By making the observer
# a class, we can manage this problem in the _SignalBus ourselves.
[docs]
class Observer:
"""
An observer that handles different signals with different methods.
Sub-classes can mark a method as a :py:const:`ListenerMethod` using the
:py:func:`listen_to_signal()` decorator. :py:const:`ListenerMethod` will
only be called for signals they are registered to.
"""
_static_listener_methods: ClassVar[dict[str | Signal[Any], list[ListenerMethod]]]
@property
def _listener_methods(self) -> dict[str, list[ListenerMethod]]:
return cast(dict[str, list[ListenerMethod]], self._static_listener_methods)
@classmethod
def __init_subclass__(cls, **kwargs: Any) -> None:
"""Gather all the listener methods on subclasses."""
super().__init_subclass__(**kwargs)
listeners = defaultdict[Any, list[ListenerMethod]](lambda: [])
for key in dir(cls):
obj = getattr(cls, key)
if inspect.isroutine(obj) and hasattr(obj, "__listen_to_signal__"):
listeners[obj.__listen_to_signal__].append(cast(ListenerMethod, obj))
cls._static_listener_methods = dict(listeners)
[docs]
def notify_emission(self, signal: str, value: Any) -> None:
"""Call all the listener methods for the given signal."""
for unbound_method in self._listener_methods.get(signal, []):
try:
unbound_method(self, value)
except Exception: # pylint: disable=broad-exception-caught
logger = current_signal_bus_logger.get()
logger.exception(
(
"Listener method %r threw an unexpected exception for %r. "
+ "Continuing with remaining listeners."
),
unbound_method,
value,
)
ValueT = TypeVar("ValueT")
_ListenerDecorator: TypeAlias = Callable[
[Callable[[ObserverT, Any], None]], ListenerMethod
]
@overload
def listen_to_signal(signal: str) -> _ListenerDecorator[ObserverT]: ...
@overload
def listen_to_signal(
signal: Signal[ValueT],
) -> _ListenerDecorator[SharingObserverT]: ...
@overload
def listen_to_signal(
signal: str, listener: Callable[[ObserverT, Any], None]
) -> ListenerMethod: ...
@overload
def listen_to_signal(
signal: Signal[ValueT], listener: Callable[[SharingObserverT, Any], None]
) -> ListenerMethod: ...
[docs]
def listen_to_signal(
signal: str | Signal[ValueT],
listener: Callable[[ObserverT, Any], None] | None = None,
) -> (
_ListenerDecorator[ObserverT]
| _ListenerDecorator[SharingObserverT]
| ListenerMethod
):
"""
Mark a method as listening to a signal.
This function will be called asynchronously whenever a value is emitted
for the signal.
For :py:class:`Observer` objects, only a `signal` specified as a `str` is supported,
however, for :py:class:`SharingObserver` objects both `str` and :py:class:`Signal`
objects are accepted.
For a :py:class:`SharingObserver`, signal names are relative to the parent object.
"""
# This functools.partial trick allows us to use listen_to_signal as a
# decorator:
#
# ```
# @listen_to_signal("signal")
# def on_signal(self, old_value, new_value):
# ...
# ```
# and as a function:
# ```
# listener = listen_to_signal("signal", method)
# ```
if listener is None:
return cast(
_ListenerDecorator[Any], functools.partial(listen_to_signal, signal)
)
# Used by Observer.__init_subclass__ to know that this method wants to
# listen to this signal.
listener = cast(ListenerMethod, listener)
listener.__listen_to_signal__ = signal
return listener
[docs]
class SharingObserver(Observer):
"""
An observer that shares its bus with sub-objects.
When a :py:class:`~ska_tango_base.type_hints.BusProtocol` is assigned to
:py:attr:`shared_bus` this will recursively set on all sub-objects which
also inherit from :py:class:`SharingObserver`.
Subclasses can be notified when a new :py:attr:`shared_bus` is available by
overriding :py:meth:`on_new_shared_bus`.
Each :py:class:`SharingObserver` object has a :py:attr:`observer_prefix` that
prefixes any signals listened to by that object. The root object, where the bus was
originally assigned, has an :py:attr:`observer_prefix` of "." and each sub-object
has an :py:attr:`observer_prefix` based on the path to access that sub-object from
the root object. For example:
.. code :: python
class SubObj(SharingObserver):
bar = Signal[str]()
@listen_to_signal(bar)
def on_bar(self, value: str):
print(value)
class MySharer(SharingObserver):
def __init__(self):
self.foo = SubObj()
self.shared_bus = SignalBus()
self.shared_bus.start_thread()
sharer = MySharer()
assert sharer.observer_prefix == "."
assert sharer.foo.observer_prefix == ".foo"
Here, the ``sharer.foo.on_bar`` listener method will receive values emitted
on the bus whenever ``sharer.foo.bar`` is set.
"""
_canonicalised_listener_methods: dict[str, list[ListenerMethod]]
_shared_bus: BusProtocol | None
ROOT_OBSERVER_PREFIX: ClassVar[str] = "."
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialise the device."""
self._shared_bus = None
self._path_from_root = self.ROOT_OBSERVER_PREFIX
super().__init__(*args, **kwargs)
@property
def observer_prefix(self) -> str:
"""Return the path from the root ``SharingObserver`` holding the bus."""
return self._path_from_root
@property
def shared_bus(self) -> BusProtocol:
"""The shared bus used by this ``SharingObserver``."""
if self._shared_bus is None:
raise RuntimeError("The bus has not been shared!")
return self._shared_bus
@shared_bus.setter
def shared_bus(self, bus: BusProtocol) -> None:
self._shared_bus = bus
self._shared_bus.register_observer(self)
self.on_new_shared_bus()
for key, obj in vars(self).items():
if isinstance(obj, SharingObserver):
# pylint: disable=protected-access
obj._path_from_root = f"{self._path_from_root}{key}."
obj.shared_bus = self._shared_bus
for cls in reversed(self.__class__.mro()):
for key, obj in vars(cls).items():
if isinstance(obj, Signal) and obj._initial_value is not NoValue:
obj.__set__(self, obj._initial_value)
@property
def _listener_methods(self) -> dict[str, list[ListenerMethod]]:
return self._canonicalised_listener_methods
[docs]
def on_new_shared_bus(self) -> None:
"""
Notify that a new bus is available.
When overriding this method you must call ``super().on_new_shared_bus``.
"""
listeners = defaultdict(lambda: [])
for signal, methods in self._static_listener_methods.items():
if isinstance(signal, str):
canonical = canonicalise_relative_to(self, signal)
elif isinstance(signal, Signal):
canonical = signal._absolute_name_for(self)
else:
raise TypeError(f"Unknown signal type '{type(signal)}'")
listeners[canonical].extend(methods)
self._canonicalised_listener_methods = dict(listeners)
[docs]
def canonicalise_relative_to(obj: SharingObserverProtocol, signal: str) -> str:
"""
Return the absolute signal name, relative to obj.
If signal is already absolute, it is returned as is.
"""
if signal.startswith(obj.ROOT_OBSERVER_PREFIX):
return signal
return f"{obj.observer_prefix}{signal}"
# FIXME: Provide a better name for this
def _possible_signal_matches(
obj: SharingObserverProtocol, signal: str
) -> Iterable[str]:
"""
Yield the possible matches of a signal for an object.
That is, the signal name as is and the signal name relative to the object.
"""
yield signal
if signal.startswith(obj.observer_prefix):
yield signal.removeprefix(obj.observer_prefix)
T = TypeVar("T")
[docs]
class Signal(Generic[T]):
"""
Descriptor to provide access to signal on a shared bus.
Can only be used on classes which model
:py:class:`~ska_tango_base.type_hints.SharingObserverProtocol`.
Whenever a ``Signal`` is set, a signal is emitted with the value provided.
The relative name of the signal is taken from the name of the ``Signal`` on
the owner class, and can be overridden by providing ``name`` kwarg to the
initialiser.
The relative name is prefixed by the
:py:attr:`~ska_tango_base.type_hints.SharingObserverProtocol.observer_prefix`
of the owner object before being emitted on the bus.
:param name: name of the signal, if None this objects name is used
:param stored: if True, store the latest emitted value on the parent object
:param initial_value: if not NoValue, emit this value when connected to a
new bus.
"""
[docs]
def __init__(
self: Signal[T],
/,
name: str | None = None,
stored: bool = False,
initial_value: Any = NoValue,
) -> None:
"""Initialise the object."""
self._relative_name = name
self._stored = stored
self._initial_value = initial_value
def __set_name__(self: Signal[T], owner: Any, name: str) -> None:
"""Set the name of the descriptor."""
if self._relative_name is None:
self._relative_name = name
@property
def relative_name(self: Signal[T]) -> str:
"""Name of signal, relative to parent object."""
return cast(str, self._relative_name)
def _absolute_name_for(self: Signal[T], obj: SharingObserverProtocol) -> str:
"""Return the absolute name of the signal when relative to obj."""
return f"{obj.observer_prefix}{self.relative_name}"
@overload
def __get__(self: Signal[T], obj: None, objtype: None = None) -> Signal[T]: ...
@overload
def __get__(
self: Signal[T], obj: SharingObserverProtocol, objtype: type | None = None
) -> T: ...
def __get__(
self: Signal[T],
obj: SharingObserverProtocol | None,
objtype: type | None = None,
) -> T | Signal[T]:
"""
Return the last emitted value or self.
:raises TypeError: If this Signal object does not store values.
:raises KeyError: If the signal has never been emitted.
"""
if obj is None:
return self
name = self._absolute_name_for(obj)
if not self._stored:
raise AttributeError(f'Signal "{name}" does not store value.')
value = getattr(obj, self.__store_name)
return cast(T, value)
@property
def __store_name(self) -> str:
return f"_Signal__{self._relative_name}_storage"
def __set__(self: Signal[T], obj: SharingObserverProtocol, value: T) -> None:
"""Emit the signal."""
name = self._absolute_name_for(obj)
obj.shared_bus.emit(name, value)
if self._stored:
setattr(obj, self.__store_name, value)
_ConvFunc: TypeAlias = Callable[..., Any]
def _get_type_hint_from_signal(signal: str | Signal[Any]) -> Any | None:
"""
Return dtype hint from a signal.
In general, for `Signal[T]` we return `T`, otherwise we return `None`.
"""
if not isinstance(signal, Signal):
return None
if hasattr(signal, "__orig_class__"):
args = typing.get_args(signal.__orig_class__)
else:
return None
if len(args) != 1:
return None
return args[0]
# We are following the same style as pytango here, so use
# snake case for this class.
[docs]
class attribute_from_signal(attribute): # noqa: N801
"""
A Tango attribute linked to an signal.
``attribute_from_signal`` takes all the same keyword arguments as
:py:class:`tango.server.attribute`, and by default will provide an ``fget``
function which returns the last emitted value for the signal.
Any value emitted for the signal will result in a change and archive events
being pushed for the attribute.
If ``write_to_signal`` is True or ``from_tango`` is not None,
then an ``fset`` function will also be provided, so that any writes to the
attribute will result in an value being emitted for the signal.
If the signal is a :py:class:`Signal` object, then the ``__get__`` and ``__set__``
magic methods will be used to interact with the signal, otherwise the signal
will be used as the name of the signal, which will be
:py:func:`canonicalise_relative_to()` the Tango Device.
``attribute_from_signal`` optionally supports conversion functions ``to_tango``
and ``from_tango``. These will convert to and from the attribute value whenever
the attribute is read/written or whenever change/archive events are pushed in
response to a value being emitted for the signal.
If the signal is a :py:class:`Signal` object, then any type arguments from
the signal will be used as the ``dtype`` for the attribute. This can be
overridden by providing a ``dtype`` keyword argument.
If the conversion functions are provided, any annotations on the functions will be
used to infer the ``dtype``, instead of any signal object provided. Again, this
can be overridden with the ``dtype`` keyword argument.
:param signal: Signal object or name of signal
:param to_tango: Function to convert from signal value to Tango value
:param from_tango: Function to convert from Tango value to signal value
:param write_to_signal: If True, include an attribute write method that will write
to the signal
Examples:
.. code:: python
class MyDevice(SignalBusMixin, Device):
my_basic_signal = Signal[int]()
myAttr = attribute_from_signal(
my_basic_signal, access=AttrWriteType.READ_WRITE
)
@listen_to_signal("my_basic_signal")
def on_my_basic_signal(self, old_value, new_value):
# Called whenever a Tango client writes to the
# "myAttr" attribute
if new_value == 2:
self.set_state(DevState.FAULT)
# The attribute will have "DevString" dtype.
@attribute_from_signal(my_basic_signal)
def myAttrAsStr(self, signal_value: int) -> str:
return str(signal_value)
# The attribute will have AttrWriteType.READ_WRITE
@myAttr_readwrite_str.from_tango
def myAttrAsStr(self, tango_value: str) -> int:
return int(tango_value)
my_attr_signal = AttrSignal[dict[str, str]]()
# json.dumps has no type annotations, so we must provide the dtype
readOnlyAttr = attribute_from_signal(
my_attr_signal, dtype=str, to_tango=json.dumps
)
@command
def MyCmd(self):
# myAttr and myAttrAsAString will push change/archive events
self.my_basic_signal = 1
# readOnlyAttr will push change/archive events
self.my_attr_signal = {"foo": "bar"}
# AttrSignal allows this to type check, but otherwise does
# nothing different to Signal
self.my_attr_signal = ({}, time.time(), AttrQuality.ATTR_INVALID)
"""
[docs]
def __init__(
self,
signal: str | Signal[Any],
/,
to_tango: _ConvFunc | None = None,
from_tango: _ConvFunc | None = None,
write_to_signal: bool = False,
initial_value: Any = NoValue,
**kwargs: Any,
) -> None:
"""Initialise the object."""
self._signal = signal
self._to_tango = to_tango
self._from_tango = from_tango
if "fget" not in kwargs:
kwargs["fget"] = self._make_fget()
if write_to_signal or self._from_tango is not None:
kwargs["fset"] = self._make_fset()
super().__init__(**kwargs)
[docs]
def do_read(
self, device: SignalBusMixin, attr: Attribute | None = None
) -> tuple[Any, float, AttrQuality]:
"""Read the signal value for the Tango attribute."""
# The signal bus thread (which is allowed to run in
# always_executed_hook) may have called `push_change_event`, which
# will set the value of the attribute. This means that pytango will
# not set the value we return from here. So, we need to make sure
# that the value flag is cleared when we leave this function. This is a
# work around for cppTango#1407.
if attr is None:
attr = self.__get__(device, type(device))
if version.parse(tango.__version__) < version.parse("10.1.0"):
attr.set_value_flag(False)
value = device._SignalBusMixin__attr_values.get(self.attr_name, NoValue)
if value is not NoValue:
assert isinstance(value, _TimedAttrData)
return _data_to_triple(value)
return 0, time.time(), AttrQuality.ATTR_INVALID
def _make_fget(self) -> Callable[[SignalBusMixin, Any], Any]:
def fget(device: SignalBusMixin, attr: Attribute | None = None): # type: ignore
"""Expose a signal as a Tango attribute."""
return self.do_read(device, attr)
annotation = None
if self._to_tango is not None:
if "return" in getattr(self._to_tango, "__annotations__", {}):
sig = inspect.signature(self._to_tango, eval_str=False)
if sig.return_annotation is not inspect.Signature.empty:
annotation = sig.return_annotation
else:
annotation = _get_type_hint_from_signal(self._signal)
if annotation is not None:
fget.__annotations__["return"] = annotation
return fget
[docs]
def do_write(self, device: SignalBusMixin, attr_or_val: Any) -> None:
"""Write a Tango attribute value to a signal."""
if isinstance(attr_or_val, Attribute):
value = attr_or_val.get_write_value()
else:
value = attr_or_val
value = self._convert_from_tango(device, value)
if isinstance(self._signal, Signal):
# pylint: disable=unnecessary-dunder-call
self._signal.__set__(device, value)
else:
device.shared_bus.emit(
canonicalise_relative_to(device, self._signal), value
)
def _make_fset(self) -> Callable[[SignalBusMixin, Any], None]:
def fset(device: SignalBusMixin, attr_or_val: Any) -> None:
"""Expose a signal as a Tango attribute."""
self.do_write(device, attr_or_val)
annotation = None
if self._from_tango is not None:
sig = inspect.signature(self._from_tango, eval_str=False)
*_, param_anno = (x.annotation for x in sig.parameters.values())
if param_anno is not inspect.Parameter.empty:
annotation = param_anno
else:
annotation = _get_type_hint_from_signal(self._signal)
if annotation is not inspect.Parameter.empty:
fset.__annotations__["attr_or_val"] = annotation
return fset
@property
def signal_name(self) -> str | None:
"""Return the name of the signal associated with this attribute."""
if isinstance(self._signal, Signal):
return self._signal.relative_name
return self._signal
def _convert_to_tango(self, device: SignalBusMixin, signal_value: Any) -> Any:
"""Convert the signal value to be used with Tango."""
if self._to_tango is not None:
if getattr(self._to_tango, "__to_tango_with_device__", False):
return self._to_tango(device, signal_value)
return self._to_tango(signal_value)
return signal_value
def _convert_from_tango(self, device: SignalBusMixin, tango_value: Any) -> Any:
"""Convert the tango value to a value to be emitted."""
if self._from_tango is not None:
if getattr(self._from_tango, "__from_tango_with_device__", False):
return self._from_tango(device, tango_value)
return self._from_tango(tango_value)
return tango_value
[docs]
def to_tango(self, to_tango: _ConvFunc) -> attribute_from_signal:
"""Decorate attribute with to tango conversion function."""
self._to_tango = to_tango
self._to_tango.__to_tango_with_device__ = True # type: ignore[attr-defined]
return self.read(self._make_fget())
[docs]
def from_tango(self, from_tango: _ConvFunc) -> attribute_from_signal:
"""Decorate attribute with from tango conversion function."""
self._from_tango = from_tango
self._from_tango.__from_tango_with_device__ = True # type: ignore[attr-defined]
return self.write(self._make_fset())
[docs]
def on_init_device(self, device: SignalBusMixin) -> None:
"""
Register for events.
Called by owner device during ``init_device()`` and whenever the attribute
is added dynamically.
"""
device.set_change_event(self.attr_name, True, True)
device.set_archive_event(self.attr_name, True, True)
[docs]
def on_emission(self, device: SignalBusMixin, new_value: Any) -> None:
"""
Push events for emission.
Called by owner device whenever a signal value is emitted for this
attribute.
"""
data = self._make_data(device, new_value)
args = _data_to_triple(data)
device.push_change_event(self.attr_name, *args)
device.push_archive_event(self.attr_name, *args)
if version.parse(tango.__version__) >= version.parse("10.1.0"):
if device.is_polled() and device.is_attribute_polled(self.attr_name):
tg = tango.Util.instance(False)
tg.fill_attr_polling_buffer(device, self.attr_name, data)
device._SignalBusMixin__attr_values[self.attr_name] = data
if _TANGO_IS_MOCKED_BY_AUTODOC:
[docs]
def read(self, fn: Any) -> attribute_from_signal:
"""Spoof read when in autodoc."""
return self
[docs]
def write(self, fn: Any) -> attribute_from_signal:
"""Spoof write when in autodoc."""
return self
[docs]
def is_allowed(self, f: Any) -> attribute_from_signal:
"""Spoof write when in autodoc."""
return self
def __call__(self, to_tango: _ConvFunc) -> attribute_from_signal:
"""Provide a conversion to tango function."""
if _TANGO_IS_MOCKED_BY_AUTODOC:
return self
return self.to_tango(to_tango)
def _make_data(self, device: SignalBusMixin, value: Any) -> _TimedAttrData:
if _is_value_triple(value):
data = _TimedAttrData(
value=self._convert_to_tango(device, value[0]),
time_stamp=value[1],
quality=value[2],
)
else:
data = _TimedAttrData(value=self._convert_to_tango(device, value))
return data
def _is_value_triple(value: Any) -> bool:
return (
isinstance(value, tuple)
and len(value) == 3
and isinstance(value[2], AttrQuality)
)
def _data_to_triple(data: _TimedAttrData) -> tuple[Any, float, tango.AttrQuality]:
# TODO: Handle errors
time_stamp = data.time_stamp
if time_stamp is None:
time_stamp = time.time()
return data.value, time_stamp, data.quality
[docs]
class AttrSignal(Signal[T | tuple[T, float, AttrQuality]], Generic[T]):
"""
Convenience signal for working with Tango attributes.
This class is intended to be used with :py:class:`attribute_from_signal`.
It is provided as a convenient way to create a :py:class:`Signal` which
supports setting a value directly and also setting a (data, date, quality)
triple that can be used with :py:meth:`tango.Attribute.set_value_date_quality`.
In both cases the value that is set is directly emitted.
This class provides no runtime behaviour. It only exists to provide the correct
type hints and allow :py:class:`attribute_from_signal` to infer its ``dtype`` from
the signal object.
Example:
.. code:: python
class MyDevice(SignalBusMixin, Device):
my_signal = AttrSignal[int]()
# dtype inferred from my_signal
myAttr = attribute_from_signal(my_signal)
@command
def MyCmd(self):
self.my_signal = 0
self.my_signal = (1, time.time(), AttrQuality.WARNING)
"""
[docs]
class CachingAttrSignal(Signal[tuple[T, float, AttrQuality]], Generic[T]):
"""
Signal that includes a time stamp of when the value was emitted.
This signal is intended to be used :py:class:`attribute_from_signal` to as a
Tango attribute where the value is provided by some other thread, for example,
a polling loop.
When ``CachingAttrSignal`` is set with a value ``data``, a ``(data, date, quality)``
triple is emitted on the shared bus. The ``date`` is recorded as the time that
the ``CachingAttrSignal`` was set and the ``quality`` is ``ATTR_VALID``.
It is possible to override the values of ``date`` by setting the
``(data, date, quality)`` directly.
"""
def __set__(
self: CachingAttrSignal[T],
obj: SharingObserverProtocol,
value: T | tuple[T, float, AttrQuality],
) -> None:
"""Emit the signal."""
if not _is_value_triple(value):
value = (cast(T, value), time.time(), AttrQuality.ATTR_VALID)
else:
value = cast(tuple[T, float, AttrQuality], value)
super().__set__(obj, value)
[docs]
class SignalBusMixin(SharingObserver, SKADevice):
"""
A base class for mix-ins to use the shared bus.
This class is not intended to be inherited from directly by users. Instead, it
is intended to be used by mix-ins to utilities the bus.
"""
shared_bus: _SignalBus
__default_signal_attribute_map: ClassVar[dict[str, attribute_from_signal]] = {}
__signal_attribute_map: dict[str, attribute_from_signal]
__attr_values: dict[str, Any]
[docs]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialise object."""
# If True, we release the Tango monitor and wait for the condition variable
# before trying to grab the monitor again.
self.__stall_new_client_requests = False
self.__new_client_request_wake_up = threading.Condition()
try:
super().__init__(*args, **kwargs)
except: # noqa: E722
try:
self.shared_bus.shutdown_thread()
except: # noqa: E722
pass
raise
# Workaround for pytango#688.
#
# Unfortunately, pytango <=10.0 does not release the GIL while
# exporting the device and iff we are connecting to a real database,
# DeviceClass::export_device will try to grab Tango monitor.
# This can lead to a deadlock with the SignalBus thread, as it grabs the
# Tango monitor and then the GIL when we call push_change_event.
#
# The workaround is to pause the background thread until we have
# been exported.
#
# We only want to do this if we are using a database and it is _not_
# a file database, as this is the only case where we get exported.
tg = tango.Util.instance()
if tg._UseDb and not tg._FileDb:
self.shared_bus._pause_thread_until(self.get_exported_flag)
@classmethod
def __init_subclass__(cls: type[SignalBusMixin], **kwargs: Any) -> None:
"""Initialise the default signal map."""
super().__init_subclass__(**kwargs)
attr_sig_map: dict[str, attribute_from_signal] = {}
for key, obj in vars(cls).items():
if isinstance(obj, attribute_from_signal):
signal_name = obj.signal_name
if signal_name is None:
raise TypeError(
f"No signal name provided for attribute_from_signal {key}"
)
attr_sig_map[signal_name] = obj
cls.__default_signal_attribute_map = attr_sig_map
[docs]
def init_device(self: SignalBusMixin) -> None:
"""Initialise the shared bus."""
super().init_device()
self.__signal_attribute_map = dict(self.__default_signal_attribute_map)
self.__attr_values = {}
self.shared_bus = _SignalBus(logger=self.logger, name=self.get_name())
for attr in self.__signal_attribute_map.values():
attr.on_init_device(self)
self.shared_bus.start_thread()
[docs]
def add_attribute(
self: SignalBusMixin,
attr: attribute,
) -> None:
"""Add an attribute dynamically."""
with AutoTangoMonitor(self):
super().add_attribute(attr)
if isinstance(attr, attribute_from_signal):
signal_name = attr.signal_name
if signal_name is None:
raise RuntimeError(
"No signal name provided for attribute_from_signal"
)
attr.on_init_device(self)
self.__signal_attribute_map[signal_name] = attr
[docs]
def notify_emission(self: SignalBusMixin, signal: str, value: Any) -> None:
"""Push change events for AttributeSignal objects."""
super().notify_emission(signal, value)
for s in _possible_signal_matches(self, signal):
attr = self.__signal_attribute_map.get(s, None)
if attr is not None:
attr.on_emission(self, value)
[docs]
def delete_device(self: SignalBusMixin) -> None:
"""Shutdown the bus background thread."""
with self.allow_internal_threads():
self.shared_bus.shutdown_thread()
super().delete_device()
[docs]
@contextmanager
def allow_internal_threads(self: SignalBusMixin) -> Iterator[None]:
"""
Allow internal threads to run without allowing new Tango requests.
This is intended to be used from a Tango client request thread to
allow internal threads to acquire the Tango monitor, without allowing
another Tango request to start.
This is used in the following two places in ska-tango-base:
1. During :py:meth:`!init_device()` to allow background threads to
cleanup gracefully and be joined.
If we allow other threads to run during :py:meth:`!init_device()` or
:py:meth:`!delete_device()`, a client could initiate a new request on
our device as the Tango client thread is able to grab the device lock.
This should be avoided because the device is only partially initialised.
This method avoids this issue by arranging for Tango client threads
to be disallowed from grabbing the Tango monitor, while still allowing
other internal threads to acquire the monitor.
2. At the start of every request to allow the signal bus thread to
catch up.
This ensures that Tango clients get a sequentially consistent view of the
Tango device. That is, if they call a command which modifies the value of
an attribute then read that attribute, they will see the updated value, even
if the attribute is using the signal bus (e.g. via attribute_from_signal).
Subclasses of SignalBusMixin should
"""
# Allowing internal threads to grab the Tango monitor is a little tricky.
# We want to release the Tango monitor, but not allow the Tango
# client threads from starting a new request. The way we do this is
# with the always_executing_hook, which is always called by Tango from the
# Tango client thread, before they call into us.
#
# We communicate that we don't want to allow new client requests
# by setting __stall_new_client_requests. When the
# always_executing_hook sees this is set, it releases the Tango
# monitor again and waits until it is signaled to try again
# via the __new_client_request_wake_up.
#
# We must be holding the Tango monitor to read/write the
# __stall_new_client_requests variable. We also cannot
# acquire the Tango monitor while holding the
# __new_client_requests_wake_up lock.
self.__stall_new_client_requests = True
try:
with AutoTangoAllowThreads(self):
yield
finally:
self.__stall_new_client_requests = False
with self.__new_client_request_wake_up:
self.__new_client_request_wake_up.notify_all()
# The default cppTango timeout for acquiring a monitor is 3.2 seconds,
# and we would like to maintain that, however, we don't know how
# long we waited for to initially acquire the lock, so we just
# assume it was very quick and wait for the full timeout seconds
# again to ensure that we wait for _at least_ that long.
_CLIENT_NEW_REQUEST_TIMEOUT = 3.2
[docs]
def always_executed_hook(self) -> None:
"""Raise an exception if we have released threads during an Init command."""
super().always_executed_hook()
# The order here is important, we have to first grab the
# __new_client_request_wake_up CV lock, then release the Tango
# monitor and then wait on the CV (and release the CV lock).
# Otherwise, we might miss the notify_all() from the thread calling
# allow_internal_threads().
#
# We also have to make sure that we release the CV lock _before_ we try
# to re-acquire the Tango monitor, otherwise we might deadlock with the
# thread.
if self.__stall_new_client_requests:
end_timestamp = time.monotonic() + self._CLIENT_NEW_REQUEST_TIMEOUT
while self.__stall_new_client_requests:
if time.monotonic() > end_timestamp:
info = inspect.getframeinfo(cast(FrameType, inspect.currentframe()))
Except.throw_exception(
"API_CommandTimedOut",
"Unable to acquire the device monitor "
"while device re-initialising",
f"{info.filename}:{info.lineno}",
)
self.__new_client_request_wake_up.acquire()
with AutoTangoAllowThreads(self):
self.__new_client_request_wake_up.wait()
self.__new_client_request_wake_up.release()
with self.allow_internal_threads():
try:
self.shared_bus.wait_for_thread(self._CLIENT_NEW_REQUEST_TIMEOUT)
except TimedOutError:
info = inspect.getframeinfo(cast(FrameType, inspect.currentframe()))
Except.throw_exception(
"API_CommandTimedOut",
"Failed to wait for signal bus before servicing Tango request.",
f"{info.filename}:{info.lineno}",
)