Source code for ska_tango_base.software_bus._attribute

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
#
from __future__ import annotations

import inspect
import time
import typing
import warnings

import tango
from packaging import version

from .. import type_hints
from .._autodoc_hacks import _TANGO_IS_MOCKED_BY_AUTODOC
from .._notango import TimedAttrData as _TimedAttrData
from ..utils import _hold_tango_monitor
from ._observer import canonicalise_relative_to
from ._signal import NoValue, NoValueType, Signal

T = typing.TypeVar("T")


_ConvFunc: typing.TypeAlias = typing.Callable[..., typing.Any]


def _get_type_hint_from_signal(signal: str | Signal[typing.Any]) -> typing.Any | None:
    """
    Return dtype hint from a signal.

    In general, for `Signal[T]` we return `T`, otherwise we return `None`.
    """
    if not isinstance(signal, Signal):
        return None

    if hasattr(signal, "__orig_class__"):
        args = typing.get_args(signal.__orig_class__)
    else:
        return None

    if len(args) != 1:
        return None

    return args[0]


def _default_value_for(attr: tango.Attribute) -> typing.Any:
    if attr.get_data_format() == tango.AttrDataFormat.SCALAR:
        match attr.get_data_type():
            case tango.CmdArgType.DevState:
                return tango.DevState.UNKNOWN
            case tango.CmdArgType.DevString:
                return ""
            case _:
                return 0

    return []


# To avoid cyclical imports we provide an internal protocol for the
# SignalBusMixin here.
class _SignalBusMixinProto(type_hints.SharingObserverProtocol, typing.Protocol):
    @property
    def _SignalBusMixin__attr_values(self) -> dict[str, typing.Any]: ...

    def set_change_event(
        self, attr: str, implement: bool, detect: bool = True
    ) -> None: ...
    def set_archive_event(
        self, attr: str, implement: bool, detect: bool = True
    ) -> None: ...

    def is_polled(self) -> bool: ...
    def is_attribute_polled(self, attr: str) -> bool: ...

    @typing.overload
    def push_change_event(self, attr: str, value: typing.Any) -> None: ...

    @typing.overload
    def push_change_event(
        self, attr: str, value: typing.Any, date: float, quality: tango.AttrQuality
    ) -> None: ...

    @typing.overload
    def push_archive_event(self, attr: str, value: typing.Any) -> None: ...

    @typing.overload
    def push_archive_event(
        self, attr: str, value: typing.Any, date: float, quality: tango.AttrQuality
    ) -> None: ...


