#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
#
from __future__ import annotations
import contextlib
import inspect
import threading
import time
import types
import typing
import tango
from .. import type_hints
from ..ska_device import SKADevice
from ._attribute import attribute_from_signal
from ._bus import TimedOutError, _SignalBus
from ._observer import SharingObserver
# FIXME: Provide a better name for this
def _possible_signal_matches(
obj: type_hints.SharingObserverProtocol, signal: str
) -> typing.Iterable[str]:
"""
Yield the possible matches of a signal for an object.
That is, the signal name as is and the signal name relative to the object.
"""
yield signal
if signal.startswith(obj.observer_prefix):
yield signal.removeprefix(obj.observer_prefix)
[docs]
class SignalBusMixin(SharingObserver, SKADevice):
"""
A base class for mix-ins to use the shared bus.
This class is not intended to be inherited from directly by users. Instead, it
is intended to be used by mix-ins to utilities the bus.
"""
shared_bus: _SignalBus
__default_signal_attribute_map: typing.ClassVar[
dict[str, attribute_from_signal]
] = {}
__signal_attribute_map: dict[str, attribute_from_signal]
__attr_values: dict[str, typing.Any]
[docs]
def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
"""Initialise object."""
# If True, we release the Tango monitor and wait for the condition variable
# before trying to grab the monitor again.
self.__stall_new_client_requests = False
self.__new_client_request_wake_up = threading.Condition()
try:
super().__init__(*args, **kwargs)
except: # noqa: E722
try:
self.shared_bus.shutdown_thread()
except: # noqa: E722
pass
raise
# Workaround for pytango#688.
#
# Unfortunately, pytango <=10.0 does not release the GIL while
# exporting the device and iff we are connecting to a real database,
# DeviceClass::export_device will try to grab Tango monitor.
# This can lead to a deadlock with the SignalBus thread, as it grabs the
# Tango monitor and then the GIL when we call push_change_event.
#
# The workaround is to pause the background thread until we have
# been exported.
#
# We only want to do this if we are using a database and it is _not_
# a file database, as this is the only case where we get exported.
tg = tango.Util.instance()
if tg._UseDb and not tg._FileDb:
self.shared_bus._pause_thread_until(self.get_exported_flag)
@classmethod
def __init_subclass__(cls: type[SignalBusMixin], **kwargs: typing.Any) -> None:
"""Initialise the default signal map."""
super().__init_subclass__(**kwargs)
attr_sig_map: dict[str, attribute_from_signal] = {}
for key, obj in vars(cls).items():
if isinstance(obj, attribute_from_signal):
signal_name = obj.signal_name
if signal_name is None:
raise TypeError(
f"No signal name provided for attribute_from_signal {key}"
)
attr_sig_map[signal_name] = obj
cls.__default_signal_attribute_map = attr_sig_map
[docs]
def init_device(self: SignalBusMixin) -> None:
"""Initialise the shared bus."""
super().init_device()
self.__signal_attribute_map = dict(self.__default_signal_attribute_map)
self.__attr_values = {}
self.shared_bus = _SignalBus(logger=self.logger, name=self.get_name())
for attr in self.__signal_attribute_map.values():
attr.on_init_device(self)
self.shared_bus.start_thread()
[docs]
def add_attribute(
self: SignalBusMixin,
attr: tango.server.attribute,
) -> None:
"""Add an attribute dynamically."""
with tango.AutoTangoMonitor(self):
super().add_attribute(attr)
if isinstance(attr, attribute_from_signal):
signal_name = attr.signal_name
if signal_name is None:
raise RuntimeError(
"No signal name provided for attribute_from_signal"
)
attr.on_init_device(self)
self.__signal_attribute_map[signal_name] = attr
[docs]
def notify_emission(self: SignalBusMixin, signal: str, value: typing.Any) -> None:
"""Push change events for AttributeSignal objects."""
super().notify_emission(signal, value)
for s in _possible_signal_matches(self, signal):
attr = self.__signal_attribute_map.get(s, None)
if attr is not None:
attr.on_emission(self, value)
[docs]
def delete_device(self: SignalBusMixin) -> None:
"""Shutdown the bus background thread."""
with self.allow_internal_threads():
self.shared_bus.shutdown_thread()
super().delete_device()
[docs]
@contextlib.contextmanager
def allow_internal_threads(self: SignalBusMixin) -> typing.Iterator[None]:
"""
Allow internal threads to run without allowing new Tango requests.
This is intended to be used from a Tango client request thread to
allow internal threads to acquire the Tango monitor, without allowing
another Tango request to start.
This is used in the following two places in ska-tango-base:
1. During :py:meth:`!init_device()` to allow background threads to
cleanup gracefully and be joined.
If we allow other threads to run during :py:meth:`!init_device()` or
:py:meth:`!delete_device()`, a client could initiate a new request on
our device as the Tango client thread is able to grab the device lock.
This should be avoided because the device is only partially initialised.
This method avoids this issue by arranging for Tango client threads
to be disallowed from grabbing the Tango monitor, while still allowing
other internal threads to acquire the monitor.
2. At the start of every request to allow the signal bus thread to
catch up.
This ensures that Tango clients get a sequentially consistent view of the
Tango device. That is, if they call a command which modifies the value of
an attribute then read that attribute, they will see the updated value, even
if the attribute is using the signal bus (e.g. via attribute_from_signal).
Subclasses of SignalBusMixin should
"""
# Allowing internal threads to grab the Tango monitor is a little tricky.
# We want to release the Tango monitor, but not allow the Tango
# client threads from starting a new request. The way we do this is
# with the always_executing_hook, which is always called by Tango from the
# Tango client thread, before they call into us.
#
# We communicate that we don't want to allow new client requests
# by setting __stall_new_client_requests. When the
# always_executing_hook sees this is set, it releases the Tango
# monitor again and waits until it is signaled to try again
# via the __new_client_request_wake_up.
#
# We must be holding the Tango monitor to read/write the
# __stall_new_client_requests variable. We also cannot
# acquire the Tango monitor while holding the
# __new_client_requests_wake_up lock.
self.__stall_new_client_requests = True
try:
with tango.AutoTangoAllowThreads(self):
yield
finally:
self.__stall_new_client_requests = False
with self.__new_client_request_wake_up:
self.__new_client_request_wake_up.notify_all()
# The default cppTango timeout for acquiring a monitor is 3.2 seconds,
# and we would like to maintain that, however, we don't know how
# long we waited for to initially acquire the lock, so we just
# assume it was very quick and wait for the full timeout seconds
# again to ensure that we wait for _at least_ that long.
_CLIENT_NEW_REQUEST_TIMEOUT = 3.2
[docs]
def always_executed_hook(self) -> None:
"""Raise an exception if we have released threads during an Init command."""
super().always_executed_hook()
# The order here is important, we have to first grab the
# __new_client_request_wake_up CV lock, then release the Tango
# monitor and then wait on the CV (and release the CV lock).
# Otherwise, we might miss the notify_all() from the thread calling
# allow_internal_threads().
#
# We also have to make sure that we release the CV lock _before_ we try
# to re-acquire the Tango monitor, otherwise we might deadlock with the
# thread.
if self.__stall_new_client_requests:
end_timestamp = time.monotonic() + self._CLIENT_NEW_REQUEST_TIMEOUT
while self.__stall_new_client_requests:
if time.monotonic() > end_timestamp:
info = inspect.getframeinfo(
typing.cast(types.FrameType, inspect.currentframe())
)
tango.Except.throw_exception(
"API_CommandTimedOut",
"Unable to acquire the device monitor "
"while device re-initialising",
f"{info.filename}:{info.lineno}",
)
self.__new_client_request_wake_up.acquire()
with tango.AutoTangoAllowThreads(self):
self.__new_client_request_wake_up.wait()
self.__new_client_request_wake_up.release()
with self.allow_internal_threads():
try:
self.shared_bus.wait_for_thread(self._CLIENT_NEW_REQUEST_TIMEOUT)
except TimedOutError:
info = inspect.getframeinfo(
typing.cast(types.FrameType, inspect.currentframe())
)
tango.Except.throw_exception(
"API_CommandTimedOut",
"Failed to wait for signal bus before servicing Tango request.",
f"{info.filename}:{info.lineno}",
)