Source code for ska_tango_base.utils

#
# This file is part of the SKA Tango Base project
#
# Distributed under the terms of the BSD 3-clause new license.
# See LICENSE.txt for more info.
"""General utilities that may be useful to SKA devices and clients."""

from __future__ import annotations

import ast
import functools
import inspect
import json
import logging
import pydoc
import sys
import time
import traceback
import uuid
import warnings
from collections.abc import Callable, Generator
from contextlib import contextmanager
from datetime import datetime
from typing import Any, ParamSpec, TypeVar, cast

import tango
from tango import (
    AttrQuality,
    AttrWriteType,
    DbDatum,
    DbDevInfo,
    DeviceProxy,
    DevState,
    ErrSeverity,
    Except,
)
from tango.server import attribute, command

from .faults import GroupDefinitionsError, SKABaseError

int_types = {
    tango.CmdArgType.DevUShort,
    tango.CmdArgType.DevLong,
    tango.CmdArgType.DevULong,
    tango.CmdArgType.DevULong64,
    tango.CmdArgType.DevLong64,
    tango.CmdArgType.DevShort,
}

float_types = {
    tango.CmdArgType.DevDouble,
    tango.CmdArgType.DevFloat,
}

# TBD - investigate just using (command argin data_type)
tango_type_conversion = {
    tango.CmdArgType.DevUShort.real: "int",
    tango.CmdArgType.DevLong.real: "int",
    tango.CmdArgType.DevULong.real: "int",
    tango.CmdArgType.DevULong64.real: "int",
    tango.CmdArgType.DevLong64.real: "int",
    tango.CmdArgType.DevShort.real: "int",
    tango.CmdArgType.DevDouble.real: "float",
    tango.CmdArgType.DevFloat.real: "float",
    tango.CmdArgType.DevString.real: "str",
    tango.CmdArgType.DevBoolean.real: "bool",
    tango.CmdArgType.DevEncoded.real: "encoded",
    tango.CmdArgType.DevState.real: "state",
    tango.CmdArgType.DevVoid.real: "void",
    tango.CmdArgType.DevEnum.real: "enum",
}
# TBD - not all tango types are used
# tango.CmdArgType.ConstDevString           tango.CmdArgType.DevState
# tango.CmdArgType.DevVarLong64Array        tango.CmdArgType.conjugate
# tango.CmdArgType.DevBoolean               tango.CmdArgType.DevString
# tango.CmdArgType.DevVarLongArray          tango.CmdArgType.denominator
# tango.CmdArgType.DevDouble                tango.CmdArgType.DevUChar
# tango.CmdArgType.DevVarLongStringArray    tango.CmdArgType.imag
# tango.CmdArgType.DevEncoded               tango.CmdArgType.DevULong
# tango.CmdArgType.DevVarShortArray         tango.CmdArgType.mro
# tango.CmdArgType.DevEnum                  tango.CmdArgType.DevULong64
# tango.CmdArgType.DevVarStateArray         tango.CmdArgType.name
# tango.CmdArgType.DevFloat                 tango.CmdArgType.DevUShort
# tango.CmdArgType.DevVarStringArray        tango.CmdArgType.names
# tango.CmdArgType.DevVarULong64Array       tango.CmdArgType.numerator
# tango.CmdArgType.DevLong                  tango.CmdArgType.DevVarCharArray
# tango.CmdArgType.DevVarULongArray         tango.CmdArgType.real
# tango.CmdArgType.DevLong64                tango.CmdArgType.DevVarDoubleArray
# tango.CmdArgType.DevVarUShortArray        tango.CmdArgType.values
# tango.CmdArgType.DevPipeBlob              tango.CmdArgType.DevVarDoubleStringArray
# tango.CmdArgType.DevVoid
# tango.CmdArgType.DevShort                 tango.CmdArgType.DevVarFloatArray


