Software bus classes

Internal software bus implementation.

class ska_tango_base.software_bus.AttrSignal[source]

Convenience signal for working with Tango attributes.

This class is intended to be used with attribute_from_signal.

It is provided as a convenient way to create a Signal which supports setting a value directly and also setting a (data, date, quality) triple that can be used with tango.Attribute.set_value_date_quality(). In both cases the value that is set is directly emitted.

This class provides no runtime behaviour. It only exists to provide the correct type hints and allow attribute_from_signal to infer its dtype from the signal object.

Example:
class MyDevice(SignalBusMixin, Device):
    my_signal = AttrSignal[int]()
    # dtype inferred from my_signal
    myAttr = attribute_from_signal(my_signal)

    @command
    def MyCmd(self):
        self.my_signal = 0
        self.my_signal = (1, time.time(), AttrQuality.WARNING)
        self.my_signal = None
class ska_tango_base.software_bus.CachingAttrSignal[source]

Signal that includes a time stamp of when the value was emitted.

This signal is intended to be used attribute_from_signal to as a Tango attribute where the value is provided by some other thread, for example, a polling loop.

When CachingAttrSignal is set with a value data, a (data, date, quality) triple is emitted on the shared bus. The date is recorded as the time that the CachingAttrSignal was set and the quality is ATTR_VALID.

It is possible to override the values of date by setting the (data, date, quality) directly.

class ska_tango_base.software_bus.LastEmittedValue[source]

Get the last emitted value stored for a signal on the shared bus.

__init__(name: str | None = None) None[source]

Get the last emitted value stored for a signal on the shared bus.

Parameters:

name – Name of the signal, if None, this object’s name is used.

class ska_tango_base.software_bus.ListenerMethod[source]

Method on an Observer which can receive a signal.

__init__(*args, **kwargs)
class ska_tango_base.software_bus.NoValueType[source]

Type of the NoValue singleton.

class ska_tango_base.software_bus.Observer[source]

An observer that handles different signals with different methods.

Sub-classes can mark a method as a ListenerMethod using the listen_to_signal() decorator. ListenerMethod will only be called for signals they are registered to.

notify_emission(signal: str, value: Any) None[source]

Call all the listener methods for the given signal.

class ska_tango_base.software_bus.SharingObserver[source]

An observer that shares its bus with sub-objects.

When a BusProtocol is assigned to shared_bus this will recursively set on all sub-objects which also inherit from SharingObserver.

Subclasses can be notified when a new shared_bus is available by overriding on_new_shared_bus().

Each SharingObserver object has a observer_prefix that prefixes any signals listened to by that object. The root object, where the bus was originally assigned, has an observer_prefix of “.” and each sub-object has an observer_prefix based on the path to access that sub-object from the root object. For example:

class SubObj(SharingObserver):
    bar = Signal[str]()

    @listen_to_signal(bar)
    def on_bar(self, value: str):
        print(value)

class MySharer(SharingObserver):
    def __init__(self):
        self.foo = SubObj()
        self.shared_bus = SignalBus()
        self.shared_bus.start_thread()

sharer = MySharer()

assert sharer.observer_prefix == "."
assert sharer.foo.observer_prefix == ".foo"

Here, the sharer.foo.on_bar listener method will receive values emitted on the bus whenever sharer.foo.bar is set.

__init__(*args: Any, **kwargs: Any) None[source]

Initialise the device.

property observer_prefix: str

Return the path from the root SharingObserver holding the bus.

property shared_bus: BusProtocol

The shared bus used by this SharingObserver.

on_new_shared_bus() None[source]

Notify that a new bus is available.

When overriding this method you must call super().on_new_shared_bus.

class ska_tango_base.software_bus.Signal[source]

Descriptor to provide access to a signal on a shared bus.

Can only be used on classes which model SharingObserverProtocol.

Whenever the Signal is set, a signal is emitted with the value provided.

By default, the Signal descriptor is not gettable, however, if stored is set then the last value emitted will be returned on __get__.

DEPRECATED: The Signal can be assigned a special NoValue to clear the cached value. The NoValue is emitted on the bus. Instead use the del keyword on a Signal to clear the cache without emitting anything. Assigning NoValue to a Signal will raise an exception in the future. NoValue will only be passed as the default initial_value.

The relative name of the signal is taken from the name of the Signal on the owner class, and can be overridden by providing name kwarg to the initialiser.

The relative name is prefixed by the observer_prefix of the owner object before being emitted on the bus.

