Callback Scheduler classes

A module for managing Tango callbacks robustly.

The aim of the CallbackScheduler class is to allow you to decouple the performance of callbacks from each other, allowing you to locally reason about the callback.

Currently CallbackScheduler only supports callbacks for Tango events but it may support SKA Long Running Command callbacks and asynchronous Tango request callbacks in the future.

class ska_tango_base.callback_scheduler.CallbackScheduler[source]

A class for managing Tango callbacks robustly.

Tango events are quickly moved from the Tango thread to internal queues where background threads call the supplied callbacks. This decouples the speed of event processing of the supplied callbacks from the rest of the Tango process.

Queues are bounded and will discard old data if the supplied callbacks cannot keep up. The queue can be configured by manually allocating it with the allocate_queue() method and then passing the returned queue to register_event_callback() or connect_event_stream(). Multiple event streams can be allocated to the same queue to ensure events from each stream are processed in order relative to each other.

Queues are processed by worker threads with each queue being assigned a priority that is lower the more events that have been processed recently. The worker threads process one event on the queue at a time, selecting a new queue to process in between each event.

__init__(*, thread_count: int = 1, name: str | None = None, logger: Logger | None = None) None[source]

Initialise the object.

Warning

If thread_count > 1 then registered callbacks for different event streams can be executed concurrently and it is the users responsibility to ensure this is safe. This is different from Tango event subscriptions where callbacks are all called from the same thread.

Parameters:
  • thread_count – Number of worker threads to spawn.

  • name – Name of the CallbackScheduler, if None a default name will be provided.

  • logger – Logger object to use, if None the module logger will be used.

shutdown() None[source]

Shutdown background threads and subscribe from all events.

register_event_callback(device_trl: str, event_type: EventType, callback: Callable[[EventData | AttrConfEventData | DataReadyEventData | DevIntrChangeEventData], None] | EventCallbackPushEventProtocol, /, initial_event: bool = True, queue_factory: Callable[[], Queue] | None = None) Future[CallbackID][source]
register_event_callback(device_trl: str, attr: str, event_type: EventType, callback: Callable[[EventData | AttrConfEventData | DataReadyEventData | DevIntrChangeEventData], None] | EventCallbackPushEventProtocol, /, initial_event: bool = True, queue_factory: Callable[[], Queue] | None = None) Future[CallbackID]
register_event_callback(device: DeviceProxy, event_type: EventType, callback: Callable[[EventData | AttrConfEventData | DataReadyEventData | DevIntrChangeEventData], None] | EventCallbackPushEventProtocol, /, initial_event: bool = True, queue_factory: Callable[[], Queue] | None = None) Future[CallbackID]
register_event_callback(device: DeviceProxy, attr: str, event_type: EventType, callback: Callable[[EventData | AttrConfEventData | DataReadyEventData | DevIntrChangeEventData], None] | EventCallbackPushEventProtocol, /, initial_event: bool = True, queue_factory: Callable[[], Queue] | None = None) Future[CallbackID]

Register a callback to an event stream.

If it is not already connected, this will create a temporary connection. A temporary connection will automatically be disconnected when there are no registered callbacks. See connect_event_stream() for details.

If unregistering independently from other callbacks is not required, the returned CallbackID and/or Future can be safely discarded.

