======================================== How to implement event driven monitoring ======================================== The typical SKA Tango device is responsible for monitoring some component. That is, the device is responsible for continuously measuring various properties of the component and reflecting these measurements as Tango attributes. In order to be a responsive as possible, a well designed Tango device will use an event system to be notified when their component changes. The ``ska-tango-base`` package provides the signal bus mechanism and :class:`~ska_tango_base.software_bus.attribute_from_signal` attribute decorator to help developers robustly publish events received from the monitored component to the Tango event system. The :class:`~ska_tango_base.software_bus.attribute_from_signal` attribute decorator is designed to reduce the boiler plate required when employing this pattern as well as providing a common, well-tested implementation that avoids common pit-falls, ensuring that: - Event generation is decoupled from event pushing, avoiding the need to pass callbacks between the Tango device and sub objects. - Archive and change events are pushed together. - Direct attribute reads are synchronised with the event stream. - The Tango device robustly handles device monitor contention without dropping events. This guide describes how to get started using the :class:`~ska_tango_base.software_bus.attribute_from_signal` attribute decorator and gives a brief overview of the flexibility offered by the utility. .. warning :: For components which do not generate events, polling is the only option. Unfortunately, as described in `Polling loop considered harmful `_, the built-in Tango device server polling loop does not work well in a resource constrained environment such as a kubernetes cluster. As such, it is recommended to use the :class:`~ska_tango_base.poller.poller.Poller` class when your component can only be polled and use the :class:`~ska_tango_base.software_bus.attribute_from_signal` as described here. .. contents:: Steps :depth: 1 :local: :backlinks: none Step 1: Create AttrSignal data descriptors ========================================== The first step is to define some signals to be emitted on the bus. This can be done with the :class:`~ska_tango_base.software_bus.AttrSignal` data descriptor. These data descriptors can be defined either on the Tango device itself; or on a sub-object of the device. The Tango device itself must inherit from :class:`~ska_tango_base.software_bus.SignalBusMixin` and sub-objects must inherit from :class:`~ska_tango_base.software_bus.SharingObserver`. In order to share the signal bus, the sub-objects must be created in an overridden :meth:`~ska_tango_base.software_bus.SharingObserver.on_new_shared_bus()` method. .. tip :: The interface classes, such as :class:`~ska_tango_base.base.base_interface.BaseInterface`, and component manager implementation classes, such as :class:`~ska_tango_base.base.base_device.SKABaseDevice` already inherit from :class:`~ska_tango_base.software_bus.SignalBusMixin`. .. warning :: Just like :meth:`!init_device()`, whenever you override :meth:`~ska_tango_base.software_bus.SharingObserver.on_new_shared_bus` you must always call :code:`super().on_new_shared_bus()`. For example, consider the following: .. code:: python class Monitor(stb.software_bus.SharingObserver): sig1 = stb.software_bus.AttrSignal[int](stored=True) sig2 = stb.software_bus.AttrSignal[dict[str, int]]() class MyDevice(stb.software_bus.SignalBusMixin, stb.SKADevice): sig3 = stb.software_bus.AttrSignal[str]() def on_new_shared_bus(self) -> None: super().on_new_shared_bus() self.monitor = Monitor() This specifies the following three signals, based on the hierarchy of python objects from the root Tango device: - :code:`".monitor.sig1"` with type :code:`int` - :code:`".monitor.sig2"` with type :code:`dict[str, int]` - :code:`".sig3"` with type :code:`str` The :attr:`!sig1` data descriptor is marked with ``stored=True`` here, allowing the most recently emitted value to be read back from the data descriptor. The other data descriptors do not allow reading back the most recently emitted value and will raise a :class:`AttributeError` instead. .. note :: The use of :class:`~ska_tango_base.software_bus.AttrSignal` is not required, a :class:`~ska_tango_base.software_bus.Signal` would also work. The :class:`~ska_tango_base.software_bus.AttrSignal` is just a :class:`~ska_tango_base.software_bus.Signal` object with type hints to allow providing a :code:`(value, timestamp, quality)` triple, as described below. Step 2: Create attribute_from_signal objects ============================================ Next we wire up the signals to :class:`~ska_tango_base.software_bus.attribute_from_signal` objects. Wiring up an :class:`~ska_tango_base.software_bus.attribute_from_signal` is quite flexible and the following code snippet demonstrates various options; creating attributes for each of the signals we defined previously: .. code-block:: python :linenos: class MyDevice(stb.software_bus.SignalBusMixin, stb.SKADevice): ... myAttr1 = stb.software_bus.attribute_from_signal( ".monitor.sig1", dtype=int, abs_change=1, archive_abs_change=10 ) @stb.software_bus.attribute_from_signal("monitor.sig2") def myAttr2(self, value: dict[str, int]) -> str: return json.dumps(value) myAttr3 = stb.software_bus.attribute_from_signal(sig3, write_to_signal=True) Lines 3-5 define the :attr:`!myAttr1` attribute wired up to the ``".monitor.sig1"`` signal. Here the :attr:`!dtype` must be supplied as the signal is being referenced by a string. :class:`~ska_tango_base.software_bus.attribute_from_signal` supports all the same parameters as :class:`tango.server.attribute`. Notice here we also supply the :attr:`!abs_change` and :attr:`!archive_abs_change` parameters in order to support change and archive events for the ``int`` attribute. :class:`~ska_tango_base.software_bus.attribute_from_signal` change and archive events are configured with ``detect=True``. Lines 7-9 define the :attr:`!myAttr2` attribute wired up to the ``".monitor.sig2"`` signal. Here we are specifying the signal name relative to the Tango device, :code:`"."` rather than using the absolute signal name as we did for :attr:`!myAttr1`. Here we are using :class:`~ska_tango_base.software_bus.attribute_from_signal` as a decorator to supply a ``to_tango`` function to convert from the ``dict[str, int]`` objects emitted on the bus to ``str`` objects that can be sent via the Tango. From the type annotations of the ``to_tango`` function, Tango will determine that :attr:`!myAttr2` has type ``DevString``. Line 11 defines the :attr:`!myAttr3` attribute wired up to the ``".sig3"`` signal. Here we are specifying the signal via the :class:`~ska_tango_base.software_bus.AttrSignal` data descriptor on the Tango device. The type of this attribute is inferred as ``DevString`` from the type hint on the :class:`~ska_tango_base.software_bus.AttrSignal` data descriptor. We are also specifying ``write_to_signal=True`` which makes the attribute read/write where writing to the attribute will result in an emission on the signal bus. :class:`~ska_tango_base.software_bus.AttrSignal` supports a ``from_tango`` method that can be used to convert from the value received by Tango before emitting them on the bus, however, this is not used here. Step 3: Emit data on the signal bus =================================== Before the emission of any data, a client reading the attribute will receive ``ATTR_INVALID`` data. To avoid this, you can provide initial data via the ``initial_value`` parameter of the :meth:`Signal.__init__ ` method. This initial value is emitted on the bus during :meth:`!init_device` when the signal bus is created. For example, .. code:: python class Monitor(stb.software_bus.SharingObserver): sig1 = stb.software_bus.AttrSignal[int](initial_value=0) Subsequent values can be emitted by assigning to :class:`~ska_tango_base.software_bus.AttrSignal` data descriptors from the Tango device or the subobjects; or by calling :meth:`~ska_tango_base.type_hints.BusProtocol.emit` directly. This can occur from any thread. For example, consider the following :meth:`!run` method, intended to be executed from a background thread: .. code:: python class Monitor(stb.software_bus.SharingObserver): ... def run(self, stop_event: threading.Event) -> None: while not stop_event.is_set(): delta = self.get_next_change() self.sig1 += delta self.sig2 = {"doubled": 2 * self.sig1, "squared": self.sig1 * self.sig1} self.shared_bus.emit(".sig3", f"{delta=}") stop_event.wait(timeout=1.0) .. warning :: Emitting from a ``stored=True`` :class:`~ska_tango_base.software_bus.Signal` is not thread safe. The user is responsible for synchronising emissions from multiple threads. Notice here, we are reading back the ``self.sig1`` value that has been emitted, which is only possible because we passed ``stored=True`` to the data descriptor. Also, as we are emitting the value for `".sig3"` directly using :meth:`~ska_tango_base.type_hints.BusProtocol.emit`, the :attr:`!sig3` data descriptor on the Tango device is not used and does not store a value, even if ``stored=True`` was set. In fact, if this is the only place we are emitting the `".sig3"` signal then the :attr:`!sig3` data descriptor is not needed. Once emitted, each value is processed by a background thread. The background thread will first convert the value using the ``to_tango`` function (if one has been provided) and then push change and archive events for the attribute with this converted value. In environments using PyTango 10.1 or later, the value is also added to the polling ring buffer if the attribute is being polled. To service client read requests, the :class:`~ska_tango_base.software_bus.attribute_from_signal` replies with the most recently emitted value. The :meth:`~tango.LatestDeviceImpl.always_executed_hook` is used to ensure that all pending emissions are processed before servicing a client request. This ensures that clients see a consistent view of the data provided by the Tango device. Either ``None`` or :class:`~ska_tango_base.software_bus.NoValue` can be emitted to force the attribute back to ``ATTR_INVALID``, as if no value has been emitted on the bus at all. .. warning:: Assigning :class:`~ska_tango_base.software_bus.NoValue` to a signal is deprecated since ska-tango-base 1.5.0. For more granular control, a "value triple" can be provided as a 3-element tuple, where the second element must be either ``None`` or a time stamp (returned from :func:`time.time`) and the third element must be an :class:`tango.AttrQuality` object. For example, we could report the attribute with ``ATTR_ALARM`` when certain values exceed a threshold: .. code:: python class Monitor(stb.software_bus.SharingObserver): ... def run(self, stop_event: threading.Event) -> None: while not stop_event.is_set(): delta = self.get_next_change() self.sig1 += delta d = {"doubled": 2 * self.sig1, "squared": self.sig1 * self.sig1} if d["squared"] > 100: self.sig2 = (d, None, tango.AttrQuality.ATTR_ALARM) else: self.sig2 = d if delta > 1: self.shared_bus.emit(".sig3", (f"{delta=}", None, tango.AttrQuality.ATTR_ALARM)) else: self.shared_bus.emit(".sig3", f"{delta=}") stop_event.wait(timeout=1.0) .. note :: When emitting a value triple, only the first element (the value itself) is passed to the ``to_tango`` function. When the time component (the second element) is ``None`` the time of data transmission to the client is used. Emitting a lone value ``v`` is equivalent to emitting the value triple ``(v, None, tango.AttrQuality.ATTR_VALID)``. The :class:`~ska_tango_base.software_bus.CachingAttrSignal` can be used as an alternative to :class:`~ska_tango_base.software_bus.AttrSignal` for automatic emission timestamp recording. For this data descriptor, emitting the lone value of ``v`` (via the data descriptor) is equivalent to emitting the value triple ``(v, time.time(), tango.AttrQuality.ATTR_VALID)``. Similarly, emitting a value triple with a ``None`` timestamp will replace it with ``time.time()``.