Parameters:
  • name – Name of the signal, if None this objects name is used.

  • stored – If True, store the latest emitted value on the parent object.

  • initial_value – If not NoValue, emit this value when connected to a new bus.

__init__(name: str | None = None, stored: bool = False, initial_value: ~typing.Any = <ska_tango_base.software_bus._signal.NoValueType object>) None[source]

Initialise the object.

property relative_name: str

Name of signal, relative to parent object.

class ska_tango_base.software_bus.SignalBusMixin[source]

A base class for mix-ins to use the shared bus.

This class is not intended to be inherited from directly by users. Instead, it is intended to be used by mix-ins to utilities the bus.

__init__(*args: Any, **kwargs: Any) None[source]

Initialise object.

init_device() None[source]

Initialise the shared bus.

add_attribute(attr: attribute) None[source]

Add an attribute dynamically.

notify_emission(signal: str, value: Any) None[source]

Push change events for AttributeSignal objects.

delete_device() None[source]

Shutdown the bus background thread.

allow_internal_threads() Iterator[None][source]

Allow internal threads to run without allowing new Tango requests.

This is intended to be used from a Tango client request thread to allow internal threads to acquire the Tango monitor, without allowing another Tango request to start.

This is used in the following two places in ska-tango-base:

  1. During init_device() to allow background threads to

    cleanup gracefully and be joined.

    If we allow other threads to run during init_device() or delete_device(), a client could initiate a new request on our device as the Tango client thread is able to grab the device lock. This should be avoided because the device is only partially initialised. This method avoids this issue by arranging for Tango client threads to be disallowed from grabbing the Tango monitor, while still allowing other internal threads to acquire the monitor.

  2. At the start of every request to allow the signal bus thread to

    catch up.

    This ensures that Tango clients get a sequentially consistent view of the Tango device. That is, if they call a command which modifies the value of an attribute then read that attribute, they will see the updated value, even if the attribute is using the signal bus (e.g. via attribute_from_signal).

Subclasses of SignalBusMixin should

always_executed_hook() None[source]

Raise an exception if we have released threads during an Init command.

exception ska_tango_base.software_bus.TimedOutError[source]

Timed out waiting for the signal bus background thread.

__init__() None[source]

Initalise the exception with the default error message.

class ska_tango_base.software_bus.attribute_from_signal[source]

A Tango attribute linked to an signal.

attribute_from_signal takes all the same keyword arguments as tango.server.attribute, and by default will provide an fget function which returns the last emitted value for the signal.

Any value emitted for the signal will result in a change and archive events being pushed for the attribute. Clients reading the attribute will receive the value of the most recently push events.

The value emitted can either be the attribute value on its own, None/NoValue or a “value triple”; that is a (<value>, <timestamp>, <quality>) tuple. When None/NoValue is emitted, clients will receive ATTR_INVALID data. In order for an emission to be recognised as a “value triple” the <quality> must be an instance of tango.AttrQuality and <timestamp> must be either None or a float (typically the return value of time.time()). When the <timestamp> is None, the client will receive the current time whenever the attribute value is sent over the network. Use AttrSignal for type annotations that support emitting both the attribute value and the “value triple”.

If write_to_signal is True or from_tango is not None, then an fset function will also be provided, so that any writes to the attribute will result in an value being emitted for the signal.

If the signal is a Signal object, then the __get__ and __set__ magic methods will be used to interact with the signal, otherwise the signal will be used as the name of the signal, which will be canonicalise_relative_to() the Tango Device.

attribute_from_signal optionally supports conversion functions to_tango and from_tango. These will convert to and from the attribute value whenever the attribute is read/written or whenever change/archive events are pushed in response to a value being emitted for the signal.

If the signal is a Signal object, then any type arguments from the signal will be used as the dtype for the attribute. This can be overridden by providing a dtype keyword argument.

If the conversion functions are provided, any annotations on the functions will be used to infer the dtype, instead of any signal object provided. Again, this can be overridden with the dtype keyword argument.

Parameters:
  • signal – Signal object or name of signal

  • to_tango – Function to convert from signal value to Tango value

  • from_tango – Function to convert from Tango value to signal value

  • write_to_signal – If True, include an attribute write method that will write to the signal

