"""
A module for managing Tango callbacks robustly.
The aim of the :class:`CallbackScheduler` class is to allow you to
decouple the performance of callbacks from each other, allowing you to locally
reason about the callback.
Currently :class:`CallbackScheduler` only supports callbacks for Tango
events but it may support SKA Long Running Command callbacks and
asynchronous Tango request callbacks in the future.
"""
from __future__ import annotations
import collections
import concurrent.futures as _futs
import heapq
import itertools
import logging
import threading
import time
import typing
import weakref
from dataclasses import dataclass, field
import tango
from packaging import version
from . import type_hints
__all__ = ["CallbackScheduler", "CallbackID", "Queue", "BrokenCallbackSchedulerError"]
CallbackID = typing.NewType("CallbackID", int)
"""
Callback Identifier.
Used to unregister a callback. Can be discarded if unregistering
is not needed.
"""
[docs]
class Queue:
"""
Opaque reference to an internal event queue.
If there are no references to the :class:`Queue` then the corresponding
internal event queue will be deleted. The class:`CallbackScheduler` will
keep a reference to the :class:`Queue` while the internal event queue is in
use.
"""
__slots__ = ("__weakref__",)
[docs]
class BrokenCallbackSchedulerError(Exception):
"""Raised when a CallbackScheduler has already been shutdown."""
[docs]
def __init__(self) -> None:
"""Initialise exception message."""
super().__init__("CallbackSheduler already shutdown")
[docs]
class CallbackScheduler:
"""
A class for managing Tango callbacks robustly.
Tango events are quickly moved from the Tango thread to internal queues where
background threads call the supplied callbacks. This decouples the speed of
event processing of the supplied callbacks from the rest of the Tango process.
Queues are bounded and will discard old data if the supplied callbacks
cannot keep up. The queue can be configured by manually allocating it with
the :meth:`allocate_queue` method and then passing the returned queue to
:meth:`register_event_callback` or :meth:`connect_event_stream`. Multiple event
streams can be allocated to the same queue to ensure events from each stream
are processed in order relative to each other.
Queues are processed by worker threads with each queue being assigned a
priority that is lower the more events that have been processed recently.
The worker threads process one event on the queue at a time, selecting a new
queue to process in between each event.
"""
_name_counter = itertools.count().__next__
[docs]
def __init__(
self,
*,
thread_count: int = 1,
name: str | None = None,
logger: logging.Logger | None = None,
) -> None:
"""
Initialise the object.
.. warning::
If ``thread_count > 1`` then registered callbacks for different
event streams can be executed concurrently and it is the users
responsibility to ensure this is safe. This is different from Tango
event subscriptions where callbacks are all called from the same
thread.
:param thread_count: Number of worker threads to spawn.
:param name: Name of the CallbackScheduler, if ``None`` a default name will be
provided.
:param logger: Logger object to use, if ``None`` the module logger will be used.
"""
if name is None:
name = f"CallbackScheduler-{CallbackScheduler._name_counter()}"
self.name = name
self._logger = logger if logger is not None else logging.getLogger(__name__)
# If empty the event handler has been shutdown and
# we raise BrokenCallbackSchedulerError as a courtesy
self._threads: set[threading.Thread] = set()
self._callback_id_counter = itertools.count().__next__
# Lock invariant: Only 1 thread is calling _callback_id_counter at a
# time.
self._callback_id_lock = threading.Lock()
# We do not need a lock to guard _callbacks as CallbackID is an int so
# __eq__ does not run arbitrary code. See
# https://docs.python.org/3/library/threadsafety.html#thread-safety-dict.
self._callbacks: dict[
CallbackID, tuple[_EventStream, type_hints.EventCallbackType]
] = {}
self._queues = _QueueSet()
self._proxies = weakref.WeakValueDictionary[
str, tango.DeviceProxy | _PendingProxy
]()
self._proxy_creator = _ProxyCreator(self._logger)
# Lock invariants:
# - At most one DeviceProxy or _PendingProxy exists for each trl
# - If a _PendingProxy is in _proxies then _proxy_creator is not
# None and it has the _PendingProxy
self._proxies_lock = threading.Lock()
self._event_streams: dict[_EventStream, _EventStreamMeta] = {}
# Lock invariant: At most one _EventStreamMeta exists for each
# _EventStream, including any _EventStreamMeta that is "on the stack".
#
# If both this lock and an _EventStreamMeta.lock need to be held at the
# same time, then this must be acquired first
self._event_streams_lock = threading.Lock()
self._active_heap = _ActiveHeap()
for i in range(thread_count):
thread_name = f"{name}-{i}"
t = threading.Thread(
name=thread_name,
target=_worker,
args=(
self._queues,
self._active_heap,
self.name,
self._logger,
),
daemon=True,
)
t.start()
self._threads.add(t)
[docs]
def shutdown(self) -> None:
"""Shutdown background threads and subscribe from all events."""
threads = self._begin_shutdown()
self._wait_shutdown(threads)
def __del__(self) -> None:
"""Shutdown CallbackScheduler."""
self.shutdown()
@typing.overload
def register_event_callback(
self,
device_trl: str,
event_type: tango.EventType,
callback: type_hints.EventCallbackType,
/,
initial_event: bool = True,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[CallbackID]: ...
@typing.overload
def register_event_callback(
self,
device_trl: str,
attr: str,
event_type: tango.EventType,
callback: type_hints.EventCallbackType,
/,
initial_event: bool = True,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[CallbackID]: ...
@typing.overload
def register_event_callback(
self,
device: tango.DeviceProxy,
event_type: tango.EventType,
callback: type_hints.EventCallbackType,
/,
initial_event: bool = True,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[CallbackID]: ...
@typing.overload
def register_event_callback(
self,
device: tango.DeviceProxy,
attr: str,
event_type: tango.EventType,
callback: type_hints.EventCallbackType,
/,
initial_event: bool = True,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[CallbackID]: ...
[docs]
def register_event_callback(
self,
*args: typing.Any,
initial_event: bool = True,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[CallbackID]:
"""
Register a callback to an event stream.
If it is not already connected, this will create a temporary connection.
A temporary connection will automatically be disconnected when there are
no registered callbacks. See :meth:`connect_event_stream` for details.
If unregistering independently from other callbacks is not required, the
returned :class:`CallbackID` and/or :class:`~concurrent.futures.Future`
can be safely discarded.
>>> eh = CallbackScheduler()
>>> fut = eh.register_callback(
"foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT, my_callback)
>>> cid = fut.result(timeout=10) # Wait for connection to be established
>>> ...
>>> eh.unregister_callback(cid)
Connections to the event stream are done with a stateless Tango
subscription, meaning if the subscription fails ``callback`` will be
called with an error :class:`~tango.EventData` object describing the reason for
the failure. Tango will retry the subscription asynchronously every 10
seconds.
If constructing a :class:`~tango.DeviceProxy` from the given
``device_trl`` fails, for example, because the device is not defined in
the database, the CallbackScheduler will retry creating the DeviceProxy
asynchronously with an exponential backoff. The provided callback will
be called whenever :class:`~tango.DeviceProxy` creation fails with a
synthetic error :class:`~tango.EventData` object describing the reason
for the failure, where only the ``err``, ``errors`` and ``reason`` (for
PyTango >=10.1) are set.
:param device: The Tango device producing the event stream. This proxy will not
be used to connect to the event stream, instead a device proxy managed by
the callback scheduler will be used.
:param device_trl: The Tango resource locator of the device producing the event
stream.
:param attr: The name of the attribute to connect to. Absent for
``INTERFACE_CHANGE_EVENT``.
:param event_type: The type of the event stream.
:param callback: Callback to call with event data.
:param queue_factory: Returns a queue to use for this event stream if not yet
connected.
:return: Future returning callback ID to unregister with.
:raises tango.DevFailed: If canonicalising the ``device_trl`` fails
"""
result = _futs.Future[CallbackID]()
callback: typing.Callable[..., None]
stream, callback = _resolve_stream_args(*args)
with self._callback_id_lock:
cid = CallbackID(self._callback_id_counter())
self._callbacks[cid] = (stream, callback)
def on_connection(fut: _ConnectionFuture) -> None:
try:
if result.set_running_or_notify_cancel():
meta, finalisers = fut.result()
# CoW so that events received before this point do
# not get sent to the new callback.
new = meta.callbacks.copy()
new.append(callback)
meta.callbacks = new
if initial_event and meta.last_event is not None:
# FIXME(tri): It would be nice if we batched up these initial
# calls when multiple callbacks are waiting for the
# connection... I'm not quite sure how to do it with
# the current architecture.
queue = self._queues.mapping[meta.queue]
queue.put(_Invocation(meta.last_event, [callback]))
finalisers.append(lambda: result.set_result(cid))
else:
try:
del self._callbacks[cid]
except KeyError:
pass
except Exception as ex:
result.set_exception(ex)
con_fut = _ConnectionFuture()
con_fut.add_done_callback(on_connection)
self._ensure_connected(stream, queue_factory, con_fut)
return result
[docs]
def unregister_callback(
self,
callback: CallbackID,
) -> None:
"""
Unregister a callback.
If there are no more callbacks associated with the event stream, then the
event stream will be disconnected unless it was marked as permanent via
:meth:`connect_event_stream`.
:param callback: The callback to unregister.
:raises ValueError: If the callback ID is not known.
"""
try:
stream, cb = self._callbacks.pop(callback)
except KeyError:
raise ValueError(f"Unknown callback ID {callback}.")
with self._event_streams_lock:
meta = self._event_streams[stream]
with meta.lock:
new = meta.callbacks.copy()
new.remove(cb)
meta.callbacks = new
if meta.should_delete():
del self._event_streams[stream]
if meta.should_delete():
meta.unsub_and_cancel(stream)
@typing.overload
def connect_event_stream(
self,
device_trl: str,
event_type: tango.EventType,
/,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[None]: ...
@typing.overload
def connect_event_stream(
self,
device_trl: str,
attr: str,
event_type: tango.EventType,
/,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[None]: ...
@typing.overload
def connect_event_stream(
self,
device: tango.DeviceProxy,
event_type: tango.EventType,
/,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[None]: ...
@typing.overload
def connect_event_stream(
self,
device: tango.DeviceProxy,
attr: str,
event_type: tango.EventType,
/,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[None]: ...
[docs]
def connect_event_stream(
self,
*args: typing.Any,
queue_factory: typing.Callable[[], Queue] | None = None,
) -> _futs.Future[None]:
"""
Connect to an event stream and maintain connection.
The connection is established via a Tango event subscription with a callback
that will push events to the queue associated with this event stream.
Calling this function marks an event stream as "permanent", meaning it will
not get disconnected even if there are no callbacks subscribed. This can be
useful to avoid multiple Tango subscription requests to the Tango device when
callbacks only want to be registered temporarily.
If the connection has not already been made and ``queue_factory is
None`` then a new queue will be allocated with the default arguments.
Otherwise the ``queue_factory`` will be called to construct a new queue.
Events are discarded rather than added to the queue if there are no
callbacks subscribed.
Connecting to the event stream is handled asynchronously and the
returned future can be used to track the progress of the connection. If
the connection is already active when this function is called, then the
returned future will be fulfilled. Call
:meth:`~concurrent.futures.Future.result` to wait until the connection has
completed. The future may be :meth:`~concurrent.futures.Future.cancel`'d to
abort the connection if it has not already been completed.
.. warning::
The connection is considered completed as soon as Tango calls the internal
subscription callback. This may be with an error event informing us that
we are unable to connect to the Tango device for some reason.
Example::
>>> eh = CallbackScheduler()
>>> fut = eh.connect_event_stream(
"foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT)
>>> fut.result(timeout=10) # Wait for connection to be established
:param device: The Tango device producing the event stream.
:param device_trl: The Tango resource locator of the device producing the event
stream.
:param attr: The name of the attribute to connect to. Absent for
``INTERFACE_CHANGE_EVENT``.
:param event_type: The type of the event stream.
:param queue_factory: Returns a queue to use for this event stream if not yet
connected.
:return: Future to track the progress of the connection.
"""
result = _futs.Future[None]()
stream, _ = _resolve_stream_args(*args)
def on_connection(fut: _ConnectionFuture) -> None:
try:
if result.set_running_or_notify_cancel():
meta, finalisers = fut.result()
meta.temporary = False
finalisers.append(lambda: result.set_result(None))
except Exception as ex:
result.set_exception(ex)
con_fut = _ConnectionFuture()
con_fut.add_done_callback(on_connection)
self._ensure_connected(stream, queue_factory, con_fut)
return result
@typing.overload
def disconnect_event_stream(
self,
device_trl: str,
event_type: tango.EventType,
/,
) -> None: ...
@typing.overload
def disconnect_event_stream(
self,
device_trl: str,
attr: str,
event_type: tango.EventType,
/,
) -> None: ...
@typing.overload
def disconnect_event_stream(
self,
device: tango.DeviceProxy,
event_type: tango.EventType,
/,
) -> None: ...
@typing.overload
def disconnect_event_stream(
self,
device: tango.DeviceProxy,
attr: str,
event_type: tango.EventType,
/,
) -> None: ...
[docs]
def disconnect_event_stream(
self,
*args: typing.Any,
) -> None:
"""
Disconnect from an event stream.
If there are callbacks associated with the event stream, then this
function will just mark the event stream as "temporary" and the event
stream will be disconnected when the last callback is unregistered.
:param device: The Tango device producing the event stream.
:param device_trl: The Tango resource locator of the device producing the event
stream.
:param attr: The name of the attribute to connect to. Absent for
``INTERFACE_CHANGE_EVENT``.
:param event_type: The type of the event stream.
"""
stream, _ = _resolve_stream_args(*args)
with self._event_streams_lock:
meta = self._event_streams[stream]
with meta.lock:
meta.temporary = True
if meta.should_delete():
del self._event_streams[stream]
if meta.should_delete():
meta.unsub_and_cancel(stream)
[docs]
def disconnect_all(self) -> None:
"""
Unregister all callbacks and disconnect all event streams.
Unlike :meth:`shutdown`, the worker threads are still running and the
:class:`CallbackScheduler` can still be used.
"""
with self._event_streams_lock:
streams = self._event_streams
self._callbacks.clear()
self._event_streams = {}
for stream, meta in streams.items():
meta.unsub_and_cancel(stream)
[docs]
def allocate_queue(self, *, queue_size: int = 8) -> Queue:
"""
Allocate an event queue.
:param queue_size: Size of the queue. When full old entries will be discarded
to make space for the new.
:return: The allocated queue.
"""
if not self._threads:
raise BrokenCallbackSchedulerError()
return self._queues.allocate(self._active_heap, queue_size)
@typing.overload
def get_queue(
self,
device_trl: str,
event_type: tango.EventType,
/,
) -> Queue: ...
@typing.overload
def get_queue(
self,
device_trl: str,
attr: str,
event_type: tango.EventType,
/,
) -> Queue: ...
@typing.overload
def get_queue(
self,
device: tango.DeviceProxy,
event_type: tango.EventType,
/,
) -> Queue: ...
@typing.overload
def get_queue(
self,
device: tango.DeviceProxy,
attr: str,
event_type: tango.EventType,
/,
) -> Queue: ...
[docs]
def get_queue(self, *args: typing.Any) -> Queue:
"""
Return the queue used by a particular event stream.
:param trl: The Tango resource locator of the origin of the event stream.
:param event_type: The type of the event stream.
:return: ID of the queue.
"""
if not self._threads:
raise BrokenCallbackSchedulerError()
stream, _ = _resolve_stream_args(*args)
try:
meta = self._event_streams[stream]
return meta.queue
except KeyError:
raise KeyError(f"Unknown event stream {stream!r}")
def _get_proxy(self, trl: str) -> tango.DeviceProxy | _PendingProxy:
"""If _PendingProxy is returned it will be locked."""
with self._proxies_lock:
if trl not in self._proxies:
try:
result = tango.DeviceProxy(trl)
self._proxies[trl] = result
return result
except tango.DevFailed as ex:
event = _synthesise_error_event(ex)
pending = _PendingProxy(trl, event)
pending.lock.acquire()
def on_proxy(fut: _ProxyFuture) -> None:
with self._proxies_lock:
self._proxies[trl] = fut.result()
pending.future.add_done_callback(on_proxy)
self._proxies[trl] = pending
self._proxy_creator.add_pending(pending)
# Do not release pending.lock here, that is up to the
# caller.
return pending
result = self._proxies[trl]
if isinstance(result, _PendingProxy):
result.lock.acquire()
return result
def _ensure_connected(
self,
stream: _EventStream,
queue_factory: typing.Callable[[], Queue] | None,
fut: _ConnectionFuture,
) -> None:
"""
Ensure an event stream is connected.
To maximise concurrency and avoid deadlocks this method ensures
the following:
1. We release the `_event_streams_lock` ASAP to allow other threads
to access other event streams.
2. We grab the `_EventStreamData.lock` before we release the
`_event_stream_lock` so that we can mark the data stream as in use before
it can be deleted.
3. We do not call "user code" while holding a lock as this may lead to a
deadlock.
`fut` will be set with the `_EventStreamData` object and a list of
finalisers while holding the `_EventStreamData.lock` or an exception if
the event stream is disconnected while the connection is still pending.
After this the `_EventStreamData` will be checked to see if it should be
deleted, so the `fut` should have a done callback to mark the data as in
use when required. After this check, the `finalisers` will be called while
not holding the `_EventStreamData.lock`. These should be used to set the result
of user `Future`s.
"""
if not self._threads:
raise BrokenCallbackSchedulerError()
fut.set_running_or_notify_cancel() # Mark as uncancelable
# If true, we need to do a Tango event subscription to connect
# this event stream.
sub_required = False
with self._event_streams_lock:
if stream in self._event_streams:
meta = self._event_streams[stream]
else:
if queue_factory is None:
queue = self.allocate_queue()
else:
queue = queue_factory()
meta = _EventStreamMeta(queue)
self._event_streams[stream] = meta
sub_required = True
meta.lock.acquire()
finalisers: list[typing.Callable[[], None]] = []
try:
if meta.pending_futures is not None:
meta.pending_futures.append(fut)
else:
fut.set_result((meta, finalisers))
finally:
meta.lock.release()
for cb in finalisers:
cb()
if not sub_required:
return
connected = False
def on_event_data(
event: tango.EventData,
metaref: weakref.ref[_EventStreamMeta] = weakref.ref(meta),
) -> None:
# Use a weakref to avoid this callback keeping the Meta alive.
meta = metaref()
if meta is None:
return
with meta.lock:
meta.last_event = event
if meta.callbacks:
queue = self._queues.mapping[meta.queue]
queue.put(_Invocation(event, meta.callbacks))
# On initial connection (successful or otherwise) inform all waiting
# _ConnectionFutures so the done callback can update the meta with
# either a EventCallbackType or marking it as permanent.
nonlocal connected
if not connected:
# Callbacks to update user futures _after_ we have resolved
# the connection.
finalisers: list[typing.Callable[[], None]] = []
with meta.lock:
futs = meta.pending_futures
meta.pending_futures = None
assert futs is not None
for f in futs:
# This calls a done callback as we always add the
# callback to the _ConnectionFuture before we pass it
# here.
f.set_result((meta, finalisers))
# If all the user futures were cancelled, this event stream might
# need to be disconnected as no callbacks were added and it wasn't
# marked permanent.
with self._event_streams_lock:
with meta.lock:
if meta.should_delete():
del self._event_streams[stream]
if meta.should_delete():
meta.unsub_and_cancel(stream)
connected = True
for cb in finalisers:
cb()
def start_subscription(
self: CallbackScheduler,
stream: _EventStream,
meta: _EventStreamMeta,
) -> None:
try:
args = stream.sub_args
assert isinstance(meta.proxy, tango.DeviceProxy)
# COMPAT(pytango < 10.1): sub_mode was added in PyTango 10.1
if version.parse(tango.__version__) < version.parse("10.1.0"):
meta.sub_id = meta.proxy.subscribe_event(
*args, on_event_data, stateless=True
)
else:
meta.sub_id = meta.proxy.subscribe_event(
*args,
on_event_data,
sub_mode=tango.EventSubMode.AsyncRead,
)
except Exception:
with self._event_streams_lock:
with meta.lock:
try:
del self._event_streams[stream]
except KeyError:
pass
meta.unsub_and_cancel(stream)
self._logger.exception(
f"Failed to start event subscription for {stream}"
)
meta.proxy = self._get_proxy(stream.device_trl)
if isinstance(meta.proxy, tango.DeviceProxy):
start_subscription(self, stream, meta)
else:
def on_proxy(
fut: _ProxyFuture,
selfref: weakref.ref[CallbackScheduler] = weakref.ref(self),
stream: _EventStream = stream,
metaref: weakref.ref[_EventStreamMeta] = weakref.ref(meta),
) -> None:
self = selfref()
if self is None:
return
meta = metaref()
if meta is None:
return
meta.proxy = fut.result()
start_subscription(self, stream, meta)
meta.proxy.future.add_done_callback(on_proxy)
meta.proxy.error_callbacks.append(on_event_data)
on_event_data(meta.proxy.last_error)
meta.proxy.lock.release()
def _begin_shutdown(self) -> list[threading.Thread]:
threads = list(self._threads)
self._threads.clear()
self.disconnect_all()
for _ in threads:
self._active_heap.submit(_NULL_QUEUE)
proxy_creator_thread = self._proxy_creator.begin_shutdown()
if proxy_creator_thread is not None:
threads.append(proxy_creator_thread)
return threads
def _wait_shutdown(self, threads: list[threading.Thread]) -> None:
for t in threads:
t.join()
class _ActiveHeap:
def __init__(self) -> None:
self.work_available = threading.Condition()
self.heap: list[_InvocationQueue] = []
self.ticket_counter = itertools.count(1).__next__
def submit(self, queue: _InvocationQueue) -> None:
# queue lock must be held
with self.work_available:
if queue is not _NULL_QUEUE:
queue.active = True
queue.ticket = self.ticket_counter()
heapq.heappush(self.heap, queue)
self.work_available.notify()
def next_queue(self, current: _InvocationQueue) -> _InvocationQueue:
if current is not _NULL_QUEUE:
with current.mutex:
if current.items:
with self.work_available:
current.ticket = self.ticket_counter()
return heapq.heappushpop(self.heap, current)
current.active = False
del current
with self.work_available:
while not self.heap:
# Reset the ticket counter while nothing is going on to avoid
# the ticket numbers getting too large
self.ticket_counter = itertools.count(1).__next__
self.work_available.wait()
return heapq.heappop(self.heap)
@dataclass(slots=True)
class _Invocation:
event: type_hints.EventDataType
callbacks: list[type_hints.EventCallbackType]
class _InvocationQueue:
"""
A thread-safe, bounded queue that never blocks.
Unlike queue.SimpleQueue, this queue is bounded.
Unlike queue.Queue, this queue will discard the oldest data to
make room for new data.
The queue will add itself to it's ``_ActiveHeap`` when it has data.
"""
def __init__(self, active_heap: _ActiveHeap, max_size: int) -> None:
self.items = collections.deque[_Invocation]()
self.active_heap = active_heap
self.max_size = max_size
self.active = False
# Lower priority is higher precedence, 0.0 is the highest
# allowed precedence
self.priority = 0.0
# Unique for all active queues. Used as a tie breaker to ensure
# round-robin.
self.ticket = 0
# Must be held for all operations.
self.mutex = threading.Lock()
def put(self, item: _Invocation) -> None:
"""
Put an item into the queue.
If the queue is full, the first (oldest) item will be removed to make room.
:param item: Item to insert
"""
with self.mutex:
# This should only ever run once, but we defensively use
# a while loop to make sure we make enough room.
while len(self.items) >= self.max_size:
self.items.popleft()
self.items.append(item)
if not self.active:
self.active_heap.submit(self)
def get(self) -> _Invocation | None:
"""Get an item from the front of the queue."""
with self.mutex:
if len(self.items):
return self.items.popleft()
return None
def __lt__(self, other: _InvocationQueue) -> bool:
return (self.priority, self.ticket) < (other.priority, other.ticket)
_NULL_QUEUE = _InvocationQueue(typing.cast(_ActiveHeap, None), 0)
@dataclass(init=True, unsafe_hash=True, eq=True, slots=True, frozen=True)
class _EventStream:
device_trl: str
attr: str | None
event_type: tango.EventType
@property
def sub_args(self) -> tuple[typing.Any, ...]:
if self.attr is None:
return (self.event_type,)
return (self.attr, self.event_type)
_T = typing.TypeVar("_T")
def _resolve_stream_args(
device_proxy_or_trl: tango.DeviceProxy | str,
attr_or_type: str | tango.EventType,
event_type: tango.EventType | _T,
arg: _T | None = None,
) -> tuple[_EventStream, _T]:
if isinstance(attr_or_type, str):
attr = attr_or_type
else:
attr = None
event_type = attr_or_type
arg = typing.cast(_T, event_type)
if isinstance(device_proxy_or_trl, str):
if not device_proxy_or_trl.startswith("tango://"):
db = tango.Database()
db_host = db.get_db_host()
db_port = db.get_db_port()
device_trl = f"tango://{db_host}:{db_port}/{device_proxy_or_trl}"
else:
device_trl = device_proxy_or_trl
else:
db_port = device_proxy_or_trl.get_db_port()
dev_name = device_proxy_or_trl.dev_name()
if db_port == "Unused":
dev_host = device_proxy_or_trl.get_dev_host()
dev_port = device_proxy_or_trl.get_dev_port()
device_trl = f"tango://{dev_host}:{dev_port}/{dev_name}#dbase=no"
else:
db_host = device_proxy_or_trl.get_db_host()
device_trl = f"tango://{db_host}:{db_port}/{dev_name}"
return _EventStream(device_trl, attr, event_type), typing.cast(_T, arg)
@dataclass
class _EventStreamMeta:
# Queue to add subscriptions to.
queue: Queue
# Lock invariants:
# - (callbacks or not temporary) or the meta is not in the dictionary
# - pending_futures is None and last_event is not None or the
# event stream is still connecting
# - if sub_id is not None, then there is an active Tango subscription
lock: threading.Lock = field(default_factory=threading.Lock)
# Proxy used to connect with.
# Invariant: If proxy is None then so is sub_id.
proxy: tango.DeviceProxy | _PendingProxy | None = None
# The Tango subscription ID for this event stream.
# Invariant: If sub_id is None then so is proxy.
sub_id: int | None = None
# The most recent event received for the event stream. This is used to
# provide the initial call when we register a callback.
last_event: typing.Any = None
# A possibly empty list of callbacks to call whenever an event is received.
# This list must be treated as CoW as the existing list might be reference
# by an _Invocation in some _InvocationQueue.
callbacks: list[type_hints.EventCallbackType] = field(default_factory=list)
# Futures to be notified when the connection is established. Each future
# must have a done callback to mark the meta as "in use" when called. If
# None then the meta has already established the connection.
pending_futures: list[_ConnectionFuture] | None = field(default_factory=list)
# If True then the meta will be deleted when there are no remaining
# subscriptions.
# Invariant: if temporary is True then callbacks is not empty
temporary: bool = True
def should_delete(self) -> bool:
return self.temporary and not self.callbacks
def unsub_and_cancel(self, stream: _EventStream) -> None:
if self.sub_id is not None and isinstance(self.proxy, tango.DeviceProxy):
self.proxy.unsubscribe_event(self.sub_id)
self.proxy = None
self.sub_id = None
if self.pending_futures is not None:
for fut in self.pending_futures:
fut.set_exception(RuntimeError(f"{stream!r} has been disconnected"))
_DECAY_PERIOD = 10
_DECAY_FACTOR = 5 / 8
@dataclass
class _QueueSet:
mapping: weakref.WeakKeyDictionary[Queue, _InvocationQueue] = field(
default_factory=weakref.WeakKeyDictionary
)
# Lock invariant: Only one thread is either iterating or adding to
# mapping at a time.
lock: threading.Lock = field(default_factory=threading.Lock)
next_decay: float = field(default_factory=lambda: time.monotonic() + _DECAY_PERIOD)
def allocate(self, active_heap: _ActiveHeap, size: int) -> Queue:
with self.lock:
qid = Queue()
self.mapping[qid] = _InvocationQueue(active_heap, size)
return qid
def decay_priorities(self) -> None:
with self.lock:
now = time.monotonic()
while now > self.next_decay:
self.next_decay += _DECAY_PERIOD
for q in self.mapping.values():
q.priority *= _DECAY_FACTOR
_ProxyFuture: typing.TypeAlias = _futs.Future[tango.DeviceProxy]
_ConnectionFuture: typing.TypeAlias = _futs.Future[
tuple[_EventStreamMeta, list[typing.Callable[[], None]]]
]
def _worker(
queues: _QueueSet,
active_heap: _ActiveHeap,
name: str,
logger: logging.Logger,
) -> None:
with tango.EnsureOmniThread():
queue = _NULL_QUEUE
while True:
queues.decay_priorities()
queue = active_heap.next_queue(queue)
if queue is _NULL_QUEUE:
return
item = queue.get()
if item is not None:
for cb in item.callbacks:
try:
if callable(cb):
cb(item.event)
else:
cb.push_event(item.event)
except Exception:
logger.exception(
f"{name}: Callback {cb} raised an exception "
f"for {item.event}. Continuing."
)
queue.priority += 1
@dataclass
class _PendingProxy:
# TRL to create a proxy for
trl: str
# The error we received the last time we attempted to create the proxy
last_error: tango.EventData
# How many buckets to delay this proxy for next time it fails
delay: int = 1
# Lock invariants:
# - All error_callbacks have been called with last_event
# - All error_callbacks have a corresponding done callback on the future
lock: threading.Lock = field(default_factory=threading.Lock)
# Future to be set when the proxy is available
future: _ProxyFuture = field(default_factory=_ProxyFuture)
# Callbacks to call every time we fail to create the proxy
error_callbacks: list[typing.Callable[[tango.EventData], None]] = field(
default_factory=list
)
def _synthesise_error_event(ex: tango.DevFailed) -> tango.EventData:
event = tango.EventData()
event.err = True
event.errors = ex
# COMPAT(pytango < 10.1): event_reason was added in PyTango 10.1
if hasattr(event, "event_reason"):
event.event_reason = tango.EventReason.SubFail
event.reception_date = tango.TimeVal.now()
return event
class _ProxyCreator:
"""
A class to manage retrying creating device proxies.
Clients can hand off _PendingProxy objects to :meth:`add_pending` to have them
created asynchronously. A background thread is spawned as required and the
thread will automatically retire after RETIRE_TIMEOUT seconds of inactivity.
The creator assumes that creating the proxy has recently failed and will not
try again for BUCKET_DELAY seconds. The delays between retries grow
exponentially with a maximum delay of (BUCKET_COUNT - 1) * BUCKET_DELAY = 10
seconds.
"""
# How long to wait between processing consecutive buckets
BUCKET_DELAY = 0.1
# How many buckets we have
BUCKET_COUNT = 101
# How long we wait with nothing to do before we retire the thread
RETIRE_TIMEOUT = 60
def __init__(self, logger: logging.Logger) -> None:
self.logger = logger
# List to collect new pending proxies from other threads. If None
# then we have signaled to the thread to stop or the thread is retired.
self.incoming: list[_PendingProxy] | None = None
# Condition variable to signal updates to incoming.
# Forbidden locks: thread_lock (thread_lock must be acquired first)
self.incoming_cond = threading.Condition()
# A sparse ring buffer of pending proxies to process at specified times.
# There can be at most BUCKET_COUNT lists here and each key satisfies
# 0 <= key < BUCKET_COUNT.
# The key corresponds to when the list needs to be processed relative
# to the current slot being processed.
self.buckets: dict[int, list[_PendingProxy]] = {}
self.thread: threading.Thread | None = None
# Lock invariants:
# - incoming is not None and thread.is_alive()
# - incoming is None and thread is not None and thread.join()
# will return
self.thread_lock = threading.Lock()
def add_pending(self, proxy: _PendingProxy) -> None:
self.logger.debug(
'Adding incoming proxy "%s"',
proxy.trl,
)
with self.thread_lock, self.incoming_cond:
if self.thread is None or self.incoming is None:
if self.thread is not None:
# This is safe because:
# - Another thread cannot be in the middle of shutdown()
# because we hold the thread_lock => the worker thread set
# incoming = None.
# - The work thread must have returned because we are holding
# incoming_cond, i.e. the worker thread isn't going to
# acquire incoming_cond again and we can join() it.
self.thread.join()
self.thread = threading.Thread(
target=self._connect_proxies,
)
self.incoming = []
self.thread.start()
self.incoming.append(proxy)
self.incoming_cond.notify()
def begin_shutdown(self) -> threading.Thread | None:
result = None
with self.thread_lock:
if self.thread is not None:
with self.incoming_cond:
self.incoming = None
self.incoming_cond.notify()
# We must release the incoming_cond here so that the
# worker thread can grab it to check incoming.
result = self.thread
self.thread = None
return result
def _connect_proxies(self) -> None:
with tango.EnsureOmniThread():
current_slot = 0
while True:
# Wait for pending proxies or retire
with self.incoming_cond:
def pred() -> bool:
# we are busy, or
# we have new stuff to do, or
# we have been asked to stop
return bool(
self.buckets or self.incoming or self.incoming is None
)
if not self.incoming_cond.wait_for(pred, self.RETIRE_TIMEOUT):
self.incoming = None
return
if self.incoming is None:
return
incoming = self.incoming
self.incoming = []
# Schedule new pending proxies for the next bucket
for pending in incoming:
self._add_to_bucket(current_slot, pending)
# Process the current bucket
current_bucket = self.buckets.pop(current_slot, [])
retry = []
for pending in current_bucket:
self.logger.debug(
'Processing pending proxy "%s" for current bucket %d',
pending.trl,
current_slot,
)
with pending.lock:
try:
proxy = tango.DeviceProxy(pending.trl)
except tango.DevFailed as ex:
self.logger.debug(
'Connection to pending proxy "%s" failed',
pending.trl,
)
retry.append(pending)
event = _synthesise_error_event(ex)
for cb in pending.error_callbacks:
try:
cb(event)
except Exception:
self.logger.exception(
"Unexpected exception while calling error "
f'callbacks for "{pending.trl}"'
)
continue
except Exception as ex:
self.logger.debug(
"Non-Tango exception while creating DeviceProxy "
f'to "{pending.trl}". Discarding pending proxy.',
exc_info=True,
)
pending.future.set_exception(ex)
self.logger.debug(
'Connection to pending proxy "%s" succeeded',
pending.trl,
)
try:
pending.future.set_result(proxy)
except Exception:
self.logger.exception(
"Unexpected exception while setting proxy future "
f'for "{pending.trl}". Discarding pending proxy.'
)
# Re-schedule failures
for pending in retry:
# We cap this to self.BUCKET_COUNT - 1 so that we don't put
# this back in the same bucket. This way we know when we
# sleep later that the current bucket is always empty,
# meaning we are free to be interrupted in less than BUCKET_DELAY
# seconds and not have pending proxies skip the queue.
pending.delay = min(2 * pending.delay, self.BUCKET_COUNT - 1)
self._add_to_bucket(current_slot, pending)
# Wait until the next bucket, if any.
next_slot = min(
(k for k in self.buckets.keys() if k > current_slot), default=None
)
if next_slot is None:
next_slot = min((k for k in self.buckets.keys()), default=None)
if next_slot is not None:
target_slot_delta = (next_slot - current_slot) % self.BUCKET_COUNT
sleep_time = target_slot_delta * self.BUCKET_DELAY
self.logger.debug(
"Sleeping for %.2fs to wait for bucket %d",
sleep_time,
next_slot,
)
start = time.monotonic()
with self.incoming_cond:
self.incoming_cond.wait(timeout=sleep_time)
if self.incoming is None:
return
# Work out how many buckets we actually slept for.
slept_for = time.monotonic() - start
slot_delta = min(
int(slept_for / self.BUCKET_DELAY), target_slot_delta
)
current_slot = (current_slot + slot_delta) % self.BUCKET_COUNT
self.logger.debug(
"Slept for %.2fs, now at bucket %d",
slept_for,
current_slot,
)
else:
self.logger.debug("All buckets empty")
def _add_to_bucket(self, current_slot: int, pending: _PendingProxy) -> None:
slot = (current_slot + pending.delay) % self.BUCKET_COUNT
self.logger.debug(
'Adding pending proxy "%s" to bucket %d (delay=%d)',
pending.trl,
slot,
pending.delay,
)
bucket = self.buckets.setdefault(slot, [])
bucket.append(pending)