Source code for ska_tango_base.callback_scheduler

"""
A module for managing Tango callbacks robustly.

The aim of the :class:`CallbackScheduler` class is to allow you to
decouple the performance of callbacks from each other, allowing you to locally
reason about the callback.

Currently :class:`CallbackScheduler` only supports callbacks for Tango
events but it may support SKA Long Running Command callbacks and
asynchronous Tango request callbacks in the future.
"""

from __future__ import annotations

import collections
import concurrent.futures as _futs
import heapq
import itertools
import logging
import threading
import time
import typing
import weakref
from dataclasses import dataclass, field

import tango
from packaging import version

from . import type_hints

__all__ = ["CallbackScheduler", "CallbackID", "Queue", "BrokenCallbackSchedulerError"]


CallbackID = typing.NewType("CallbackID", int)
"""
Callback Identifier.

Used to unregister a callback.  Can be discarded if unregistering
is not needed.
"""


[docs] class Queue: """ Opaque reference to an internal event queue. If there are no references to the :class:`Queue` then the corresponding internal event queue will be deleted. The class:`CallbackScheduler` will keep a reference to the :class:`Queue` while the internal event queue is in use. """ __slots__ = ("__weakref__",)
[docs] class BrokenCallbackSchedulerError(Exception): """Raised when a CallbackScheduler has already been shutdown."""
[docs] def __init__(self) -> None: """Initialise exception message.""" super().__init__("CallbackSheduler already shutdown")
[docs] class CallbackScheduler: """ A class for managing Tango callbacks robustly. Tango events are quickly moved from the Tango thread to internal queues where background threads call the supplied callbacks. This decouples the speed of event processing of the supplied callbacks from the rest of the Tango process. Queues are bounded and will discard old data if the supplied callbacks cannot keep up. The queue can be configured by manually allocating it with the :meth:`allocate_queue` method and then passing the returned queue to :meth:`register_event_callback` or :meth:`connect_event_stream`. Multiple event streams can be allocated to the same queue to ensure events from each stream are processed in order relative to each other. Queues are processed by worker threads with each queue being assigned a priority that is lower the more events that have been processed recently. The worker threads process one event on the queue at a time, selecting a new queue to process in between each event. """ _name_counter = itertools.count().__next__
[docs] def __init__( self, *, thread_count: int = 1, name: str | None = None, logger: logging.Logger | None = None, ) -> None: """ Initialise the object. .. warning:: If ``thread_count > 1`` then registered callbacks for different event streams can be executed concurrently and it is the users responsibility to ensure this is safe. This is different from Tango event subscriptions where callbacks are all called from the same thread. :param thread_count: Number of worker threads to spawn. :param name: Name of the CallbackScheduler, if ``None`` a default name will be provided. :param logger: Logger object to use, if ``None`` the module logger will be used. """ if name is None: name = f"CallbackScheduler-{CallbackScheduler._name_counter()}" self.name = name self._logger = logger if logger is not None else logging.getLogger(__name__) # If empty the event handler has been shutdown and # we raise BrokenCallbackSchedulerError as a courtesy self._threads: set[threading.Thread] = set() self._callback_id_counter = itertools.count().__next__ # Lock invariant: Only 1 thread is calling _callback_id_counter at a # time. self._callback_id_lock = threading.Lock() # We do not need a lock to guard _callbacks as CallbackID is an int so # __eq__ does not run arbitrary code. See # https://docs.python.org/3/library/threadsafety.html#thread-safety-dict. self._callbacks: dict[ CallbackID, tuple[_EventStream, type_hints.EventCallbackType] ] = {} self._queues = _QueueSet() self._proxies = weakref.WeakValueDictionary[ str, tango.DeviceProxy | _PendingProxy ]() self._proxy_creator = _ProxyCreator(self._logger) # Lock invariants: # - At most one DeviceProxy or _PendingProxy exists for each trl # - If a _PendingProxy is in _proxies then _proxy_creator is not # None and it has the _PendingProxy self._proxies_lock = threading.Lock() self._event_streams: dict[_EventStream, _EventStreamMeta] = {} # Lock invariant: At most one _EventStreamMeta exists for each # _EventStream, including any _EventStreamMeta that is "on the stack". # # If both this lock and an _EventStreamMeta.lock need to be held at the # same time, then this must be acquired first self._event_streams_lock = threading.Lock() self._active_heap = _ActiveHeap() for i in range(thread_count): thread_name = f"{name}-{i}" t = threading.Thread( name=thread_name, target=_worker, args=( self._queues, self._active_heap, self.name, self._logger, ), daemon=True, ) t.start() self._threads.add(t)
[docs] def shutdown(self) -> None: """Shutdown background threads and subscribe from all events.""" threads = self._begin_shutdown() self._wait_shutdown(threads)
def __del__(self) -> None: """Shutdown CallbackScheduler.""" self.shutdown() @typing.overload def register_event_callback( self, device_trl: str, event_type: tango.EventType, callback: type_hints.EventCallbackType, /, initial_event: bool = True, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[CallbackID]: ... @typing.overload def register_event_callback( self, device_trl: str, attr: str, event_type: tango.EventType, callback: type_hints.EventCallbackType, /, initial_event: bool = True, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[CallbackID]: ... @typing.overload def register_event_callback( self, device: tango.DeviceProxy, event_type: tango.EventType, callback: type_hints.EventCallbackType, /, initial_event: bool = True, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[CallbackID]: ... @typing.overload def register_event_callback( self, device: tango.DeviceProxy, attr: str, event_type: tango.EventType, callback: type_hints.EventCallbackType, /, initial_event: bool = True, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[CallbackID]: ...
[docs] def register_event_callback( self, *args: typing.Any, initial_event: bool = True, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[CallbackID]: """ Register a callback to an event stream. If it is not already connected, this will create a temporary connection. A temporary connection will automatically be disconnected when there are no registered callbacks. See :meth:`connect_event_stream` for details. If unregistering independently from other callbacks is not required, the returned :class:`CallbackID` and/or :class:`~concurrent.futures.Future` can be safely discarded. >>> eh = CallbackScheduler() >>> fut = eh.register_callback( "foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT, my_callback) >>> cid = fut.result(timeout=10) # Wait for connection to be established >>> ... >>> eh.unregister_callback(cid) Connections to the event stream are done with a stateless Tango subscription, meaning if the subscription fails ``callback`` will be called with an error :class:`~tango.EventData` object describing the reason for the failure. Tango will retry the subscription asynchronously every 10 seconds. If constructing a :class:`~tango.DeviceProxy` from the given ``device_trl`` fails, for example, because the device is not defined in the database, the CallbackScheduler will retry creating the DeviceProxy asynchronously with an exponential backoff. The provided callback will be called whenever :class:`~tango.DeviceProxy` creation fails with a synthetic error :class:`~tango.EventData` object describing the reason for the failure, where only the ``err``, ``errors`` and ``reason`` (for PyTango >=10.1) are set. :param device: The Tango device producing the event stream. This proxy will not be used to connect to the event stream, instead a device proxy managed by the callback scheduler will be used. :param device_trl: The Tango resource locator of the device producing the event stream. :param attr: The name of the attribute to connect to. Absent for ``INTERFACE_CHANGE_EVENT``. :param event_type: The type of the event stream. :param callback: Callback to call with event data. :param queue_factory: Returns a queue to use for this event stream if not yet connected. :return: Future returning callback ID to unregister with. :raises tango.DevFailed: If canonicalising the ``device_trl`` fails """ result = _futs.Future[CallbackID]() callback: typing.Callable[..., None] stream, callback = _resolve_stream_args(*args) with self._callback_id_lock: cid = CallbackID(self._callback_id_counter()) self._callbacks[cid] = (stream, callback) def on_connection(fut: _ConnectionFuture) -> None: try: if result.set_running_or_notify_cancel(): meta, finalisers = fut.result() # CoW so that events received before this point do # not get sent to the new callback. new = meta.callbacks.copy() new.append(callback) meta.callbacks = new if initial_event and meta.last_event is not None: # FIXME(tri): It would be nice if we batched up these initial # calls when multiple callbacks are waiting for the # connection... I'm not quite sure how to do it with # the current architecture. queue = self._queues.mapping[meta.queue] queue.put(_Invocation(meta.last_event, [callback])) finalisers.append(lambda: result.set_result(cid)) else: try: del self._callbacks[cid] except KeyError: pass except Exception as ex: result.set_exception(ex) con_fut = _ConnectionFuture() con_fut.add_done_callback(on_connection) self._ensure_connected(stream, queue_factory, con_fut) return result
[docs] def unregister_callback( self, callback: CallbackID, ) -> None: """ Unregister a callback. If there are no more callbacks associated with the event stream, then the event stream will be disconnected unless it was marked as permanent via :meth:`connect_event_stream`. :param callback: The callback to unregister. :raises ValueError: If the callback ID is not known. """ try: stream, cb = self._callbacks.pop(callback) except KeyError: raise ValueError(f"Unknown callback ID {callback}.") with self._event_streams_lock: meta = self._event_streams[stream] with meta.lock: new = meta.callbacks.copy() new.remove(cb) meta.callbacks = new if meta.should_delete(): del self._event_streams[stream] if meta.should_delete(): meta.unsub_and_cancel(stream)
@typing.overload def connect_event_stream( self, device_trl: str, event_type: tango.EventType, /, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[None]: ... @typing.overload def connect_event_stream( self, device_trl: str, attr: str, event_type: tango.EventType, /, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[None]: ... @typing.overload def connect_event_stream( self, device: tango.DeviceProxy, event_type: tango.EventType, /, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[None]: ... @typing.overload def connect_event_stream( self, device: tango.DeviceProxy, attr: str, event_type: tango.EventType, /, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[None]: ...
[docs] def connect_event_stream( self, *args: typing.Any, queue_factory: typing.Callable[[], Queue] | None = None, ) -> _futs.Future[None]: """ Connect to an event stream and maintain connection. The connection is established via a Tango event subscription with a callback that will push events to the queue associated with this event stream. Calling this function marks an event stream as "permanent", meaning it will not get disconnected even if there are no callbacks subscribed. This can be useful to avoid multiple Tango subscription requests to the Tango device when callbacks only want to be registered temporarily. If the connection has not already been made and ``queue_factory is None`` then a new queue will be allocated with the default arguments. Otherwise the ``queue_factory`` will be called to construct a new queue. Events are discarded rather than added to the queue if there are no callbacks subscribed. Connecting to the event stream is handled asynchronously and the returned future can be used to track the progress of the connection. If the connection is already active when this function is called, then the returned future will be fulfilled. Call :meth:`~concurrent.futures.Future.result` to wait until the connection has completed. The future may be :meth:`~concurrent.futures.Future.cancel`'d to abort the connection if it has not already been completed. .. warning:: The connection is considered completed as soon as Tango calls the internal subscription callback. This may be with an error event informing us that we are unable to connect to the Tango device for some reason. Example:: >>> eh = CallbackScheduler() >>> fut = eh.connect_event_stream( "foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT) >>> fut.result(timeout=10) # Wait for connection to be established :param device: The Tango device producing the event stream. :param device_trl: The Tango resource locator of the device producing the event stream. :param attr: The name of the attribute to connect to. Absent for ``INTERFACE_CHANGE_EVENT``. :param event_type: The type of the event stream. :param queue_factory: Returns a queue to use for this event stream if not yet connected. :return: Future to track the progress of the connection. """ result = _futs.Future[None]() stream, _ = _resolve_stream_args(*args) def on_connection(fut: _ConnectionFuture) -> None: try: if result.set_running_or_notify_cancel(): meta, finalisers = fut.result() meta.temporary = False finalisers.append(lambda: result.set_result(None)) except Exception as ex: result.set_exception(ex) con_fut = _ConnectionFuture() con_fut.add_done_callback(on_connection) self._ensure_connected(stream, queue_factory, con_fut) return result
@typing.overload def disconnect_event_stream( self, device_trl: str, event_type: tango.EventType, /, ) -> None: ... @typing.overload def disconnect_event_stream( self, device_trl: str, attr: str, event_type: tango.EventType, /, ) -> None: ... @typing.overload def disconnect_event_stream( self, device: tango.DeviceProxy, event_type: tango.EventType, /, ) -> None: ... @typing.overload def disconnect_event_stream( self, device: tango.DeviceProxy, attr: str, event_type: tango.EventType, /, ) -> None: ...
[docs] def disconnect_event_stream( self, *args: typing.Any, ) -> None: """ Disconnect from an event stream. If there are callbacks associated with the event stream, then this function will just mark the event stream as "temporary" and the event stream will be disconnected when the last callback is unregistered. :param device: The Tango device producing the event stream. :param device_trl: The Tango resource locator of the device producing the event stream. :param attr: The name of the attribute to connect to. Absent for ``INTERFACE_CHANGE_EVENT``. :param event_type: The type of the event stream. """ stream, _ = _resolve_stream_args(*args) with self._event_streams_lock: meta = self._event_streams[stream] with meta.lock: meta.temporary = True if meta.should_delete(): del self._event_streams[stream] if meta.should_delete(): meta.unsub_and_cancel(stream)
[docs] def disconnect_all(self) -> None: """ Unregister all callbacks and disconnect all event streams. Unlike :meth:`shutdown`, the worker threads are still running and the :class:`CallbackScheduler` can still be used. """ with self._event_streams_lock: streams = self._event_streams self._callbacks.clear() self._event_streams = {} for stream, meta in streams.items(): meta.unsub_and_cancel(stream)
[docs] def allocate_queue(self, *, queue_size: int = 8) -> Queue: """ Allocate an event queue. :param queue_size: Size of the queue. When full old entries will be discarded to make space for the new. :return: The allocated queue. """ if not self._threads: raise BrokenCallbackSchedulerError() return self._queues.allocate(self._active_heap, queue_size)
@typing.overload def get_queue( self, device_trl: str, event_type: tango.EventType, /, ) -> Queue: ... @typing.overload def get_queue( self, device_trl: str, attr: str, event_type: tango.EventType, /, ) -> Queue: ... @typing.overload def get_queue( self, device: tango.DeviceProxy, event_type: tango.EventType, /, ) -> Queue: ... @typing.overload def get_queue( self, device: tango.DeviceProxy, attr: str, event_type: tango.EventType, /, ) -> Queue: ...
[docs] def get_queue(self, *args: typing.Any) -> Queue: """ Return the queue used by a particular event stream. :param trl: The Tango resource locator of the origin of the event stream. :param event_type: The type of the event stream. :return: ID of the queue. """ if not self._threads: raise BrokenCallbackSchedulerError() stream, _ = _resolve_stream_args(*args) try: meta = self._event_streams[stream] return meta.queue except KeyError: raise KeyError(f"Unknown event stream {stream!r}")
def _get_proxy(self, trl: str) -> tango.DeviceProxy | _PendingProxy: """If _PendingProxy is returned it will be locked.""" with self._proxies_lock: if trl not in self._proxies: try: result = tango.DeviceProxy(trl) self._proxies[trl] = result return result except tango.DevFailed as ex: event = _synthesise_error_event(ex) pending = _PendingProxy(trl, event) pending.lock.acquire() def on_proxy(fut: _ProxyFuture) -> None: with self._proxies_lock: self._proxies[trl] = fut.result() pending.future.add_done_callback(on_proxy) self._proxies[trl] = pending self._proxy_creator.add_pending(pending) # Do not release pending.lock here, that is up to the # caller. return pending result = self._proxies[trl] if isinstance(result, _PendingProxy): result.lock.acquire() return result def _ensure_connected( self, stream: _EventStream, queue_factory: typing.Callable[[], Queue] | None, fut: _ConnectionFuture, ) -> None: """ Ensure an event stream is connected. To maximise concurrency and avoid deadlocks this method ensures the following: 1. We release the `_event_streams_lock` ASAP to allow other threads to access other event streams. 2. We grab the `_EventStreamData.lock` before we release the `_event_stream_lock` so that we can mark the data stream as in use before it can be deleted. 3. We do not call "user code" while holding a lock as this may lead to a deadlock. `fut` will be set with the `_EventStreamData` object and a list of finalisers while holding the `_EventStreamData.lock` or an exception if the event stream is disconnected while the connection is still pending. After this the `_EventStreamData` will be checked to see if it should be deleted, so the `fut` should have a done callback to mark the data as in use when required. After this check, the `finalisers` will be called while not holding the `_EventStreamData.lock`. These should be used to set the result of user `Future`s. """ if not self._threads: raise BrokenCallbackSchedulerError() fut.set_running_or_notify_cancel() # Mark as uncancelable # If true, we need to do a Tango event subscription to connect # this event stream. sub_required = False with self._event_streams_lock: if stream in self._event_streams: meta = self._event_streams[stream] else: if queue_factory is None: queue = self.allocate_queue() else: queue = queue_factory() meta = _EventStreamMeta(queue) self._event_streams[stream] = meta sub_required = True meta.lock.acquire() finalisers: list[typing.Callable[[], None]] = [] try: if meta.pending_futures is not None: meta.pending_futures.append(fut) else: fut.set_result((meta, finalisers)) finally: meta.lock.release() for cb in finalisers: cb() if not sub_required: return connected = False def on_event_data( event: tango.EventData, metaref: weakref.ref[_EventStreamMeta] = weakref.ref(meta), ) -> None: # Use a weakref to avoid this callback keeping the Meta alive. meta = metaref() if meta is None: return with meta.lock: meta.last_event = event if meta.callbacks: queue = self._queues.mapping[meta.queue] queue.put(_Invocation(event, meta.callbacks)) # On initial connection (successful or otherwise) inform all waiting # _ConnectionFutures so the done callback can update the meta with # either a EventCallbackType or marking it as permanent. nonlocal connected if not connected: # Callbacks to update user futures _after_ we have resolved # the connection. finalisers: list[typing.Callable[[], None]] = [] with meta.lock: futs = meta.pending_futures meta.pending_futures = None assert futs is not None for f in futs: # This calls a done callback as we always add the # callback to the _ConnectionFuture before we pass it # here. f.set_result((meta, finalisers)) # If all the user futures were cancelled, this event stream might # need to be disconnected as no callbacks were added and it wasn't # marked permanent. with self._event_streams_lock: with meta.lock: if meta.should_delete(): del self._event_streams[stream] if meta.should_delete(): meta.unsub_and_cancel(stream) connected = True for cb in finalisers: cb() def start_subscription( self: CallbackScheduler, stream: _EventStream, meta: _EventStreamMeta, ) -> None: try: args = stream.sub_args assert isinstance(meta.proxy, tango.DeviceProxy) # COMPAT(pytango < 10.1): sub_mode was added in PyTango 10.1 if version.parse(tango.__version__) < version.parse("10.1.0"): meta.sub_id = meta.proxy.subscribe_event( *args, on_event_data, stateless=True ) else: meta.sub_id = meta.proxy.subscribe_event( *args, on_event_data, sub_mode=tango.EventSubMode.AsyncRead, ) except Exception: with self._event_streams_lock: with meta.lock: try: del self._event_streams[stream] except KeyError: pass meta.unsub_and_cancel(stream) self._logger.exception( f"Failed to start event subscription for {stream}" ) meta.proxy = self._get_proxy(stream.device_trl) if isinstance(meta.proxy, tango.DeviceProxy): start_subscription(self, stream, meta) else: def on_proxy( fut: _ProxyFuture, selfref: weakref.ref[CallbackScheduler] = weakref.ref(self), stream: _EventStream = stream, metaref: weakref.ref[_EventStreamMeta] = weakref.ref(meta), ) -> None: self = selfref() if self is None: return meta = metaref() if meta is None: return meta.proxy = fut.result() start_subscription(self, stream, meta) meta.proxy.future.add_done_callback(on_proxy) meta.proxy.error_callbacks.append(on_event_data) on_event_data(meta.proxy.last_error) meta.proxy.lock.release() def _begin_shutdown(self) -> list[threading.Thread]: threads = list(self._threads) self._threads.clear() self.disconnect_all() for _ in threads: self._active_heap.submit(_NULL_QUEUE) proxy_creator_thread = self._proxy_creator.begin_shutdown() if proxy_creator_thread is not None: threads.append(proxy_creator_thread) return threads def _wait_shutdown(self, threads: list[threading.Thread]) -> None: for t in threads: t.join()
class _ActiveHeap: def __init__(self) -> None: self.work_available = threading.Condition() self.heap: list[_InvocationQueue] = [] self.ticket_counter = itertools.count(1).__next__ def submit(self, queue: _InvocationQueue) -> None: # queue lock must be held with self.work_available: if queue is not _NULL_QUEUE: queue.active = True queue.ticket = self.ticket_counter() heapq.heappush(self.heap, queue) self.work_available.notify() def next_queue(self, current: _InvocationQueue) -> _InvocationQueue: if current is not _NULL_QUEUE: with current.mutex: if current.items: with self.work_available: current.ticket = self.ticket_counter() return heapq.heappushpop(self.heap, current) current.active = False del current with self.work_available: while not self.heap: # Reset the ticket counter while nothing is going on to avoid # the ticket numbers getting too large self.ticket_counter = itertools.count(1).__next__ self.work_available.wait() return heapq.heappop(self.heap) @dataclass(slots=True) class _Invocation: event: type_hints.EventDataType callbacks: list[type_hints.EventCallbackType] class _InvocationQueue: """ A thread-safe, bounded queue that never blocks. Unlike queue.SimpleQueue, this queue is bounded. Unlike queue.Queue, this queue will discard the oldest data to make room for new data. The queue will add itself to it's ``_ActiveHeap`` when it has data. """ def __init__(self, active_heap: _ActiveHeap, max_size: int) -> None: self.items = collections.deque[_Invocation]() self.active_heap = active_heap self.max_size = max_size self.active = False # Lower priority is higher precedence, 0.0 is the highest # allowed precedence self.priority = 0.0 # Unique for all active queues. Used as a tie breaker to ensure # round-robin. self.ticket = 0 # Must be held for all operations. self.mutex = threading.Lock() def put(self, item: _Invocation) -> None: """ Put an item into the queue. If the queue is full, the first (oldest) item will be removed to make room. :param item: Item to insert """ with self.mutex: # This should only ever run once, but we defensively use # a while loop to make sure we make enough room. while len(self.items) >= self.max_size: self.items.popleft() self.items.append(item) if not self.active: self.active_heap.submit(self) def get(self) -> _Invocation | None: """Get an item from the front of the queue.""" with self.mutex: if len(self.items): return self.items.popleft() return None def __lt__(self, other: _InvocationQueue) -> bool: return (self.priority, self.ticket) < (other.priority, other.ticket) _NULL_QUEUE = _InvocationQueue(typing.cast(_ActiveHeap, None), 0) @dataclass(init=True, unsafe_hash=True, eq=True, slots=True, frozen=True) class _EventStream: device_trl: str attr: str | None event_type: tango.EventType @property def sub_args(self) -> tuple[typing.Any, ...]: if self.attr is None: return (self.event_type,) return (self.attr, self.event_type) _T = typing.TypeVar("_T") def _resolve_stream_args( device_proxy_or_trl: tango.DeviceProxy | str, attr_or_type: str | tango.EventType, event_type: tango.EventType | _T, arg: _T | None = None, ) -> tuple[_EventStream, _T]: if isinstance(attr_or_type, str): attr = attr_or_type else: attr = None event_type = attr_or_type arg = typing.cast(_T, event_type) if isinstance(device_proxy_or_trl, str): if not device_proxy_or_trl.startswith("tango://"): db = tango.Database() db_host = db.get_db_host() db_port = db.get_db_port() device_trl = f"tango://{db_host}:{db_port}/{device_proxy_or_trl}" else: device_trl = device_proxy_or_trl else: db_port = device_proxy_or_trl.get_db_port() dev_name = device_proxy_or_trl.dev_name() if db_port == "Unused": dev_host = device_proxy_or_trl.get_dev_host() dev_port = device_proxy_or_trl.get_dev_port() device_trl = f"tango://{dev_host}:{dev_port}/{dev_name}#dbase=no" else: db_host = device_proxy_or_trl.get_db_host() device_trl = f"tango://{db_host}:{db_port}/{dev_name}" return _EventStream(device_trl, attr, event_type), typing.cast(_T, arg) @dataclass class _EventStreamMeta: # Queue to add subscriptions to. queue: Queue # Lock invariants: # - (callbacks or not temporary) or the meta is not in the dictionary # - pending_futures is None and last_event is not None or the # event stream is still connecting # - if sub_id is not None, then there is an active Tango subscription lock: threading.Lock = field(default_factory=threading.Lock) # Proxy used to connect with. # Invariant: If proxy is None then so is sub_id. proxy: tango.DeviceProxy | _PendingProxy | None = None # The Tango subscription ID for this event stream. # Invariant: If sub_id is None then so is proxy. sub_id: int | None = None # The most recent event received for the event stream. This is used to # provide the initial call when we register a callback. last_event: typing.Any = None # A possibly empty list of callbacks to call whenever an event is received. # This list must be treated as CoW as the existing list might be reference # by an _Invocation in some _InvocationQueue. callbacks: list[type_hints.EventCallbackType] = field(default_factory=list) # Futures to be notified when the connection is established. Each future # must have a done callback to mark the meta as "in use" when called. If # None then the meta has already established the connection. pending_futures: list[_ConnectionFuture] | None = field(default_factory=list) # If True then the meta will be deleted when there are no remaining # subscriptions. # Invariant: if temporary is True then callbacks is not empty temporary: bool = True def should_delete(self) -> bool: return self.temporary and not self.callbacks def unsub_and_cancel(self, stream: _EventStream) -> None: if self.sub_id is not None and isinstance(self.proxy, tango.DeviceProxy): self.proxy.unsubscribe_event(self.sub_id) self.proxy = None self.sub_id = None if self.pending_futures is not None: for fut in self.pending_futures: fut.set_exception(RuntimeError(f"{stream!r} has been disconnected")) _DECAY_PERIOD = 10 _DECAY_FACTOR = 5 / 8 @dataclass class _QueueSet: mapping: weakref.WeakKeyDictionary[Queue, _InvocationQueue] = field( default_factory=weakref.WeakKeyDictionary ) # Lock invariant: Only one thread is either iterating or adding to # mapping at a time. lock: threading.Lock = field(default_factory=threading.Lock) next_decay: float = field(default_factory=lambda: time.monotonic() + _DECAY_PERIOD) def allocate(self, active_heap: _ActiveHeap, size: int) -> Queue: with self.lock: qid = Queue() self.mapping[qid] = _InvocationQueue(active_heap, size) return qid def decay_priorities(self) -> None: with self.lock: now = time.monotonic() while now > self.next_decay: self.next_decay += _DECAY_PERIOD for q in self.mapping.values(): q.priority *= _DECAY_FACTOR _ProxyFuture: typing.TypeAlias = _futs.Future[tango.DeviceProxy] _ConnectionFuture: typing.TypeAlias = _futs.Future[ tuple[_EventStreamMeta, list[typing.Callable[[], None]]] ] def _worker( queues: _QueueSet, active_heap: _ActiveHeap, name: str, logger: logging.Logger, ) -> None: with tango.EnsureOmniThread(): queue = _NULL_QUEUE while True: queues.decay_priorities() queue = active_heap.next_queue(queue) if queue is _NULL_QUEUE: return item = queue.get() if item is not None: for cb in item.callbacks: try: if callable(cb): cb(item.event) else: cb.push_event(item.event) except Exception: logger.exception( f"{name}: Callback {cb} raised an exception " f"for {item.event}. Continuing." ) queue.priority += 1 @dataclass class _PendingProxy: # TRL to create a proxy for trl: str # The error we received the last time we attempted to create the proxy last_error: tango.EventData # How many buckets to delay this proxy for next time it fails delay: int = 1 # Lock invariants: # - All error_callbacks have been called with last_event # - All error_callbacks have a corresponding done callback on the future lock: threading.Lock = field(default_factory=threading.Lock) # Future to be set when the proxy is available future: _ProxyFuture = field(default_factory=_ProxyFuture) # Callbacks to call every time we fail to create the proxy error_callbacks: list[typing.Callable[[tango.EventData], None]] = field( default_factory=list ) def _synthesise_error_event(ex: tango.DevFailed) -> tango.EventData: event = tango.EventData() event.err = True event.errors = ex # COMPAT(pytango < 10.1): event_reason was added in PyTango 10.1 if hasattr(event, "event_reason"): event.event_reason = tango.EventReason.SubFail event.reception_date = tango.TimeVal.now() return event class _ProxyCreator: """ A class to manage retrying creating device proxies. Clients can hand off _PendingProxy objects to :meth:`add_pending` to have them created asynchronously. A background thread is spawned as required and the thread will automatically retire after RETIRE_TIMEOUT seconds of inactivity. The creator assumes that creating the proxy has recently failed and will not try again for BUCKET_DELAY seconds. The delays between retries grow exponentially with a maximum delay of (BUCKET_COUNT - 1) * BUCKET_DELAY = 10 seconds. """ # How long to wait between processing consecutive buckets BUCKET_DELAY = 0.1 # How many buckets we have BUCKET_COUNT = 101 # How long we wait with nothing to do before we retire the thread RETIRE_TIMEOUT = 60 def __init__(self, logger: logging.Logger) -> None: self.logger = logger # List to collect new pending proxies from other threads. If None # then we have signaled to the thread to stop or the thread is retired. self.incoming: list[_PendingProxy] | None = None # Condition variable to signal updates to incoming. # Forbidden locks: thread_lock (thread_lock must be acquired first) self.incoming_cond = threading.Condition() # A sparse ring buffer of pending proxies to process at specified times. # There can be at most BUCKET_COUNT lists here and each key satisfies # 0 <= key < BUCKET_COUNT. # The key corresponds to when the list needs to be processed relative # to the current slot being processed. self.buckets: dict[int, list[_PendingProxy]] = {} self.thread: threading.Thread | None = None # Lock invariants: # - incoming is not None and thread.is_alive() # - incoming is None and thread is not None and thread.join() # will return self.thread_lock = threading.Lock() def add_pending(self, proxy: _PendingProxy) -> None: self.logger.debug( 'Adding incoming proxy "%s"', proxy.trl, ) with self.thread_lock, self.incoming_cond: if self.thread is None or self.incoming is None: if self.thread is not None: # This is safe because: # - Another thread cannot be in the middle of shutdown() # because we hold the thread_lock => the worker thread set # incoming = None. # - The work thread must have returned because we are holding # incoming_cond, i.e. the worker thread isn't going to # acquire incoming_cond again and we can join() it. self.thread.join() self.thread = threading.Thread( target=self._connect_proxies, ) self.incoming = [] self.thread.start() self.incoming.append(proxy) self.incoming_cond.notify() def begin_shutdown(self) -> threading.Thread | None: result = None with self.thread_lock: if self.thread is not None: with self.incoming_cond: self.incoming = None self.incoming_cond.notify() # We must release the incoming_cond here so that the # worker thread can grab it to check incoming. result = self.thread self.thread = None return result def _connect_proxies(self) -> None: with tango.EnsureOmniThread(): current_slot = 0 while True: # Wait for pending proxies or retire with self.incoming_cond: def pred() -> bool: # we are busy, or # we have new stuff to do, or # we have been asked to stop return bool( self.buckets or self.incoming or self.incoming is None ) if not self.incoming_cond.wait_for(pred, self.RETIRE_TIMEOUT): self.incoming = None return if self.incoming is None: return incoming = self.incoming self.incoming = [] # Schedule new pending proxies for the next bucket for pending in incoming: self._add_to_bucket(current_slot, pending) # Process the current bucket current_bucket = self.buckets.pop(current_slot, []) retry = [] for pending in current_bucket: self.logger.debug( 'Processing pending proxy "%s" for current bucket %d', pending.trl, current_slot, ) with pending.lock: try: proxy = tango.DeviceProxy(pending.trl) except tango.DevFailed as ex: self.logger.debug( 'Connection to pending proxy "%s" failed', pending.trl, ) retry.append(pending) event = _synthesise_error_event(ex) for cb in pending.error_callbacks: try: cb(event) except Exception: self.logger.exception( "Unexpected exception while calling error " f'callbacks for "{pending.trl}"' ) continue except Exception as ex: self.logger.debug( "Non-Tango exception while creating DeviceProxy " f'to "{pending.trl}". Discarding pending proxy.', exc_info=True, ) pending.future.set_exception(ex) self.logger.debug( 'Connection to pending proxy "%s" succeeded', pending.trl, ) try: pending.future.set_result(proxy) except Exception: self.logger.exception( "Unexpected exception while setting proxy future " f'for "{pending.trl}". Discarding pending proxy.' ) # Re-schedule failures for pending in retry: # We cap this to self.BUCKET_COUNT - 1 so that we don't put # this back in the same bucket. This way we know when we # sleep later that the current bucket is always empty, # meaning we are free to be interrupted in less than BUCKET_DELAY # seconds and not have pending proxies skip the queue. pending.delay = min(2 * pending.delay, self.BUCKET_COUNT - 1) self._add_to_bucket(current_slot, pending) # Wait until the next bucket, if any. next_slot = min( (k for k in self.buckets.keys() if k > current_slot), default=None ) if next_slot is None: next_slot = min((k for k in self.buckets.keys()), default=None) if next_slot is not None: target_slot_delta = (next_slot - current_slot) % self.BUCKET_COUNT sleep_time = target_slot_delta * self.BUCKET_DELAY self.logger.debug( "Sleeping for %.2fs to wait for bucket %d", sleep_time, next_slot, ) start = time.monotonic() with self.incoming_cond: self.incoming_cond.wait(timeout=sleep_time) if self.incoming is None: return # Work out how many buckets we actually slept for. slept_for = time.monotonic() - start slot_delta = min( int(slept_for / self.BUCKET_DELAY), target_slot_delta ) current_slot = (current_slot + slot_delta) % self.BUCKET_COUNT self.logger.debug( "Slept for %.2fs, now at bucket %d", slept_for, current_slot, ) else: self.logger.debug("All buckets empty") def _add_to_bucket(self, current_slot: int, pending: _PendingProxy) -> None: slot = (current_slot + pending.delay) % self.BUCKET_COUNT self.logger.debug( 'Adding pending proxy "%s" to bucket %d (delay=%d)', pending.trl, slot, pending.delay, ) bucket = self.buckets.setdefault(slot, []) bucket.append(pending)