Source code for ska_mid_dish_dcp_lib.utils.helper

"""Helper methods shared across package."""

import asyncio
from functools import wraps
from typing import Any, Tuple, Type


[docs]def retry_on_exception( retries: int = 3, delay: float = 1.0, exceptions: Tuple[Type[Exception], ...] = (Exception,), ) -> Any: """Retry a function if specific exceptions are raised. :param retries: Number of retries before giving up, defaults to 3 :param delay: Delay in seconds between retries, defaults to 1.0 :param exceptions: Exceptions that trigger a retry, defaults to (Exception,) """ def decorator(func: Any) -> Any: @wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> Any: for attempt in range(retries): try: return await func(*args, **kwargs) except exceptions: if attempt == retries - 1: raise await asyncio.sleep(delay) return wrapper return decorator