Source code for ska_tango_base.software_bus._mixin

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
#
from __future__ import annotations

import contextlib
import inspect
import threading
import time
import types
import typing

import tango

from .. import type_hints
from ..ska_device import SKADevice
from ._attribute import attribute_from_signal
from ._bus import TimedOutError, _SignalBus
from ._observer import SharingObserver


# FIXME: Provide a better name for this
def _possible_signal_matches(
    obj: type_hints.SharingObserverProtocol, signal: str
) -> typing.Iterable[str]:
    """
    Yield the possible matches of a signal for an object.

    That is, the signal name as is and the signal name relative to the object.
    """
    yield signal

    if signal.startswith(obj.observer_prefix):
        yield signal.removeprefix(obj.observer_prefix)


[docs] class SignalBusMixin(SharingObserver, SKADevice): """ A base class for mix-ins to use the shared bus. This class is not intended to be inherited from directly by users. Instead, it is intended to be used by mix-ins to utilities the bus. """ shared_bus: _SignalBus __default_signal_attribute_map: typing.ClassVar[ dict[str, attribute_from_signal] ] = {} __signal_attribute_map: dict[str, attribute_from_signal] __attr_values: dict[str, typing.Any]
[docs] def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None: """Initialise object.""" # If True, we release the Tango monitor and wait for the condition variable # before trying to grab the monitor again. self.__stall_new_client_requests = False self.__new_client_request_wake_up = threading.Condition() try: super().__init__(*args, **kwargs) except: # noqa: E722 try: self.shared_bus.shutdown_thread() except: # noqa: E722 pass raise # Workaround for pytango#688. # # Unfortunately, pytango <=10.0 does not release the GIL while # exporting the device and iff we are connecting to a real database, # DeviceClass::export_device will try to grab Tango monitor. # This can lead to a deadlock with the SignalBus thread, as it grabs the # Tango monitor and then the GIL when we call push_change_event. # # The workaround is to pause the background thread until we have # been exported. # # We only want to do this if we are using a database and it is _not_ # a file database, as this is the only case where we get exported. tg = tango.Util.instance() if tg._UseDb and not tg._FileDb: self.shared_bus._pause_thread_until(self.get_exported_flag)
@classmethod def __init_subclass__(cls: type[SignalBusMixin], **kwargs: typing.Any) -> None: """Initialise the default signal map.""" super().__init_subclass__(**kwargs) attr_sig_map: dict[str, attribute_from_signal] = {} for key, obj in vars(cls).items(): if isinstance(obj, attribute_from_signal): signal_name = obj.signal_name if signal_name is None: raise TypeError( f"No signal name provided for attribute_from_signal {key}" ) attr_sig_map[signal_name] = obj cls.__default_signal_attribute_map = attr_sig_map
[docs] def init_device(self: SignalBusMixin) -> None: """Initialise the shared bus.""" super().init_device() self.__signal_attribute_map = dict(self.__default_signal_attribute_map) self.__attr_values = {} self.shared_bus = _SignalBus(logger=self.logger, name=self.get_name()) for attr in self.__signal_attribute_map.values(): attr.on_init_device(self) self.shared_bus.start_thread()
[docs] def add_attribute( self: SignalBusMixin, attr: tango.server.attribute, ) -> None: """Add an attribute dynamically.""" with tango.AutoTangoMonitor(self): super().add_attribute(attr) if isinstance(attr, attribute_from_signal): signal_name = attr.signal_name if signal_name is None: raise RuntimeError( "No signal name provided for attribute_from_signal" ) attr.on_init_device(self) self.__signal_attribute_map[signal_name] = attr
[docs] def notify_emission(self: SignalBusMixin, signal: str, value: typing.Any) -> None: """Push change events for AttributeSignal objects.""" super().notify_emission(signal, value) for s in _possible_signal_matches(self, signal): attr = self.__signal_attribute_map.get(s, None) if attr is not None: attr.on_emission(self, value)
[docs] def delete_device(self: SignalBusMixin) -> None: """Shutdown the bus background thread.""" with self.allow_internal_threads(): self.shared_bus.shutdown_thread() super().delete_device()
[docs] @contextlib.contextmanager def allow_internal_threads(self: SignalBusMixin) -> typing.Iterator[None]: """ Allow internal threads to run without allowing new Tango requests. This is intended to be used from a Tango client request thread to allow internal threads to acquire the Tango monitor, without allowing another Tango request to start. This is used in the following two places in ska-tango-base: 1. During :py:meth:`!init_device()` to allow background threads to cleanup gracefully and be joined. If we allow other threads to run during :py:meth:`!init_device()` or :py:meth:`!delete_device()`, a client could initiate a new request on our device as the Tango client thread is able to grab the device lock. This should be avoided because the device is only partially initialised. This method avoids this issue by arranging for Tango client threads to be disallowed from grabbing the Tango monitor, while still allowing other internal threads to acquire the monitor. 2. At the start of every request to allow the signal bus thread to catch up. This ensures that Tango clients get a sequentially consistent view of the Tango device. That is, if they call a command which modifies the value of an attribute then read that attribute, they will see the updated value, even if the attribute is using the signal bus (e.g. via attribute_from_signal). Subclasses of SignalBusMixin should """ # Allowing internal threads to grab the Tango monitor is a little tricky. # We want to release the Tango monitor, but not allow the Tango # client threads from starting a new request. The way we do this is # with the always_executing_hook, which is always called by Tango from the # Tango client thread, before they call into us. # # We communicate that we don't want to allow new client requests # by setting __stall_new_client_requests. When the # always_executing_hook sees this is set, it releases the Tango # monitor again and waits until it is signaled to try again # via the __new_client_request_wake_up. # # We must be holding the Tango monitor to read/write the # __stall_new_client_requests variable. We also cannot # acquire the Tango monitor while holding the # __new_client_requests_wake_up lock. self.__stall_new_client_requests = True try: with tango.AutoTangoAllowThreads(self): yield finally: self.__stall_new_client_requests = False with self.__new_client_request_wake_up: self.__new_client_request_wake_up.notify_all()
# The default cppTango timeout for acquiring a monitor is 3.2 seconds, # and we would like to maintain that, however, we don't know how # long we waited for to initially acquire the lock, so we just # assume it was very quick and wait for the full timeout seconds # again to ensure that we wait for _at least_ that long. _CLIENT_NEW_REQUEST_TIMEOUT = 3.2
[docs] def always_executed_hook(self) -> None: """Raise an exception if we have released threads during an Init command.""" super().always_executed_hook() # The order here is important, we have to first grab the # __new_client_request_wake_up CV lock, then release the Tango # monitor and then wait on the CV (and release the CV lock). # Otherwise, we might miss the notify_all() from the thread calling # allow_internal_threads(). # # We also have to make sure that we release the CV lock _before_ we try # to re-acquire the Tango monitor, otherwise we might deadlock with the # thread. if self.__stall_new_client_requests: end_timestamp = time.monotonic() + self._CLIENT_NEW_REQUEST_TIMEOUT while self.__stall_new_client_requests: if time.monotonic() > end_timestamp: info = inspect.getframeinfo( typing.cast(types.FrameType, inspect.currentframe()) ) tango.Except.throw_exception( "API_CommandTimedOut", "Unable to acquire the device monitor " "while device re-initialising", f"{info.filename}:{info.lineno}", ) self.__new_client_request_wake_up.acquire() with tango.AutoTangoAllowThreads(self): self.__new_client_request_wake_up.wait() self.__new_client_request_wake_up.release() with self.allow_internal_threads(): try: self.shared_bus.wait_for_thread(self._CLIENT_NEW_REQUEST_TIMEOUT) except TimedOutError: info = inspect.getframeinfo( typing.cast(types.FrameType, inspect.currentframe()) ) tango.Except.throw_exception( "API_CommandTimedOut", "Failed to wait for signal bus before servicing Tango request.", f"{info.filename}:{info.lineno}", )