# We are following the same style as pytango here, so use
# snake case for this class.
[docs] class attribute_from_signal(tango.server.attribute): # noqa: N801 """ A Tango attribute linked to an signal. ``attribute_from_signal`` takes all the same keyword arguments as :py:class:`tango.server.attribute`, and by default will provide an ``fget`` function which returns the last emitted value for the signal. Any value emitted for the signal will result in a change and archive events being pushed for the attribute. Clients reading the attribute will receive the value of the most recently push events. The value emitted can either be the attribute value on its own, ``None``/:obj:`NoValue` or a "value triple"; that is a ``(<value>, <timestamp>, <quality>)`` tuple. When ``None``/:obj:`NoValue` is emitted, clients will receive ``ATTR_INVALID`` data. In order for an emission to be recognised as a "value triple" the ``<quality>`` must be an instance of :class:`tango.AttrQuality` and ``<timestamp>`` must be either ``None`` or a ``float`` (typically the return value of :func:`time.time`). When the ``<timestamp>`` is ``None``, the client will receive the current time whenever the attribute value is sent over the network. Use :class:`AttrSignal` for type annotations that support emitting both the attribute value and the "value triple". If ``write_to_signal`` is ``True`` or ``from_tango`` is not None, then an ``fset`` function will also be provided, so that any writes to the attribute will result in an value being emitted for the signal. If the signal is a :py:class:`Signal` object, then the ``__get__`` and ``__set__`` magic methods will be used to interact with the signal, otherwise the signal will be used as the name of the signal, which will be :py:func:`canonicalise_relative_to()` the Tango Device. ``attribute_from_signal`` optionally supports conversion functions ``to_tango`` and ``from_tango``. These will convert to and from the attribute value whenever the attribute is read/written or whenever change/archive events are pushed in response to a value being emitted for the signal. If the signal is a :py:class:`Signal` object, then any type arguments from the signal will be used as the ``dtype`` for the attribute. This can be overridden by providing a ``dtype`` keyword argument. If the conversion functions are provided, any annotations on the functions will be used to infer the ``dtype``, instead of any signal object provided. Again, this can be overridden with the ``dtype`` keyword argument. :param signal: Signal object or name of signal :param to_tango: Function to convert from signal value to Tango value :param from_tango: Function to convert from Tango value to signal value :param write_to_signal: If True, include an attribute write method that will write to the signal Examples: .. code:: python class MyDevice(SignalBusMixin, Device): my_basic_signal = Signal[int]() myAttr = attribute_from_signal( my_basic_signal, write_to_signal=True ) @listen_to_signal("my_basic_signal") def on_my_basic_signal(self, old_value, new_value): # Called whenever a Tango client writes to the # "myAttr" attribute if new_value == 2: self.set_state(DevState.FAULT) # The attribute will have "DevString" dtype. @attribute_from_signal(my_basic_signal) def myAttrAsStr(self, signal_value: int) -> str: return str(signal_value) # The attribute will have AttrWriteType.READ_WRITE @myAttr_readwrite_str.from_tango def myAttrAsStr(self, tango_value: str) -> int: return int(tango_value) my_attr_signal = AttrSignal[dict[str, str]]() # json.dumps has no type annotations, so we must provide the dtype readOnlyAttr = attribute_from_signal( my_attr_signal, dtype=str, to_tango=json.dumps ) @command def MyCmd(self): # myAttr and myAttrAsAString will push change/archive events self.my_basic_signal = 1 # readOnlyAttr will push change/archive events self.my_attr_signal = {"foo": "bar"} # AttrSignal allows this to type check, but otherwise does # nothing different to Signal self.my_attr_signal = ({}, time.time(), AttrQuality.ATTR_INVALID) """
[docs] def __init__( self, signal: str | Signal[typing.Any], /, to_tango: _ConvFunc | None = None, from_tango: _ConvFunc | None = None, write_to_signal: bool = False, **kwargs: typing.Any, ) -> None: """Initialise the object.""" self._signal = signal self._to_tango = to_tango self._from_tango = from_tango if "initial_value" in kwargs: warnings.warn( '"initial_value" is not used by attribute_from_signal. ' "Passing this will be an error in ska-tango-base 2.0.0.", FutureWarning, ) kwargs.pop("initial_value") if "fget" not in kwargs: kwargs["fget"] = self._make_fget() if write_to_signal or self._from_tango is not None: kwargs["fset"] = self._make_fset() super().__init__(**kwargs)
[docs] def do_read( self, device: _SignalBusMixinProto, attr: tango.device_server.Attribute | None = None, ) -> tuple[typing.Any, float, tango.AttrQuality]: """Read the signal value for the Tango attribute.""" # The signal bus thread (which is allowed to run in # always_executed_hook) may have called `push_change_event`, which # will set the value of the attribute. This means that pytango will # not set the value we return from here. So, we need to make sure # that the value flag is cleared when we leave this function. This is a # work around for cppTango#1407. if attr is None: attr = self.__get__(device, type(device)) if version.parse(tango.__version__) < version.parse("10.1.0"): attr.set_value_flag(False) value = device._SignalBusMixin__attr_values.get(self.attr_name) if value is None: value = self._make_data(device, NoValue) assert isinstance(value, _TimedAttrData) return _data_to_triple(value)
def _make_fget( self, ) -> typing.Callable[[_SignalBusMixinProto, typing.Any], typing.Any]: def fget( # type: ignore device: _SignalBusMixinProto, attr: tango.device_sever.Attribute | None = None, ): """Expose a signal as a Tango attribute.""" return self.do_read(device, attr) annotation = None if self._to_tango is not None: if "return" in getattr(self._to_tango, "__annotations__", {}): sig = inspect.signature(self._to_tango, eval_str=False) if sig.return_annotation is not inspect.Signature.empty: annotation = sig.return_annotation else: annotation = _get_type_hint_from_signal(self._signal) if annotation is not None: fget.__annotations__["return"] = annotation return fget
[docs] def do_write(self, device: _SignalBusMixinProto, attr_or_val: typing.Any) -> None: """Write a Tango attribute value to a signal.""" if isinstance(attr_or_val, tango.device_server.Attribute): value = attr_or_val.get_write_value() else: value = attr_or_val value = self._convert_from_tango(device, value) if isinstance(self._signal, Signal): self._signal.__set__(device, value) else: device.shared_bus.emit( canonicalise_relative_to(device, self._signal), value )
def _make_fset(self) -> typing.Callable[[_SignalBusMixinProto, typing.Any], None]: def fset(device: _SignalBusMixinProto, attr_or_val: typing.Any) -> None: """Expose a signal as a Tango attribute.""" self.do_write(device, attr_or_val) annotation = None if self._from_tango is not None: sig = inspect.signature(self._from_tango, eval_str=False) *_, param_anno = (x.annotation for x in sig.parameters.values()) if param_anno is not inspect.Parameter.empty: annotation = param_anno else: annotation = _get_type_hint_from_signal(self._signal) if annotation is not inspect.Parameter.empty: fset.__annotations__["attr_or_val"] = annotation return fset @property def signal_name(self) -> str | None: """Return the name of the signal associated with this attribute.""" if isinstance(self._signal, Signal): return self._signal.relative_name return self._signal def _convert_to_tango( self, device: _SignalBusMixinProto, signal_value: typing.Any ) -> typing.Any: """Convert the signal value to be used with Tango.""" if self._to_tango is not None: if getattr(self._to_tango, "__to_tango_with_device__", False): return self._to_tango(device, signal_value) return self._to_tango(signal_value) return signal_value def _convert_from_tango( self, device: _SignalBusMixinProto, tango_value: typing.Any ) -> typing.Any: """Convert the tango value to a value to be emitted.""" if self._from_tango is not None: if getattr(self._from_tango, "__from_tango_with_device__", False): return self._from_tango(device, tango_value) return self._from_tango(tango_value) return tango_value
[docs] def to_tango(self, to_tango: _ConvFunc) -> attribute_from_signal: """Decorate attribute with to tango conversion function.""" self._to_tango = to_tango self._to_tango.__to_tango_with_device__ = True # type: ignore[attr-defined] return self.read(self._make_fget())
[docs] def from_tango(self, from_tango: _ConvFunc) -> attribute_from_signal: """Decorate attribute with from tango conversion function.""" self._from_tango = from_tango self._from_tango.__from_tango_with_device__ = True # type: ignore[attr-defined] return self.write(self._make_fset())
[docs] def on_init_device(self, device: _SignalBusMixinProto) -> None: """ Register for events. Called by owner device during ``init_device()`` and whenever the attribute is added dynamically. """ device.set_change_event(self.attr_name, True, True) device.set_archive_event(self.attr_name, True, True)
[docs] def on_emission(self, device: _SignalBusMixinProto, new_value: typing.Any) -> None: """ Push events for emission. Called by owner device whenever a signal value is emitted for this attribute. """ data = self._make_data(device, new_value) args = _data_to_triple(data) with _hold_tango_monitor(device): device.push_change_event(self.attr_name, *args) device.push_archive_event(self.attr_name, *args) if version.parse(tango.__version__) >= version.parse("10.1.0"): if device.is_polled() and device.is_attribute_polled(self.attr_name): tg = tango.Util.instance(False) tg.fill_attr_polling_buffer(device, self.attr_name, data) device._SignalBusMixin__attr_values[self.attr_name] = data
if _TANGO_IS_MOCKED_BY_AUTODOC:
[docs] def read(self, fn: typing.Any) -> attribute_from_signal: """Spoof read when in autodoc.""" return self
[docs] def write(self, fn: typing.Any) -> attribute_from_signal: """Spoof write when in autodoc.""" return self
[docs] def is_allowed(self, f: typing.Any) -> attribute_from_signal: """Spoof write when in autodoc.""" return self
def __call__(self, to_tango: _ConvFunc) -> attribute_from_signal: """Provide a conversion to tango function.""" if _TANGO_IS_MOCKED_BY_AUTODOC: return self return self.to_tango(to_tango) def _make_data( self, device: _SignalBusMixinProto, value: typing.Any ) -> _TimedAttrData: attr = self.__get__(device, type(device)) if value is NoValue: data = _TimedAttrData( _default_value_for(attr), tango.AttrQuality.ATTR_INVALID ) elif _is_value_triple(value): converted_value = self._convert_to_tango(device, value[0]) time_stamp = value[1] quality = value[2] if converted_value is None: if quality == tango.AttrQuality.ATTR_INVALID: data = _TimedAttrData( _default_value_for(attr), quality, time_stamp=time_stamp ) else: raise ValueError(f"'{quality}' quality not allowed for 'None'") else: data = _TimedAttrData(converted_value, quality, time_stamp=time_stamp) else: converted_value = self._convert_to_tango(device, value) if converted_value is None: data = _TimedAttrData( _default_value_for(attr), tango.AttrQuality.ATTR_INVALID ) else: data = _TimedAttrData(converted_value) return data
def _is_value_triple(value: typing.Any) -> bool: return ( isinstance(value, tuple) and len(value) == 3 and (value[1] is None or isinstance(value[1], float)) and isinstance(value[2], tango.AttrQuality) ) def _data_to_triple( data: _TimedAttrData, ) -> tuple[typing.Any, float, tango.AttrQuality]: # TODO: Handle errors time_stamp = data.time_stamp if time_stamp is None: time_stamp = time.time() return data.value, time_stamp, data.quality
[docs] class AttrSignal( Signal[T | tuple[T, float | None, tango.AttrQuality] | None], typing.Generic[T] ): """ Convenience signal for working with Tango attributes. This class is intended to be used with :py:class:`attribute_from_signal`. It is provided as a convenient way to create a :py:class:`Signal` which supports setting a value directly and also setting a (data, date, quality) triple that can be used with :py:meth:`tango.Attribute.set_value_date_quality`. In both cases the value that is set is directly emitted. This class provides no runtime behaviour. It only exists to provide the correct type hints and allow :py:class:`attribute_from_signal` to infer its ``dtype`` from the signal object. Example: .. code:: python class MyDevice(SignalBusMixin, Device): my_signal = AttrSignal[int]() # dtype inferred from my_signal myAttr = attribute_from_signal(my_signal) @command def MyCmd(self): self.my_signal = 0 self.my_signal = (1, time.time(), AttrQuality.WARNING) self.my_signal = None """
[docs] class CachingAttrSignal(Signal[tuple[T, float, tango.AttrQuality]], typing.Generic[T]): """ Signal that includes a time stamp of when the value was emitted. This signal is intended to be used :py:class:`attribute_from_signal` to as a Tango attribute where the value is provided by some other thread, for example, a polling loop. When ``CachingAttrSignal`` is set with a value ``data``, a ``(data, date, quality)`` triple is emitted on the shared bus. The ``date`` is recorded as the time that the ``CachingAttrSignal`` was set and the ``quality`` is ``ATTR_VALID``. It is possible to override the values of ``date`` by setting the ``(data, date, quality)`` directly. """ def __set__( self: CachingAttrSignal[T], obj: type_hints.SharingObserverProtocol, value: T | tuple[T, float, tango.AttrQuality] | None | NoValueType, ) -> None: """Emit the signal.""" if value is NoValue: value = typing.cast(NoValueType, value) # type: ignore[redundant-cast] elif _is_value_triple(value): value = typing.cast(tuple[T, float, tango.AttrQuality], value) elif value is None: value = (typing.cast(T, value), time.time(), tango.AttrQuality.ATTR_INVALID) else: value = (typing.cast(T, value), time.time(), tango.AttrQuality.ATTR_VALID) super().__set__(obj, value)
[docs] class LastEmittedValue(Signal[T], typing.Generic[T]): """Get the last emitted value stored for a signal on the shared bus."""
[docs] def __init__( self, /, name: str | None = None, ) -> None: """ Get the last emitted value stored for a signal on the shared bus. :param name: Name of the signal, if ``None``, this object's name is used. """ super().__init__(name=name, stored=True)
def __set__( self: LastEmittedValue[T], obj: type_hints.SharingObserverProtocol, value: T | tuple[T, float | None, tango.AttrQuality] | None | NoValueType, ) -> None: name = self._absolute_name_for(obj) raise AttributeError( f"'{obj.__class__.__name__}.{self.relative_name}' only allows reading the " f"last emitted value of signal '{name}' on the shared bus." ) def __delete__( self: LastEmittedValue[T], obj: type_hints.SharingObserverProtocol ) -> None: name = self._absolute_name_for(obj) raise AttributeError( f"'{obj.__class__.__name__}.{self.relative_name}' only allows reading the " f"last emitted value of signal '{name}' on the shared bus." )
class _DeprecatedAttrSignal(AttrSignal[T], typing.Generic[T]): """ AttrSignal that emits a deprecation warning when set. Should be used to move a stored signal to be read-only. """ def __init__( self: _DeprecatedAttrSignal[T], alternative: str, *args: typing.Any, **kwargs: typing.Any, ): super().__init__(*args, **kwargs) self._alternative = alternative def __set__( self: _DeprecatedAttrSignal[T], obj: type_hints.SharingObserverProtocol, value: T | tuple[T, float | None, tango.AttrQuality] | None | NoValueType, ) -> None: warnings.warn( f"Writing to signal '{self._relative_name}' directly is deprecated and " "will be disallowed in the next release of ska-tango-base. " f"Use '{self._alternative}' instead.", DeprecationWarning, ) super().__set__(obj, value)