Handling Tango Events

This page describes the architectural of Tango events handling in ska-oso-scripting, focusing on four core components: TangoClient, StreamHandle, TangoSubscriptionConsolidator, and Callback. These components work together to provide a mechanism for subscribing to, consuming, and processing Tango device events in observing scripts.

Primary Presentation

Component Class Diagram

Element Catalogue

Elements and their properties

Component

Description

TangoClient

Acts as the primary proxy between clients and Tango devices. It provides the main API for event streaming via events_stream(). It coordinates with TangoSubscriptionConsolidator to minimise the number of Tango subscriptions that are opened.

TangoSubscriptionConsolidator

Prevents duplicate subscriptions to the same Tango attribute by creating one long-lived subscription per attribute and setting an associated Callback instance as the one and only subscriber. Hence, clients are decoupled from direct Tango subscriptions; instead clients (indirectly) subscribe to Callbacks.

Callback

Observable that distributes Tango events to all registered observers. Uses a queue-based architecture to quickly move event reception off the Tango thread. Subsequent event processing and relaying occurs on a separate thread, with value change detection to only notify observers when values actually change.

EventObserver

The protocol that Callback observers implement to be notified of a Tango change event.

StreamHandle

Represents an active subscription to attribute ChangeEvents. Implements the EventObserver protocol to receive events from Callback, and then provides iteration over those events via the standard Python Iterator protocol.

Element Interfaces

This section illustrates the key interface for Tango events handling from the perspective of an scripting function developer.

API

method

Description

events_stream()

Returns an AbstractContextManager that yields change events from one or more Tango device attributes in the form of (EventData, Attribute) tuples.

Change events are filtered to remove error events and duplicate (i.e., consecutive change events that do not indicate a real value change). The iterator stream will continue indefinitely; clients are responsible for breaking iteration when they no longer want to process events.

Examples

The most common pattern is to subscribe to multiple attributes and iterate over events as they arrive:

from ska_oso_scripting import TangoClient, Attribute

client = TangoClient()
attr1 = Attribute("device/1", "state")
attr2 = Attribute("device/2", "healthstate")

with client.events_stream([attr1, attr2]) as stream:
    for event, attr in stream:
        print(f"Event from {attr.device}/{attr.name}")
        # Process event data here

Using the Context Manager Pattern

The context manager pattern ensures proper cleanup of resources:

with client.events_stream([attr1, attr2]) as stream:
    # Stream is active here
    # Automatically closed on exit (even on exception)
    pass
# Stream stops observing events and resources cleaned up

Add a client-side timeout

The iterator returned by events_stream() can be combined with iterators from the ska_oso_scripting.core.iterators package. For example, wrapping the Tango iterator in a TimeoutIterator will cause an EventTimeoutError to be raised after a period of time has elapsed.

obsState = Attribute.from_trl('mid-tmc/subarray/01/obsState')
try:
   with client.events_stream([obsstate]) as raw_stream:
       # will raise EventTimeoutError if raw_stream goes longer than 60 seconds between change events
       with iterators.TimeoutIterator(raw_stream, timeout=60.0, mode='idle') as timeout_stream:
            ...
except iterators.EventTimeoutError:
    print('Obsstate not changed in last 60 seconds. Investigate!')

Element Behaviour

Event Subscription Flow

Event Subscription Flow

During event subscription, the flow is:

  • Calling code invokes TangoClient.events_stream([attr1, attr2, ...])

  • TangoClient creates a StreamHandle for the requested attributes

  • TangoClient registers the StreamHandle with TangoSubscriptionConsolidator

  • TangoSubscriptionConsolidator creates or retrieves a Callback for each attribute

  • New Callbacks subscribe to the corresponding Tango device attribute

  • The StreamHandle is registered with the Callback as an observer to be notified when events are received.

Event Reception Flow

Event Reception and Distribution Flow

As a Tango device emits change events, the flow is:

  • Tango device emits a change event

  • Tango event notification thread calls the appropriate Callback’s __call__() method

  • Callback immediately queues the event (non-blocking) and returns

  • Callback’s background processing thread dequeues the event

  • Callback checks if value has changed (compares to last known value)

  • If changed, Callback calls notify() on all registered observers

  • StreamHandle’s notify() method enqueues the (event, attribute) tuple, ready for retrieval via the iterator

Variability Guide

N/A

Rationale

This section explains the key design decisions and trade-offs in the Tango events handling architecture.

Subscription Consolidation

The TangoSubscriptionConsolidator implements a consolidation pattern to minimize subscribe/unsubscribe calls. Previously, each event listening operation would subscribe to an event, wait for reception, then unsubscribe. This pattern caused problems with Tango device stability and performance. The consolidator maintains long-lived subscriptions per attribute, with multiple observers registered to a single subscription.

Callback processing thread

The Callback class uses a dedicated background thread for event processing to ensure the PyTango thread (which receives events from Tango) is blocked for as short a time as possible. This design prevents slow observer notification from blocking the Tango event system.

Value Change Detection

Tango change events can be emitted for reasons other than actual value changes (e.g., subscriptions starting, resuming after faults). Rather than having each client filter for these spurious events, the Callback class implements value change detection by maintaining the last known value and comparing it before notifying observers. The first event after subscription establishes a baseline without triggering notifications.

Stream Handle as Context Manager

StreamHandle implements a context manager protocol to ensure the instance is unregistered as a Callback observer as soon as it exits scope. While the Callback class maintains observers in a weakref.WeakSet to avoid keeping StreamHandle instances alive indefinitely, use of the Context Manager allows the StreamHandle to be unregistered immediately rather than waiting for garbage collection.