Source code for ska_low_mccs.utils

"""Utility functions relevant to the entire ska-low-mccs package."""

from __future__ import annotations

import json
import os
import random
import time
from functools import wraps
from typing import Any, Callable, Optional

import numpy as np

__all__ = [
    "LatencyInjector",
    "summarize_device_config",
    "subarraybeam_trl_from_ids",
]

# Reserved local ID range per subarray
MAX_SUBARRAY_BEAMS_PER_SUBARRAY = 48


[docs] class LatencyInjector: """ Injects configurable latency into method calls on any object. Useful for testing race conditions and timing-sensitive code by simulating variable network or hardware delays. Usage examples:: # Example 1: Wrap specific methods injector = LatencyInjector( min_latency=0.0, max_latency=1.8, probability_of_slow=0.15 ) injector.wrap_method(proxy, 'load_pointing_delays') injector.wrap_method(proxy, 'apply_pointing_delays') # Example 2: Wrap all public methods on a proxy injector = LatencyInjector( min_latency=0.0, max_latency=2.0, probability_of_slow=0.2 ) injector.wrap_proxy(proxy) # Example 3: Wrap only specified methods injector = LatencyInjector( min_latency=0.0, max_latency=1.5, probability_of_slow=0.1 ) injector.wrap_proxy( proxy, methods=['load_pointing_delays', 'apply_pointing_delays'] ) # Example 4: Wrap all methods except specific ones injector = LatencyInjector( min_latency=0.0, max_latency=1.0, probability_of_slow=0.05 ) injector.wrap_proxy( proxy, exclude=['get_change_event_callbacks'] ) """
[docs] def __init__( self: LatencyInjector, min_latency: float = 0.0, max_latency: float = 2.0, probability_of_slow: float = 0.1, ) -> None: """ Initialize the latency injector. :param min_latency: Minimum latency in seconds for fast calls :param max_latency: Maximum latency in seconds for slow calls :param probability_of_slow: Probability (0.0-1.0) of triggering slow latency """ self.min_latency: float = min_latency self.max_latency: float = max_latency self.probability_of_slow: float = probability_of_slow
def _inject_latency(self: "LatencyInjector") -> None: """Inject random latency based on configuration.""" if random.random() < self.probability_of_slow: latency = random.uniform(self.max_latency * 0.5, self.max_latency) else: latency = random.uniform(self.min_latency, self.min_latency + 0.1) time.sleep(latency)
[docs] def wrap_method(self: "LatencyInjector", obj: object, method_name: str) -> None: """ Wrap a method with latency injection. :param obj: The object containing the method to wrap :param method_name: Name of the method to wrap """ original_method: Callable = getattr(obj, method_name) @wraps(original_method) def wrapper(*args: Any, **kwargs: Any) -> Any: self._inject_latency() return original_method(*args, **kwargs) setattr(obj, method_name, wrapper)
[docs] def wrap_proxy( self: "LatencyInjector", proxy: object, methods: Optional[list[str]] = None, exclude: Optional[list[str]] = None, ) -> None: """ Wrap methods on a proxy with latency injection. :param proxy: The proxy object to wrap :param methods: List of method names to wrap. If None, wraps all public callable methods :param exclude: List of method names to exclude from wrapping """ exclude = exclude or [] if methods is None: methods = [] for name in dir(proxy): # Skip Python builtins (__method__) but include private # methods (_method) if (name.startswith("__") and name.endswith("__")) or name in exclude: continue try: attr = getattr(proxy, name, None) if callable(attr): methods.append(name) except Exception: # pylint: disable=broad-exception-caught # Skip attributes that raise exceptions when accessed # (e.g., properties with decorators that enforce state) pass for method_name in methods: if method_name not in exclude: self.wrap_method(proxy, method_name)
[docs] def subarraybeam_trl_from_ids(subarray_id: int, local_beam_id: int) -> str: """Return TRL in new XX-YY format from subarray + local ids. :param subarray_id: 1-based subarray identifier :param local_beam_id: 1-based local beam identifier :returns: TRL string 'low-mccs/subarraybeam/XX-YY' :raises ValueError: for out of range inputs """ if subarray_id < 1: raise ValueError("subarray_id must be >= 1") if not 1 <= local_beam_id <= MAX_SUBARRAY_BEAMS_PER_SUBARRAY: raise ValueError( f"local_beam_id must be in 1..{MAX_SUBARRAY_BEAMS_PER_SUBARRAY}" ) return f"low-mccs/subarraybeam/{subarray_id:02d}-{local_beam_id:02d}"
[docs] def summarize_device_config( self: Any, name: str, version: str, properties: list[str], envvars: list[str] | None = None, redacted_envvars: list[str] | None = None, ) -> str: """ Construct a string summary of device configuration. :param self: the tango device invoking this function :param name: device TRL :param version: software version of the device :param properties: list of device properties whose values should be included. It is assumed that these device properties have already been loaded into the device as object attributes. :param envvars: list of environment variables whose values should be included :param redacted_envvars: list of environment variables whose presence should be reported, but for which the value should not be included. Use this for environment variables that contain secret information such as passwords and other credentials. :return: a string summary of the device configuration. """ properties_str = "" for device_property in properties: properties_str += f"\t{device_property}: {getattr(self, device_property)}\n" envvars_str = "" for envvar in envvars or []: envvar_value = os.environ.get(envvar) if envvar_value is not None: envvars_str += f"\t{envvar}: {envvar_value}\n" for redacted_envvar in redacted_envvars or []: if redacted_envvar in os.environ: envvars_str += f"\t{redacted_envvar}: [REDACTED]\n" return ( f"{name} version {version}\n" f"Initialised {name} device with properties:\n" f"{properties_str}" "and environment variables:\n" f"{envvars_str}" )
[docs] class NumpyEncoder(json.JSONEncoder): """Converts numpy types to JSON.""" # pylint: disable=arguments-renamed
[docs] def default(self: NumpyEncoder, obj: Any) -> Any: """ Dereturn the encoded value. :param obj: the object. :returns: the encoded value. """ if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return json.JSONEncoder.default(self, obj)
def complex_list_to_interleaved_float_list(data: list[complex]) -> list[float]: """ Convert a list of complex values into an interleaved list of floats. :param data: The list of complex values :returns: The list of floats. """ return [v for vc in data for v in [vc.real, vc.imag]] def interleaved_float_list_to_complex_list(data: list[float]) -> list[complex]: """ Convert a list of interleaved float values into a list of floats. :param data: The list of float values :returns: The list of complex values. """ return [complex(r, i) for r, i in zip(data[::2], data[1::2])]