How to use the CallbackScheduler

Event-driven Tango clients need to take care to ensure that the application processes events promptly so that each event gets serviced in a timely manner. Unfortunately, the built-in Tango event consumer is a resource that is shared by the entire process, meaning that reasoning about Tango event processing requires global information about the entire process. The CallbackScheduler is an implementation of the recommendations in TBP 001 to allow developers to create robust event-driven Tango clients.

Registering a callback

The CallbackScheduler.register_event_callback() method can be used as a replacement for tango.DeviceProxy.subscribe_event(). For PyTango 10.1 or later the Tango subscription happens asynchronously and the returned future can be waited on, blocking until the subscription has been connected. For earlier versions of PyTango the subscription is synchronous and the future has a filled result when it is returned. Related subscriptions should use the same CallbackScheduler object so that the scheduler can prioritise between the event streams. For example:

import tango
import ska_tango_base as stb

dp = tango.DeviceProxy("foo/bar/1")

sched = stb.CallbackScheduler()

futures = []
futures.append(sched.register_event_callback(dp, "myAttr1", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback()))
futures.append(sched.register_event_callback(dp, "myAttr2", tango.EventType.ARCHIVE_EVENT, tango.utils.EventCallback()))

callback_ids = []
for f in futures:
    callback_ids.append(f.result())

# Callbacks are registered now and will receive events
# ...

for cid in callback_ids:
    sched.unregister_callback(cid)

Tip

If you don’t want to unregister the callbacks individually then you don’t need to keep the return CallbackID around.

Controlling the event queues

When using the CallbackScheduler Tango events are added to internal, bounded queues which are prioritised by worker threads to ensure that each event stream gets serviced in a timely manner. This has two consequences:

  1. Events from different event streams can be processed in any order, regardless of the order they are received.

  2. If callbacks for a given event stream cannot keep up, then old events are discarded from the queue to make room for new events. The default size of the queue is reasonably small as for most attributes it is more important to process fresh data than to keep up with every transition.

Both of these consequences can be mitigated using the queue_factory parameter of CallbackScheduler.register_event_callback(). If the order of events between event streams is important then a queue_factory using CallbackScheduler.get_queue() can be used to use the same queue for both event streams. Similarly, a queue_factory using CallbackScheduler.allocate_queue() can be used to increase the size of a queue. For example:

import tango
import ska_tango_base as stb

dp = tango.DeviceProxy("foo/bar/1")

sched = stb.CallbackScheduler()

sched.register_event_callback(dp, "myAttr1", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback(), queue_factory=lambda: sched.allocate_queue(queue_size=8192))
sched.register_event_callback(dp, "myAttr2", tango.EventType.ARCHIVE_EVENT, tango.utils.EventCallback(), queue_factory=lambda: sched.get_queue(dp, "myAttr1", tango.EventType.CHANGE_EVENT))

Warning

The queue_factory will only be used if the event stream is not already connected, i.e. this is the first callback to be registered with this event stream.

Invoking a long running command

A CallbackScheduler can be passed to invoke_lrc() to use for event callback scheduling. This ensures that slow processing of the long running command result does not interfere with other event processing in the Tango client. When a CallbackScheduler is not provided a module level callback scheduler is used instead.

For example:

import tango
import threading
import ska_tango_base as stb

done = threading.Event()
def lrc_callback(**kwargs):
    print(kwargs)
    if 'result' in kwargs:
        done.set()

dp = tango.DeviceProxy("foo/bar/1")

sched = stb.CallbackScheduler()

subs = stb.long_running_commands.invoke_lrc(lrc_callback, dp, "MyLrc", callback_scheduler=sched)

done.wait()

subs.unsubscribe_lrc_events()

Note

LRCSubscriptionsProtocol.unsubscribe_lrc_events() no longer unsubscribes from the Tango event callback, but instead unregisters callbacks with the CallbackScheduler. This name is from earlier versions of ska-tango-base where invoke_lrc() made Tango event subscriptions directly.

Connecting an event stream

Whenever a callback is unregistered and there are no more callbacks associated with an event stream, the event stream is disconnected. For short lived callbacks (such as those typically used by long running commands) repeatedly disconnecting and reconnecting an event stream can interrupt all Tango event callbacks, as Tango will not process events while a call to tango.DeviceProxy.subscribe_event() or tango.DeviceProxy.unsubscribe_event() is in flight.

To avoid subscription thrashing like this the CallbackScheduler.connect_event_stream() method can be used to persist the Tango event subscription even when no callbacks are registered. The subscription will remain until CallbackScheduler.disconnect_event_stream() is called. For example:

import tango
import ska_tango_base as stb

dp = tango.DeviceProxy("foo/bar/1")

sched = stb.CallbackScheduler()

fut = sched.connect_event_stream(dp, "myAttr1", tango.EventType.CHANGE_EVENT)

# Wait for connect to complete
fut.result()

