Source code for ska_pst.lmc.device_proxy

# -*- coding: utf-8 -*-
#
# This file is part of the SKA PST project.
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE for more info.
"""This module provides a factory for :py:class:`tango.DeviceProxy` instances.

This code is based off the SKA TANGO Examples class.
"""

from __future__ import annotations

import dataclasses
import logging
import threading
from typing import Any, Callable, Dict, List, Optional, Type, TypeAlias

import backoff
import tango
from readerwriterlock import rwlock
from tango import DevFailed, DeviceProxy, DevState, GreenMode
from typing_extensions import TypedDict

BackoffDetailsType = TypedDict("BackoffDetailsType", {"args": list, "elapsed": float})


[docs]@dataclasses.dataclass(kw_only=True, frozen=True) class ChangeEvent: """Data class representing a change event for an attribute.""" value: Any """The current value of the attribute.""" quality: tango.AttrQuality = tango.AttrQuality.ATTR_VALID """ The TANGO attribute quality. This allows for trying to check if the attribute is in a WARNING or ALARM state. """
ChangeEventCallback: TypeAlias = Callable[[ChangeEvent], None]
[docs]class ChangeEventSubscription: """ Class to act as a handle for a change event subscription. Instances of this class can be used to programmatically unsubscribe from a change event, without having to have access to the device or the subscription id. """ def __init__( self: ChangeEventSubscription, subscription_id: int, device: PstDeviceProxy, callbacks: List[ChangeEventCallback], attribute_name: str, logger: logging.Logger, ) -> None: """ Initialise object. :param subscription_id: the id of the subscription. :param device: the `PstDeviceProxy` for which the subscription belongs to. """ self._subscription_id = subscription_id self._device = device self._subscribed = True self._callbacks = callbacks self._attribute_name = attribute_name self._logger = logger @property def callbacks(self: ChangeEventSubscription) -> List[Callable]: """Get callbacks for current subscription.""" return self._callbacks def __del__(self: ChangeEventSubscription) -> None: """Cleanup the subscription when object is getting deleted.""" self.unsubscribe()
[docs] def unsubscribe(self: ChangeEventSubscription) -> None: """ Unsubscribe to the change event. Use this to method to unsubscribe to listening to a change event of as device. As this is potentially called from a Python thread this will try to run this within a TANGO OmniThread using a background thread. """ if self._subscribed: self._logger.debug( ( f"Unsubscribing {self._device.fqdn}.{self._attribute_name} " f"with subscription_id = {self._subscription_id}" ) ) self._device.unsubscribe_change_event(subscription=self) self._subscribed = False self.callbacks.clear()
@property def subscribed(self: ChangeEventSubscription) -> bool: """Check if subscription is still subscribed.""" return self._subscribed
[docs]class PstDeviceProxy: """A :py:class:`DeviceProxy` wrapper class. This class is used to wrap device proxies and abstract away from the TANGO API. This class is designed to provide passthrough methods/attributes that would already be available. At the moment this is a very simple API wrapper but could be built up, like what is done in MCCS that allows the device's to try to connect and wait for being initialised. """ _device: DeviceProxy _fqdn: str _logger: logging.Logger _subscriptions: Dict[str, ChangeEventSubscription] _lock: rwlock.RWLockWrite def __init__( self: PstDeviceProxy, fqdn: str, logger: logging.Logger, device: DeviceProxy, ) -> None: """ Initialise device proxy. :param fqdn: the fully qualified device-name of the TANGO device that the proxy is for. :param logger: the logger to use for logging within this proxy. :param device: the TANGO device proxy instance. """ assert DeviceProxyFactory._raw_proxies.get(fqdn, None) == device, "Use DeviceProxyFactory.get_device" self.__dict__["_fqdn"] = fqdn self.__dict__["_logger"] = logger self.__dict__["_device"] = device self.__dict__["_subscriptions"] = {} self.__dict__["_lock"] = rwlock.RWLockWrite() def _event_callback(self: PstDeviceProxy, attribute_name: str, event: tango.EventData) -> None: try: self._logger.debug(f"Received event for {attribute_name}, event = {event}") if event.err: self._logger.warning(f"Received failed change event: error stack is {event.errors}.") return elif event.attr_value is None: warning_message = ( "Received change event with empty value. Falling back to manual " f"attribute read. Event.err is {event.err}. Event.errors is\n" f"{event.errors}." ) self._logger.warning(warning_message) attr_value = self._read(attribute_name) else: attr_value = event.attr_value quality: tango.AttrQuality = tango.AttrQuality.ATTR_VALID if isinstance(attr_value, tango.DeviceAttribute): value = attr_value.value quality = attr_value.quality else: value = attr_value change_evt = ChangeEvent( value=value, quality=quality, ) self._logger.debug(f"Received event callback for {self.fqdn}.{attribute_name} - {change_evt=}") # read lock with self._lock.gen_rlock(): if attribute_name in self._subscriptions: for callback in self._subscriptions[attribute_name].callbacks: try: callback(change_evt) except Exception: self._logger.warning( f"Error in calling callback for {attribute_name}", exc_info=True ) else: self._logger.warning( f"No subscription for {attribute_name}, potential race condition.", exc_info=False ) except Exception: self._logger.warning( f"Error occurred in _event_callback for event {attribute_name=}, {event=}", exc_info=True ) def _read(self: PstDeviceProxy, attribute_name: str) -> Any: """ Read an attribute manually. Used when we receive an event with empty attribute data. :param attribute_name: the name of the attribute to be read :return: the attribute value """ return self._device.read_attribute(attribute_name)
[docs] def subscribe_change_event( self: PstDeviceProxy, attribute_name: str, callback: ChangeEventCallback, stateless: bool = False ) -> ChangeEventSubscription: """ Subscribe to change events. This method is used to subscribe to an attribute changed event on the given proxy object. This is similar to: .. code-block:: python device.subscribe_event( attribute_name, tango.EventType.CHANGE_EVENT, callback, stateless=stateless, ) This method also returns a `ChangeEventSubscription` which can be used to later unsubscribe from change events on the device proxy. :param attribute_name: the name of the attribute on the device proxy to subscribe to. :type attribute_name: str :param callback: the callback for TANGO to call when an event has happened. :type callback: ChangeEventCallback :param stateless: whether to use the TANGO stateless event model or not, default is False. :type stateless: bool, optional :returns: a ChangeEventSubscription that can be used to later to unsubscribe from. :rtype: ChangeEventSubscription """ self._logger.debug(f"DeviceProxy subscribing to events on {self.fqdn}.{attribute_name}") def _handle_event(event: tango.EventData) -> None: # need to do this on a different thread t = threading.Thread( target=self._event_callback, kwargs={"attribute_name": attribute_name, "event": event}, daemon=True, ) t.start() if attribute_name in self._subscriptions: self._logger.info(f"{attribute_name} already in subscriptions. Adding to callings") self._subscriptions[attribute_name].callbacks.append(callback) value = self._read(attribute_name) callback(value.value) else: # write lock with self._lock.gen_wlock(): self._logger.info(f"Subscribing to events on {self.fqdn}.{attribute_name} on TANGO Device") subscription_id = self._device.subscribe_event( attribute_name, tango.EventType.CHANGE_EVENT, _handle_event, stateless=stateless, ) self._logger.debug(f"Subscription ID is {subscription_id}") subscription = ChangeEventSubscription( subscription_id=subscription_id, device=self, callbacks=[callback], attribute_name=attribute_name, logger=self._logger, ) self._subscriptions[attribute_name] = subscription return self._subscriptions[attribute_name]
[docs] def unsubscribe_change_event(self: PstDeviceProxy, subscription: ChangeEventSubscription) -> None: """ Unsubscribe to change events for a given subscription. This method is used to unsubscribe to an attribute changed event on the given proxy object. This is similar to: .. code-block:: python device.unsubscribe_event(subscription_id) :param subscription: the subscription to unsubscribe to. """ attribute_name = subscription._attribute_name subscription_id = subscription._subscription_id self._logger.debug( f"{self} handling unsubscribe for attribute '{attribute_name}', with subid = {subscription_id}" ) def _task() -> None: try: with tango.EnsureOmniThread(): self._device.unsubscribe_event(subscription_id) except Exception: self._logger.warning( ( f"Error in unsubscribing from {self._device.fqdn}.{attribute_name}" f"with subscription id = {subscription_id}" ), exc_info=True, ) with self._lock.gen_wlock(): thread = threading.Thread(target=_task) thread.start() thread.join() if attribute_name in self._subscriptions: del self._subscriptions[attribute_name]
def __setattr__(self: PstDeviceProxy, name: str, value: Any) -> None: """ Set attribute. :param name: name of attribute to set. :param value: the value of the attribute. """ if name in ["fqdn", "logger"]: self.__dict__[f"_{name}"] = value else: setattr(self._device, name, value) def __getattr__(self: PstDeviceProxy, name: str) -> Any: """ Get attribute value. :param name: the name of attribute to get. :returns: the value of the attribute. :raises: AttributeError if the attribute does not exist. """ if name in ["fqdn", "logger"]: return self.__dict__[f"_{name}"] else: return getattr(self._device, name) @property def device(self: PstDeviceProxy) -> DeviceProxy: """Get TANGO Device Proxy object.""" return self._device def __repr__(self: PstDeviceProxy) -> str: """ Create a string representation of PstDeviceProxy. :return: a string representation of a PstDeviceProxy :rtype: str """ return f"PstDeviceProxy(fqdn='{self._fqdn}')"
[docs] def is_subscribed_to_events(self: PstDeviceProxy, attribute_name: str) -> bool: """ Check if there is an active event subscription for attribute. Checks if there is a `ChangeEventSubscription` for the attribute and if it is actively subscribed. :param attribute_name: the name of the attribute to check if there is an active event subscription. """ return attribute_name in self._subscriptions and self._subscriptions[attribute_name].subscribed
[docs] def wait_for_initialised(self: PstDeviceProxy) -> None: """Wait for the device proxy is up and initialised. Only after the device is initialised, `device.state() != DevState.INIT` is the device ready to be used. """ def _on_giveup_check_initialised(details: BackoffDetailsType) -> None: """ Give up waiting for the device to complete initialisation. :param details: a dictionary providing call context, such as the call args and the elapsed time """ elapsed = details["elapsed"] self._logger.warning( f"Gave up waiting for the device ({self}) to complete " f"initialisation after {elapsed} seconds." ) # ensure that the device is no longer in the DevState.INIT @backoff.on_predicate( backoff.expo, on_giveup=_on_giveup_check_initialised, # type: ignore factor=1, max_time=120.0, ) def _check_initialised() -> bool: """ Check that the device has completed initialisation. That is, check that the device is no longer in :py:const:`tango.DevState.INIT`. Checking that a device has initialised means calling its `state()` method, and even after the device returns a response from a ping, it might still raise an exception in response to reading device state (``"BAD_INV_ORDER_ORBHasShutdown``). So here we catch that exception. This method only performs a single check, and returns immediately. To check for initialisation in an exponential backoff-retry loop, use :py:meth:`._backoff_check_initialised`. :param device: the device to be checked :return: whether the device has completed initialisation """ try: return self.state() != DevState.INIT except DevFailed: self._logger.debug( "Caught a DevFailed exception while checking that the device has " "initialised. This is most likely a 'BAD_INV_ORDER_ORBHasShutdown " "exception triggered by the call to state()." ) return False _check_initialised()
[docs]class DeviceProxyFactory: """ Simple factory to create :py:class:`tango.DeviceProxy` instances. This class is an easy attempt to develop the concept developed by MCCS team in the following confluence page: https://confluence.skatelescope.org/display/SE/Running+BDD+tests+in+multiple+harnesses It is a factory class which provide the ability to create an object of type DeviceProxy. If a proxy had already been created it will reuse that instance. When testing the static variable _test_context is an instance of the TANGO class MultiDeviceTestContext. More information on tango testing can be found at the following link: https://pytango.readthedocs.io/en/stable/testing.html """ _proxy_supplier: Callable[..., DeviceProxy] = tango.DeviceProxy _raw_proxies: Dict[str, DeviceProxy] = {} __proxies: Dict[str, PstDeviceProxy] = {}
[docs] @classmethod def get_device( cls: Type[DeviceProxyFactory], fqdn: str, green_mode: GreenMode = GreenMode.Synchronous, logger: Optional[logging.Logger] = None, ) -> PstDeviceProxy: """Return a :py:class::`PstDeviceProxy`. This will return an existing proxy if already created, else it will create a `tango.DeviceProxy` and then wrap it as a :py:class::`PstDeviceProxy`. :param fqdn: the FQDN of the TANGO device that the proxy is for. :param green_mode: the TANGO green mode, the default is GreenMode.Synchronous. :param logger: the Python logger to use for the proxy. """ if logger is None: logger = logging.getLogger(__name__) # type: ignore def _on_giveup_connect(details: BackoffDetailsType) -> None: fqdn = details["args"][1] elapsed = details["elapsed"] logger.warning( # type: ignore f"Gave up trying to connect to device {fqdn} after " f"{elapsed} seconds." ) @backoff.on_exception( backoff.expo, tango.DevFailed, on_giveup=_on_giveup_connect, # type: ignore factor=0.1, max_time=120.0, ) def _get_proxy() -> tango.DeviceProxy: return cls._proxy_supplier(fqdn, green_mode=green_mode) if fqdn not in cls._raw_proxies: logger.debug(f"Creating new PstDeviceProxy for {fqdn}") try: cls._raw_proxies[fqdn] = _get_proxy() except Exception: cls._raw_proxies[fqdn] = cls._proxy_supplier(fqdn) cls.__proxies[fqdn] = PstDeviceProxy(fqdn=fqdn, logger=logger, device=cls._raw_proxies[fqdn]) proxy = cls.__proxies[fqdn] return proxy