Source code for ska_tango_base.software_bus

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""Internal software bus implementation."""

from __future__ import annotations

import functools
import inspect
import logging
import threading
import time
import typing
from collections import defaultdict
from collections.abc import Callable, Iterable
from contextlib import contextmanager
from contextvars import ContextVar
from queue import Full, Queue
from threading import RLock, Thread
from types import FrameType
from typing import (
    Any,
    ClassVar,
    Generic,
    Iterator,
    Protocol,
    TypeAlias,
    TypeVar,
    cast,
    overload,
)
from weakref import WeakSet

import tango
from packaging import version
from tango import (
    AttrQuality,
    AutoTangoAllowThreads,
    AutoTangoMonitor,
    EnsureOmniThread,
    Except,
)
from tango.device_server import Attribute
from tango.server import attribute

from ._autodoc_hacks import _TANGO_IS_MOCKED_BY_AUTODOC
from ._notango import TimedAttrData as _TimedAttrData
from .ska_device import SKADevice
from .type_hints import (
    BusProtocol,
    ObserverProtocol,
    SharingObserverProtocol,
)

module_logger = logging.getLogger(__name__)
current_signal_bus_logger: ContextVar[logging.Logger] = ContextVar(
    "current_signal_bus_logger", default=module_logger
)

__all__ = [
    "AttrSignal",
    "CachingAttrSignal",
    "ListenerMethod",
    "NoValue",
    "Observer",
    "SharingObserver",
    "Signal",
    "SignalBusMixin",
    "attribute_from_signal",
    "canonicalise_relative_to",
    "listen_to_signal",
]


class _WaitForSignalObserver:
    """An observer for SignalBus.wait_for_signal_value."""

    def __init__(
        self,
        signal: str,
        conditional: Callable[[Any], bool],
        event: threading.Event,
    ):
        self._signal = signal
        self._conditional = conditional
        self._event = event

    def notify_emission(self, signal: str, value: Any) -> None:
        """
        Unblock the `wait_for_signal_value` caller when `self._conditional` passes.

        First checks the signal is the one waited on, then unblocks the event if the
        value is desired as indicated by self._conditional.
        """
        if signal == self._signal:
            if self._conditional(value):
                self._event.set()


NoValue = object()
"""Used to communicate that no value has been emitted for a signal.