# The event stream is still connected so this just modifies client-side data structures and
# does not block
cid = sched.register_event_callback(dp, "myAttr1", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback()).result()

# ...

sched.unregister_callback(cid)

# We still don't block here as the event stream is still connected
cid = sched.register_event_callback(dp, "myAttr1", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback()).result()

# ...

sched.unregister_callback(cid)

Warning

While the event stream is connected but there are no callbacks, data is sent between the server and client only to be discarded by the client. This is wasteful so care must be taken when using this feature to ensure that event streams are not connected for longer than necessary.

The event streams used by the long running command protocol can be connected in a similar manner using the connect_lrc_interface() and disconnect_lrc_interface() functions. For example,

import tango
import threading
import ska_tango_base as stb

done = threading.Event()
def lrc_callback(**kwargs):
    print(kwargs)
    if 'result' in kwargs:
        done.set()

dp = tango.DeviceProxy("foo/bar/1")

sched = stb.CallbackScheduler()

stb.long_running_commands.connect_lrc_interface(dp, sched)

subs = stb.long_running_commands.invoke_lrc(lrc_callback, dp, "MyLrc", callback_scheduler=sched)

done.wait()

subs.unsubscribe_lrc_events()

done.clear()

# ...

# Does not need to re-connect to the LRC event streams and so does not
# interrupt Tango event processing
subs = stb.long_running_commands.invoke_lrc(lrc_callback, dp, "MyLrc", callback_scheduler=sched)

done.wait()

subs.unsubscribe_lrc_events()

stb.long_running_commands.disconnect_lrc_interface(dp, sched)

Slow callbacks

The Tango event system requires that all Tango event callbacks are “quick” so that they do not interfere with other callbacks. The CallbackScheduler relaxes this requirement allowing callbacks to take as long as they’d like provided that they are, on average, faster than the average rate that events are received. When using a slow callback such as this it is recommended to create a scheduler with multiple threads. This will allow other events to be processed by the scheduler while the slow callback is in progress. You will typically want one more thread than the number of slow callbacks that can be executing at the same time. For example:

import tango
import time
import ska_tango_base as stb

dp = tango.DeviceProxy("foo/bar/1")

def my_slow_callback(event: stb.type_hints.EventDataType) -> None:
    _ = event
    time.sleep(1)

sched = stb.CallbackScheduler(thread_count=2)

sched.register_event_callback(dp, "myAttr1", tango.EventType.CHANGE_EVENT, my_slow_callback, queue_factory=lambda: sched.allocate_queue(queue_size=512))

# Still serviced while events from "myAttr1" are processed
sched.register_event_callback(dp, "myAttr2", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback())

Tip

When using a slow callback you may want to use the queue_factory to increase the size of the queue to be able to store bursts of events for later processing.

Connecting to undefined devices

The DeviceProxy constructor requires that the Tango device being connected to is defined in the Tango database, otherwise it will raise a “DB_DeviceNotDefined” exception. At SKAO, this can make it difficult to start stateless Tango subscriptions at startup time as the Tango device being connected to might not have been defined in the database by the Kubernetes pod providing the device. To help with this issue, the CallbackScheduler supports subscribing to a Tango device using its TRL and will handle re-trying the creation of the DeviceProxy if this fails.

For example, the following will subscribe to change events from the foo/bar/1/myAttr1 attribute as soon as it is available, even if the foo/bar/1 device is not yet defined in the database.

import tango
import ska_tango_base as stb

sched = stb.CallbackScheduler()

sched.register_event_callback("foo/bar/1", "myAttr1", tango.EventType.CHANGE_EVENT, tango.utils.EventCallback())

While the device is not defined in the database the provided callback will be provided with synthetic error events with the exception raised when creating the DeviceProxy. Just like regular subscription failure the error event will have the event_reason set to SubFail.

Note

The event_reason is only available for PyTango >=10.1.0

As the “DB_DeviceNotDefined” error is expected to happen during startup, we assume it will be resolved quickly in most cases. As such the CallbackScheduler will initially retry quick before exponentially grow the delay between subsequent retries. This in contrast to how Tango handles subscription failure which is retried at a constant 10 second cadence.

Cleaning up

When a scheduler is no longer needed the CallbackScheduler.shutdown() method can be used to stop all worker threads and disconnect from all event streams. This is typically useful in delete_device methods. For example:

import ska_tango_base as stb

class MyDevice(stb.SKADevice):
    def init_device(self) -> None:
        super().init_device()
        self.scheduler = stb.CallbackScheduler()

    def delete_device(self) -> None:
        self.scheduler.shutdown()
        super().delete_device()

Warning

CallbackScheduler.shutdown() joins the worker threads calling your callbacks. If any of your callbacks grab the device monitor lock then the above example may deadlock as the device monitor lock is held by delete_device. You can use the allow_internal_threads() context manager to handle this case if required.