"""
The command module contains code that encapsulates Tango device interactions
(commands, attribute read/writes, etc.) and provides the means to execute
them.
The OET decouples functions from Tango devices so that the commands can be
managed and executed by a proxy. This allows the proxy to execute commands
asynchronously while listening for interrupt signals, while to the caller
the execution appears synchronous.
"""
from __future__ import annotations
import atexit
import logging
import queue
import threading
import weakref
from typing import Dict, Tuple
import tango
from .exception import AlreadySubscribedError, EventTimeoutError
LOGGER = logging.getLogger(__name__)
class Attribute:
"""
An abstraction of a Tango attribute.
"""
def __init__(self, device: str, name: str):
"""
Create an Attribute instance.
:param device: the FQDN of the target Tango device
:param name: the name of the attribute to read
"""
self.device = device
self.name = name
def __repr__(self):
return f'<Attribute("{self.device}", "{self.name}")>'
def __eq__(self, other):
if not isinstance(other, Attribute):
return False
return self.device == other.device and self.name == other.name
class Command:
"""
An abstraction of a Tango command.
"""
def __init__(self, device: str, command_name: str, *args):
"""
Create a Tango command.
:param device: the FQDN of the target Tango device
:param command_name: the name of the command to execute
:param args: unnamed arguments to be passed to the command
"""
self.device = device
self.command_name = command_name
self.args = args
def __repr__(self):
arg_str = ", ".join([repr(o) for o in self.args])
return f"<Command({repr(self.device)}, {repr(self.command_name)}, {arg_str})>"
def __eq__(self, other):
if not isinstance(other, Command):
return False
return (
self.device == other.device
and self.command_name == other.command_name
and self.args == other.args
)
[docs]
class TangoDeviceProxyFactory: # pylint: disable=too-few-public-methods
"""
A call to create Tango DeviceProxy clients.
This class exists to allow unit tests to override the factory with an
implementation that returns mock DeviceProxy instances.
"""
def __call__(self, device_name: str) -> tango.DeviceProxy:
proxy = tango.DeviceProxy(device_name)
proxy.set_timeout_millis(10000)
return proxy
class TangoExecutor: # pylint: disable=too-few-public-methods
"""
TangoExecutor is the proxy between calling code and Tango devices. It
accepts encapsulated Tango interactions and performs them on behalf of the
calling code.
"""
class SingleQueueEventStrategy:
"""
SingleQueueEventStrategy encapsulates the event handling behaviour of
the TangoExecutor from ~October 2021, when all events were added to a
single queue and subscriptions were created and released after each
attribute read operation.
We hope to replace this with a more advanced implementation that
allows subscriptions to multiple events.
:param mgr: SubscriptionManager instance used to observe events
"""
def __init__(self, mgr: SubscriptionManager):
self._subscription_manager = mgr
self._subscribed = False
self._queue = queue.Queue()
def subscribe_event(self, attr: Attribute) -> int:
"""
Subscribe to change events published by a Tango attribute.
This strategy only supports one active subscription at any time.
An exception will be raised if a second subscription is attempted.
This method returns a subscription identifier which should be
supplied to a subsequent unsubscribe_event method.
:param attr: attribute to subscribe to
:return: subscription identifier
"""
if self._subscribed:
raise AlreadySubscribedError(
f"Multiple subscriptions not allowed: {attr}"
)
LOGGER.debug("Observing %s/%s", attr.device, attr.name)
self._subscription_manager.register_observer(attr, self)
return -1
def unsubscribe_event(
self,
attr: Attribute,
subscription_id: int, # pylint: disable=unused-argument
) -> None:
"""
Unsubscribe to change events published by a Tango attribute.
This strategy only supports one active subscription at any time.
An exception will be raised if a second subscription is attempted.
:param attr: attribute to unsubscribe from
:param subscription_id: subscription identifier
"""
LOGGER.debug("Unobserving %s/%s", attr.device, attr.name)
self._subscription_manager.unregister_observer(attr, self)
self._drain()
self._subscribed = False
def _drain(self):
"""
Drains all events from the queue, blocking until the queue is empty.
"""
drained = False
while not drained:
try:
self._queue.get(block=False)
except queue.Empty:
drained = True
def read_event(
self,
attr: Attribute, # pylint: disable=unused-argument
timeout: float,
) -> tango.EventData:
"""
Read an event from the queue. This function blocks until an event
is received.
With a single subscription active at any one time, the attribute
is ignored by this implementation but is expected to be required
by strategy that support multiple attribute subscriptions.
:param timeout: timeout value to interrupt execution
"""
LOGGER.info("Reading event from the queue with timeout %s", timeout)
try:
evt = self._queue.get(timeout=timeout)
return evt
except queue.Empty:
raise EventTimeoutError(
"Timeout after waiting read an event from the queue"
)
def notify(self, evt: tango.EventData):
"""
This implements the SubscriptionManager EventObserver interface. Tango
ChangeEvents republished by the SubscriptionManager are received via
this method.
Queue is thread-safe so we do not need to synchronise this method with
read_event.
"""
LOGGER.debug("Putting received event in the queue: %s", evt)
self._queue.put(evt)
def __init__(self, proxy_factory=TangoDeviceProxyFactory()):
"""
Create a new TangoExecutor.
:param proxy_factory: a function or object which, when called, returns
an object that conforms to the PyTango DeviceProxy interface.
"""
self._proxy_factory = proxy_factory
# maps
subscription_manager = SubscriptionManager(proxy_factory)
self._evt_strategy = TangoExecutor.SingleQueueEventStrategy(
subscription_manager
)
# maps device names to device proxies. These proxies are used for
# command execution and polling reads. There is scope for these to be
# unified with the proxies used for event management.
# TODO modify TangoDeviceProxyFactory to cache proxies?
self._device_proxies: Dict[str, tango.DeviceProxy] = {}
def execute(self, command: Command, **kwargs):
"""
Execute a Command on a Tango device.
Additional kwargs to the DeviceProxy can be specified if required.
:param command: the command to execute
:return: the response, if any, returned by the Tango device
"""
proxy = self._get_proxy(command.device)
param = None
if len(command.args) == 1:
param = command.args[0]
if len(command.args) > 1:
param = command.args
LOGGER.info("Executing command: %r", command)
return proxy.command_inout(command.command_name, cmd_param=param, **kwargs)
def read(self, attribute: Attribute):
"""
Read an attribute on a Tango device.
:param attribute: the attribute to read
:return: the attribute value
"""
proxy = self._get_proxy(attribute.device)
LOGGER.debug("Reading attribute: %s/%s", attribute.device, attribute.name)
response = getattr(proxy, attribute.name)
return response
def subscribe_event(self, attribute: Attribute):
"""
Subscribe event on a Tango device.
:param attribute: the attribute to subscribe to
:return: subscription ID
"""
return self._evt_strategy.subscribe_event(attribute)
def read_event(self, attr: Attribute, timeout: float = None) -> tango.EventData:
"""
Get an event for the specified attribute.
:param timeout: custom timeout provided while execution of command's
if systems do not respond within reasonable timescales then method raised EventTimeoutError.
"""
return self._evt_strategy.read_event(attr, timeout=timeout)
def unsubscribe_event(self, attribute: Attribute, event_id: int):
"""
unsubscribe event on a Tango device.
:param attribute: the attribute to unsubscribe
:param event_id: event subscribe id
:return:
"""
self._evt_strategy.unsubscribe_event(attribute, event_id)
def _get_proxy(self, device_name: str) -> tango.DeviceProxy:
# It takes time to construct and connect a device proxy to the remote
# device, so instances are cached
if device_name not in self._device_proxies:
proxy = self._proxy_factory(device_name)
self._device_proxies[device_name] = proxy
return self._device_proxies[device_name]
# class EventObserver(Protocol):
# def notify(self, evt: tango.EventData) -> None:
# ...
[docs]
class Callback:
"""
Callback is an observable that distributes Tango events received by the
callback instance to all observers registered at the moment of event
reception.
"""
def __init__(self):
# observers should not be kept alive due to registration
# self.observers: weakref.WeakSet[EventObserver] = weakref.WeakSet()
self._observers = weakref.WeakSet()
# Observer notification is likely to run on a different thread from
# observer registration, hence the observers set is locked before any
# operation.
self._observers_lock = threading.Lock()
# Tango (or SKA implementation?) emits an event containing the current
# device value when subscribing to attribute change events. This is
# confusing as it's not a change in a value, just a statement of the
# initial value. This flag is set if the 'discard first event' feature
# flag is set and the first event is discarded.
self._first_event_discarded = False
[docs]
def register_observer(self, observer):
"""
Register an EventObserver.
Once registered, the observer will be notified of all Tango events
received by this instance.
:param observer: observer to register
"""
with self._observers_lock:
self._observers.add(observer)
[docs]
def unregister_observer(self, observer):
"""
Unregister an EventObserver.
Unsubscribed observers will not receive Tango events subsequently
received by this instance.
:param observer: observer to register
"""
with self._observers_lock:
self._observers.discard(observer)
[docs]
def notify_observers(self, evt: tango.EventData):
"""
Distribute an event to all registered observers.
:param evt: event to distribute
"""
# take a snapshot of observers to give stable state to iterate over.
# We iterate over a copy rather than notifying while holding the lock
# as we do not know how observer event processing will take.
if not self._first_event_discarded:
self._first_event_discarded = True
LOGGER.debug("Discarding first event: %s", evt)
return
with self._observers_lock:
observers_copy = set(self._observers)
for observer in observers_copy:
observer.notify(evt)
def __call__(self, evt: tango.EventData):
"""
Called by Tango DeviceProxy on event reception. Tango expects a
function, hence we implement __call__ to provide a function-like
interface.
"""
self.notify_observers(evt)
[docs]
class SubscriptionManager:
"""
SubscriptionManager is a proxy for Tango event subscriptions that prevents
duplicate subscriptions and minimises subscribe/unsubscribe calls.
Previously, each time a script listened to an event, it would subscribe to
an event, wait for reception of the appropriate event, then unsubscribe.
These multiple subscribe/unsubscribe calls were found to create problems.
SubscriptionManager was introduced to manage subscriptions, with the aim of
having fewer, longer-lived subscriptions. Clients subscribe to the
SubscriptionManager, and the SubscriptionManager handles any required
subscriptions to Tango devices.
"""
def __init__(self, proxy_factory=TangoDeviceProxyFactory()):
self._proxy_factory = proxy_factory
self._proxies: Dict[str, tango.DeviceProxy] = {}
self._callbacks: Dict[Tuple[str, str], Callback] = {}
# maps (device name, device attribute) to event subscription ID
self._subscription_ids: Dict[Tuple[str, str], int] = {}
atexit.register(self._unsubscribe_all)
# py3.8
# def register_observer(self, attr: Attribute, observer: EventObserver):
def register_observer(self, attr: Attribute, observer):
"""
Register an EventObserver as an observer of a Tango attribute.
Once registered, the EventObserver will be notified of each Tango
event published by the attribute.
:param attr: Tango attribute to observe
:param observer: the EventObserver to notify
"""
# the observer must be registered before the subscription is
# established to prevent a window where an event could be received but
# not distributed
callback = self._get_callback(attr)
callback.register_observer(observer)
self._subscribe(callback, attr)
def _subscribe(self, callback, attr):
k = (attr.device, attr.name)
if k not in self._subscription_ids:
proxy = self._get_proxy(attr.device)
LOGGER.debug("Subscribing to %s/%s", attr.device, attr.name)
sub_id = proxy.subscribe_event(
attr.name, tango.EventType.CHANGE_EVENT, callback
)
self._subscription_ids[k] = sub_id
# py3.8
# def register_observer(self, attr: Attribute, observer: EventObserver):
def unregister_observer(self, attr: Attribute, observer):
"""
Deregister an EventObserver as an observer of a Tango attribute.
:param attr: the observed Tango attribute
:param observer: the EventObserver to unsubscribe
"""
callback = self._get_callback(attr)
callback.unregister_observer(observer)
def _get_proxy(self, device_name: str) -> tango.DeviceProxy:
if device_name not in self._proxies:
proxy = self._proxy_factory(device_name)
self._proxies[device_name] = proxy
return self._proxies[device_name]
def _get_callback(self, attr: Attribute) -> Callback:
k = (attr.device, attr.name)
if k in self._callbacks:
return self._callbacks[k]
callback = Callback()
self._callbacks[k] = callback
return callback
def _unsubscribe_all(self):
for (device, attr), pid in self._subscription_ids.items():
proxy = self._get_proxy(device)
LOGGER.debug("Unsubscribing ID %s (%s/%s)", pid, device, attr)
proxy.unsubscribe_event(pid)
self._subscription_ids.clear()