Source code for ska_tango_base._better_device

"""Module providing a better tango.server.Device."""

from __future__ import annotations

import contextlib
import inspect
import threading
import time
import types
import typing

import tango.server

from . import utils


[docs] class BetterDevice(tango.server.Device): """ A base class to deal with the shortcomings of :class:`tango.server.Device`. The :meth:`allow_internal_threads` context manager releases the Tango device monitor lock without allowing external client threads to acquire the monitor. This can be used to allow internal threads that can potentially acquire the monitor to run e.g. during shutdown of the thread. This is required to clean up a thread that pushes events from the :meth:`!delete_device` method. Subclasses of :class:`BetterDevice` should always ensure that the :code:`super().always_executed_hook()` is called when overriding :meth:`always_executed_hook`. """
[docs] def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None: """Initialise object.""" # If True, we release the Tango monitor and wait for the condition variable # before trying to grab the monitor again. self.__stall_new_client_requests = False self.__new_client_request_wake_up = threading.Condition() self.__exported_thread_event: threading.Event | None = None self.__wait_for_exported_thread: threading.Thread | None = None try: super().__init__(*args, **kwargs) except: # noqa: E722 try: self.__shutdown_exported_thread() except: # noqa: E722 pass raise
[docs] def init_device(self) -> None: """Initialise the device.""" super().init_device() if type(self).device_exported_hook != BetterDevice.device_exported_hook: self.__exported_thread_event = threading.Event() self.__wait_for_exported_thread = threading.Thread( target=self.__invoke_device_exported_hook ) self.__wait_for_exported_thread.start()
[docs] def delete_device(self) -> None: """Delete the device.""" try: with self.allow_internal_threads(): self.__shutdown_exported_thread() except Exception: self.logger.exception("Failed to shutdown device_exported_hook thread") super().delete_device()
[docs] def device_exported_hook(self) -> None: """ Set up device once device and server have been exported. This hook can be used to subscribe to events, even if to devices in the same device server, such as a self subscription. """
def __invoke_device_exported_hook(self) -> None: def dserver_available() -> bool: try: tango.Util.instance().get_dserver_device() return True except tango.DevFailed: return False assert self.__exported_thread_event is not None while not self.__exported_thread_event.is_set(): if dserver_available() and self.get_exported_flag(): try: with utils._hold_tango_monitor(self): self.device_exported_hook() break except Exception as ex: print(f"device_export_hook() failed: {ex}") self.__exported_thread_event.wait(0.05) def __shutdown_exported_thread(self) -> None: if self.__wait_for_exported_thread is not None: assert self.__exported_thread_event is not None self.__exported_thread_event.set() self.__wait_for_exported_thread.join() self.__exported_thread_event = None self.__wait_for_exported_thread = None
[docs] @contextlib.contextmanager def allow_internal_threads(self: BetterDevice) -> typing.Iterator[None]: """ Allow internal threads to run without allowing new Tango requests. This is intended to be used from a Tango client request thread to allow internal threads to acquire the Tango monitor, without allowing another Tango request to start. This is used in the following two places in ska-tango-base: 1. During :py:meth:`!delete_device()` to allow background threads to cleanup gracefully and be joined. If we allow other threads to run during :py:meth:`!init_device()` or :py:meth:`!delete_device()`, a client could initiate a new request on our device as the Tango client thread is able to grab the device lock. This should be avoided because the device is only partially initialised. This method avoids this issue by arranging for Tango client threads to be disallowed from grabbing the Tango monitor, while still allowing other internal threads to acquire the monitor. 2. At the start of every request to allow the signal bus thread to catch up. This ensures that Tango clients get a sequentially consistent view of the Tango device. That is, if they call a command which modifies the value of an attribute then read that attribute, they will see the updated value, even if the attribute is using the signal bus (e.g. via attribute_from_signal). """ # Allowing internal threads to grab the Tango monitor is a little tricky. # We want to release the Tango monitor, but not allow the Tango # client threads from starting a new request. The way we do this is # with the always_executing_hook, which is always called by Tango from the # Tango client thread, before they call into us. # # We communicate that we don't want to allow new client requests # by setting __stall_new_client_requests. When the # always_executing_hook sees this is set, it releases the Tango # monitor again and waits until it is signaled to try again # via the __new_client_request_wake_up. # # We must be holding the Tango monitor to read/write the # __stall_new_client_requests variable. We also cannot # acquire the Tango monitor while holding the # __new_client_requests_wake_up lock. self.__stall_new_client_requests = True try: with tango.AutoTangoAllowThreads(self): yield finally: self.__stall_new_client_requests = False with self.__new_client_request_wake_up: self.__new_client_request_wake_up.notify_all()
# The default cppTango timeout for acquiring a monitor is 3.2 seconds, # and we would like to maintain that, however, we don't know how # long we waited for to initially acquire the lock, so we just # assume it was very quick and wait for the full timeout seconds # again to ensure that we wait for _at least_ that long. _CLIENT_NEW_REQUEST_TIMEOUT = 3.2
[docs] def always_executed_hook(self) -> None: """Stall incoming request if we are only allowing internal threads.""" super().always_executed_hook() # The order here is important, we have to first grab the # __new_client_request_wake_up CV lock, then release the Tango # monitor and then wait on the CV (and release the CV lock). # Otherwise, we might miss the notify_all() from the thread calling # allow_internal_threads(). # # We also have to make sure that we release the CV lock _before_ we try # to re-acquire the Tango monitor, otherwise we might deadlock with the # thread. if self.__stall_new_client_requests: end_timestamp = time.monotonic() + self._CLIENT_NEW_REQUEST_TIMEOUT while self.__stall_new_client_requests: if time.monotonic() > end_timestamp: info = inspect.getframeinfo( typing.cast(types.FrameType, inspect.currentframe()) ) tango.Except.throw_exception( "API_CommandTimedOut", "Unable to acquire the device monitor " "while device re-initialising", f"{info.filename}:{info.lineno}", ) self.__new_client_request_wake_up.acquire() with tango.AutoTangoAllowThreads(self): self.__new_client_request_wake_up.wait() self.__new_client_request_wake_up.release()