Source code for ska_oso_scripting.functions.devicecontrol.tango_executor

"""
The command module contains code that encapsulates Tango device interactions
(commands, attribute read/writes, etc.) and provides the means to execute
them.
The OET decouples functions from Tango devices so that the commands can be
managed and executed by a proxy. This allows the proxy to execute commands
asynchronously while listening for interrupt signals, while to the caller
the execution appears synchronous.
"""

from __future__ import annotations

import atexit
import logging
import queue
import threading
import weakref
from typing import Dict, Tuple

import tango

from .exception import AlreadySubscribedError, EventTimeoutError

LOGGER = logging.getLogger(__name__)


class Attribute:
    """
    An abstraction of a Tango attribute.
    """

    def __init__(self, device: str, name: str):
        """
        Create an Attribute instance.

        :param device: the FQDN of the target Tango device
        :param name: the name of the attribute to read
        """
        self.device = device
        self.name = name

    def __repr__(self):
        return f'<Attribute("{self.device}", "{self.name}")>'

    def __eq__(self, other):
        if not isinstance(other, Attribute):
            return False
        return self.device == other.device and self.name == other.name


class Command:
    """
    An abstraction of a Tango command.
    """

    def __init__(self, device: str, command_name: str, *args):
        """
        Create a Tango command.
        :param device: the FQDN of the target Tango device
        :param command_name: the name of the command to execute
        :param args: unnamed arguments to be passed to the command
        """
        self.device = device
        self.command_name = command_name
        self.args = args

    def __repr__(self):
        arg_str = ", ".join([repr(o) for o in self.args])
        return f"<Command({repr(self.device)}, {repr(self.command_name)}, {arg_str})>"

    def __eq__(self, other):
        if not isinstance(other, Command):
            return False
        return (
            self.device == other.device
            and self.command_name == other.command_name
            and self.args == other.args
        )


