********************* Handling Tango Events ********************* This page describes the architectural of Tango events handling in ska-oso-scripting, focusing on four core components: :class:`~ska_oso_scripting.core.tango.client.TangoClient`, :class:`~ska_oso_scripting.core.tango.client.StreamHandle`, :class:`~ska_oso_scripting.core.tango.client.TangoSubscriptionConsolidator`, and :class:`~ska_oso_scripting.core.tango.client.Callback`. These components work together to provide a mechanism for subscribing to, consuming, and processing Tango device events in observing scripts. Primary Presentation ==================== .. image:: ../../diagrams/tango-events-class-diagram.png :align: center :alt: Component Class Diagram Element Catalogue ================= Elements and their properties ----------------------------- .. list-table:: :widths: 15 85 :header-rows: 1 * - Component - Description * - :class:`~ska_oso_scripting.core.tango.client.TangoClient` - Acts as the primary proxy between clients and Tango devices. It provides the main API for event streaming via :meth:`~ska_oso_scripting.core.tango.client.TangoClient.events_stream`. It coordinates with TangoSubscriptionConsolidator to minimise the number of Tango subscriptions that are opened. * - :class:`~ska_oso_scripting.core.tango.client.TangoSubscriptionConsolidator` - Prevents duplicate subscriptions to the same Tango attribute by creating one long-lived subscription per attribute and setting an associated Callback instance as the one and only subscriber. Hence, clients are decoupled from direct Tango subscriptions; instead clients (indirectly) subscribe to Callbacks. * - :class:`~ska_oso_scripting.core.tango.client.Callback` - Observable that distributes Tango events to all registered observers. Uses a queue-based architecture to quickly move event reception off the Tango thread. Subsequent event processing and relaying occurs on a separate thread, with value change detection to only notify observers when values actually change. * - :class:`~ska_oso_scripting.core.tango.client.EventObserver` - The protocol that Callback observers implement to be notified of a Tango change event. * - :class:`~ska_oso_scripting.core.tango.client.StreamHandle` - Represents an active subscription to attribute ChangeEvents. Implements the EventObserver protocol to receive events from Callback, and then provides iteration over those events via the standard Python Iterator protocol. Element Interfaces ------------------ This section illustrates the key interface for Tango events handling from the perspective of an scripting function developer. API ^^^ .. list-table:: :widths: 20 80 :header-rows: 1 * - method - Description * - :meth:`~ska_oso_scripting.core.tango.client.TangoClient.events_stream()` - | Returns an :class:`~contextlib.AbstractContextManager` that yields change events from one or more Tango device attributes in the form of (:class:`~tango.EventData`, :class:`~ska_oso_scripting.core.tango.client.Attribute`) tuples. | | Change events are filtered to remove error events and duplicate (i.e., consecutive change events that do not indicate a real value change). The iterator stream will continue indefinitely; clients are responsible for breaking iteration when they no longer want to process events. Examples ^^^^^^^^ The most common pattern is to subscribe to multiple attributes and iterate over events as they arrive: .. code-block:: python from ska_oso_scripting import TangoClient, Attribute client = TangoClient() attr1 = Attribute("device/1", "state") attr2 = Attribute("device/2", "healthstate") with client.events_stream([attr1, attr2]) as stream: for event, attr in stream: print(f"Event from {attr.device}/{attr.name}") # Process event data here Using the Context Manager Pattern ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The context manager pattern ensures proper cleanup of resources: .. code-block:: python with client.events_stream([attr1, attr2]) as stream: # Stream is active here # Automatically closed on exit (even on exception) pass # Stream stops observing events and resources cleaned up Add a client-side timeout ^^^^^^^^^^^^^^^^^^^^^^^^^ The iterator returned by :meth:`~ska_oso_scripting.core.tango.client.TangoClient.events_stream` can be combined with iterators from the ``ska_oso_scripting.core.iterators`` package. For example, wrapping the Tango iterator in a :class:`~ska_oso_scripting.core.iterators.TimeoutIterator` will cause an :class:`~ska_oso_scripting.core.iterators.EventTimeoutError` to be raised after a period of time has elapsed. .. code-block:: python obsState = Attribute.from_trl('mid-tmc/subarray/01/obsState') try: with client.events_stream([obsstate]) as raw_stream: # will raise EventTimeoutError if raw_stream goes longer than 60 seconds between change events with iterators.TimeoutIterator(raw_stream, timeout=60.0, mode='idle') as timeout_stream: ... except iterators.EventTimeoutError: print('Obsstate not changed in last 60 seconds. Investigate!') Element Behaviour ----------------- Event Subscription Flow ^^^^^^^^^^^^^^^^^^^^^^^ .. image:: ../../diagrams/tango-events-subscription-sequence.png :align: center :alt: Event Subscription Flow During event subscription, the flow is: - Calling code invokes ``TangoClient.events_stream([attr1, attr2, ...])`` - TangoClient creates a StreamHandle for the requested attributes - TangoClient registers the StreamHandle with TangoSubscriptionConsolidator - TangoSubscriptionConsolidator creates or retrieves a Callback for each attribute - New Callbacks subscribe to the corresponding Tango device attribute - The StreamHandle is registered with the Callback as an observer to be notified when events are received. Event Reception Flow ^^^^^^^^^^^^^^^^^^^^ .. image:: ../../diagrams/tango-events-reception-sequence.png :align: center :alt: Event Reception and Distribution Flow As a Tango device emits change events, the flow is: - Tango device emits a change event - Tango event notification thread calls the appropriate Callback's ``__call__()`` method - Callback immediately queues the event (non-blocking) and returns - Callback's background processing thread dequeues the event - Callback checks if value has changed (compares to last known value) - If changed, Callback calls ``notify()`` on all registered observers - StreamHandle's ``notify()`` method enqueues the (event, attribute) tuple, ready for retrieval via the iterator Variability Guide ================= N/A Rationale ========= This section explains the key design decisions and trade-offs in the Tango events handling architecture. Subscription Consolidation -------------------------- The TangoSubscriptionConsolidator implements a consolidation pattern to minimize subscribe/unsubscribe calls. Previously, each event listening operation would subscribe to an event, wait for reception, then unsubscribe. This pattern caused problems with Tango device stability and performance. The consolidator maintains long-lived subscriptions per attribute, with multiple observers registered to a single subscription. Callback processing thread -------------------------- The Callback class uses a dedicated background thread for event processing to ensure the PyTango thread (which receives events from Tango) is blocked for as short a time as possible. This design prevents slow observer notification from blocking the Tango event system. Value Change Detection ---------------------- Tango change events can be emitted for reasons other than actual value changes (e.g., subscriptions starting, resuming after faults). Rather than having each client filter for these spurious events, the Callback class implements value change detection by maintaining the last known value and comparing it before notifying observers. The first event after subscription establishes a baseline without triggering notifications. Stream Handle as Context Manager -------------------------------- StreamHandle implements a context manager protocol to ensure the instance is unregistered as a Callback observer as soon as it exits scope. While the Callback class maintains observers in a ``weakref.WeakSet`` to avoid keeping StreamHandle instances alive indefinitely, use of the Context Manager allows the StreamHandle to be unregistered immediately rather than waiting for garbage collection.