from __future__ import annotations
import logging
import queue
import re
import time
from collections import Counter
from functools import partial
from inspect import getfullargspec
from typing import Any, Callable, Union
import numpy as np
from tango import DeviceProxy, DevSource, EventType
from tango.device_proxy import __update_enum_values
from ska_oso_scripting.core.tango import TangoDeviceProxyFactory
LOG = logging.getLogger(__name__)
# Create a singleton instance of TangoDeviceProxyFactory
DEVICE_PROXY_FACTORY = TangoDeviceProxyFactory(default_source=DevSource.DEV)
[docs]
def single_prop(dev: DeviceProxy, prop_name: str) -> str:
"""
Return the first value of a device property.
Properties are always lists
but many properties only expect a single value.
This convenience method makes accessing those less verbose.
"""
return dev.get_property(prop_name)[prop_name][0]
[docs]
def get_device(trl: str, tango_host=None, timeout_ms=10000) -> DeviceProxy:
"""
Grab the Tango device from the Tango Resource Locator (TRL)
:params trl: the fully qualified domain name
:return device: The Tango device
"""
device = DEVICE_PROXY_FACTORY("/".join(filter(None, [tango_host, trl])))
device.set_source(DevSource.DEV)
if timeout_ms is not None:
device.set_timeout_millis(timeout_ms)
return device
[docs]
def restart_devices(devs: list[DeviceProxy], force_restartserver=False):
"""
Restarts devs without affecting other devices.
If dev is the only device in its device server,
restarts the device server using DServer.RestartServer().
Otherwise, restarts just this device via Init().
"""
dev_trl_map = {dev.dev_name(): dev for dev in devs}
restart_trls = set(dev_trl_map)
for dserver in [
DEVICE_PROXY_FACTORY(trl) for trl in {dev.adm_name() for dev in devs}
]:
ds_trls = {trl for cls, trl in [x.split("::") for x in dserver.QueryDevice()]}
if ds_trls.issubset(restart_trls) or force_restartserver:
LOG.warning(
f"Restarting devices {ds_trls} by restarting {dserver.dev_name()}"
)
dserver.RestartServer()
else:
for trl in ds_trls & restart_trls:
LOG.warning(f"Restarting device {trl} with Init()")
dev_trl_map[trl].Init()
[docs]
def member(dev: DeviceProxy) -> str:
"""
Return the member part of the device TRL.
For example, if the TRL is "low-mccs/tile/s8-1-tpm01",
then the member part is "s8-1-tpm01".
:return: the member part of the device TRL.
"""
return dev.dev_name().split("/")[-1]
[docs]
def wait_for(
devs: DeviceProxy | list[DeviceProxy],
attr: str,
desired_value,
failed_value=None,
timeout: float = 60.0,
quiet: bool = False,
) -> None:
"""
Block until attr of each of devs has a certain value or matches a predicate.
:param devs: a DeviceProxy or a list of DeviceProxy
:param attr: the name of the attribute to wait for
:param desired_value: an attribute value to wait for, or a custom predicate which
may accept either one argument (the value) or two (the value, and the device)
:param failed_value: the value of the attribute which if reached to add the device
in the failed list and not wait for any more updates on it. When every other
device has reached either the desired value or this value, raise an appropriate
exception. Also accepts a custom predicate.
:param timeout: maximum time to wait for each device to meet the condition
:param quiet: don't log anything
:return: None
"""
def default_predicate(expected, value, _):
match expected, value:
case list() | np.ndarray(), np.ndarray():
return np.array_equal(value, expected)
case set(), _:
return value in expected
case _:
return value == expected
# allow predicate to accept only value, or value and device
desired_predicate_vararg: Union[Any, Callable[[Any], bool]] = (
desired_value
if callable(desired_value)
else partial(default_predicate, desired_value)
)
failed_predicate_vararg: Union[Any, Callable[[Any], bool]] = (
failed_value
if callable(failed_value)
else partial(default_predicate, failed_value)
)
if len(getfullargspec(desired_predicate_vararg).args) == 1:
def desired_predicate(vararg, _):
return desired_predicate_vararg(vararg)
else:
desired_predicate = desired_predicate_vararg
if len(getfullargspec(failed_predicate_vararg).args) == 1:
def failed_predicate(vararg, _):
return failed_predicate_vararg(vararg)
else:
failed_predicate = failed_predicate_vararg
if isinstance(devs, DeviceProxy):
devs = [devs]
if len(devs) == 0:
return
if not quiet:
LOG.info(
f"Waiting {timeout}s for attribute {attr} of {format_trls(devs)} "
f"to reach {desired_value!r}"
)
devs_left = set(devs)
devs_failed = set()
devs_done = {}
try:
for dev, value in _yield_attr_values(devs, attr, timeout=timeout):
if dev not in devs_left:
continue
if not quiet:
LOG.debug(f"{dev.dev_name()}/{attr} = {value!r}")
if failed_value is not None and failed_predicate(value, dev):
devs_failed.add(dev)
devs_left.remove(dev)
if desired_predicate(value, dev):
devs_left.remove(dev)
devs_done[dev] = value
if not devs_left:
if devs_failed:
raise queue.Empty()
break
except queue.Empty:
for dev in devs_left:
assert dev.get_source() == DevSource.DEV
value = _update_enum(dev, attr, dev[attr].value)
if desired_predicate(value, dev):
LOG.warning(
f"{dev} did not send an event, "
f"but attribute {attr} reached the desired value {value!r}"
)
devs_done[dev] = value
devs_left -= devs_done.keys()
if devs_left | devs_failed:
errs = []
if devs_left:
errs.append(
f"didn't reach desired value {desired_value!r} "
f"for {format_trls(devs_left)}"
)
if devs_failed:
errs.append(
f"reached fail value {failed_value!r} "
f"for {format_trls(devs_failed)}"
)
exc = ValueError(
f"After {timeout}s, attribute {attr} " + ", and ".join(errs)
)
exc.failed_devices = devs_left.union(devs_failed)
raise exc from None # the queue.Empty is not useful information, drop it
[docs]
def fq_dev_name(dev: DeviceProxy) -> str:
"""Given a DeviceProxy, return a fully-qualified device name."""
return f"{dev.get_db_host()}:{dev.get_db_port()}/{dev.dev_name()}"
def _update_enum(dev: DeviceProxy, attr: str, value: Any) -> Any:
"""
Returns an attribute value, cast to the appropriate Enum type if applicable.
This uses the same code Tango uses under the hood to create the values returned
when accessing DevEnum attributes as Python attributes. If the attribute is not
a DevEnum, it will be returned unchanged.
"""
attr_info = dev.__get_attr_cache().get(attr.lower())
if not attr_info:
# this populates DeviceProxy's internal caches - see DeviceProxy.__getattr__
dev.__refresh_cmd_cache()
dev.__refresh_attr_cache()
attr_info = dev.__get_attr_cache()[attr.lower()]
return __update_enum_values(attr_info, value)
def _yield_attr_values(devices: list[DeviceProxy], attr: str, timeout: float = None):
"""
Subscribes to attr on each of devices, and yields (device, attribute value) tuples.
"""
def _evt_val(evt):
if evt.err:
return None
if evt.attr_value is None:
return evt.device, None
return evt.device, _update_enum(evt.device, attr, evt.attr_value.value)
s_q = queue.SimpleQueue()
subscriptions = [
dev.subscribe_event(attr, EventType.CHANGE_EVENT, s_q.put, stateless=True)
for dev in devices
]
if timeout is not None:
deadline = time.time() + timeout
def get_fn():
return s_q.get(timeout=max(0.0, deadline - time.time()))
else:
get_fn = s_q.get
try:
yield from filter(None, map(_evt_val, iter(get_fn, object())))
finally:
for dev, sub in zip(devices, subscriptions):
dev.unsubscribe_event(sub)
[docs]
def convert_devicetree_str_to_deviceproxy(device_tree: dict) -> dict:
"""
Process a device_tree dictionary, replacing any TRL strings with device
proxy instances.
This function recursively traverses a nested dictionary structure,
replacing any string TRLs with DeviceProxy instances.
:param device_tree: A nested dictionary where string keys/values are TRLs
:return: A new nested dictionary with DeviceProxy instances replacing TRL
strings
"""
if not isinstance(device_tree, dict):
return device_tree
result = {}
for key, value in device_tree.items():
if isinstance(value, dict):
# Recursively process nested dictionaries
result[DEVICE_PROXY_FACTORY(key)] = convert_devicetree_str_to_deviceproxy(
value
)
elif isinstance(value, str):
# If it's a string, assume it's a TRL and replace with a device proxy
result[DEVICE_PROXY_FACTORY(key)] = DEVICE_PROXY_FACTORY(value)
else:
# Keep other types as-is
result[DEVICE_PROXY_FACTORY(key)] = value
return result