>>> eh = CallbackScheduler()
>>> fut = eh.register_callback(
        "foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT, my_callback)
>>> cid = fut.result(timeout=10)  # Wait for connection to be established
>>> ...
>>> eh.unregister_callback(cid)

Connections to the event stream are done with a stateless Tango subscription, meaning if the subscription fails callback will be called with an error EventData object describing the reason for the failure. Tango will retry the subscription asynchronously every 10 seconds.

If constructing a DeviceProxy from the given device_trl fails, for example, because the device is not defined in the database, the CallbackScheduler will retry creating the DeviceProxy asynchronously with an exponential backoff. The provided callback will be called whenever DeviceProxy creation fails with a synthetic error EventData object describing the reason for the failure, where only the err, errors and reason (for PyTango >=10.1) are set.

Parameters:
  • device – The Tango device producing the event stream. This proxy will not be used to connect to the event stream, instead a device proxy managed by the callback scheduler will be used.

  • device_trl – The Tango resource locator of the device producing the event stream.

  • attr – The name of the attribute to connect to. Absent for INTERFACE_CHANGE_EVENT.

  • event_type – The type of the event stream.

  • callback – Callback to call with event data.

  • queue_factory – Returns a queue to use for this event stream if not yet connected.

Returns:

Future returning callback ID to unregister with.

Raises:

tango.DevFailed – If canonicalising the device_trl fails

unregister_callback(callback: CallbackID) None[source]

Unregister a callback.

If there are no more callbacks associated with the event stream, then the event stream will be disconnected unless it was marked as permanent via connect_event_stream().

Parameters:

callback – The callback to unregister.

Raises:

ValueError – If the callback ID is not known.

connect_event_stream(device_trl: str, event_type: EventType, /, queue_factory: Callable[[], Queue] | None = None) Future[None][source]
connect_event_stream(device_trl: str, attr: str, event_type: EventType, /, queue_factory: Callable[[], Queue] | None = None) Future[None]
connect_event_stream(device: DeviceProxy, event_type: EventType, /, queue_factory: Callable[[], Queue] | None = None) Future[None]
connect_event_stream(device: DeviceProxy, attr: str, event_type: EventType, /, queue_factory: Callable[[], Queue] | None = None) Future[None]

Connect to an event stream and maintain connection.

The connection is established via a Tango event subscription with a callback that will push events to the queue associated with this event stream.

Calling this function marks an event stream as “permanent”, meaning it will not get disconnected even if there are no callbacks subscribed. This can be useful to avoid multiple Tango subscription requests to the Tango device when callbacks only want to be registered temporarily.

If the connection has not already been made and queue_factory is None then a new queue will be allocated with the default arguments. Otherwise the queue_factory will be called to construct a new queue.

Events are discarded rather than added to the queue if there are no callbacks subscribed.

Connecting to the event stream is handled asynchronously and the returned future can be used to track the progress of the connection. If the connection is already active when this function is called, then the returned future will be fulfilled. Call result() to wait until the connection has completed. The future may be cancel()’d to abort the connection if it has not already been completed.

Warning

The connection is considered completed as soon as Tango calls the internal subscription callback. This may be with an error event informing us that we are unable to connect to the Tango device for some reason.

Example:

>>> eh = CallbackScheduler()
>>> fut = eh.connect_event_stream(
        "foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT)
>>> fut.result(timeout=10) # Wait for connection to be established
Parameters:
  • device – The Tango device producing the event stream.

  • device_trl – The Tango resource locator of the device producing the event stream.

  • attr – The name of the attribute to connect to. Absent for INTERFACE_CHANGE_EVENT.

  • event_type – The type of the event stream.

  • queue_factory – Returns a queue to use for this event stream if not yet connected.

Returns:

Future to track the progress of the connection.

disconnect_event_stream(device_trl: str, event_type: EventType, /) None[source]
disconnect_event_stream(device_trl: str, attr: str, event_type: EventType, /) None
disconnect_event_stream(device: DeviceProxy, event_type: EventType, /) None
disconnect_event_stream(device: DeviceProxy, attr: str, event_type: EventType, /) None

Disconnect from an event stream.

If there are callbacks associated with the event stream, then this function will just mark the event stream as “temporary” and the event stream will be disconnected when the last callback is unregistered.

Parameters:
  • device – The Tango device producing the event stream.

  • device_trl – The Tango resource locator of the device producing the event stream.

  • attr – The name of the attribute to connect to. Absent for INTERFACE_CHANGE_EVENT.

  • event_type – The type of the event stream.

disconnect_all() None[source]

Unregister all callbacks and disconnect all event streams.

Unlike shutdown(), the worker threads are still running and the CallbackScheduler can still be used.

allocate_queue(*, queue_size: int = 8) Queue[source]

Allocate an event queue.

Parameters:

queue_size – Size of the queue. When full old entries will be discarded to make space for the new.

Returns:

The allocated queue.

get_queue(device_trl: str, event_type: EventType, /) Queue[source]
get_queue(device_trl: str, attr: str, event_type: EventType, /) Queue
get_queue(device: DeviceProxy, event_type: EventType, /) Queue
get_queue(device: DeviceProxy, attr: str, event_type: EventType, /) Queue

Return the queue used by a particular event stream.

Parameters:
  • trl – The Tango resource locator of the origin of the event stream.

  • event_type – The type of the event stream.

Returns:

ID of the queue.

class ska_tango_base.callback_scheduler.CallbackID

Callback Identifier.

Used to unregister a callback. Can be discarded if unregistering is not needed.

alias of int

class ska_tango_base.callback_scheduler.Queue[source]

Opaque reference to an internal event queue.

If there are no references to the Queue then the corresponding internal event queue will be deleted. The class:CallbackScheduler will keep a reference to the Queue while the internal event queue is in use.

exception ska_tango_base.callback_scheduler.BrokenCallbackSchedulerError[source]

Raised when a CallbackScheduler has already been shutdown.

__init__() None[source]

Initialise exception message.