#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
#
from __future__ import annotations
import inspect
import types
import typing
import tango
from .. import type_hints
from ..ska_device import SKADevice
from ._attribute import attribute_from_signal
from ._bus import TimedOutError, _SignalBus
from ._observer import SharingObserver
# FIXME: Provide a better name for this
def _possible_signal_matches(
obj: type_hints.SharingObserverProtocol, signal: str
) -> typing.Iterable[str]:
"""
Yield the possible matches of a signal for an object.
That is, the signal name as is and the signal name relative to the object.
"""
yield signal
if signal.startswith(obj.observer_prefix):
yield signal.removeprefix(obj.observer_prefix)
[docs]
class SignalBusMixin(SharingObserver, SKADevice):
"""
A base class for mix-ins to use the shared bus.
This class is not intended to be inherited from directly by users. Instead, it
is intended to be used by mix-ins to utilities the bus.
"""
shared_bus: _SignalBus
__default_signal_attribute_map: typing.ClassVar[
dict[str, attribute_from_signal]
] = {}
__signal_attribute_map: dict[str, attribute_from_signal]
__attr_values: dict[str, typing.Any]
[docs]
def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
"""Initialise object."""
try:
super().__init__(*args, **kwargs)
except: # noqa: E722
try:
self.shared_bus.shutdown_thread()
except: # noqa: E722
pass
raise
# Workaround for pytango#688.
#
# Unfortunately, pytango <=10.0 does not release the GIL while
# exporting the device and iff we are connecting to a real database,
# DeviceClass::export_device will try to grab Tango monitor.
# This can lead to a deadlock with the SignalBus thread, as it grabs the
# Tango monitor and then the GIL when we call push_change_event.
#
# The workaround is to pause the background thread until we have
# been exported.
#
# We only want to do this if we are using a database and it is _not_
# a file database, as this is the only case where we get exported.
tg = tango.Util.instance()
if tg._UseDb and not tg._FileDb:
self.shared_bus._pause_thread_until(self.get_exported_flag)
@classmethod
def __init_subclass__(cls: type[SignalBusMixin], **kwargs: typing.Any) -> None:
"""Initialise the default signal map."""
super().__init_subclass__(**kwargs)
attr_sig_map: dict[str, attribute_from_signal] = {}
for key, obj in vars(cls).items():
if isinstance(obj, attribute_from_signal):
signal_name = obj.signal_name
if signal_name is None:
raise TypeError(
f"No signal name provided for attribute_from_signal {key}"
)
attr_sig_map[signal_name] = obj
cls.__default_signal_attribute_map = attr_sig_map
[docs]
def init_device(self: SignalBusMixin) -> None:
"""Initialise the shared bus."""
super().init_device()
self.__signal_attribute_map = dict(self.__default_signal_attribute_map)
self.__attr_values = {}
self.shared_bus = _SignalBus(logger=self.logger, name=self.get_name())
for attr in self.__signal_attribute_map.values():
attr.on_init_device(self)
self.shared_bus.start_thread()
[docs]
def add_attribute(
self: SignalBusMixin,
attr: tango.server.attribute,
) -> None:
"""Add an attribute dynamically."""
with tango.AutoTangoMonitor(self):
super().add_attribute(attr)
if isinstance(attr, attribute_from_signal):
signal_name = attr.signal_name
if signal_name is None:
raise RuntimeError(
"No signal name provided for attribute_from_signal"
)
attr.on_init_device(self)
self.__signal_attribute_map[signal_name] = attr
[docs]
def notify_emission(self: SignalBusMixin, signal: str, value: typing.Any) -> None:
"""Push change events for AttributeSignal objects."""
super().notify_emission(signal, value)
for s in _possible_signal_matches(self, signal):
attr = self.__signal_attribute_map.get(s, None)
if attr is not None:
attr.on_emission(self, value)
[docs]
def delete_device(self: SignalBusMixin) -> None:
"""Shutdown the bus background thread."""
with self.allow_internal_threads():
self.shared_bus.shutdown_thread()
super().delete_device()
[docs]
def always_executed_hook(self) -> None:
"""Raise an exception if we have released threads during an Init command."""
super().always_executed_hook()
with self.allow_internal_threads():
try:
self.shared_bus.wait_for_thread(self._CLIENT_NEW_REQUEST_TIMEOUT)
except TimedOutError:
info = inspect.getframeinfo(
typing.cast(types.FrameType, inspect.currentframe())
)
tango.Except.throw_exception(
"API_CommandTimedOut",
"Failed to wait for signal bus before servicing Tango request.",
f"{info.filename}:{info.lineno}",
)