Required to distinguish from ``None``, which is a valid value to emit.
"""


class TimedOutError(Exception):
    """Timed out waiting for the signal bus background thread."""

    def __init__(self) -> None:
        """Initalise the exception with the default error message."""
        super().__init__(self.__class__.__doc__)


class _SignalBus:
    """
    A software bus that notifies observers listening to signals.

    The bus can :py:func:`emit()` values for a given signal.  In order to
    get notified when a value is emitted for any signals, an observer
    can be registered with :py:func:`register_observer()`.  Observers are stored
    in a class :py:class:`weakref.WeakSet` and so must be kept alive by the
    caller.

    Each signal is identified by a user-provided string. When a value is emitted
    for a given signal an "emission" is added to an
    internal queue.  This queue is serviced by a separate background thread,
    which must be started with :py:func:`start_thread()`.

    When background thread receives an emission from the internal queue it
    will notify all the registered observer objects that still exist by
    calling :py:func:`ObserverProtocol.notify_emission`. This notification occurs in an
    unspecified order. Emissions are processed in the order they are received.

    The internal queue has a maximum size and attempting to :py:func:`emit()` while the
    queue is full will log a warning message before discarding the emission.
    The expectation is that observers should return quickly compared to the rate
    that signals are emitted, so the internal queue filling up should be considered
    a misuse of ``SignalBus``. The limit is in place so that the failure mode is more
    explicit than some queue growing and eating up all the memory on the system.

    :param max_queue_size: Maximum size of the internal queue
    :param name: name to use for the background thread
    """

    def __init__(
        self,
        logger: logging.Logger | None = None,
        max_queue_size: int = 8196,
        name: str = "",
    ) -> None:
        """
        Initialise the object.

        :param logger: to use to for logging
        :param max_queue_size: maximum number of emissions that can be stored
                               in the internal queue
        """
        self._logger = module_logger if logger is None else logger

        # We don't want to create circular references with this bus, so we only
        # ever hold on to observers with weak references.  If the observer goes
        # away, we don't care and will just stop notifying it.
        self._observers = WeakSet[ObserverProtocol]()

        self._emission_lock = RLock()
        self._last_emission_queue_full_log = 0.0
        self._discarded_emissions_count = 0
        self._emission_queue = Queue[tuple[str, Any]](maxsize=max_queue_size)
        if name:
            name += " "
        self._thread = Thread(
            target=self._run_bus_thread,
            name=f"{name}SignalBus Thread",
        )

    def register_observer(self, observer: ObserverProtocol) -> None:
        """
        Register an observer to be notified when a signal is emitted.

        `observer.notify_emission` will be called for every signal emitted
        on the `SignalBus`. It is the responsibility of the observer to
        determine whether the signal is interesting.

        If `register_observer` is called while the SignalBus background thread is
        running, the observer will be registered after any queued emissions
        (calls to `SignalBus.emit`). `wait_for_thread` is supplied for synchronising
        with the background thread.

        If the observer is destroyed it will automatically be unregistered.

        :param observer: The observer to register.
        """
        if self._thread.is_alive():
            self._emission_queue.put(("!register", observer))
            return

        self._observers.add(observer)

    _LOG_QUEUE_FULL_PERIOD = 10.0

    def emit(self, signal: str, value: Any) -> None:
        """
        Emit a new value for the signal.

        When this function returns the last value will be updated with ``value``.  That
        is, provided no other thread is emitting values for this signal, the following
        assert will never fire for any ``value`` and ``signal``:

        .. code:: python

            bus.emit(signal, value)
            assert bus.get_last_emitted_value(signal) == value

        Any observes registered for this signal are notified asynchronously by the
        background thread which is started by :py:func:`start_thread()`.

        :param signal: name of the signal
        :param value: new value to emit
        """
        if signal.startswith("!"):
            raise ValueError(
                f'Invalid signal "{signal}". Signals may not start with a "!"'
            )

        with self._emission_lock:
            try:
                self._emission_queue.put((signal, value), block=False)
            except Full:
                now = time.time()
                if (
                    now - self._last_emission_queue_full_log
                    > self._LOG_QUEUE_FULL_PERIOD
                ):
                    discard_prefix = ""
                    discard_suffix = ""
                    if self._discarded_emissions_count != 0:
                        discard_prefix = (
                            f"and {self._discarded_emissions_count} other emissions "
                        )
                        if self._last_emission_queue_full_log != 0.0:
                            discard_suffix = (
                                f" in the last {self._LOG_QUEUE_FULL_PERIOD} seconds"
                            )
                    self._logger.exception(
                        "Observers cannot keep up with rate signals are being emitted. "
                        "Emission %r %shave been discarded%s.",
                        (signal, value),
                        discard_prefix,
                        discard_suffix,
                    )
                    self._last_emission_queue_full_log = now
                    self._discarded_emissions_count = 0
                else:
                    self._discarded_emissions_count += 1

    def start_thread(self) -> None:
        """Start the background thread to notify observers about emissions."""
        self._thread.start()

    def shutdown_thread(self) -> None:
        """
        Shutdown the background thread.

        This waits for the thread to finish processing remaining emissions.
        """
        # TODO: Should we wait for processing to finish, or flush the queue
        # first?
        self._emission_queue.put(("!shutdown", None))
        self._thread.join()

    def wait_for_thread(self, timeout: float | None = 5.0) -> None:
        """
        Synchronise the calling thread with the background thread.

        When this function returns, all emissions sequenced before this call
        will have been processed by the background thread.

        :param timeout: Time to wait for the background thread.  A timeout of
                        ``None`` will wait forever.
        :raises TimedOutError: If the timeout is exceeded while waiting for
                                the background thread.
        """
        event = threading.Event()
        self._emission_queue.put(("!fence", (event, None)))
        if not event.wait(timeout=timeout):
            raise TimedOutError()

    def wait_for_signal_value(
        self,
        signal: str,
        conditional: Callable[[Any], bool] | None = None,
        timeout: float | None = None,
    ) -> bool:
        """
        Wait for a value to be emitted for `signal` where `conditional` returns True.

        Whenever a value is emitted for the signal, `conditional(old_value, new_value)`
        is called. The thread calling this method will block until `conditional` returns
        True or the timeout is reached. If `conditional` is None, this method will
        unblock on any value emitted for the signal.

        :param signal: The signal to wait for a value emission.
        :param conditional: Callable used to check for the desired signal value.
        :param timeout: Time to wait for signal value. A timeout of None (the default)
            will wait forever.
        :returns: True if a desired value was emitted for the signal within the timeout,
            False otherwise.
        """
        if conditional is None:

            def default(*_: Any) -> bool:
                return True

            conditional = default

        # Event gets set() by _run_bus_thread
        event = threading.Event()
        # The observer will exist at least until it is added to self._observers by the
        # background thread. If the timeout is longer than it takes to be registered,
        # the only strong reference to the observer will be in this method, and when
        # execution leaves this scope, the garbage collector will delete the observer.
        observer = _WaitForSignalObserver(signal, conditional, event)
        self.register_observer(observer)
        return event.wait(timeout)

    def __del__(self) -> None:
        """Delete the object."""
        if self._thread.is_alive():
            self.shutdown_thread()

    def _process_emission(self, signal: str, value: Any) -> None:
        for obs in self._observers:
            try:
                obs.notify_emission(signal, value)
            except Exception:
                self._logger.exception(
                    (
                        "Observer %r threw an unexpected exception while "
                        + "processing emission %r. Continuing with remaining observers."
                    ),
                    obs,
                    (signal, value),
                )

    def _run_bus_thread(self) -> None:
        current_signal_bus_logger.set(self._logger)
        with EnsureOmniThread():
            while True:
                signal, value = self._emission_queue.get()
                if signal.startswith("!"):
                    match signal:
                        case "!shutdown":
                            break
                        case "!fence":
                            received, pause_request = value
                            received.set()
                            if pause_request is not None:
                                self._pause(*pause_request)
                        case "!register":
                            self._observers.add(value)
                        case _:
                            self._logger.error(
                                f'Invalid control signal "{signal}". Skipping.'
                            )
                else:
                    self._process_emission(signal, value)

    # Workarounds for pytango#688, see comment in SignalBusMixin.__init__()
    def _pause(
        self, should_unpause_now: Callable[[], bool], max_duration: float
    ) -> None:
        """Pause thread until should_unpause_now returns True."""
        end_timestamp = time.monotonic() + max_duration
        while time.monotonic() < end_timestamp:
            if should_unpause_now():
                return
            time.sleep(0.1)

        self._logger.error(
            "SignalBus background thread pause exceed "
            f"maximum duration of {max_duration} ms."
        )

    def _pause_thread_until(
        self,
        callback: Callable[[], bool],
        max_pause_duration: int = 10000,
        ack_timeout: int = 1000,
    ) -> None:
        """
        Pause the background thread until the callback returns True.

        This function will first clear the queue, before pausing the
        background thread.  Signals emitted after this function is called,
        will not be processed until the background thread is unpaused.

        The thread will emit and error and unpause if it exceeds the
        maximum duration.

        This function exists to workaround pytango#688 and should not
        be used outside of ska-tango-base.

        :param callback: Returns True when we should stop pausing
        :param max_pause_duration: Maximum duration to pause for
        :param ack_timeout: Timeout for pause acknowledgement

        :returns: Once the queue has been cleared and the background
          thread has acknowledged the pause request.
        """
        event = threading.Event()
        self._emission_queue.put(("!fence", (event, (callback, max_pause_duration))))
        if not event.wait(timeout=ack_timeout):
            raise TimedOutError()


ObserverT = TypeVar("ObserverT", bound="Observer")
SharingObserverT = TypeVar("SharingObserverT", bound="SharingObserver")


[docs] class ListenerMethod(Protocol): """Method on an :py:class:`Observer` which can receive a signal.""" @property def __listen_to_signal__(self) -> Any: """Signal to listen to.""" @__listen_to_signal__.setter def __listen_to_signal__(self, value: Any) -> None: """Signal to listen to.""" def __call__(self, observer: ObserverT, value: Any) -> None: """Respond to emission."""
# We are using a object here, rather than a function, to make it easier to # avoid circular references. If we just used a single function as an observer, # then users would have to be careful to make sure closures only hold weak # references to objects holding a reference to the bus. By making the observer # a class, we can manage this problem in the _SignalBus ourselves.
[docs] class Observer: """ An observer that handles different signals with different methods. Sub-classes can mark a method as a :py:const:`ListenerMethod` using the :py:func:`listen_to_signal()` decorator. :py:const:`ListenerMethod` will only be called for signals they are registered to. """ _static_listener_methods: ClassVar[dict[str | Signal[Any], list[ListenerMethod]]] @property def _listener_methods(self) -> dict[str, list[ListenerMethod]]: return cast(dict[str, list[ListenerMethod]], self._static_listener_methods) @classmethod def __init_subclass__(cls, **kwargs: Any) -> None: """Gather all the listener methods on subclasses.""" super().__init_subclass__(**kwargs) listeners = defaultdict[Any, list[ListenerMethod]](lambda: []) for key in dir(cls): obj = getattr(cls, key) if inspect.isroutine(obj) and hasattr(obj, "__listen_to_signal__"): listeners[obj.__listen_to_signal__].append(cast(ListenerMethod, obj)) cls._static_listener_methods = dict(listeners)
[docs] def notify_emission(self, signal: str, value: Any) -> None: """Call all the listener methods for the given signal.""" for unbound_method in self._listener_methods.get(signal, []): try: unbound_method(self, value) except Exception: # pylint: disable=broad-exception-caught logger = current_signal_bus_logger.get() logger.exception( ( "Listener method %r threw an unexpected exception for %r. " + "Continuing with remaining listeners." ), unbound_method, value, )
ValueT = TypeVar("ValueT") _ListenerDecorator: TypeAlias = Callable[ [Callable[[ObserverT, Any], None]], ListenerMethod ] @overload def listen_to_signal(signal: str) -> _ListenerDecorator[ObserverT]: ... @overload def listen_to_signal( signal: Signal[ValueT], ) -> _ListenerDecorator[SharingObserverT]: ... @overload def listen_to_signal( signal: str, listener: Callable[[ObserverT, Any], None] ) -> ListenerMethod: ... @overload def listen_to_signal( signal: Signal[ValueT], listener: Callable[[SharingObserverT, Any], None] ) -> ListenerMethod: ...
[docs] def listen_to_signal( signal: str | Signal[ValueT], listener: Callable[[ObserverT, Any], None] | None = None, ) -> ( _ListenerDecorator[ObserverT] | _ListenerDecorator[SharingObserverT] | ListenerMethod ): """ Mark a method as listening to a signal. This function will be called asynchronously whenever a value is emitted for the signal. For :py:class:`Observer` objects, only a `signal` specified as a `str` is supported, however, for :py:class:`SharingObserver` objects both `str` and :py:class:`Signal` objects are accepted. For a :py:class:`SharingObserver`, signal names are relative to the parent object. """ # This functools.partial trick allows us to use listen_to_signal as a # decorator: # # ``` # @listen_to_signal("signal") # def on_signal(self, old_value, new_value): # ... # ``` # and as a function: # ``` # listener = listen_to_signal("signal", method) # ``` if listener is None: return cast( _ListenerDecorator[Any], functools.partial(listen_to_signal, signal) ) # Used by Observer.__init_subclass__ to know that this method wants to # listen to this signal. listener = cast(ListenerMethod, listener) listener.__listen_to_signal__ = signal return listener
[docs] class SharingObserver(Observer): """ An observer that shares its bus with sub-objects. When a :py:class:`~ska_tango_base.type_hints.BusProtocol` is assigned to :py:attr:`shared_bus` this will recursively set on all sub-objects which also inherit from :py:class:`SharingObserver`. Subclasses can be notified when a new :py:attr:`shared_bus` is available by overriding :py:meth:`on_new_shared_bus`. Each :py:class:`SharingObserver` object has a :py:attr:`observer_prefix` that prefixes any signals listened to by that object. The root object, where the bus was originally assigned, has an :py:attr:`observer_prefix` of "." and each sub-object has an :py:attr:`observer_prefix` based on the path to access that sub-object from the root object. For example: .. code :: python class SubObj(SharingObserver): bar = Signal[str]() @listen_to_signal(bar) def on_bar(self, value: str): print(value) class MySharer(SharingObserver): def __init__(self): self.foo = SubObj() self.shared_bus = SignalBus() self.shared_bus.start_thread() sharer = MySharer() assert sharer.observer_prefix == "." assert sharer.foo.observer_prefix == ".foo" Here, the ``sharer.foo.on_bar`` listener method will receive values emitted on the bus whenever ``sharer.foo.bar`` is set. """ _canonicalised_listener_methods: dict[str, list[ListenerMethod]] _shared_bus: BusProtocol | None ROOT_OBSERVER_PREFIX: ClassVar[str] = "."
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None: """Initialise the device.""" self._shared_bus = None self._path_from_root = self.ROOT_OBSERVER_PREFIX super().__init__(*args, **kwargs)
@property def observer_prefix(self) -> str: """Return the path from the root ``SharingObserver`` holding the bus.""" return self._path_from_root @property def shared_bus(self) -> BusProtocol: """The shared bus used by this ``SharingObserver``.""" if self._shared_bus is None: raise RuntimeError("The bus has not been shared!") return self._shared_bus @shared_bus.setter def shared_bus(self, bus: BusProtocol) -> None: self._shared_bus = bus self._shared_bus.register_observer(self) self.on_new_shared_bus() for key, obj in vars(self).items(): if isinstance(obj, SharingObserver): # pylint: disable=protected-access obj._path_from_root = f"{self._path_from_root}{key}." obj.shared_bus = self._shared_bus for cls in reversed(self.__class__.mro()): for key, obj in vars(cls).items(): if isinstance(obj, Signal) and obj._initial_value is not NoValue: obj.__set__(self, obj._initial_value) @property def _listener_methods(self) -> dict[str, list[ListenerMethod]]: return self._canonicalised_listener_methods
[docs] def on_new_shared_bus(self) -> None: """ Notify that a new bus is available. When overriding this method you must call ``super().on_new_shared_bus``. """ listeners = defaultdict(lambda: []) for signal, methods in self._static_listener_methods.items(): if isinstance(signal, str): canonical = canonicalise_relative_to(self, signal) elif isinstance(signal, Signal): canonical = signal._absolute_name_for(self) else: raise TypeError(f"Unknown signal type '{type(signal)}'") listeners[canonical].extend(methods) self._canonicalised_listener_methods = dict(listeners)
[docs] def canonicalise_relative_to(obj: SharingObserverProtocol, signal: str) -> str: """ Return the absolute signal name, relative to obj. If signal is already absolute, it is returned as is. """ if signal.startswith(obj.ROOT_OBSERVER_PREFIX): return signal return f"{obj.observer_prefix}{signal}"
# FIXME: Provide a better name for this def _possible_signal_matches( obj: SharingObserverProtocol, signal: str ) -> Iterable[str]: """ Yield the possible matches of a signal for an object. That is, the signal name as is and the signal name relative to the object. """ yield signal if signal.startswith(obj.observer_prefix): yield signal.removeprefix(obj.observer_prefix) T = TypeVar("T")
[docs] class Signal(Generic[T]): """ Descriptor to provide access to signal on a shared bus. Can only be used on classes which model :py:class:`~ska_tango_base.type_hints.SharingObserverProtocol`. Whenever a ``Signal`` is set, a signal is emitted with the value provided. The relative name of the signal is taken from the name of the ``Signal`` on the owner class, and can be overridden by providing ``name`` kwarg to the initialiser. The relative name is prefixed by the :py:attr:`~ska_tango_base.type_hints.SharingObserverProtocol.observer_prefix` of the owner object before being emitted on the bus. :param name: name of the signal, if None this objects name is used :param stored: if True, store the latest emitted value on the parent object :param initial_value: if not NoValue, emit this value when connected to a new bus. """
[docs] def __init__( self: Signal[T], /, name: str | None = None, stored: bool = False, initial_value: Any = NoValue, ) -> None: """Initialise the object.""" self._relative_name = name self._stored = stored self._initial_value = initial_value
def __set_name__(self: Signal[T], owner: Any, name: str) -> None: """Set the name of the descriptor.""" if self._relative_name is None: self._relative_name = name @property def relative_name(self: Signal[T]) -> str: """Name of signal, relative to parent object.""" return cast(str, self._relative_name) def _absolute_name_for(self: Signal[T], obj: SharingObserverProtocol) -> str: """Return the absolute name of the signal when relative to obj.""" return f"{obj.observer_prefix}{self.relative_name}" @overload def __get__(self: Signal[T], obj: None, objtype: None = None) -> Signal[T]: ... @overload def __get__( self: Signal[T], obj: SharingObserverProtocol, objtype: type | None = None ) -> T: ... def __get__( self: Signal[T], obj: SharingObserverProtocol | None, objtype: type | None = None, ) -> T | Signal[T]: """ Return the last emitted value or self. :raises TypeError: If this Signal object does not store values. :raises KeyError: If the signal has never been emitted. """ if obj is None: return self name = self._absolute_name_for(obj) if not self._stored: raise AttributeError(f'Signal "{name}" does not store value.') value = getattr(obj, self.__store_name) return cast(T, value) @property def __store_name(self) -> str: return f"_Signal__{self._relative_name}_storage" def __set__(self: Signal[T], obj: SharingObserverProtocol, value: T) -> None: """Emit the signal.""" name = self._absolute_name_for(obj) obj.shared_bus.emit(name, value) if self._stored: setattr(obj, self.__store_name, value)
_ConvFunc: TypeAlias = Callable[..., Any] def _get_type_hint_from_signal(signal: str | Signal[Any]) -> Any | None: """ Return dtype hint from a signal. In general, for `Signal[T]` we return `T`, otherwise we return `None`. """ if not isinstance(signal, Signal): return None if hasattr(signal, "__orig_class__"): args = typing.get_args(signal.__orig_class__) else: return None if len(args) != 1: return None return args[0] # We are following the same style as pytango here, so use # snake case for this class.
[docs] class attribute_from_signal(attribute): # noqa: N801 """ A Tango attribute linked to an signal. ``attribute_from_signal`` takes all the same keyword arguments as :py:class:`tango.server.attribute`, and by default will provide an ``fget`` function which returns the last emitted value for the signal. Any value emitted for the signal will result in a change and archive events being pushed for the attribute. If ``write_to_signal`` is True or ``from_tango`` is not None, then an ``fset`` function will also be provided, so that any writes to the attribute will result in an value being emitted for the signal. If the signal is a :py:class:`Signal` object, then the ``__get__`` and ``__set__`` magic methods will be used to interact with the signal, otherwise the signal will be used as the name of the signal, which will be :py:func:`canonicalise_relative_to()` the Tango Device. ``attribute_from_signal`` optionally supports conversion functions ``to_tango`` and ``from_tango``. These will convert to and from the attribute value whenever the attribute is read/written or whenever change/archive events are pushed in response to a value being emitted for the signal. If the signal is a :py:class:`Signal` object, then any type arguments from the signal will be used as the ``dtype`` for the attribute. This can be overridden by providing a ``dtype`` keyword argument. If the conversion functions are provided, any annotations on the functions will be used to infer the ``dtype``, instead of any signal object provided. Again, this can be overridden with the ``dtype`` keyword argument. :param signal: Signal object or name of signal :param to_tango: Function to convert from signal value to Tango value :param from_tango: Function to convert from Tango value to signal value :param write_to_signal: If True, include an attribute write method that will write to the signal Examples: .. code:: python class MyDevice(SignalBusMixin, Device): my_basic_signal = Signal[int]() myAttr = attribute_from_signal( my_basic_signal, access=AttrWriteType.READ_WRITE ) @listen_to_signal("my_basic_signal") def on_my_basic_signal(self, old_value, new_value): # Called whenever a Tango client writes to the # "myAttr" attribute if new_value == 2: self.set_state(DevState.FAULT) # The attribute will have "DevString" dtype. @attribute_from_signal(my_basic_signal) def myAttrAsStr(self, signal_value: int) -> str: return str(signal_value) # The attribute will have AttrWriteType.READ_WRITE @myAttr_readwrite_str.from_tango def myAttrAsStr(self, tango_value: str) -> int: return int(tango_value) my_attr_signal = AttrSignal[dict[str, str]]() # json.dumps has no type annotations, so we must provide the dtype readOnlyAttr = attribute_from_signal( my_attr_signal, dtype=str, to_tango=json.dumps ) @command def MyCmd(self): # myAttr and myAttrAsAString will push change/archive events self.my_basic_signal = 1 # readOnlyAttr will push change/archive events self.my_attr_signal = {"foo": "bar"} # AttrSignal allows this to type check, but otherwise does # nothing different to Signal self.my_attr_signal = ({}, time.time(), AttrQuality.ATTR_INVALID) """
[docs] def __init__( self, signal: str | Signal[Any], /, to_tango: _ConvFunc | None = None, from_tango: _ConvFunc | None = None, write_to_signal: bool = False, initial_value: Any = NoValue, **kwargs: Any, ) -> None: """Initialise the object.""" self._signal = signal self._to_tango = to_tango self._from_tango = from_tango if "fget" not in kwargs: kwargs["fget"] = self._make_fget() if write_to_signal or self._from_tango is not None: kwargs["fset"] = self._make_fset() super().__init__(**kwargs)
[docs] def do_read( self, device: SignalBusMixin, attr: Attribute | None = None ) -> tuple[Any, float, AttrQuality]: """Read the signal value for the Tango attribute.""" # The signal bus thread (which is allowed to run in # always_executed_hook) may have called `push_change_event`, which # will set the value of the attribute. This means that pytango will # not set the value we return from here. So, we need to make sure # that the value flag is cleared when we leave this function. This is a # work around for cppTango#1407. if attr is None: attr = self.__get__(device, type(device)) if version.parse(tango.__version__) < version.parse("10.1.0"): attr.set_value_flag(False) value = device._SignalBusMixin__attr_values.get(self.attr_name, NoValue) if value is not NoValue: assert isinstance(value, _TimedAttrData) return _data_to_triple(value) return 0, time.time(), AttrQuality.ATTR_INVALID
def _make_fget(self) -> Callable[[SignalBusMixin, Any], Any]: def fget(device: SignalBusMixin, attr: Attribute | None = None): # type: ignore """Expose a signal as a Tango attribute.""" return self.do_read(device, attr) annotation = None if self._to_tango is not None: if "return" in getattr(self._to_tango, "__annotations__", {}): sig = inspect.signature(self._to_tango, eval_str=False) if sig.return_annotation is not inspect.Signature.empty: annotation = sig.return_annotation else: annotation = _get_type_hint_from_signal(self._signal) if annotation is not None: fget.__annotations__["return"] = annotation return fget
[docs] def do_write(self, device: SignalBusMixin, attr_or_val: Any) -> None: """Write a Tango attribute value to a signal.""" if isinstance(attr_or_val, Attribute): value = attr_or_val.get_write_value() else: value = attr_or_val value = self._convert_from_tango(device, value) if isinstance(self._signal, Signal): # pylint: disable=unnecessary-dunder-call self._signal.__set__(device, value) else: device.shared_bus.emit( canonicalise_relative_to(device, self._signal), value )
def _make_fset(self) -> Callable[[SignalBusMixin, Any], None]: def fset(device: SignalBusMixin, attr_or_val: Any) -> None: """Expose a signal as a Tango attribute.""" self.do_write(device, attr_or_val) annotation = None if self._from_tango is not None: sig = inspect.signature(self._from_tango, eval_str=False) *_, param_anno = (x.annotation for x in sig.parameters.values()) if param_anno is not inspect.Parameter.empty: annotation = param_anno else: annotation = _get_type_hint_from_signal(self._signal) if annotation is not inspect.Parameter.empty: fset.__annotations__["attr_or_val"] = annotation return fset @property def signal_name(self) -> str | None: """Return the name of the signal associated with this attribute.""" if isinstance(self._signal, Signal): return self._signal.relative_name return self._signal def _convert_to_tango(self, device: SignalBusMixin, signal_value: Any) -> Any: """Convert the signal value to be used with Tango.""" if self._to_tango is not None: if getattr(self._to_tango, "__to_tango_with_device__", False): return self._to_tango(device, signal_value) return self._to_tango(signal_value) return signal_value def _convert_from_tango(self, device: SignalBusMixin, tango_value: Any) -> Any: """Convert the tango value to a value to be emitted.""" if self._from_tango is not None: if getattr(self._from_tango, "__from_tango_with_device__", False): return self._from_tango(device, tango_value) return self._from_tango(tango_value) return tango_value
[docs] def to_tango(self, to_tango: _ConvFunc) -> attribute_from_signal: """Decorate attribute with to tango conversion function.""" self._to_tango = to_tango self._to_tango.__to_tango_with_device__ = True # type: ignore[attr-defined] return self.read(self._make_fget())
[docs] def from_tango(self, from_tango: _ConvFunc) -> attribute_from_signal: """Decorate attribute with from tango conversion function.""" self._from_tango = from_tango self._from_tango.__from_tango_with_device__ = True # type: ignore[attr-defined] return self.write(self._make_fset())
[docs] def on_init_device(self, device: SignalBusMixin) -> None: """ Register for events. Called by owner device during ``init_device()`` and whenever the attribute is added dynamically. """ device.set_change_event(self.attr_name, True, True) device.set_archive_event(self.attr_name, True, True)
[docs] def on_emission(self, device: SignalBusMixin, new_value: Any) -> None: """ Push events for emission. Called by owner device whenever a signal value is emitted for this attribute. """ data = self._make_data(device, new_value) args = _data_to_triple(data) device.push_change_event(self.attr_name, *args) device.push_archive_event(self.attr_name, *args) if version.parse(tango.__version__) >= version.parse("10.1.0"): if device.is_polled() and device.is_attribute_polled(self.attr_name): tg = tango.Util.instance(False) tg.fill_attr_polling_buffer(device, self.attr_name, data) device._SignalBusMixin__attr_values[self.attr_name] = data
if _TANGO_IS_MOCKED_BY_AUTODOC:
[docs] def read(self, fn: Any) -> attribute_from_signal: """Spoof read when in autodoc.""" return self
[docs] def write(self, fn: Any) -> attribute_from_signal: """Spoof write when in autodoc.""" return self
[docs] def is_allowed(self, f: Any) -> attribute_from_signal: """Spoof write when in autodoc.""" return self
def __call__(self, to_tango: _ConvFunc) -> attribute_from_signal: """Provide a conversion to tango function.""" if _TANGO_IS_MOCKED_BY_AUTODOC: return self return self.to_tango(to_tango) def _make_data(self, device: SignalBusMixin, value: Any) -> _TimedAttrData: if _is_value_triple(value): data = _TimedAttrData( value=self._convert_to_tango(device, value[0]), time_stamp=value[1], quality=value[2], ) else: data = _TimedAttrData(value=self._convert_to_tango(device, value)) return data
def _is_value_triple(value: Any) -> bool: return ( isinstance(value, tuple) and len(value) == 3 and isinstance(value[2], AttrQuality) ) def _data_to_triple(data: _TimedAttrData) -> tuple[Any, float, tango.AttrQuality]: # TODO: Handle errors time_stamp = data.time_stamp if time_stamp is None: time_stamp = time.time() return data.value, time_stamp, data.quality
[docs] class AttrSignal(Signal[T | tuple[T, float, AttrQuality]], Generic[T]): """ Convenience signal for working with Tango attributes. This class is intended to be used with :py:class:`attribute_from_signal`. It is provided as a convenient way to create a :py:class:`Signal` which supports setting a value directly and also setting a (data, date, quality) triple that can be used with :py:meth:`tango.Attribute.set_value_date_quality`. In both cases the value that is set is directly emitted. This class provides no runtime behaviour. It only exists to provide the correct type hints and allow :py:class:`attribute_from_signal` to infer its ``dtype`` from the signal object. Example: .. code:: python class MyDevice(SignalBusMixin, Device): my_signal = AttrSignal[int]() # dtype inferred from my_signal myAttr = attribute_from_signal(my_signal) @command def MyCmd(self): self.my_signal = 0 self.my_signal = (1, time.time(), AttrQuality.WARNING) """
[docs] class CachingAttrSignal(Signal[tuple[T, float, AttrQuality]], Generic[T]): """ Signal that includes a time stamp of when the value was emitted. This signal is intended to be used :py:class:`attribute_from_signal` to as a Tango attribute where the value is provided by some other thread, for example, a polling loop. When ``CachingAttrSignal`` is set with a value ``data``, a ``(data, date, quality)`` triple is emitted on the shared bus. The ``date`` is recorded as the time that the ``CachingAttrSignal`` was set and the ``quality`` is ``ATTR_VALID``. It is possible to override the values of ``date`` by setting the ``(data, date, quality)`` directly. """ def __set__( self: CachingAttrSignal[T], obj: SharingObserverProtocol, value: T | tuple[T, float, AttrQuality], ) -> None: """Emit the signal.""" if not _is_value_triple(value): value = (cast(T, value), time.time(), AttrQuality.ATTR_VALID) else: value = cast(tuple[T, float, AttrQuality], value) super().__set__(obj, value)
[docs] class SignalBusMixin(SharingObserver, SKADevice): """ A base class for mix-ins to use the shared bus. This class is not intended to be inherited from directly by users. Instead, it is intended to be used by mix-ins to utilities the bus. """ shared_bus: _SignalBus __default_signal_attribute_map: ClassVar[dict[str, attribute_from_signal]] = {} __signal_attribute_map: dict[str, attribute_from_signal] __attr_values: dict[str, Any]
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None: """Initialise object.""" # If True, we release the Tango monitor and wait for the condition variable # before trying to grab the monitor again. self.__stall_new_client_requests = False self.__new_client_request_wake_up = threading.Condition() try: super().__init__(*args, **kwargs) except: # noqa: E722 try: self.shared_bus.shutdown_thread() except: # noqa: E722 pass raise # Workaround for pytango#688. # # Unfortunately, pytango <=10.0 does not release the GIL while # exporting the device and iff we are connecting to a real database, # DeviceClass::export_device will try to grab Tango monitor. # This can lead to a deadlock with the SignalBus thread, as it grabs the # Tango monitor and then the GIL when we call push_change_event. # # The workaround is to pause the background thread until we have # been exported. # # We only want to do this if we are using a database and it is _not_ # a file database, as this is the only case where we get exported. tg = tango.Util.instance() if tg._UseDb and not tg._FileDb: self.shared_bus._pause_thread_until(self.get_exported_flag)
@classmethod def __init_subclass__(cls: type[SignalBusMixin], **kwargs: Any) -> None: """Initialise the default signal map.""" super().__init_subclass__(**kwargs) attr_sig_map: dict[str, attribute_from_signal] = {} for key, obj in vars(cls).items(): if isinstance(obj, attribute_from_signal): signal_name = obj.signal_name if signal_name is None: raise TypeError( f"No signal name provided for attribute_from_signal {key}" ) attr_sig_map[signal_name] = obj cls.__default_signal_attribute_map = attr_sig_map
[docs] def init_device(self: SignalBusMixin) -> None: """Initialise the shared bus.""" super().init_device() self.__signal_attribute_map = dict(self.__default_signal_attribute_map) self.__attr_values = {} self.shared_bus = _SignalBus(logger=self.logger, name=self.get_name()) for attr in self.__signal_attribute_map.values(): attr.on_init_device(self) self.shared_bus.start_thread()
[docs] def add_attribute( self: SignalBusMixin, attr: attribute, ) -> None: """Add an attribute dynamically.""" with AutoTangoMonitor(self): super().add_attribute(attr) if isinstance(attr, attribute_from_signal): signal_name = attr.signal_name if signal_name is None: raise RuntimeError( "No signal name provided for attribute_from_signal" ) attr.on_init_device(self) self.__signal_attribute_map[signal_name] = attr
[docs] def notify_emission(self: SignalBusMixin, signal: str, value: Any) -> None: """Push change events for AttributeSignal objects.""" super().notify_emission(signal, value) for s in _possible_signal_matches(self, signal): attr = self.__signal_attribute_map.get(s, None) if attr is not None: attr.on_emission(self, value)
[docs] def delete_device(self: SignalBusMixin) -> None: """Shutdown the bus background thread.""" with self.allow_internal_threads(): self.shared_bus.shutdown_thread() super().delete_device()
[docs] @contextmanager def allow_internal_threads(self: SignalBusMixin) -> Iterator[None]: """ Allow internal threads to run without allowing new Tango requests. This is intended to be used from a Tango client request thread to allow internal threads to acquire the Tango monitor, without allowing another Tango request to start. This is used in the following two places in ska-tango-base: 1. During :py:meth:`!init_device()` to allow background threads to cleanup gracefully and be joined. If we allow other threads to run during :py:meth:`!init_device()` or :py:meth:`!delete_device()`, a client could initiate a new request on our device as the Tango client thread is able to grab the device lock. This should be avoided because the device is only partially initialised. This method avoids this issue by arranging for Tango client threads to be disallowed from grabbing the Tango monitor, while still allowing other internal threads to acquire the monitor. 2. At the start of every request to allow the signal bus thread to catch up. This ensures that Tango clients get a sequentially consistent view of the Tango device. That is, if they call a command which modifies the value of an attribute then read that attribute, they will see the updated value, even if the attribute is using the signal bus (e.g. via attribute_from_signal). Subclasses of SignalBusMixin should """ # Allowing internal threads to grab the Tango monitor is a little tricky. # We want to release the Tango monitor, but not allow the Tango # client threads from starting a new request. The way we do this is # with the always_executing_hook, which is always called by Tango from the # Tango client thread, before they call into us. # # We communicate that we don't want to allow new client requests # by setting __stall_new_client_requests. When the # always_executing_hook sees this is set, it releases the Tango # monitor again and waits until it is signaled to try again # via the __new_client_request_wake_up. # # We must be holding the Tango monitor to read/write the # __stall_new_client_requests variable. We also cannot # acquire the Tango monitor while holding the # __new_client_requests_wake_up lock. self.__stall_new_client_requests = True try: with AutoTangoAllowThreads(self): yield finally: self.__stall_new_client_requests = False with self.__new_client_request_wake_up: self.__new_client_request_wake_up.notify_all()
# The default cppTango timeout for acquiring a monitor is 3.2 seconds, # and we would like to maintain that, however, we don't know how # long we waited for to initially acquire the lock, so we just # assume it was very quick and wait for the full timeout seconds # again to ensure that we wait for _at least_ that long. _CLIENT_NEW_REQUEST_TIMEOUT = 3.2
[docs] def always_executed_hook(self) -> None: """Raise an exception if we have released threads during an Init command.""" super().always_executed_hook() # The order here is important, we have to first grab the # __new_client_request_wake_up CV lock, then release the Tango # monitor and then wait on the CV (and release the CV lock). # Otherwise, we might miss the notify_all() from the thread calling # allow_internal_threads(). # # We also have to make sure that we release the CV lock _before_ we try # to re-acquire the Tango monitor, otherwise we might deadlock with the # thread. if self.__stall_new_client_requests: end_timestamp = time.monotonic() + self._CLIENT_NEW_REQUEST_TIMEOUT while self.__stall_new_client_requests: if time.monotonic() > end_timestamp: info = inspect.getframeinfo(cast(FrameType, inspect.currentframe())) Except.throw_exception( "API_CommandTimedOut", "Unable to acquire the device monitor " "while device re-initialising", f"{info.filename}:{info.lineno}", ) self.__new_client_request_wake_up.acquire() with AutoTangoAllowThreads(self): self.__new_client_request_wake_up.wait() self.__new_client_request_wake_up.release() with self.allow_internal_threads(): try: self.shared_bus.wait_for_thread(self._CLIENT_NEW_REQUEST_TIMEOUT) except TimedOutError: info = inspect.getframeinfo(cast(FrameType, inspect.currentframe())) Except.throw_exception( "API_CommandTimedOut", "Failed to wait for signal bus before servicing Tango request.", f"{info.filename}:{info.lineno}", )