[docs] @contextmanager def exception_manager( cls: type[Exception], callback: Callable[[], None] | None = None ) -> Generator[None, None, None]: """ Return a context manager that manages exceptions. :param cls: class type :param callback: a callback :yields: return a context manager """ try: yield except tango.DevFailed: # Find caller from the relative point of this executing handler curframe = inspect.currentframe() calframe = inspect.getouterframes(curframe, 2) # Form exception message message = f"{type(tango.DevFailed).__name__}: {tango.DevFailed.message}" # Retrieve class class_name = str(cls.__class__.__name__) # Add info to message additional_info = traceback.format_exc() message = message + " [--" + additional_info + "--] " # cls.exception(command_name=class_name + "::" + calframe[2][3], # command_inputs=str(arguments), # message=message) if callback: callback() tango.Except.re_throw_exception( tango.DevFailed, "SKA_CommandFailed", message, class_name + "::" + calframe[2][3], ) except Exception as anything: # Find caller from the relative point of this executing handler curframe = inspect.currentframe() calframe = inspect.getouterframes(curframe, 2) # Form exception message message = f"{type(anything).__name__}: {tango.DevFailed.message}" # Retrieve class class_name = str(cls.__class__.__name__) # Add info to message additional_info = traceback.format_exc() message = message + " [--" + additional_info + "--] " # cls.exception(command_name=class_name+"::"+calframe[2][3], # command_inputs=str(arguments), # message=message) if callback: callback() tango.Except.throw_exception( "SKA_CommandFailed", message, class_name + "::" + calframe[2][3] )
[docs] def get_dev_info( domain_name: str, device_server_name: str, device_ref: str ) -> DbDevInfo: """ Get device info. :param domain_name: tango domain name :param device_server_name: tango device server name :param device_ref: tango device reference :return: database device info instance """ dev_info = DbDevInfo() dev_info._class = device_server_name dev_info.server = f"{device_server_name}/{domain_name}" # add the device dev_info.name = f"{domain_name}/{device_ref}" return dev_info
[docs] def dp_set_property(device_name: str, property_name: str, property_value: Any) -> None: """ Use a DeviceProxy to set a device property. :param device_name: tango device name :param property_name: tango property name :param property_value: tango property value """ device_proxy = DeviceProxy(device_name) db_datum = DbDatum() db_datum.name = property_name if isinstance(property_value, list): for value in property_value: db_datum.value_string.append(value) else: db_datum.value_string.append(property_value) device_proxy.put_property(db_datum)
[docs] def get_device_group_and_id(device_name: str) -> list[str]: """ Return the group and id part of a device name. :param device_name: tango device name :return: group & id part of tango device name """ return device_name.split("/")[1:]
[docs] def convert_api_value(param_dict: dict[str, str]) -> tuple[str, Any]: """ Validate tango command parameters which are passed via json. :param param_dict: parameters :raises TypeError: invalid type :raises ValueError: value not of specified type :return: tuple(name, value) """ valid_types = ["int", "bool", "str", "float"] type_str = param_dict.get("type", "str").lower() if type_str not in valid_types: raise TypeError(f"Valid types must be from {', '.join(valid_types)}") value_type: Any # for type checker value_type = pydoc.locate(type_str) value_str = str(param_dict.get("value")) if value_type is bool: if value_str.lower() not in ["true", "false"]: raise ValueError( f"Parameter value {param_dict.get('value')} is not of type {value_type}" ) value = value_str.lower() == "true" else: value = value_type(value_str) return str(param_dict.get("name")), value
[docs] def coerce_value(value: Any) -> Any: """ Coerce tango.DevState values to string, leaving other values alone. :param value: a tango DevState :return: DevState as a string """ # Enum is not serialised correctly as json # _DevState is tango.DevState # because DevState is tango.DevState != tango.DevState if type(value) in [DevState, tango.DevState]: return str(value) return value
[docs] def get_dp_attribute( device_proxy: DeviceProxy, dp_attribute: attribute, with_value: bool = False, with_context: bool = False, ) -> dict[str, Any]: """ Get an attribute from a DeviceProxy. :param device_proxy:a tango device proxy :param dp_attribute: Attribute :param with_value: default False :param with_context: default False :return: dictionary of attribute info """ attr_dict = { "name": dp_attribute.name, "polling_frequency": dp_attribute.events.per_event.period, "min_value": ( dp_attribute.min_value if dp_attribute.min_value != "Not specified" else None ), "max_value": ( dp_attribute.max_value if dp_attribute.max_value != "Not specified" else None ), "readonly": dp_attribute.writable not in [ AttrWriteType.READ_WRITE, AttrWriteType.WRITE, AttrWriteType.READ_WITH_WRITE, ], } # TBD - use tango_type_conversion dict, or just str(attribute.data_format) if dp_attribute.data_format == tango.AttrDataFormat.SCALAR: if dp_attribute.data_type in int_types: attr_dict["data_type"] = "int" elif dp_attribute.data_type in float_types: attr_dict["data_type"] = "float" elif dp_attribute.data_type == tango.CmdArgType.DevString: attr_dict["data_type"] = "str" elif dp_attribute.data_type == tango.CmdArgType.DevBoolean: attr_dict["data_type"] = "bool" elif dp_attribute.data_type == tango.CmdArgType.DevEncoded: attr_dict["data_type"] = "encoded" elif dp_attribute.data_type == tango.CmdArgType.DevVoid: attr_dict["data_type"] = "void" else: # Data types we aren't really going to represent attr_dict["data_type"] = "other" if with_context: device_type, device_id = get_tango_device_type_id(device_proxy.dev_name()) attr_dict["component_type"] = device_type attr_dict["component_id"] = device_id if with_value: try: attr_value = device_proxy.read_attribute(dp_attribute.name) attr_dict["value"] = coerce_value(attr_value.value) attr_dict["is_alarm"] = attr_value.quality == AttrQuality.ATTR_ALARM timestamp = datetime.fromtimestamp(attr_value.time.tv_sec) timestamp.replace(microsecond=attr_value.time.tv_usec) attr_dict["timestamp"] = timestamp.isoformat() except Exception: # TBD - decide what to do - add log? pass return attr_dict
[docs] def get_dp_command( device_name: str, dp_command: command, with_context: bool = False ) -> dict[str, Any]: """ Get a command from a DeviceProxy. :param device_name: tango device name :param dp_command: tango command :param with_context: default False :return: dictionary of command info """ def command_parameters(command_desc: str) -> dict[str, Any]: try: non_json = ["", "none", "Uninitialised"] if command_desc in non_json: return {} # ugghhh POGO replaces quotes with backticks :( return cast( dict[str, Any], ast.literal_eval(command_desc.replace("`", "'")) ) except Exception: # TBD - decide what to do - add log? pass return {} command_dict = { "name": dp_command.cmd_name, "parameters": command_parameters(dp_command.in_type_desc), } if with_context: device_type, device_id = get_tango_device_type_id(device_name) command_dict["component_type"] = device_type command_dict["component_id"] = device_id return command_dict
[docs] def get_tango_device_type_id(tango_address: str) -> list[str]: """ Return the type id of a TANGO device. :param tango_address: tango device address :return: the type id of the device """ return tango_address.split("/")[1:3]
[docs] def get_groups_from_json(json_definitions: list[str]) -> dict[str, Any]: """ Return a dict of tango.Group objects matching the JSON definitions. Extracts the definitions of groups of devices and builds up matching tango.Group objects. Some minimal validation is done - if the definition contains nothing then None is returned, otherwise an exception will be raised on error. This function will *NOT* attempt to verify that the devices exist in the Tango database, nor that they are running. The definitions would typically be provided by the Tango device property "GroupDefinitions", available in the SKABaseDevice. The property is an array of strings. Thus a sequence is expected for this function. Each string in the list is a JSON serialised dict defining the "group_name", "devices" and "subgroups" in the group. The tango.Group() created enables easy access to the managed devices in bulk, or individually. Empty and whitespace-only strings will be ignored. The general format of the list is as follows, with optional "devices" and "subgroups" keys: .. code-block:: py [ {"group_name": "<name>", "devices": ["<dev name>", ...]}, { "group_name": "<name>", "devices": ["<dev name>", "<dev name>", ...], "subgroups" : [{<nested group>}, {<nested group>}, ...] }, ... ] For example, a hierarchy of racks, servers and switches: .. code-block:: py [ { "group_name": "servers", "devices": [ "elt/server/1", "elt/server/2", "elt/server/3", "elt/server/4", ], }, { "group_name": "switches", "devices": ["elt/switch/A", "elt/switch/B"], }, { "group_name": "pdus", "devices": ["elt/pdu/rackA", "elt/pdu/rackB"], }, { "group_name": "racks", "subgroups": [ { "group_name": "rackA", "devices": [ "elt/server/1", "elt/server/2", "elt/switch/A", "elt/pdu/rackA", ], }, { "group_name": "rackB", "devices": [ "elt/server/3", "elt/server/4", "elt/switch/B", "elt/pdu/rackB", ], "subgroups": [], }, ], }, ] :param json_definitions: Sequence of strings, each one a JSON dict with keys "group_name", and one or both of: "devices" and "subgroups", recursively defining the hierarchy. :return: A dictionary, the keys of which are the names of the groups, in the following form: {"<group name 1>": <tango.Group>, "<group name 2>": <tango.Group>, ...}. Will be an empty dict if no groups were specified. :raises GroupDefinitionsError: arising from GroupDefinitionsError - If error parsing JSON string. - If missing keys in the JSON definition. - If invalid device name. - If invalid groups included. - If a group has multiple parent groups. - If a device is included multiple time in a hierarchy. E.g. g1:[a,b] g2:[a,c] g3:[g1,g2] """ try: # Parse and validate user's definitions groups = {} for json_definition in (j.strip() for j in json_definitions): if json_definition: definition = json.loads(json_definition) _validate_group(definition) group_name = definition["group_name"] groups[group_name] = _build_group(definition) return groups except Exception as exc: # the exc_info is included for detailed traceback ska_error = SKABaseError(exc) raise GroupDefinitionsError(ska_error).with_traceback(sys.exc_info()[2])
def _validate_group(definition: dict[str, Any]) -> None: """ Validate and clean up groups definition, raise AssertError if invalid. Used internally by `get_groups_from_json`. :param definition: the group definition :raise AssertError: if group is invalid """ error_message = f"Missing 'group_name' key - {definition}" assert "group_name" in definition, error_message error_message = f"Missing 'devices' or 'subgroups' key - {definition}" assert "devices" in definition or "subgroups" in definition, error_message definition["group_name"] = definition["group_name"].strip() old_devices = definition.get("devices", []) new_devices = [] for old_device in old_devices: # sanity check on device name, expect 'domain/family/member' # TODO (AJ): Check with regex. Allow fully qualified names? device = old_device.strip() error_message = f"Invalid device name format - {device}" assert device.count("/") == 2, error_message new_devices.append(device) definition["devices"] = new_devices subgroups = definition.get("subgroups", []) for subgroup_definition in subgroups: _validate_group(subgroup_definition) # recurse def _build_group(definition: dict[str, Any]) -> tango.Group: """ Return tango.Group object according to defined hierarchy. Used internally by `get_groups_from_json`. :param definition: definition of the group :return: a tango Group """ group_name = definition["group_name"] devices = definition.get("devices", []) subgroups = definition.get("subgroups", []) group = tango.Group(group_name) for device_name in devices: group.add(device_name) for subgroup_definition in subgroups: subgroup = _build_group(subgroup_definition) # recurse group.add(subgroup) return group
[docs] def validate_capability_types( command_name: str, requested_capabilities: list[str], valid_capabilities: list[str] ) -> None: """ Check the validity of the capability types passed to the specified command. :param command_name: The name of the command to be executed. :param requested_capabilities: A list of strings representing capability types. :param valid_capabilities: A list of strings representing capability types. """ invalid_capabilities = list(set(requested_capabilities) - set(valid_capabilities)) if invalid_capabilities: Except.throw_exception( "Command failed!", f"Invalid capability types requested {invalid_capabilities}", command_name, ErrSeverity.ERR, )
[docs] def validate_input_sizes(command_name: str, argin: tuple[list[int], list[str]]) -> None: """ Check the validity of the input parameters passed to the specified command. :param command_name: The name of the command which is to be executed. :param argin: A tuple of two lists """ capabilities_instances, capability_types = argin if len(capabilities_instances) != len(capability_types): Except.throw_exception( "Command failed!", "Argin value lists size mismatch.", command_name, ErrSeverity.ERR, )
[docs] def convert_dict_to_list(dictionary: dict[Any, Any]) -> list[str]: """ Convert a dictionary to a list of "key:value" strings. :param dictionary: a dictionary to be converted :return: a list of key/value strings """ the_list = [] for key, value in list(dictionary.items()): the_list.append(f"{key}:{value}") return sorted(the_list)
[docs] def for_testing_only( func: Callable[..., Any], _testing_check: Callable[[], bool] = lambda: "pytest" in sys.modules, ) -> Callable[..., Any]: """ Return a function that warns if called outside of testing, then calls a function. This is intended to be used as a decorator that marks a function as available for testing purposes only. If the decorated function is called outside of testing, a warning is raised. .. code-block:: python @for_testing_only def _straight_to_state(self, state): ... :param func: function to be wrapped :param _testing_check: True if testing :return: the wrapper function """ @functools.wraps(func) def _wrapper(*args: Any, **kwargs: Any) -> Any: """ Raise a warning if not testing, then call the function. This is a wrapper function that implements the functionality of the decorator. :param args: function arguments :param kwargs: function keyword arguments :return: the wrapped function """ if not _testing_check(): warnings.warn(f"{func.__name__} should only be used for testing purposes.") return func(*args, **kwargs) return _wrapper
[docs] def generate_command_id(command_name: str) -> str: """ Generate a unique command ID for a given command name. :param command_name: name of the command for which an ID is to be generated. :return: a unique command ID string """ return f"{time.time()}_{uuid.uuid4().fields[-1]}_{command_name}"
P = ParamSpec("P") """Parameter specification variable.""" T = TypeVar("T") """Type variable."""
[docs] def deprecate_kwarg( name: str, detail: str | None = None ) -> Callable[[Callable[P, T]], Callable[P, T]]: """ Deprecate a keyword argument. If the decorated function is passed the keyword argument a DeprecationWarning is emitted. :param name: name of keyword argument :param detail: sentence to add to the warning :return: decorator """ def decorator(func: Callable[P, T]) -> Callable[P, T]: @functools.wraps(func) def wrapped(*args: P.args, **kwargs: P.kwargs) -> Any: if name in kwargs: message = f'Deprecated kwarg "{name}" passed to {func.__name__}.' if detail is not None: message += f" {detail}" warnings.warn(message, DeprecationWarning, stacklevel=2) return func(*args, **kwargs) return wrapped return decorator
@contextmanager def _hold_tango_monitor( mon: tango.DeviceImpl | tango.DeviceClass, *, timeout: float | None = None, logger: logging.Logger | None = None, ) -> Generator[None, None, None]: """ Acquire a tango monitor with a specific timeout. Unlike AutoTangoMonitor, this context manager will allow you to set a longer timeout than the in-built timeout of the Tango Monitor (which can be set with set_timeout), which avoids disturbing other threads that are waiting on the lock. The timeout provided will be rounded to the next highest multiple of the Tango Monitors built in timeout. Warning logs will be emitted every time acquiring the lock times out so that there is visibility that there is contention on the lock. Ideally, this should not happen. :param mon: Tango monitor to hold :param timeout: Time to wait for lock in seconds, if :code:`None` this will wait indefinitely :param logger: Logger to use, if :code:`None` use :code:`mon.logger` or the "ska_tango_base.hold_tango_monitor" logger :raises tango.DevFailed: if acquiring the lock times out """ if logger is None: logger = getattr( mon, "logger", logging.getLogger("ska_tango_base.hold_tango_monitor") ) deadline = None if timeout is not None: deadline = time.monotonic() + timeout lock = tango.AutoTangoMonitor(mon) while True: try: lock.__enter__() break except tango.DevFailed as ex: if ex.args[0].reason != "API_CommandTimedOut": raise if deadline is not None and time.monotonic() > deadline: raise logger.warning( "Failed to acquire tango monitor. Trying again...", exc_info=True ) try: yield finally: lock.__exit__()