Examples:
class MyDevice(SignalBusMixin, Device):
    my_basic_signal = Signal[int]()
    myAttr = attribute_from_signal(
        my_basic_signal, write_to_signal=True
    )

    @listen_to_signal("my_basic_signal")
    def on_my_basic_signal(self, old_value, new_value):
        # Called whenever a Tango client writes to the
        # "myAttr" attribute
        if new_value == 2:
            self.set_state(DevState.FAULT)

    # The attribute will have "DevString" dtype.
    @attribute_from_signal(my_basic_signal)
    def myAttrAsStr(self, signal_value: int) -> str:
        return str(signal_value)

    # The attribute will have AttrWriteType.READ_WRITE
    @myAttr_readwrite_str.from_tango
    def myAttrAsStr(self, tango_value: str) -> int:
        return int(tango_value)

    my_attr_signal = AttrSignal[dict[str, str]]()
    # json.dumps has no type annotations, so we must provide the dtype
    readOnlyAttr = attribute_from_signal(
        my_attr_signal, dtype=str, to_tango=json.dumps
    )

    @command
    def MyCmd(self):
        # myAttr and myAttrAsAString will push change/archive events
        self.my_basic_signal = 1

        # readOnlyAttr will push change/archive events
        self.my_attr_signal = {"foo": "bar"}

        # AttrSignal allows this to type check, but otherwise does
        # nothing different to Signal
        self.my_attr_signal = ({}, time.time(), AttrQuality.ATTR_INVALID)
__init__(signal: str | Signal[Any], /, to_tango: Callable[[...], Any] | None = None, from_tango: Callable[[...], Any] | None = None, write_to_signal: bool = False, **kwargs: Any) None[source]

Initialise the object.

do_read(device: SignalBusMixin, attr: Attribute | None = None) tuple[Any, float, AttrQuality][source]

Read the signal value for the Tango attribute.

do_write(device: SignalBusMixin, attr_or_val: Any) None[source]

Write a Tango attribute value to a signal.

property signal_name: str | None

Return the name of the signal associated with this attribute.

to_tango(to_tango: Callable[[...], Any]) attribute_from_signal[source]

Decorate attribute with to tango conversion function.

from_tango(from_tango: Callable[[...], Any]) attribute_from_signal[source]

Decorate attribute with from tango conversion function.

on_init_device(device: SignalBusMixin) None[source]

Register for events.

Called by owner device during init_device() and whenever the attribute is added dynamically.

on_emission(device: SignalBusMixin, new_value: Any) None[source]

Push events for emission.

Called by owner device whenever a signal value is emitted for this attribute.

read(fn: Any) attribute_from_signal[source]

Spoof read when in autodoc.

write(fn: Any) attribute_from_signal[source]

Spoof write when in autodoc.

is_allowed(f: Any) attribute_from_signal[source]

Spoof write when in autodoc.

ska_tango_base.software_bus.canonicalise_relative_to(obj: SharingObserverProtocol, signal: str) str[source]

Return the absolute signal name, relative to obj.

If signal is already absolute, it is returned as is.

ska_tango_base.software_bus.listen_to_signal(signal: str | Signal[ValueT], listener: Callable[[ObserverT, Any], None] | None = None) Callable[[Callable[[ObserverT, Any], None]], ListenerMethod] | Callable[[Callable[[SharingObserverT, Any], None]], ListenerMethod] | ListenerMethod[source]

Mark a method as listening to a signal.

This function will be called asynchronously whenever a value is emitted for the signal.

For Observer objects, only a signal specified as a str is supported, however, for SharingObserver objects both str and Signal objects are accepted.

For a SharingObserver, signal names are relative to the parent object.

ska_tango_base.software_bus.NoValue

Used as the default initial value for a stored Signal.

Required to distinguish from None, which is a valid value to emit.

Testing helpers for software buses.

ska_tango_base.software_bus.testing.bus_test_context(*observers: ObserverProtocol) Generator[BusProtocol, None, None][source]

Context manager that provides a temporary signal bus for testing.

Automatically registers the provided observers before starting emission processing. Any instances of SharingObserver will have their shared_bus property assigned instead of being register directly.

Parameters:

observers – The observers to be registered with the bus.

Returns:

The signal bus instance.

class ska_tango_base.software_bus.testing.MockSignalObserver[source]

Mock observer that allows asserting that specific signals were emitted.

This is only available if ska_tango_testing can be imported.

notify_emission(signal: str, value: Any) None[source]

Notify the observer of an emission asynchronously.

Parameters:
  • signal – The name of the signal emitted.

  • value – The value emitted on the signal.