Source code for ska_oso_scripting.engineering.low.utils.tango_utils

from __future__ import annotations

import logging
import queue
import re
import time
from collections import Counter
from functools import partial
from inspect import getfullargspec
from typing import Any, Callable, Union

import numpy as np
from tango import DeviceProxy, DevSource, EventType
from tango.device_proxy import __update_enum_values

from ska_oso_scripting.core.tango import TangoDeviceProxyFactory

LOG = logging.getLogger(__name__)


# Create a singleton instance of TangoDeviceProxyFactory
DEVICE_PROXY_FACTORY = TangoDeviceProxyFactory(default_source=DevSource.DEV)


[docs] def single_prop(dev: DeviceProxy, prop_name: str) -> str: """ Return the first value of a device property. Properties are always lists but many properties only expect a single value. This convenience method makes accessing those less verbose. """ return dev.get_property(prop_name)[prop_name][0]
[docs] def get_device(trl: str, tango_host=None, timeout_ms=10000) -> DeviceProxy: """ Grab the Tango device from the Tango Resource Locator (TRL) :params trl: the fully qualified domain name :return device: The Tango device """ device = DEVICE_PROXY_FACTORY("/".join(filter(None, [tango_host, trl]))) device.set_source(DevSource.DEV) if timeout_ms is not None: device.set_timeout_millis(timeout_ms) return device
[docs] def restart_devices(devs: list[DeviceProxy], force_restartserver=False): """ Restarts devs without affecting other devices. If dev is the only device in its device server, restarts the device server using DServer.RestartServer(). Otherwise, restarts just this device via Init(). """ dev_trl_map = {dev.dev_name(): dev for dev in devs} restart_trls = set(dev_trl_map) for dserver in [ DEVICE_PROXY_FACTORY(trl) for trl in {dev.adm_name() for dev in devs} ]: ds_trls = {trl for cls, trl in [x.split("::") for x in dserver.QueryDevice()]} if ds_trls.issubset(restart_trls) or force_restartserver: LOG.warning( f"Restarting devices {ds_trls} by restarting {dserver.dev_name()}" ) dserver.RestartServer() else: for trl in ds_trls & restart_trls: LOG.warning(f"Restarting device {trl} with Init()") dev_trl_map[trl].Init()
[docs] def member(dev: DeviceProxy) -> str: """ Return the member part of the device TRL. For example, if the TRL is "low-mccs/tile/s8-1-tpm01", then the member part is "s8-1-tpm01". :return: the member part of the device TRL. """ return dev.dev_name().split("/")[-1]
[docs] def wait_for( devs: DeviceProxy | list[DeviceProxy], attr: str, desired_value, failed_value=None, timeout: float = 60.0, quiet: bool = False, ) -> None: """ Block until attr of each of devs has a certain value or matches a predicate. :param devs: a DeviceProxy or a list of DeviceProxy :param attr: the name of the attribute to wait for :param desired_value: an attribute value to wait for, or a custom predicate which may accept either one argument (the value) or two (the value, and the device) :param failed_value: the value of the attribute which if reached to add the device in the failed list and not wait for any more updates on it. When every other device has reached either the desired value or this value, raise an appropriate exception. Also accepts a custom predicate. :param timeout: maximum time to wait for each device to meet the condition :param quiet: don't log anything :return: None """ def default_predicate(expected, value, _): match expected, value: case list() | np.ndarray(), np.ndarray(): return np.array_equal(value, expected) case set(), _: return value in expected case _: return value == expected # allow predicate to accept only value, or value and device desired_predicate_vararg: Union[Any, Callable[[Any], bool]] = ( desired_value if callable(desired_value) else partial(default_predicate, desired_value) ) failed_predicate_vararg: Union[Any, Callable[[Any], bool]] = ( failed_value if callable(failed_value) else partial(default_predicate, failed_value) ) if len(getfullargspec(desired_predicate_vararg).args) == 1: def desired_predicate(vararg, _): return desired_predicate_vararg(vararg) else: desired_predicate = desired_predicate_vararg if len(getfullargspec(failed_predicate_vararg).args) == 1: def failed_predicate(vararg, _): return failed_predicate_vararg(vararg) else: failed_predicate = failed_predicate_vararg if isinstance(devs, DeviceProxy): devs = [devs] if len(devs) == 0: return if not quiet: LOG.info( f"Waiting {timeout}s for attribute {attr} of {format_trls(devs)} " f"to reach {desired_value!r}" ) devs_left = set(devs) devs_failed = set() devs_done = {} try: for dev, value in _yield_attr_values(devs, attr, timeout=timeout): if dev not in devs_left: continue if not quiet: LOG.debug(f"{dev.dev_name()}/{attr} = {value!r}") if failed_value is not None and failed_predicate(value, dev): devs_failed.add(dev) devs_left.remove(dev) if desired_predicate(value, dev): devs_left.remove(dev) devs_done[dev] = value if not devs_left: if devs_failed: raise queue.Empty() break except queue.Empty: for dev in devs_left: assert dev.get_source() == DevSource.DEV value = _update_enum(dev, attr, dev[attr].value) if desired_predicate(value, dev): LOG.warning( f"{dev} did not send an event, " f"but attribute {attr} reached the desired value {value!r}" ) devs_done[dev] = value devs_left -= devs_done.keys() if devs_left | devs_failed: errs = [] if devs_left: errs.append( f"didn't reach desired value {desired_value!r} " f"for {format_trls(devs_left)}" ) if devs_failed: errs.append( f"reached fail value {failed_value!r} " f"for {format_trls(devs_failed)}" ) exc = ValueError( f"After {timeout}s, attribute {attr} " + ", and ".join(errs) ) exc.failed_devices = devs_left.union(devs_failed) raise exc from None # the queue.Empty is not useful information, drop it
[docs] def format_trls(devs): """ Returns a compact representation of device names by factoring out a common prefix. >>> format_trls(["low-mccs/tile/s8-1-tpm15", "low-mccs/tile/s8-1-tpm16"]) 'low-mccs/tile/s8-1-(tpm15|tpm16)' >>> format_trls(["low-mccs/subarray/01", "low-sdp/subarray/01"]) 'low-(mccs|sdp)/subarray/01' >>> format_trls(["low-mccs/station/s8-1", "low-mccs/spsstation/s8-1"]) 'low-mccs/(spsstation|station)/s8-1' >>> format_trls(["domain/family-a/1", "domain/family-a/2", "domain/family-b/1"]) 'domain/family-a/1, domain/family-a/2, domain/family-b/1' >>> format_trls([]) '' """ trls = sorted(dev if isinstance(dev, str) else dev.dev_name() for dev in devs) if len(trls) == 0: return "" if len(trls) == 1: return trls[0] # Split on regex word boundaries tokenised = [re.split(r"\b", trl) for trl in trls] # If all TRLs have the same number of tokens if len(set(len(tokens) for tokens in tokenised)) == 1: part_sets = [sorted(set(tokens)) for tokens in zip(*tokenised)] # If only one token differs between TRLs length_counts = Counter(len(part) for part in part_sets) if length_counts[1] == len(part_sets) - 1: # Construct and return the condensed format return "".join( part[0] if len(part) == 1 else f"({'|'.join(part)})" for part in part_sets ) # Otherwise just join full TRLs with comma return ", ".join(trls)
[docs] def fq_dev_name(dev: DeviceProxy) -> str: """Given a DeviceProxy, return a fully-qualified device name.""" return f"{dev.get_db_host()}:{dev.get_db_port()}/{dev.dev_name()}"
def _update_enum(dev: DeviceProxy, attr: str, value: Any) -> Any: """ Returns an attribute value, cast to the appropriate Enum type if applicable. This uses the same code Tango uses under the hood to create the values returned when accessing DevEnum attributes as Python attributes. If the attribute is not a DevEnum, it will be returned unchanged. """ attr_info = dev.__get_attr_cache().get(attr.lower()) if not attr_info: # this populates DeviceProxy's internal caches - see DeviceProxy.__getattr__ dev.__refresh_cmd_cache() dev.__refresh_attr_cache() attr_info = dev.__get_attr_cache()[attr.lower()] return __update_enum_values(attr_info, value) def _yield_attr_values(devices: list[DeviceProxy], attr: str, timeout: float = None): """ Subscribes to attr on each of devices, and yields (device, attribute value) tuples. """ def _evt_val(evt): if evt.err: return None if evt.attr_value is None: return evt.device, None return evt.device, _update_enum(evt.device, attr, evt.attr_value.value) s_q = queue.SimpleQueue() subscriptions = [ dev.subscribe_event(attr, EventType.CHANGE_EVENT, s_q.put, stateless=True) for dev in devices ] if timeout is not None: deadline = time.time() + timeout def get_fn(): return s_q.get(timeout=max(0.0, deadline - time.time())) else: get_fn = s_q.get try: yield from filter(None, map(_evt_val, iter(get_fn, object()))) finally: for dev, sub in zip(devices, subscriptions): dev.unsubscribe_event(sub)
[docs] def convert_devicetree_str_to_deviceproxy(device_tree: dict) -> dict: """ Process a device_tree dictionary, replacing any TRL strings with device proxy instances. This function recursively traverses a nested dictionary structure, replacing any string TRLs with DeviceProxy instances. :param device_tree: A nested dictionary where string keys/values are TRLs :return: A new nested dictionary with DeviceProxy instances replacing TRL strings """ if not isinstance(device_tree, dict): return device_tree result = {} for key, value in device_tree.items(): if isinstance(value, dict): # Recursively process nested dictionaries result[DEVICE_PROXY_FACTORY(key)] = convert_devicetree_str_to_deviceproxy( value ) elif isinstance(value, str): # If it's a string, assume it's a TRL and replace with a device proxy result[DEVICE_PROXY_FACTORY(key)] = DEVICE_PROXY_FACTORY(value) else: # Keep other types as-is result[DEVICE_PROXY_FACTORY(key)] = value return result