Handling Tango Events
This page describes the architectural of Tango events handling in ska-oso-scripting, focusing on four core components:
TangoClient,
StreamHandle,
TangoSubscriptionConsolidator, and
Callback. These components work together to provide a mechanism for
subscribing to, consuming, and processing Tango device events in observing scripts.
Primary Presentation
Element Catalogue
Elements and their properties
Component |
Description |
|---|---|
Acts as the primary proxy between clients and Tango devices. It provides the main API for event streaming via
|
|
|
Prevents duplicate subscriptions to the same Tango attribute by creating one long-lived subscription per attribute and setting an associated Callback instance as the one and only subscriber. Hence, clients are decoupled from direct Tango subscriptions; instead clients (indirectly) subscribe to Callbacks. |
|
Observable that distributes Tango events to all registered observers. Uses a queue-based architecture to quickly move event reception off the Tango thread. Subsequent event processing and relaying occurs on a separate thread, with value change detection to only notify observers when values actually change. |
|
The protocol that Callback observers implement to be notified of a Tango change event. |
|
Represents an active subscription to attribute ChangeEvents. Implements the EventObserver protocol to receive events from Callback, and then provides iteration over those events via the standard Python Iterator protocol. |
Element Interfaces
This section illustrates the key interface for Tango events handling from the perspective of an scripting function developer.
API
method |
Description |
|---|---|
|
Returns an
AbstractContextManager that yields change events from one or more Tango device
attributes in the form of (EventData, Attribute)
tuples.Change events are filtered to remove error events and duplicate (i.e., consecutive change events that do not
indicate a real value change). The iterator stream will continue indefinitely; clients are responsible for
breaking iteration when they no longer want to process events.
|
Examples
The most common pattern is to subscribe to multiple attributes and iterate over events as they arrive:
from ska_oso_scripting import TangoClient, Attribute
client = TangoClient()
attr1 = Attribute("device/1", "state")
attr2 = Attribute("device/2", "healthstate")
with client.events_stream([attr1, attr2]) as stream:
for event, attr in stream:
print(f"Event from {attr.device}/{attr.name}")
# Process event data here
Using the Context Manager Pattern
The context manager pattern ensures proper cleanup of resources:
with client.events_stream([attr1, attr2]) as stream:
# Stream is active here
# Automatically closed on exit (even on exception)
pass
# Stream stops observing events and resources cleaned up
Add a client-side timeout
The iterator returned by events_stream() can be combined with
iterators from the ska_oso_scripting.core.iterators package. For example, wrapping the Tango iterator in a
TimeoutIterator will cause an
EventTimeoutError to be raised after a
period of time has elapsed.
obsState = Attribute.from_trl('mid-tmc/subarray/01/obsState')
try:
with client.events_stream([obsstate]) as raw_stream:
# will raise EventTimeoutError if raw_stream goes longer than 60 seconds between change events
with iterators.TimeoutIterator(raw_stream, timeout=60.0, mode='idle') as timeout_stream:
...
except iterators.EventTimeoutError:
print('Obsstate not changed in last 60 seconds. Investigate!')
Element Behaviour
Event Subscription Flow
During event subscription, the flow is:
Calling code invokes
TangoClient.events_stream([attr1, attr2, ...])TangoClient creates a StreamHandle for the requested attributes
TangoClient registers the StreamHandle with TangoSubscriptionConsolidator
TangoSubscriptionConsolidator creates or retrieves a Callback for each attribute
New Callbacks subscribe to the corresponding Tango device attribute
The StreamHandle is registered with the Callback as an observer to be notified when events are received.
Event Reception Flow
As a Tango device emits change events, the flow is:
Tango device emits a change event
Tango event notification thread calls the appropriate Callback’s
__call__()methodCallback immediately queues the event (non-blocking) and returns
Callback’s background processing thread dequeues the event
Callback checks if value has changed (compares to last known value)
If changed, Callback calls
notify()on all registered observersStreamHandle’s
notify()method enqueues the (event, attribute) tuple, ready for retrieval via the iterator
Variability Guide
N/A
Rationale
This section explains the key design decisions and trade-offs in the Tango events handling architecture.
Subscription Consolidation
The TangoSubscriptionConsolidator implements a consolidation pattern to minimize subscribe/unsubscribe calls. Previously, each event listening operation would subscribe to an event, wait for reception, then unsubscribe. This pattern caused problems with Tango device stability and performance. The consolidator maintains long-lived subscriptions per attribute, with multiple observers registered to a single subscription.
Callback processing thread
The Callback class uses a dedicated background thread for event processing to ensure the PyTango thread (which receives events from Tango) is blocked for as short a time as possible. This design prevents slow observer notification from blocking the Tango event system.
Value Change Detection
Tango change events can be emitted for reasons other than actual value changes (e.g., subscriptions starting, resuming after faults). Rather than having each client filter for these spurious events, the Callback class implements value change detection by maintaining the last known value and comparing it before notifying observers. The first event after subscription establishes a baseline without triggering notifications.
Stream Handle as Context Manager
StreamHandle implements a context manager protocol to ensure the instance is unregistered as a Callback observer as soon
as it exits scope. While the Callback class maintains observers in a weakref.WeakSet to avoid keeping StreamHandle
instances alive indefinitely, use of the Context Manager allows the StreamHandle to be unregistered immediately rather
than waiting for garbage collection.