Source code for ska_tango_base.software_bus._mixin

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
#
from __future__ import annotations

import inspect
import types
import typing

import tango

from .. import type_hints
from ..ska_device import SKADevice
from ._attribute import attribute_from_signal
from ._bus import TimedOutError, _SignalBus
from ._observer import SharingObserver


# FIXME: Provide a better name for this
def _possible_signal_matches(
    obj: type_hints.SharingObserverProtocol, signal: str
) -> typing.Iterable[str]:
    """
    Yield the possible matches of a signal for an object.

    That is, the signal name as is and the signal name relative to the object.
    """
    yield signal

    if signal.startswith(obj.observer_prefix):
        yield signal.removeprefix(obj.observer_prefix)


[docs] class SignalBusMixin(SharingObserver, SKADevice): """ A base class for mix-ins to use the shared bus. This class is not intended to be inherited from directly by users. Instead, it is intended to be used by mix-ins to utilities the bus. """ shared_bus: _SignalBus __default_signal_attribute_map: typing.ClassVar[ dict[str, attribute_from_signal] ] = {} __signal_attribute_map: dict[str, attribute_from_signal] __attr_values: dict[str, typing.Any]
[docs] def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None: """Initialise object.""" try: super().__init__(*args, **kwargs) except: # noqa: E722 try: self.shared_bus.shutdown_thread() except: # noqa: E722 pass raise # Workaround for pytango#688. # # Unfortunately, pytango <=10.0 does not release the GIL while # exporting the device and iff we are connecting to a real database, # DeviceClass::export_device will try to grab Tango monitor. # This can lead to a deadlock with the SignalBus thread, as it grabs the # Tango monitor and then the GIL when we call push_change_event. # # The workaround is to pause the background thread until we have # been exported. # # We only want to do this if we are using a database and it is _not_ # a file database, as this is the only case where we get exported. tg = tango.Util.instance() if tg._UseDb and not tg._FileDb: self.shared_bus._pause_thread_until(self.get_exported_flag)
@classmethod def __init_subclass__(cls: type[SignalBusMixin], **kwargs: typing.Any) -> None: """Initialise the default signal map.""" super().__init_subclass__(**kwargs) attr_sig_map: dict[str, attribute_from_signal] = {} for key, obj in vars(cls).items(): if isinstance(obj, attribute_from_signal): signal_name = obj.signal_name if signal_name is None: raise TypeError( f"No signal name provided for attribute_from_signal {key}" ) attr_sig_map[signal_name] = obj cls.__default_signal_attribute_map = attr_sig_map
[docs] def init_device(self: SignalBusMixin) -> None: """Initialise the shared bus.""" super().init_device() self.__signal_attribute_map = dict(self.__default_signal_attribute_map) self.__attr_values = {} self.shared_bus = _SignalBus(logger=self.logger, name=self.get_name()) for attr in self.__signal_attribute_map.values(): attr.on_init_device(self) self.shared_bus.start_thread()
[docs] def add_attribute( self: SignalBusMixin, attr: tango.server.attribute, ) -> None: """Add an attribute dynamically.""" with tango.AutoTangoMonitor(self): super().add_attribute(attr) if isinstance(attr, attribute_from_signal): signal_name = attr.signal_name if signal_name is None: raise RuntimeError( "No signal name provided for attribute_from_signal" ) attr.on_init_device(self) self.__signal_attribute_map[signal_name] = attr
[docs] def notify_emission(self: SignalBusMixin, signal: str, value: typing.Any) -> None: """Push change events for AttributeSignal objects.""" super().notify_emission(signal, value) for s in _possible_signal_matches(self, signal): attr = self.__signal_attribute_map.get(s, None) if attr is not None: attr.on_emission(self, value)
[docs] def delete_device(self: SignalBusMixin) -> None: """Shutdown the bus background thread.""" with self.allow_internal_threads(): self.shared_bus.shutdown_thread() super().delete_device()
[docs] def always_executed_hook(self) -> None: """Raise an exception if we have released threads during an Init command.""" super().always_executed_hook() with self.allow_internal_threads(): try: self.shared_bus.wait_for_thread(self._CLIENT_NEW_REQUEST_TIMEOUT) except TimedOutError: info = inspect.getframeinfo( typing.cast(types.FrameType, inspect.currentframe()) ) tango.Except.throw_exception( "API_CommandTimedOut", "Failed to wait for signal bus before servicing Tango request.", f"{info.filename}:{info.lineno}", )