Software bus classes
Internal software bus implementation.
- class ska_tango_base.software_bus.AttrSignal[source]
Convenience signal for working with Tango attributes.
This class is intended to be used with
attribute_from_signal.It is provided as a convenient way to create a
Signalwhich supports setting a value directly and also setting a (data, date, quality) triple that can be used withtango.Attribute.set_value_date_quality(). In both cases the value that is set is directly emitted.This class provides no runtime behaviour. It only exists to provide the correct type hints and allow
attribute_from_signalto infer itsdtypefrom the signal object.- Example:
class MyDevice(SignalBusMixin, Device): my_signal = AttrSignal[int]() # dtype inferred from my_signal myAttr = attribute_from_signal(my_signal) @command def MyCmd(self): self.my_signal = 0 self.my_signal = (1, time.time(), AttrQuality.WARNING) self.my_signal = None
- class ska_tango_base.software_bus.CachingAttrSignal[source]
Signal that includes a time stamp of when the value was emitted.
This signal is intended to be used
attribute_from_signalto as a Tango attribute where the value is provided by some other thread, for example, a polling loop.When
CachingAttrSignalis set with a valuedata, a(data, date, quality)triple is emitted on the shared bus. Thedateis recorded as the time that theCachingAttrSignalwas set and thequalityisATTR_VALID.It is possible to override the values of
dateby setting the(data, date, quality)directly.
- class ska_tango_base.software_bus.LastEmittedValue[source]
Get the last emitted value stored for a signal on the shared bus.
- class ska_tango_base.software_bus.ListenerMethod[source]
Method on an
Observerwhich can receive a signal.- __init__(*args, **kwargs)
- class ska_tango_base.software_bus.Observer[source]
An observer that handles different signals with different methods.
Sub-classes can mark a method as a
ListenerMethodusing thelisten_to_signal()decorator.ListenerMethodwill only be called for signals they are registered to.
- class ska_tango_base.software_bus.SharingObserver[source]
An observer that shares its bus with sub-objects.
When a
BusProtocolis assigned toshared_busthis will recursively set on all sub-objects which also inherit fromSharingObserver.Subclasses can be notified when a new
shared_busis available by overridingon_new_shared_bus().Each
SharingObserverobject has aobserver_prefixthat prefixes any signals listened to by that object. The root object, where the bus was originally assigned, has anobserver_prefixof “.” and each sub-object has anobserver_prefixbased on the path to access that sub-object from the root object. For example:class SubObj(SharingObserver): bar = Signal[str]() @listen_to_signal(bar) def on_bar(self, value: str): print(value) class MySharer(SharingObserver): def __init__(self): self.foo = SubObj() self.shared_bus = SignalBus() self.shared_bus.start_thread() sharer = MySharer() assert sharer.observer_prefix == "." assert sharer.foo.observer_prefix == ".foo"
Here, the
sharer.foo.on_barlistener method will receive values emitted on the bus wheneversharer.foo.baris set.The shared bus used by this
SharingObserver.
Notify that a new bus is available.
When overriding this method you must call
super().on_new_shared_bus.
- class ska_tango_base.software_bus.Signal[source]
Descriptor to provide access to a signal on a shared bus.
Can only be used on classes which model
SharingObserverProtocol.Whenever the
Signalis set, a signal is emitted with the value provided.By default, the
Signaldescriptor is not gettable, however, ifstoredis set then the last value emitted will be returned on__get__.DEPRECATED: The
Signalcan be assigned a specialNoValueto clear the cached value. TheNoValueis emitted on the bus. Instead use thedelkeyword on aSignalto clear the cache without emitting anything. AssigningNoValueto aSignalwill raise an exception in the future.NoValuewill only be passed as the defaultinitial_value.The relative name of the signal is taken from the name of the
Signalon the owner class, and can be overridden by providingnamekwarg to the initialiser.The relative name is prefixed by the
observer_prefixof the owner object before being emitted on the bus.- Parameters:
name – Name of the signal, if
Nonethis objects name is used.stored – If
True, store the latest emitted value on the parent object.initial_value – If not
NoValue, emit this value when connected to a new bus.
- class ska_tango_base.software_bus.SignalBusMixin[source]
A base class for mix-ins to use the shared bus.
This class is not intended to be inherited from directly by users. Instead, it is intended to be used by mix-ins to utilities the bus.
- notify_emission(signal: str, value: Any) None[source]
Push change events for AttributeSignal objects.
- allow_internal_threads() Iterator[None][source]
Allow internal threads to run without allowing new Tango requests.
This is intended to be used from a Tango client request thread to allow internal threads to acquire the Tango monitor, without allowing another Tango request to start.
This is used in the following two places in ska-tango-base:
- During
init_device()to allow background threads to cleanup gracefully and be joined.
If we allow other threads to run during
init_device()ordelete_device(), a client could initiate a new request on our device as the Tango client thread is able to grab the device lock. This should be avoided because the device is only partially initialised. This method avoids this issue by arranging for Tango client threads to be disallowed from grabbing the Tango monitor, while still allowing other internal threads to acquire the monitor.
- During
- At the start of every request to allow the signal bus thread to
catch up.
This ensures that Tango clients get a sequentially consistent view of the Tango device. That is, if they call a command which modifies the value of an attribute then read that attribute, they will see the updated value, even if the attribute is using the signal bus (e.g. via attribute_from_signal).
Subclasses of SignalBusMixin should
- exception ska_tango_base.software_bus.TimedOutError[source]
Timed out waiting for the signal bus background thread.
- class ska_tango_base.software_bus.attribute_from_signal[source]
A Tango attribute linked to an signal.
attribute_from_signaltakes all the same keyword arguments astango.server.attribute, and by default will provide anfgetfunction which returns the last emitted value for the signal.Any value emitted for the signal will result in a change and archive events being pushed for the attribute. Clients reading the attribute will receive the value of the most recently push events.
The value emitted can either be the attribute value on its own,
None/NoValueor a “value triple”; that is a(<value>, <timestamp>, <quality>)tuple. WhenNone/NoValueis emitted, clients will receiveATTR_INVALIDdata. In order for an emission to be recognised as a “value triple” the<quality>must be an instance oftango.AttrQualityand<timestamp>must be eitherNoneor afloat(typically the return value oftime.time()). When the<timestamp>isNone, the client will receive the current time whenever the attribute value is sent over the network. UseAttrSignalfor type annotations that support emitting both the attribute value and the “value triple”.If
write_to_signalisTrueorfrom_tangois not None, then anfsetfunction will also be provided, so that any writes to the attribute will result in an value being emitted for the signal.If the signal is a
Signalobject, then the__get__and__set__magic methods will be used to interact with the signal, otherwise the signal will be used as the name of the signal, which will becanonicalise_relative_to()the Tango Device.attribute_from_signaloptionally supports conversion functionsto_tangoandfrom_tango. These will convert to and from the attribute value whenever the attribute is read/written or whenever change/archive events are pushed in response to a value being emitted for the signal.If the signal is a
Signalobject, then any type arguments from the signal will be used as thedtypefor the attribute. This can be overridden by providing adtypekeyword argument.If the conversion functions are provided, any annotations on the functions will be used to infer the
dtype, instead of any signal object provided. Again, this can be overridden with thedtypekeyword argument.- Parameters:
signal – Signal object or name of signal
to_tango – Function to convert from signal value to Tango value
from_tango – Function to convert from Tango value to signal value
write_to_signal – If True, include an attribute write method that will write to the signal
- Examples:
class MyDevice(SignalBusMixin, Device): my_basic_signal = Signal[int]() myAttr = attribute_from_signal( my_basic_signal, write_to_signal=True ) @listen_to_signal("my_basic_signal") def on_my_basic_signal(self, old_value, new_value): # Called whenever a Tango client writes to the # "myAttr" attribute if new_value == 2: self.set_state(DevState.FAULT) # The attribute will have "DevString" dtype. @attribute_from_signal(my_basic_signal) def myAttrAsStr(self, signal_value: int) -> str: return str(signal_value) # The attribute will have AttrWriteType.READ_WRITE @myAttr_readwrite_str.from_tango def myAttrAsStr(self, tango_value: str) -> int: return int(tango_value) my_attr_signal = AttrSignal[dict[str, str]]() # json.dumps has no type annotations, so we must provide the dtype readOnlyAttr = attribute_from_signal( my_attr_signal, dtype=str, to_tango=json.dumps ) @command def MyCmd(self): # myAttr and myAttrAsAString will push change/archive events self.my_basic_signal = 1 # readOnlyAttr will push change/archive events self.my_attr_signal = {"foo": "bar"} # AttrSignal allows this to type check, but otherwise does # nothing different to Signal self.my_attr_signal = ({}, time.time(), AttrQuality.ATTR_INVALID)
- __init__(signal: str | Signal[Any], /, to_tango: Callable[[...], Any] | None = None, from_tango: Callable[[...], Any] | None = None, write_to_signal: bool = False, **kwargs: Any) None[source]
Initialise the object.
- do_read(device: SignalBusMixin, attr: Attribute | None = None) tuple[Any, float, AttrQuality][source]
Read the signal value for the Tango attribute.
- do_write(device: SignalBusMixin, attr_or_val: Any) None[source]
Write a Tango attribute value to a signal.
- to_tango(to_tango: Callable[[...], Any]) attribute_from_signal[source]
Decorate attribute with to tango conversion function.
- from_tango(from_tango: Callable[[...], Any]) attribute_from_signal[source]
Decorate attribute with from tango conversion function.
- on_init_device(device: SignalBusMixin) None[source]
Register for events.
Called by owner device during
init_device()and whenever the attribute is added dynamically.
- on_emission(device: SignalBusMixin, new_value: Any) None[source]
Push events for emission.
Called by owner device whenever a signal value is emitted for this attribute.
- read(fn: Any) attribute_from_signal[source]
Spoof read when in autodoc.
- write(fn: Any) attribute_from_signal[source]
Spoof write when in autodoc.
- is_allowed(f: Any) attribute_from_signal[source]
Spoof write when in autodoc.
- ska_tango_base.software_bus.canonicalise_relative_to(obj: SharingObserverProtocol, signal: str) str[source]
Return the absolute signal name, relative to obj.
If signal is already absolute, it is returned as is.
- ska_tango_base.software_bus.listen_to_signal(signal: str | Signal[ValueT], listener: Callable[[ObserverT, Any], None] | None = None) Callable[[Callable[[ObserverT, Any], None]], ListenerMethod] | Callable[[Callable[[SharingObserverT, Any], None]], ListenerMethod] | ListenerMethod[source]
Mark a method as listening to a signal.
This function will be called asynchronously whenever a value is emitted for the signal.
For
Observerobjects, only a signal specified as a str is supported, however, forSharingObserverobjects both str andSignalobjects are accepted.For a
SharingObserver, signal names are relative to the parent object.
- ska_tango_base.software_bus.NoValue
Used as the default initial value for a stored
Signal.Required to distinguish from
None, which is a valid value to emit.
Testing helpers for software buses.
- ska_tango_base.software_bus.testing.bus_test_context(*observers: ObserverProtocol) Generator[BusProtocol, None, None][source]
Context manager that provides a temporary signal bus for testing.
Automatically registers the provided observers before starting emission processing. Any instances of
SharingObserverwill have theirshared_busproperty assigned instead of being register directly.- Parameters:
observers – The observers to be registered with the bus.
- Returns:
The signal bus instance.