================================ How to use the CallbackScheduler ================================ Event-driven Tango clients need to take care to ensure that the application processes events promptly so that each event gets serviced in a timely manner. Unfortunately, the built-in Tango event consumer is a resource that is shared by the entire process, meaning that reasoning about Tango event processing requires global information about the entire process. The :class:`~.CallbackScheduler` is an implementation of the recommendations in `TBP 001 `_ to allow developers to create robust event-driven Tango clients. Registering a callback ====================== The :meth:`.CallbackScheduler.register_event_callback` method can be used as a replacement for :meth:`tango.DeviceProxy.subscribe_event`. For PyTango 10.1 or later the Tango subscription happens asynchronously and the returned future can be waited on, blocking until the subscription has been connected. For earlier versions of PyTango the subscription is synchronous and the future has a filled result when it is returned. Related subscriptions should use the same :class:`.CallbackScheduler` object so that the scheduler can prioritise between the event streams. For example: .. code:: python import tango import ska_tango_base as stb dp = tango.DeviceProxy("foo/bar/1") sched = stb.CallbackScheduler() futures = [] futures.append(sched.register_event_callback(dp, "myAttr1", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback())) futures.append(sched.register_event_callback(dp, "myAttr2", tango.EventType.ARCHIVE_EVENT, tango.utils.EventCallback())) callback_ids = [] for f in futures: callback_ids.append(f.result()) # Callbacks are registered now and will receive events # ... for cid in callback_ids: sched.unregister_callback(cid) .. tip:: If you don't want to unregister the callbacks individually then you don't need to keep the return :class:`.CallbackID` around. Controlling the event queues ---------------------------- When using the :class:`.CallbackScheduler` Tango events are added to internal, bounded queues which are prioritised by worker threads to ensure that each event stream gets serviced in a timely manner. This has two consequences: 1. Events from different event streams can be processed in any order, regardless of the order they are received. 2. If callbacks for a given event stream cannot keep up, then old events are discarded from the queue to make room for new events. The default size of the queue is reasonably small as for most attributes it is more important to process fresh data than to keep up with every transition. Both of these consequences can be mitigated using the ``queue_factory`` parameter of :meth:`.CallbackScheduler.register_event_callback`. If the order of events between event streams is important then a ``queue_factory`` using :meth:`.CallbackScheduler.get_queue` can be used to use the same queue for both event streams. Similarly, a ``queue_factory`` using :meth:`.CallbackScheduler.allocate_queue` can be used to increase the size of a queue. For example: .. code:: python import tango import ska_tango_base as stb dp = tango.DeviceProxy("foo/bar/1") sched = stb.CallbackScheduler() sched.register_event_callback(dp, "myAttr1", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback(), queue_factory=lambda: sched.allocate_queue(queue_size=8192)) sched.register_event_callback(dp, "myAttr2", tango.EventType.ARCHIVE_EVENT, tango.utils.EventCallback(), queue_factory=lambda: sched.get_queue(dp, "myAttr1", tango.EventType.CHANGE_EVENT)) .. warning:: The ``queue_factory`` will only be used if the event stream is not already connected, i.e. this is the first callback to be registered with this event stream. Invoking a long running command =============================== A :class:`.CallbackScheduler` can be passed to :func:`.invoke_lrc` to use for event callback scheduling. This ensures that slow processing of the long running command result does not interfere with other event processing in the Tango client. When a :class:`.CallbackScheduler` is not provided a module level callback scheduler is used instead. For example: .. code:: python import tango import threading import ska_tango_base as stb done = threading.Event() def lrc_callback(**kwargs): print(kwargs) if 'result' in kwargs: done.set() dp = tango.DeviceProxy("foo/bar/1") sched = stb.CallbackScheduler() subs = stb.long_running_commands.invoke_lrc(lrc_callback, dp, "MyLrc", callback_scheduler=sched) done.wait() subs.unsubscribe_lrc_events() .. note:: :meth:`.LRCSubscriptionsProtocol.unsubscribe_lrc_events()` no longer unsubscribes from the Tango event callback, but instead unregisters callbacks with the :class:`.CallbackScheduler`. This name is from earlier versions of ska-tango-base where :func:`.invoke_lrc` made Tango event subscriptions directly. Connecting an event stream ========================== Whenever a callback is unregistered and there are no more callbacks associated with an event stream, the event stream is disconnected. For short lived callbacks (such as those typically used by long running commands) repeatedly disconnecting and reconnecting an event stream can interrupt all Tango event callbacks, as Tango will not process events while a call to :meth:`tango.DeviceProxy.subscribe_event` or :meth:`tango.DeviceProxy.unsubscribe_event` is in flight. To avoid subscription thrashing like this the :meth:`.CallbackScheduler.connect_event_stream` method can be used to persist the Tango event subscription even when no callbacks are registered. The subscription will remain until :meth:`.CallbackScheduler.disconnect_event_stream` is called. For example: .. code:: python import tango import ska_tango_base as stb dp = tango.DeviceProxy("foo/bar/1") sched = stb.CallbackScheduler() fut = sched.connect_event_stream(dp, "myAttr1", tango.EventType.CHANGE_EVENT) # Wait for connect to complete fut.result() # The event stream is still connected so this just modifies client-side data structures and # does not block cid = sched.register_event_callback(dp, "myAttr1", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback()).result() # ... sched.unregister_callback(cid) # We still don't block here as the event stream is still connected cid = sched.register_event_callback(dp, "myAttr1", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback()).result() # ... sched.unregister_callback(cid) .. warning:: While the event stream is connected but there are no callbacks, data is sent between the server and client only to be discarded by the client. This is wasteful so care must be taken when using this feature to ensure that event streams are not connected for longer than necessary. The event streams used by the long running command protocol can be connected in a similar manner using the :func:`.connect_lrc_interface` and :func:`.disconnect_lrc_interface` functions. For example, .. code:: python import tango import threading import ska_tango_base as stb done = threading.Event() def lrc_callback(**kwargs): print(kwargs) if 'result' in kwargs: done.set() dp = tango.DeviceProxy("foo/bar/1") sched = stb.CallbackScheduler() stb.long_running_commands.connect_lrc_interface(dp, sched) subs = stb.long_running_commands.invoke_lrc(lrc_callback, dp, "MyLrc", callback_scheduler=sched) done.wait() subs.unsubscribe_lrc_events() done.clear() # ... # Does not need to re-connect to the LRC event streams and so does not # interrupt Tango event processing subs = stb.long_running_commands.invoke_lrc(lrc_callback, dp, "MyLrc", callback_scheduler=sched) done.wait() subs.unsubscribe_lrc_events() stb.long_running_commands.disconnect_lrc_interface(dp, sched) Slow callbacks ============== The Tango event system requires that all Tango event callbacks are "quick" so that they do not interfere with other callbacks. The :class:`.CallbackScheduler` relaxes this requirement allowing callbacks to take as long as they'd like provided that they are, on average, faster than the average rate that events are received. When using a slow callback such as this it is recommended to create a scheduler with multiple threads. This will allow other events to be processed by the scheduler while the slow callback is in progress. You will typically want one more thread than the number of slow callbacks that can be executing at the same time. For example: .. code:: python import tango import time import ska_tango_base as stb dp = tango.DeviceProxy("foo/bar/1") def my_slow_callback(event: stb.type_hints.EventDataType) -> None: _ = event time.sleep(1) sched = stb.CallbackScheduler(thread_count=2) sched.register_event_callback(dp, "myAttr1", tango.EventType.CHANGE_EVENT, my_slow_callback, queue_factory=lambda: sched.allocate_queue(queue_size=512)) # Still serviced while events from "myAttr1" are processed sched.register_event_callback(dp, "myAttr2", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback()) .. tip:: When using a slow callback you may want to use the ``queue_factory`` to increase the size of the queue to be able to store bursts of events for later processing. Connecting to undefined devices =============================== The :class:`~tango.DeviceProxy` constructor requires that the Tango device being connected to is defined in the Tango database, otherwise it will raise a "DB_DeviceNotDefined" exception. At SKAO, this can make it difficult to start stateless Tango subscriptions at startup time as the Tango device being connected to might not have been defined in the database by the Kubernetes pod providing the device. To help with this issue, the :class:`.CallbackScheduler` supports subscribing to a Tango device using its TRL and will handle re-trying the creation of the :class:`~tango.DeviceProxy` if this fails. For example, the following will subscribe to change events from the ``foo/bar/1/myAttr1`` attribute as soon as it is available, even if the ``foo/bar/1`` device is not yet defined in the database. .. code:: python import tango import ska_tango_base as stb sched = stb.CallbackScheduler() sched.register_event_callback("foo/bar/1", "myAttr1", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback()) While the device is not defined in the database the provided callback will be provided with synthetic error events with the exception raised when creating the :class:`~tango.DeviceProxy`. Just like regular subscription failure the error event will have the ``event_reason`` set to :class:`SubFail `. .. note :: The ``event_reason`` is only available for PyTango >=10.1.0 As the "DB_DeviceNotDefined" error is expected to happen during startup, we assume it will be resolved quickly in most cases. As such the :class:`.CallbackScheduler` will initially retry quick before exponentially grow the delay between subsequent retries. This in contrast to how Tango handles subscription failure which is retried at a constant 10 second cadence. Cleaning up =========== When a scheduler is no longer needed the :meth:`.CallbackScheduler.shutdown` method can be used to stop all worker threads and disconnect from all event streams. This is typically useful in ``delete_device`` methods. For example: .. code:: python import ska_tango_base as stb class MyDevice(stb.SKADevice): def init_device(self) -> None: super().init_device() self.scheduler = stb.CallbackScheduler() def delete_device(self) -> None: self.scheduler.shutdown() super().delete_device() .. warning:: :meth:`.CallbackScheduler.shutdown` joins the worker threads calling your callbacks. If any of your callbacks grab the device monitor lock then the above example may deadlock as the device monitor lock is held by ``delete_device``. You can use the :meth:`.allow_internal_threads` context manager to handle this case if required.