[docs] class TangoDeviceProxyFactory: # pylint: disable=too-few-public-methods """ A call to create Tango DeviceProxy clients. This class exists to allow unit tests to override the factory with an implementation that returns mock DeviceProxy instances. """ def __call__(self, device_name: str) -> tango.DeviceProxy: proxy = tango.DeviceProxy(device_name) proxy.set_timeout_millis(10000) return proxy
class TangoExecutor: # pylint: disable=too-few-public-methods """ TangoExecutor is the proxy between calling code and Tango devices. It accepts encapsulated Tango interactions and performs them on behalf of the calling code. """ class SingleQueueEventStrategy: """ SingleQueueEventStrategy encapsulates the event handling behaviour of the TangoExecutor from ~October 2021, when all events were added to a single queue and subscriptions were created and released after each attribute read operation. We hope to replace this with a more advanced implementation that allows subscriptions to multiple events. :param mgr: SubscriptionManager instance used to observe events """ def __init__(self, mgr: SubscriptionManager): self._subscription_manager = mgr self._subscribed = False self._queue = queue.Queue() def subscribe_event(self, attr: Attribute) -> int: """ Subscribe to change events published by a Tango attribute. This strategy only supports one active subscription at any time. An exception will be raised if a second subscription is attempted. This method returns a subscription identifier which should be supplied to a subsequent unsubscribe_event method. :param attr: attribute to subscribe to :return: subscription identifier """ if self._subscribed: raise AlreadySubscribedError( f"Multiple subscriptions not allowed: {attr}" ) LOGGER.debug("Observing %s/%s", attr.device, attr.name) self._subscription_manager.register_observer(attr, self) return -1 def unsubscribe_event( self, attr: Attribute, subscription_id: int, # pylint: disable=unused-argument ) -> None: """ Unsubscribe to change events published by a Tango attribute. This strategy only supports one active subscription at any time. An exception will be raised if a second subscription is attempted. :param attr: attribute to unsubscribe from :param subscription_id: subscription identifier """ LOGGER.debug("Unobserving %s/%s", attr.device, attr.name) self._subscription_manager.unregister_observer(attr, self) self._drain() self._subscribed = False def _drain(self): """ Drains all events from the queue, blocking until the queue is empty. """ drained = False while not drained: try: self._queue.get(block=False) except queue.Empty: drained = True def read_event( self, attr: Attribute, # pylint: disable=unused-argument timeout: float, ) -> tango.EventData: """ Read an event from the queue. This function blocks until an event is received. With a single subscription active at any one time, the attribute is ignored by this implementation but is expected to be required by strategy that support multiple attribute subscriptions. :param timeout: timeout value to interrupt execution """ LOGGER.info("Reading event from the queue with timeout %s", timeout) try: evt = self._queue.get(timeout=timeout) return evt except queue.Empty: raise EventTimeoutError( "Timeout after waiting read an event from the queue" ) def notify(self, evt: tango.EventData): """ This implements the SubscriptionManager EventObserver interface. Tango ChangeEvents republished by the SubscriptionManager are received via this method. Queue is thread-safe so we do not need to synchronise this method with read_event. """ LOGGER.debug("Putting received event in the queue: %s", evt) self._queue.put(evt) def __init__(self, proxy_factory=TangoDeviceProxyFactory()): """ Create a new TangoExecutor. :param proxy_factory: a function or object which, when called, returns an object that conforms to the PyTango DeviceProxy interface. """ self._proxy_factory = proxy_factory # maps subscription_manager = SubscriptionManager(proxy_factory) self._evt_strategy = TangoExecutor.SingleQueueEventStrategy( subscription_manager ) # maps device names to device proxies. These proxies are used for # command execution and polling reads. There is scope for these to be # unified with the proxies used for event management. # TODO modify TangoDeviceProxyFactory to cache proxies? self._device_proxies: Dict[str, tango.DeviceProxy] = {} def execute(self, command: Command, **kwargs): """ Execute a Command on a Tango device. Additional kwargs to the DeviceProxy can be specified if required. :param command: the command to execute :return: the response, if any, returned by the Tango device """ proxy = self._get_proxy(command.device) param = None if len(command.args) == 1: param = command.args[0] if len(command.args) > 1: param = command.args LOGGER.info("Executing command: %r", command) return proxy.command_inout(command.command_name, cmd_param=param, **kwargs) def read(self, attribute: Attribute): """ Read an attribute on a Tango device. :param attribute: the attribute to read :return: the attribute value """ proxy = self._get_proxy(attribute.device) LOGGER.debug("Reading attribute: %s/%s", attribute.device, attribute.name) response = getattr(proxy, attribute.name) return response def subscribe_event(self, attribute: Attribute): """ Subscribe event on a Tango device. :param attribute: the attribute to subscribe to :return: subscription ID """ return self._evt_strategy.subscribe_event(attribute) def read_event(self, attr: Attribute, timeout: float = None) -> tango.EventData: """ Get an event for the specified attribute. :param timeout: custom timeout provided while execution of command's if systems do not respond within reasonable timescales then method raised EventTimeoutError. """ return self._evt_strategy.read_event(attr, timeout=timeout) def unsubscribe_event(self, attribute: Attribute, event_id: int): """ unsubscribe event on a Tango device. :param attribute: the attribute to unsubscribe :param event_id: event subscribe id :return: """ self._evt_strategy.unsubscribe_event(attribute, event_id) def _get_proxy(self, device_name: str) -> tango.DeviceProxy: # It takes time to construct and connect a device proxy to the remote # device, so instances are cached if device_name not in self._device_proxies: proxy = self._proxy_factory(device_name) self._device_proxies[device_name] = proxy return self._device_proxies[device_name] # class EventObserver(Protocol): # def notify(self, evt: tango.EventData) -> None: # ...
[docs] class Callback: """ Callback is an observable that distributes Tango events received by the callback instance to all observers registered at the moment of event reception. """ def __init__(self): # observers should not be kept alive due to registration # self.observers: weakref.WeakSet[EventObserver] = weakref.WeakSet() self._observers = weakref.WeakSet() # Observer notification is likely to run on a different thread from # observer registration, hence the observers set is locked before any # operation. self._observers_lock = threading.Lock() # Tango (or SKA implementation?) emits an event containing the current # device value when subscribing to attribute change events. This is # confusing as it's not a change in a value, just a statement of the # initial value. This flag is set if the 'discard first event' feature # flag is set and the first event is discarded. self._first_event_discarded = False
[docs] def register_observer(self, observer): """ Register an EventObserver. Once registered, the observer will be notified of all Tango events received by this instance. :param observer: observer to register """ with self._observers_lock: self._observers.add(observer)
[docs] def unregister_observer(self, observer): """ Unregister an EventObserver. Unsubscribed observers will not receive Tango events subsequently received by this instance. :param observer: observer to register """ with self._observers_lock: self._observers.discard(observer)
[docs] def notify_observers(self, evt: tango.EventData): """ Distribute an event to all registered observers. :param evt: event to distribute """ # take a snapshot of observers to give stable state to iterate over. # We iterate over a copy rather than notifying while holding the lock # as we do not know how observer event processing will take. if not self._first_event_discarded: self._first_event_discarded = True LOGGER.debug("Discarding first event: %s", evt) return with self._observers_lock: observers_copy = set(self._observers) for observer in observers_copy: observer.notify(evt)
def __call__(self, evt: tango.EventData): """ Called by Tango DeviceProxy on event reception. Tango expects a function, hence we implement __call__ to provide a function-like interface. """ self.notify_observers(evt)
[docs] class SubscriptionManager: """ SubscriptionManager is a proxy for Tango event subscriptions that prevents duplicate subscriptions and minimises subscribe/unsubscribe calls. Previously, each time a script listened to an event, it would subscribe to an event, wait for reception of the appropriate event, then unsubscribe. These multiple subscribe/unsubscribe calls were found to create problems. SubscriptionManager was introduced to manage subscriptions, with the aim of having fewer, longer-lived subscriptions. Clients subscribe to the SubscriptionManager, and the SubscriptionManager handles any required subscriptions to Tango devices. """ def __init__(self, proxy_factory=TangoDeviceProxyFactory()): self._proxy_factory = proxy_factory self._proxies: Dict[str, tango.DeviceProxy] = {} self._callbacks: Dict[Tuple[str, str], Callback] = {} # maps (device name, device attribute) to event subscription ID self._subscription_ids: Dict[Tuple[str, str], int] = {} atexit.register(self._unsubscribe_all) # py3.8 # def register_observer(self, attr: Attribute, observer: EventObserver): def register_observer(self, attr: Attribute, observer): """ Register an EventObserver as an observer of a Tango attribute. Once registered, the EventObserver will be notified of each Tango event published by the attribute. :param attr: Tango attribute to observe :param observer: the EventObserver to notify """ # the observer must be registered before the subscription is # established to prevent a window where an event could be received but # not distributed callback = self._get_callback(attr) callback.register_observer(observer) self._subscribe(callback, attr) def _subscribe(self, callback, attr): k = (attr.device, attr.name) if k not in self._subscription_ids: proxy = self._get_proxy(attr.device) LOGGER.debug("Subscribing to %s/%s", attr.device, attr.name) sub_id = proxy.subscribe_event( attr.name, tango.EventType.CHANGE_EVENT, callback ) self._subscription_ids[k] = sub_id # py3.8 # def register_observer(self, attr: Attribute, observer: EventObserver): def unregister_observer(self, attr: Attribute, observer): """ Deregister an EventObserver as an observer of a Tango attribute. :param attr: the observed Tango attribute :param observer: the EventObserver to unsubscribe """ callback = self._get_callback(attr) callback.unregister_observer(observer) def _get_proxy(self, device_name: str) -> tango.DeviceProxy: if device_name not in self._proxies: proxy = self._proxy_factory(device_name) self._proxies[device_name] = proxy return self._proxies[device_name] def _get_callback(self, attr: Attribute) -> Callback: k = (attr.device, attr.name) if k in self._callbacks: return self._callbacks[k] callback = Callback() self._callbacks[k] = callback return callback def _unsubscribe_all(self): for (device, attr), pid in self._subscription_ids.items(): proxy = self._get_proxy(device) LOGGER.debug("Unsubscribing ID %s (%s/%s)", pid, device, attr) proxy.unsubscribe_event(pid) self._subscription_ids.clear()