#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
#
from __future__ import annotations
import inspect
import time
import typing
import warnings
import tango
from packaging import version
from .. import type_hints
from .._autodoc_hacks import _TANGO_IS_MOCKED_BY_AUTODOC
from .._notango import TimedAttrData as _TimedAttrData
from ..utils import _hold_tango_monitor
from ._observer import canonicalise_relative_to
from ._signal import NoValue, NoValueType, Signal
T = typing.TypeVar("T")
_ConvFunc: typing.TypeAlias = typing.Callable[..., typing.Any]
def _get_type_hint_from_signal(signal: str | Signal[typing.Any]) -> typing.Any | None:
"""
Return dtype hint from a signal.
In general, for `Signal[T]` we return `T`, otherwise we return `None`.
"""
if not isinstance(signal, Signal):
return None
if hasattr(signal, "__orig_class__"):
args = typing.get_args(signal.__orig_class__)
else:
return None
if len(args) != 1:
return None
return args[0]
def _default_value_for(attr: tango.Attribute) -> typing.Any:
if attr.get_data_format() == tango.AttrDataFormat.SCALAR:
match attr.get_data_type():
case tango.CmdArgType.DevState:
return tango.DevState.UNKNOWN
case tango.CmdArgType.DevString:
return ""
case _:
return 0
return []
# To avoid cyclical imports we provide an internal protocol for the
# SignalBusMixin here.
class _SignalBusMixinProto(type_hints.SharingObserverProtocol, typing.Protocol):
@property
def _SignalBusMixin__attr_values(self) -> dict[str, typing.Any]: ...
def set_change_event(
self, attr: str, implement: bool, detect: bool = True
) -> None: ...
def set_archive_event(
self, attr: str, implement: bool, detect: bool = True
) -> None: ...
def is_polled(self) -> bool: ...
def is_attribute_polled(self, attr: str) -> bool: ...
@typing.overload
def push_change_event(self, attr: str, value: typing.Any) -> None: ...
@typing.overload
def push_change_event(
self, attr: str, value: typing.Any, date: float, quality: tango.AttrQuality
) -> None: ...
@typing.overload
def push_archive_event(self, attr: str, value: typing.Any) -> None: ...
@typing.overload
def push_archive_event(
self, attr: str, value: typing.Any, date: float, quality: tango.AttrQuality
) -> None: ...
# We are following the same style as pytango here, so use
# snake case for this class.
[docs]
class attribute_from_signal(tango.server.attribute): # noqa: N801
"""
A Tango attribute linked to an signal.
``attribute_from_signal`` takes all the same keyword arguments as
:py:class:`tango.server.attribute`, and by default will provide an ``fget``
function which returns the last emitted value for the signal.
Any value emitted for the signal will result in a change and archive events
being pushed for the attribute. Clients reading the attribute will receive
the value of the most recently push events.
The value emitted can either be the attribute value on its own,
``None``/:obj:`NoValue` or a "value triple"; that is a ``(<value>, <timestamp>,
<quality>)`` tuple. When ``None``/:obj:`NoValue` is emitted, clients will receive
``ATTR_INVALID`` data. In order for an emission to be recognised as a
"value triple" the ``<quality>`` must be an instance of
:class:`tango.AttrQuality` and ``<timestamp>`` must be either ``None`` or a
``float`` (typically the return value of :func:`time.time`). When the
``<timestamp>`` is ``None``, the client will receive the current time
whenever the attribute value is sent over the network. Use
:class:`AttrSignal` for type annotations that support emitting both the
attribute value and the "value triple".
If ``write_to_signal`` is ``True`` or ``from_tango`` is not None,
then an ``fset`` function will also be provided, so that any writes to the
attribute will result in an value being emitted for the signal.
If the signal is a :py:class:`Signal` object, then the ``__get__`` and ``__set__``
magic methods will be used to interact with the signal, otherwise the signal
will be used as the name of the signal, which will be
:py:func:`canonicalise_relative_to()` the Tango Device.
``attribute_from_signal`` optionally supports conversion functions ``to_tango``
and ``from_tango``. These will convert to and from the attribute value whenever
the attribute is read/written or whenever change/archive events are pushed in
response to a value being emitted for the signal.
If the signal is a :py:class:`Signal` object, then any type arguments from
the signal will be used as the ``dtype`` for the attribute. This can be
overridden by providing a ``dtype`` keyword argument.
If the conversion functions are provided, any annotations on the functions will be
used to infer the ``dtype``, instead of any signal object provided. Again, this
can be overridden with the ``dtype`` keyword argument.
:param signal: Signal object or name of signal
:param to_tango: Function to convert from signal value to Tango value
:param from_tango: Function to convert from Tango value to signal value
:param write_to_signal: If True, include an attribute write method that will write
to the signal
Examples:
.. code:: python
class MyDevice(SignalBusMixin, Device):
my_basic_signal = Signal[int]()
myAttr = attribute_from_signal(
my_basic_signal, write_to_signal=True
)
@listen_to_signal("my_basic_signal")
def on_my_basic_signal(self, old_value, new_value):
# Called whenever a Tango client writes to the
# "myAttr" attribute
if new_value == 2:
self.set_state(DevState.FAULT)
# The attribute will have "DevString" dtype.
@attribute_from_signal(my_basic_signal)
def myAttrAsStr(self, signal_value: int) -> str:
return str(signal_value)
# The attribute will have AttrWriteType.READ_WRITE
@myAttr_readwrite_str.from_tango
def myAttrAsStr(self, tango_value: str) -> int:
return int(tango_value)
my_attr_signal = AttrSignal[dict[str, str]]()
# json.dumps has no type annotations, so we must provide the dtype
readOnlyAttr = attribute_from_signal(
my_attr_signal, dtype=str, to_tango=json.dumps
)
@command
def MyCmd(self):
# myAttr and myAttrAsAString will push change/archive events
self.my_basic_signal = 1
# readOnlyAttr will push change/archive events
self.my_attr_signal = {"foo": "bar"}
# AttrSignal allows this to type check, but otherwise does
# nothing different to Signal
self.my_attr_signal = ({}, time.time(), AttrQuality.ATTR_INVALID)
"""
[docs]
def __init__(
self,
signal: str | Signal[typing.Any],
/,
to_tango: _ConvFunc | None = None,
from_tango: _ConvFunc | None = None,
write_to_signal: bool = False,
**kwargs: typing.Any,
) -> None:
"""Initialise the object."""
self._signal = signal
self._to_tango = to_tango
self._from_tango = from_tango
if "initial_value" in kwargs:
warnings.warn(
'"initial_value" is not used by attribute_from_signal. '
"Passing this will be an error in ska-tango-base 2.0.0.",
FutureWarning,
)
kwargs.pop("initial_value")
if "fget" not in kwargs:
kwargs["fget"] = self._make_fget()
if write_to_signal or self._from_tango is not None:
kwargs["fset"] = self._make_fset()
super().__init__(**kwargs)
[docs]
def do_read(
self,
device: _SignalBusMixinProto,
attr: tango.device_server.Attribute | None = None,
) -> tuple[typing.Any, float, tango.AttrQuality]:
"""Read the signal value for the Tango attribute."""
# The signal bus thread (which is allowed to run in
# always_executed_hook) may have called `push_change_event`, which
# will set the value of the attribute. This means that pytango will
# not set the value we return from here. So, we need to make sure
# that the value flag is cleared when we leave this function. This is a
# work around for cppTango#1407.
if attr is None:
attr = self.__get__(device, type(device))
if version.parse(tango.__version__) < version.parse("10.1.0"):
attr.set_value_flag(False)
value = device._SignalBusMixin__attr_values.get(self.attr_name)
if value is None:
value = self._make_data(device, NoValue)
assert isinstance(value, _TimedAttrData)
return _data_to_triple(value)
def _make_fget(
self,
) -> typing.Callable[[_SignalBusMixinProto, typing.Any], typing.Any]:
def fget( # type: ignore
device: _SignalBusMixinProto,
attr: tango.device_sever.Attribute | None = None,
):
"""Expose a signal as a Tango attribute."""
return self.do_read(device, attr)
annotation = None
if self._to_tango is not None:
if "return" in getattr(self._to_tango, "__annotations__", {}):
sig = inspect.signature(self._to_tango, eval_str=False)
if sig.return_annotation is not inspect.Signature.empty:
annotation = sig.return_annotation
else:
annotation = _get_type_hint_from_signal(self._signal)
if annotation is not None:
fget.__annotations__["return"] = annotation
return fget
[docs]
def do_write(self, device: _SignalBusMixinProto, attr_or_val: typing.Any) -> None:
"""Write a Tango attribute value to a signal."""
if isinstance(attr_or_val, tango.device_server.Attribute):
value = attr_or_val.get_write_value()
else:
value = attr_or_val
value = self._convert_from_tango(device, value)
if isinstance(self._signal, Signal):
self._signal.__set__(device, value)
else:
device.shared_bus.emit(
canonicalise_relative_to(device, self._signal), value
)
def _make_fset(self) -> typing.Callable[[_SignalBusMixinProto, typing.Any], None]:
def fset(device: _SignalBusMixinProto, attr_or_val: typing.Any) -> None:
"""Expose a signal as a Tango attribute."""
self.do_write(device, attr_or_val)
annotation = None
if self._from_tango is not None:
sig = inspect.signature(self._from_tango, eval_str=False)
*_, param_anno = (x.annotation for x in sig.parameters.values())
if param_anno is not inspect.Parameter.empty:
annotation = param_anno
else:
annotation = _get_type_hint_from_signal(self._signal)
if annotation is not inspect.Parameter.empty:
fset.__annotations__["attr_or_val"] = annotation
return fset
@property
def signal_name(self) -> str | None:
"""Return the name of the signal associated with this attribute."""
if isinstance(self._signal, Signal):
return self._signal.relative_name
return self._signal
def _convert_to_tango(
self, device: _SignalBusMixinProto, signal_value: typing.Any
) -> typing.Any:
"""Convert the signal value to be used with Tango."""
if self._to_tango is not None:
if getattr(self._to_tango, "__to_tango_with_device__", False):
return self._to_tango(device, signal_value)
return self._to_tango(signal_value)
return signal_value
def _convert_from_tango(
self, device: _SignalBusMixinProto, tango_value: typing.Any
) -> typing.Any:
"""Convert the tango value to a value to be emitted."""
if self._from_tango is not None:
if getattr(self._from_tango, "__from_tango_with_device__", False):
return self._from_tango(device, tango_value)
return self._from_tango(tango_value)
return tango_value
[docs]
def to_tango(self, to_tango: _ConvFunc) -> attribute_from_signal:
"""Decorate attribute with to tango conversion function."""
self._to_tango = to_tango
self._to_tango.__to_tango_with_device__ = True # type: ignore[attr-defined]
return self.read(self._make_fget())
[docs]
def from_tango(self, from_tango: _ConvFunc) -> attribute_from_signal:
"""Decorate attribute with from tango conversion function."""
self._from_tango = from_tango
self._from_tango.__from_tango_with_device__ = True # type: ignore[attr-defined]
return self.write(self._make_fset())
[docs]
def on_init_device(self, device: _SignalBusMixinProto) -> None:
"""
Register for events.
Called by owner device during ``init_device()`` and whenever the attribute
is added dynamically.
"""
device.set_change_event(self.attr_name, True, True)
device.set_archive_event(self.attr_name, True, True)
[docs]
def on_emission(self, device: _SignalBusMixinProto, new_value: typing.Any) -> None:
"""
Push events for emission.
Called by owner device whenever a signal value is emitted for this
attribute.
"""
data = self._make_data(device, new_value)
args = _data_to_triple(data)
with _hold_tango_monitor(device):
device.push_change_event(self.attr_name, *args)
device.push_archive_event(self.attr_name, *args)
if version.parse(tango.__version__) >= version.parse("10.1.0"):
if device.is_polled() and device.is_attribute_polled(self.attr_name):
tg = tango.Util.instance(False)
tg.fill_attr_polling_buffer(device, self.attr_name, data)
device._SignalBusMixin__attr_values[self.attr_name] = data
if _TANGO_IS_MOCKED_BY_AUTODOC:
[docs]
def read(self, fn: typing.Any) -> attribute_from_signal:
"""Spoof read when in autodoc."""
return self
[docs]
def write(self, fn: typing.Any) -> attribute_from_signal:
"""Spoof write when in autodoc."""
return self
[docs]
def is_allowed(self, f: typing.Any) -> attribute_from_signal:
"""Spoof write when in autodoc."""
return self
def __call__(self, to_tango: _ConvFunc) -> attribute_from_signal:
"""Provide a conversion to tango function."""
if _TANGO_IS_MOCKED_BY_AUTODOC:
return self
return self.to_tango(to_tango)
def _make_data(
self, device: _SignalBusMixinProto, value: typing.Any
) -> _TimedAttrData:
attr = self.__get__(device, type(device))
if value is NoValue:
data = _TimedAttrData(
_default_value_for(attr), tango.AttrQuality.ATTR_INVALID
)
elif _is_value_triple(value):
converted_value = self._convert_to_tango(device, value[0])
time_stamp = value[1]
quality = value[2]
if converted_value is None:
if quality == tango.AttrQuality.ATTR_INVALID:
data = _TimedAttrData(
_default_value_for(attr), quality, time_stamp=time_stamp
)
else:
raise ValueError(f"'{quality}' quality not allowed for 'None'")
else:
data = _TimedAttrData(converted_value, quality, time_stamp=time_stamp)
else:
converted_value = self._convert_to_tango(device, value)
if converted_value is None:
data = _TimedAttrData(
_default_value_for(attr), tango.AttrQuality.ATTR_INVALID
)
else:
data = _TimedAttrData(converted_value)
return data
def _is_value_triple(value: typing.Any) -> bool:
return (
isinstance(value, tuple)
and len(value) == 3
and (value[1] is None or isinstance(value[1], float))
and isinstance(value[2], tango.AttrQuality)
)
def _data_to_triple(
data: _TimedAttrData,
) -> tuple[typing.Any, float, tango.AttrQuality]:
# TODO: Handle errors
time_stamp = data.time_stamp
if time_stamp is None:
time_stamp = time.time()
return data.value, time_stamp, data.quality
[docs]
class AttrSignal(
Signal[T | tuple[T, float | None, tango.AttrQuality] | None], typing.Generic[T]
):
"""
Convenience signal for working with Tango attributes.
This class is intended to be used with :py:class:`attribute_from_signal`.
It is provided as a convenient way to create a :py:class:`Signal` which
supports setting a value directly and also setting a (data, date, quality)
triple that can be used with :py:meth:`tango.Attribute.set_value_date_quality`.
In both cases the value that is set is directly emitted.
This class provides no runtime behaviour. It only exists to provide the correct
type hints and allow :py:class:`attribute_from_signal` to infer its ``dtype`` from
the signal object.
Example:
.. code:: python
class MyDevice(SignalBusMixin, Device):
my_signal = AttrSignal[int]()
# dtype inferred from my_signal
myAttr = attribute_from_signal(my_signal)
@command
def MyCmd(self):
self.my_signal = 0
self.my_signal = (1, time.time(), AttrQuality.WARNING)
self.my_signal = None
"""
[docs]
class CachingAttrSignal(Signal[tuple[T, float, tango.AttrQuality]], typing.Generic[T]):
"""
Signal that includes a time stamp of when the value was emitted.
This signal is intended to be used :py:class:`attribute_from_signal` to as a
Tango attribute where the value is provided by some other thread, for example,
a polling loop.
When ``CachingAttrSignal`` is set with a value ``data``, a ``(data, date, quality)``
triple is emitted on the shared bus. The ``date`` is recorded as the time that
the ``CachingAttrSignal`` was set and the ``quality`` is ``ATTR_VALID``.
It is possible to override the values of ``date`` by setting the
``(data, date, quality)`` directly.
"""
def __set__(
self: CachingAttrSignal[T],
obj: type_hints.SharingObserverProtocol,
value: T | tuple[T, float, tango.AttrQuality] | None | NoValueType,
) -> None:
"""Emit the signal."""
if value is NoValue:
value = typing.cast(NoValueType, value) # type: ignore[redundant-cast]
elif _is_value_triple(value):
value = typing.cast(tuple[T, float, tango.AttrQuality], value)
elif value is None:
value = (typing.cast(T, value), time.time(), tango.AttrQuality.ATTR_INVALID)
else:
value = (typing.cast(T, value), time.time(), tango.AttrQuality.ATTR_VALID)
super().__set__(obj, value)
[docs]
class LastEmittedValue(Signal[T], typing.Generic[T]):
"""Get the last emitted value stored for a signal on the shared bus."""
[docs]
def __init__(
self,
/,
name: str | None = None,
) -> None:
"""
Get the last emitted value stored for a signal on the shared bus.
:param name: Name of the signal, if ``None``, this object's name is used.
"""
super().__init__(name=name, stored=True)
def __set__(
self: LastEmittedValue[T],
obj: type_hints.SharingObserverProtocol,
value: T | tuple[T, float | None, tango.AttrQuality] | None | NoValueType,
) -> None:
name = self._absolute_name_for(obj)
raise AttributeError(
f"'{obj.__class__.__name__}.{self.relative_name}' only allows reading the "
f"last emitted value of signal '{name}' on the shared bus."
)
def __delete__(
self: LastEmittedValue[T], obj: type_hints.SharingObserverProtocol
) -> None:
name = self._absolute_name_for(obj)
raise AttributeError(
f"'{obj.__class__.__name__}.{self.relative_name}' only allows reading the "
f"last emitted value of signal '{name}' on the shared bus."
)
class _DeprecatedAttrSignal(AttrSignal[T], typing.Generic[T]):
"""
AttrSignal that emits a deprecation warning when set.
Should be used to move a stored signal to be read-only.
"""
def __init__(
self: _DeprecatedAttrSignal[T],
alternative: str,
*args: typing.Any,
**kwargs: typing.Any,
):
super().__init__(*args, **kwargs)
self._alternative = alternative
def __set__(
self: _DeprecatedAttrSignal[T],
obj: type_hints.SharingObserverProtocol,
value: T | tuple[T, float | None, tango.AttrQuality] | None | NoValueType,
) -> None:
warnings.warn(
f"Writing to signal '{self._relative_name}' directly is deprecated and "
"will be disallowed in the next release of ska-tango-base. "
f"Use '{self._alternative}' instead.",
DeprecationWarning,
)
super().__set__(obj, value)