How to implement event driven monitoring
The typical SKA Tango device is responsible for monitoring some component. That is, the device is responsible for continuously measuring various properties of the component and reflecting these measurements as Tango attributes. In order to be a responsive as possible, a well designed Tango device will use an event system to be notified when their component changes. The ska-tango-base package provides the signal bus mechanism and attribute_from_signal attribute decorator to help developers robustly publish events received from the monitored component to the Tango event system.
The attribute_from_signal attribute decorator is designed to reduce the boiler plate required when employing this pattern as well as providing a common, well-tested implementation that avoids common pit-falls, ensuring that:
Event generation is decoupled from event pushing, avoiding the need to pass callbacks between the Tango device and sub objects.
Archive and change events are pushed together.
Direct attribute reads are synchronised with the event stream.
The Tango device robustly handles device monitor contention without dropping events.
This guide describes how to get started using the attribute_from_signal attribute decorator and gives a brief overview of the flexibility offered by the utility.
Warning
For components which do not generate events, polling is the only option. Unfortunately, as described in Polling loop considered harmful, the built-in Tango device server polling loop does not work well in a resource constrained environment such as a kubernetes cluster. As such, it is recommended to use the Poller class when your component can only be polled and use the attribute_from_signal as described here.
Step 1: Create AttrSignal data descriptors
The first step is to define some signals to be emitted on the bus. This can be done with the AttrSignal data descriptor.
These data descriptors can be defined either on the Tango device itself; or on a sub-object of the device. The Tango device itself must inherit from SignalBusMixin and sub-objects must inherit from SharingObserver. In order to share the signal bus, the sub-objects must be created in an overridden on_new_shared_bus() method.
Tip
The interface classes, such as BaseInterface, and component manager implementation classes, such as SKABaseDevice already inherit from SignalBusMixin.
Warning
Just like init_device(), whenever you override on_new_shared_bus() you must always call super().on_new_shared_bus().
For example, consider the following:
class Monitor(stb.software_bus.SharingObserver):
sig1 = stb.software_bus.AttrSignal[int](stored=True)
sig2 = stb.software_bus.AttrSignal[dict[str, int]]()
class MyDevice(stb.software_bus.SignalBusMixin, stb.SKADevice):
sig3 = stb.software_bus.AttrSignal[str]()
def on_new_shared_bus(self) -> None:
super().on_new_shared_bus()
self.monitor = Monitor()
This specifies the following three signals, based on the hierarchy of python objects from the root Tango device:
".monitor.sig1"with typeint".monitor.sig2"with typedict[str, int]".sig3"with typestr
The sig1 data descriptor is marked with stored=True here, allowing the most recently emitted value to be read back from the data descriptor. The other data descriptors do not allow reading back the most recently emitted value and will raise a AttributeError instead.
Note
The use of AttrSignal is not required, a Signal would also work. The AttrSignal is just a Signal object with type hints to allow providing a (value, timestamp, quality) triple, as described below.
Step 2: Create attribute_from_signal objects
Next we wire up the signals to attribute_from_signal objects. Wiring up an attribute_from_signal is quite flexible and the following code snippet demonstrates various options; creating attributes for each of the signals we defined previously:
1class MyDevice(stb.software_bus.SignalBusMixin, stb.SKADevice):
2 ...
3 myAttr1 = stb.software_bus.attribute_from_signal(
4 ".monitor.sig1", dtype=int, abs_change=1, archive_abs_change=10
5 )
6
7 @stb.software_bus.attribute_from_signal("monitor.sig2")
8 def myAttr2(self, value: dict[str, int]) -> str:
9 return json.dumps(value)
10
11 myAttr3 = stb.software_bus.attribute_from_signal(sig3, write_to_signal=True)
Lines 3-5 define the myAttr1 attribute wired up to the ".monitor.sig1" signal. Here the dtype must be supplied as the signal is being referenced by a string. attribute_from_signal supports all the same parameters as tango.server.attribute. Notice here we also supply the abs_change and archive_abs_change parameters in order to support change and archive events for the int attribute. attribute_from_signal change and archive events are configured with detect=True.
Lines 7-9 define the myAttr2 attribute wired up to the ".monitor.sig2" signal. Here we are specifying the signal name relative to the Tango device, "." rather than using the absolute signal name as we did for myAttr1. Here we are using attribute_from_signal as a decorator to supply a to_tango function to convert from the dict[str, int] objects emitted on the bus to str objects that can be sent via the Tango. From the type annotations of the to_tango function, Tango will determine that myAttr2 has type DevString.
Line 11 defines the myAttr3 attribute wired up to the ".sig3" signal. Here we are specifying the signal via the AttrSignal data descriptor on the Tango device. The type of this attribute is inferred as DevString from the type hint on the AttrSignal data descriptor. We are also specifying write_to_signal=True which makes the attribute read/write where writing to the attribute will result in an emission on the signal bus. AttrSignal supports a from_tango method that can be used to convert from the value received by Tango before emitting them on the bus, however, this is not used here.
Step 3: Emit data on the signal bus
Before the emission of any data, a client reading the attribute will receive ATTR_INVALID data. To avoid this, you can provide initial data via the initial_value parameter of the Signal.__init__ method. This initial value is emitted on the bus during init_device() when the signal bus is created. For example,
class Monitor(stb.software_bus.SharingObserver):
sig1 = stb.software_bus.AttrSignal[int](initial_value=0)
Subsequent values can be emitted by assigning to AttrSignal data descriptors from the Tango device or the subobjects; or by calling emit() directly. This can occur from any thread. For example, consider the following run() method, intended to be executed from a background thread:
class Monitor(stb.software_bus.SharingObserver):
...
def run(self, stop_event: threading.Event) -> None:
while not stop_event.is_set():
delta = self.get_next_change()
self.sig1 += delta
self.sig2 = {"doubled": 2 * self.sig1, "squared": self.sig1 * self.sig1}
self.shared_bus.emit(".sig3", f"{delta=}")
stop_event.wait(timeout=1.0)
Warning
Emitting from a stored=True Signal is not thread safe. The user is responsible for synchronising emissions from multiple threads.
Notice here, we are reading back the self.sig1 value that has been emitted, which is only possible because we passed stored=True to the data descriptor. Also, as we are emitting the value for “.sig3” directly using emit(), the sig3 data descriptor on the Tango device is not used and does not store a value, even if stored=True was set. In fact, if this is the only place we are emitting the “.sig3” signal then the sig3 data descriptor is not needed.
Once emitted, each value is processed by a background thread. The background thread will first convert the value using the to_tango function (if one has been provided) and then push change and archive events for the attribute with this converted value. In environments using PyTango 10.1 or later, the value is also added to the polling ring buffer if the attribute is being polled.
To service client read requests, the attribute_from_signal replies with the most recently emitted value. The always_executed_hook() is used to ensure that all pending emissions are processed before servicing a client request. This ensures that clients see a consistent view of the data provided by the Tango device.
Either None or NoValue can be emitted to force the attribute back to ATTR_INVALID, as if no value has been emitted on the bus at all.
Warning
Assigning NoValue to a signal is deprecated since ska-tango-base 1.5.0.
For more granular control, a “value triple” can be provided as a 3-element tuple, where the second element must be either None or a time stamp (returned from time.time()) and the third element must be an tango.AttrQuality object. For example, we could report the attribute with ATTR_ALARM when certain values exceed a threshold:
class Monitor(stb.software_bus.SharingObserver):
...
def run(self, stop_event: threading.Event) -> None:
while not stop_event.is_set():
delta = self.get_next_change()
self.sig1 += delta
d = {"doubled": 2 * self.sig1, "squared": self.sig1 * self.sig1}
if d["squared"] > 100:
self.sig2 = (d, None, tango.AttrQuality.ATTR_ALARM)
else:
self.sig2 = d
if delta > 1:
self.shared_bus.emit(".sig3", (f"{delta=}", None, tango.AttrQuality.ATTR_ALARM))
else:
self.shared_bus.emit(".sig3", f"{delta=}")
stop_event.wait(timeout=1.0)
Note
When emitting a value triple, only the first element (the value itself) is passed to the to_tango function.
When the time component (the second element) is None the time of data transmission to the client is used. Emitting a lone value v is equivalent to emitting the value triple (v, None, tango.AttrQuality.ATTR_VALID).
The CachingAttrSignal can be used as an alternative to AttrSignal for automatic emission timestamp recording. For this data descriptor, emitting the lone value of v (via the data descriptor) is equivalent to emitting the value triple (v, time.time(), tango.AttrQuality.ATTR_VALID). Similarly, emitting a value triple with a None timestamp will replace it with time.time().