Callback Scheduler classes
A module for managing Tango callbacks robustly.
The aim of the CallbackScheduler class is to allow you to
decouple the performance of callbacks from each other, allowing you to locally
reason about the callback.
Currently CallbackScheduler only supports callbacks for Tango
events but it may support SKA Long Running Command callbacks and
asynchronous Tango request callbacks in the future.
- class ska_tango_base.callback_scheduler.CallbackScheduler[source]
A class for managing Tango callbacks robustly.
Tango events are quickly moved from the Tango thread to internal queues where background threads call the supplied callbacks. This decouples the speed of event processing of the supplied callbacks from the rest of the Tango process.
Queues are bounded and will discard old data if the supplied callbacks cannot keep up. The queue can be configured by manually allocating it with the
allocate_queue()method and then passing the returned queue toregister_event_callback()orconnect_event_stream(). Multiple event streams can be allocated to the same queue to ensure events from each stream are processed in order relative to each other.Queues are processed by worker threads with each queue being assigned a priority that is lower the more events that have been processed recently. The worker threads process one event on the queue at a time, selecting a new queue to process in between each event.
- __init__(*, thread_count: int = 1, name: str | None = None, logger: Logger | None = None) None[source]
Initialise the object.
Warning
If
thread_count > 1then registered callbacks for different event streams can be executed concurrently and it is the users responsibility to ensure this is safe. This is different from Tango event subscriptions where callbacks are all called from the same thread.- Parameters:
thread_count – Number of worker threads to spawn.
name – Name of the CallbackScheduler, if
Nonea default name will be provided.logger – Logger object to use, if
Nonethe module logger will be used.
- register_event_callback(device_trl: str, event_type: EventType, callback: Callable[[EventData | AttrConfEventData | DataReadyEventData | DevIntrChangeEventData], None] | EventCallbackPushEventProtocol, /, initial_event: bool = True, queue_factory: Callable[[], Queue] | None = None) Future[CallbackID][source]
- register_event_callback(device_trl: str, attr: str, event_type: EventType, callback: Callable[[EventData | AttrConfEventData | DataReadyEventData | DevIntrChangeEventData], None] | EventCallbackPushEventProtocol, /, initial_event: bool = True, queue_factory: Callable[[], Queue] | None = None) Future[CallbackID]
- register_event_callback(device: DeviceProxy, event_type: EventType, callback: Callable[[EventData | AttrConfEventData | DataReadyEventData | DevIntrChangeEventData], None] | EventCallbackPushEventProtocol, /, initial_event: bool = True, queue_factory: Callable[[], Queue] | None = None) Future[CallbackID]
- register_event_callback(device: DeviceProxy, attr: str, event_type: EventType, callback: Callable[[EventData | AttrConfEventData | DataReadyEventData | DevIntrChangeEventData], None] | EventCallbackPushEventProtocol, /, initial_event: bool = True, queue_factory: Callable[[], Queue] | None = None) Future[CallbackID]
Register a callback to an event stream.
If it is not already connected, this will create a temporary connection. A temporary connection will automatically be disconnected when there are no registered callbacks. See
connect_event_stream()for details.If unregistering independently from other callbacks is not required, the returned
CallbackIDand/orFuturecan be safely discarded.>>> eh = CallbackScheduler() >>> fut = eh.register_callback( "foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT, my_callback) >>> cid = fut.result(timeout=10) # Wait for connection to be established >>> ... >>> eh.unregister_callback(cid)
Connections to the event stream are done with a stateless Tango subscription, meaning if the subscription fails
callbackwill be called with an errorEventDataobject describing the reason for the failure. Tango will retry the subscription asynchronously every 10 seconds.If constructing a
DeviceProxyfrom the givendevice_trlfails, for example, because the device is not defined in the database, the CallbackScheduler will retry creating the DeviceProxy asynchronously with an exponential backoff. The provided callback will be called wheneverDeviceProxycreation fails with a synthetic errorEventDataobject describing the reason for the failure, where only theerr,errorsandreason(for PyTango >=10.1) are set.- Parameters:
device – The Tango device producing the event stream. This proxy will not be used to connect to the event stream, instead a device proxy managed by the callback scheduler will be used.
device_trl – The Tango resource locator of the device producing the event stream.
attr – The name of the attribute to connect to. Absent for
INTERFACE_CHANGE_EVENT.event_type – The type of the event stream.
callback – Callback to call with event data.
queue_factory – Returns a queue to use for this event stream if not yet connected.
- Returns:
Future returning callback ID to unregister with.
- Raises:
tango.DevFailed – If canonicalising the
device_trlfails
- unregister_callback(callback: CallbackID) None[source]
Unregister a callback.
If there are no more callbacks associated with the event stream, then the event stream will be disconnected unless it was marked as permanent via
connect_event_stream().- Parameters:
callback – The callback to unregister.
- Raises:
ValueError – If the callback ID is not known.
- connect_event_stream(device_trl: str, event_type: EventType, /, queue_factory: Callable[[], Queue] | None = None) Future[None][source]
- connect_event_stream(device_trl: str, attr: str, event_type: EventType, /, queue_factory: Callable[[], Queue] | None = None) Future[None]
- connect_event_stream(device: DeviceProxy, event_type: EventType, /, queue_factory: Callable[[], Queue] | None = None) Future[None]
- connect_event_stream(device: DeviceProxy, attr: str, event_type: EventType, /, queue_factory: Callable[[], Queue] | None = None) Future[None]
Connect to an event stream and maintain connection.
The connection is established via a Tango event subscription with a callback that will push events to the queue associated with this event stream.
Calling this function marks an event stream as “permanent”, meaning it will not get disconnected even if there are no callbacks subscribed. This can be useful to avoid multiple Tango subscription requests to the Tango device when callbacks only want to be registered temporarily.
If the connection has not already been made and
queue_factory is Nonethen a new queue will be allocated with the default arguments. Otherwise thequeue_factorywill be called to construct a new queue.Events are discarded rather than added to the queue if there are no callbacks subscribed.
Connecting to the event stream is handled asynchronously and the returned future can be used to track the progress of the connection. If the connection is already active when this function is called, then the returned future will be fulfilled. Call
result()to wait until the connection has completed. The future may becancel()’d to abort the connection if it has not already been completed.Warning
The connection is considered completed as soon as Tango calls the internal subscription callback. This may be with an error event informing us that we are unable to connect to the Tango device for some reason.
Example:
>>> eh = CallbackScheduler() >>> fut = eh.connect_event_stream( "foo/bar/1", "myAttr", tango.EventType.CHANGE_EVENT) >>> fut.result(timeout=10) # Wait for connection to be established
- Parameters:
device – The Tango device producing the event stream.
device_trl – The Tango resource locator of the device producing the event stream.
attr – The name of the attribute to connect to. Absent for
INTERFACE_CHANGE_EVENT.event_type – The type of the event stream.
queue_factory – Returns a queue to use for this event stream if not yet connected.
- Returns:
Future to track the progress of the connection.
- disconnect_event_stream(device_trl: str, event_type: EventType, /) None[source]
- disconnect_event_stream(device_trl: str, attr: str, event_type: EventType, /) None
- disconnect_event_stream(device: DeviceProxy, event_type: EventType, /) None
- disconnect_event_stream(device: DeviceProxy, attr: str, event_type: EventType, /) None
Disconnect from an event stream.
If there are callbacks associated with the event stream, then this function will just mark the event stream as “temporary” and the event stream will be disconnected when the last callback is unregistered.
- Parameters:
device – The Tango device producing the event stream.
device_trl – The Tango resource locator of the device producing the event stream.
attr – The name of the attribute to connect to. Absent for
INTERFACE_CHANGE_EVENT.event_type – The type of the event stream.
- disconnect_all() None[source]
Unregister all callbacks and disconnect all event streams.
Unlike
shutdown(), the worker threads are still running and theCallbackSchedulercan still be used.
- allocate_queue(*, queue_size: int = 8) Queue[source]
Allocate an event queue.
- Parameters:
queue_size – Size of the queue. When full old entries will be discarded to make space for the new.
- Returns:
The allocated queue.
- get_queue(device_trl: str, event_type: EventType, /) Queue[source]
- get_queue(device_trl: str, attr: str, event_type: EventType, /) Queue
- get_queue(device: DeviceProxy, event_type: EventType, /) Queue
- get_queue(device: DeviceProxy, attr: str, event_type: EventType, /) Queue
Return the queue used by a particular event stream.
- Parameters:
trl – The Tango resource locator of the origin of the event stream.
event_type – The type of the event stream.
- Returns:
ID of the queue.
- class ska_tango_base.callback_scheduler.CallbackID
Callback Identifier.
Used to unregister a callback. Can be discarded if unregistering is not needed.
alias of
int
- class ska_tango_base.callback_scheduler.Queue[source]
Opaque reference to an internal event queue.
If there are no references to the
Queuethen the corresponding internal event queue will be deleted. The class:CallbackScheduler will keep a reference to theQueuewhile the internal event queue is in use.