Source code for ska_sdp_config.config

"""High-level API for SKA SDP configuration."""

from __future__ import annotations

import logging
import os
import sys
import threading
import warnings
from datetime import date
from socket import gethostname
from typing import Callable, Iterable, Optional, Union

from . import backend as backend_mod
from . import entity
from .backend import Backend, DbTransaction, Lease, TxnWrapper, Watcher
from .base_transaction import BaseTransaction
from .operations import (
    AllocationOperations,
    ArbitraryOperations,
    ComponentOperations,
    DependencyOperations,
    DeploymentOperations,
    EntityOperations,
    ExecutionBlockOperations,
    FlowOperations,
    ProcessingBlockOperations,
    RequestOperations,
    ResourceOperations,
    ScriptOperations,
    SystemOperations,
)

LOG = logging.getLogger(__name__)

MAX_LEASE_REFRESH_RETRIES = 5


[docs] class Config: """Connection to SKA SDP configuration.""" # pylint: disable=too-many-instance-attributes, too-many-arguments # pylint: disable=too-many-positional-arguments def __init__( self, backend=None, global_prefix: str = "", owner: dict | entity.Owner | None = None, component_name: str | None = None, wrapper: Optional[TxnWrapper] = None, owned_entity: tuple[str, str] | None = None, **cargs, ): """ Connect to configuration using the given backend. :param backend: Backend to use. Defaults to environment or etcd3 if not set. :param global_prefix: Prefix to use within the database :param owner: Object used for identifying the process when claiming ownership. :param component_name: name of component; used to generate alive key in db (e.g. lmc-controller). *DEPRECATED*: use ``owned_entity=("component", component_name)`` instead. :param owned_entity: two-tuple with the type (Transaction attribute name) and full key of the entity for which convenience ownership management is provided. :param cargs: Backend client arguments """ self._backend = self._determine_backend(backend, **cargs) # Owner dictionary if isinstance(owner, entity.Owner): self.owner = owner else: if owner is None: owner = { "pid": os.getpid(), "hostname": gethostname(), "command": sys.argv, } self.owner = entity.Owner(**owner) # Global prefix assert global_prefix == "" or global_prefix[0] == "/" self._global_prefix = global_prefix # Lease associated with this client, kept alive on a separate thread # until the client is closed self._client_lease = None self._keepalive_thread: threading.Thread | None = None self._close_evt = threading.Event() # Identity of the entity receiving convenient ownership management if component_name and not owned_entity: warnings.warn( "component_name is deprecated, use owned_entity", DeprecationWarning, ) owned_entity = ("component", component_name) self._owned_entity = owned_entity # Transaction wrapper. self.wrapper: TxnWrapper = wrapper self._lease_lost = False @property def backend(self) -> Backend: """Get the backend database object.""" return self._backend @staticmethod def _determine_backend(backend: str, **cargs) -> Backend: # Determine backend if not backend: backend = os.getenv("SDP_CONFIG_BACKEND", "etcd3") # Instantiate backend, reading configuration from environment/dotenv if backend == "etcd3": if "host" not in cargs: cargs["host"] = os.getenv("SDP_CONFIG_HOST", "127.0.0.1") if "port" not in cargs: cargs["port"] = int(os.getenv("SDP_CONFIG_PORT", "2379")) if "user" not in cargs: cargs["user"] = os.getenv("SDP_CONFIG_USERNAME", None) if "password" not in cargs: cargs["password"] = os.getenv("SDP_CONFIG_PASSWORD", None) return backend_mod.Etcd3Backend(**cargs) if backend == "memory": return backend_mod.MemoryBackend() raise ValueError(f"Unknown configuration backend {backend}!")
[docs] def lease(self, ttl=10) -> Lease: """ Generate a new lease. Once entered can be associated with keys, which will be kept alive until the end of the lease. At that point a daemon thread will be started automatically to refresh the lease periodically (default seems to be TTL/4). :param ttl: Time to live for lease :returns: lease object """ return self._backend.lease(ttl)
@property def lease_is_lost(self) -> bool: """Return whether this Config object's lease has been lost.""" return self._lease_lost @property def client_lease(self) -> Lease: """Return the lease associated with the client. It will be kept alive until the client gets closed. """ return self._ensure_client_lease() def _ensure_client_lease(self) -> Lease: # This always returns the same lease, even if it's dead. # Which means the dead lease is a fatal error. # Lease death can be checked by calling lease.alive(). if self._lease_lost: raise LeaseLostError( f"Lease lost after {MAX_LEASE_REFRESH_RETRIES} retries." ) if self._client_lease is None: self._client_lease = self.lease() self._keepalive_thread = threading.Thread( target=self._keep_lease_alive, args=(self._client_lease,), name="lease_keepalive", ) self._keepalive_thread.start() LOG.debug("keepalive started!") return self._client_lease def _keep_lease_alive(self, lease: Lease) -> None: """Refresh lease""" max_retries = MAX_LEASE_REFRESH_RETRIES retries = 0 # try to refresh lease 5 times, waiting in between each retry while True: # check lease ttl, if finished, revoke lease try: ttl = lease.remaining_ttl # pylint: disable=broad-exception-caught except Exception as exc: retries += 1 LOG.warning( "Failed to query ttl, attempt %d/%d, error %s", retries, max_retries, exc, ) if retries >= max_retries: self._mark_lease_as_lost() return # wait on the event to avoid retrying too quickly if self._close_evt.wait(1.0): self._remove_lease(lease) return continue if ttl < 0: self._mark_lease_as_lost() return if self._close_evt.wait(ttl / 2): self._remove_lease(lease) return # if refreshing lease fails, mark as lost try: lease.refresh() retries = 0 # pylint: disable=broad-exception-caught except Exception as exc: retries += 1 LOG.warning( "Failed to refresh lease, attempt %d/%d, error %s", retries, max_retries, exc, ) if retries >= max_retries: self._mark_lease_as_lost() return continue def _mark_lease_as_lost(self): """Mark lease as lost""" LOG.error("Marking lease as lost.") self._lease_lost = True self._client_lease = None def _remove_lease(self, lease: Lease): """Revoke lease""" LOG.info("Revoking lease.") lease.revoke() self._client_lease = None
[docs] def revoke_lease(self) -> None: """ Revokes the lease internally held by this client, if any. Shouldn't normally be called by users, but is useful for tests. """ self._close_evt.set() if self._keepalive_thread: self._keepalive_thread.join() self._keepalive_thread = None self._close_evt.clear()
def _wrap_txn( self, txn: DbTransaction, wrap: bool = True ) -> Transaction | TxnWrapper: """ Utility function to wrap a low-level transaction. Wraps the low-level transaction in the Transaction class. If a custom wrapper is provided and further wrapping has been requested, it is used to wrap the transaction afterwards. :param txn: low-level transaction :param wrap: whether to use the custom wrapper, if present. :returns: wrapped transaction """ transaction = Transaction( self.owner, self._owned_entity, self._ensure_client_lease, txn, self._global_prefix, ) if self.wrapper is not None and wrap: transaction = self.wrapper(transaction) return transaction def _txn( self, max_retries: int = 64, wrap: bool = True ) -> Transaction | TxnWrapper: """ Utility function to allow creating Transaction objects that are not wrapped with the custom wrapper provided by the user, and thus can be used internally by this class. """ for txn in self._backend.txn(max_retries=max_retries): yield self._wrap_txn(txn, wrap)
[docs] def txn( self, max_retries: int = 64 ) -> Iterable[Union[Transaction, TxnWrapper]]: """Create a :class:`Transaction` for atomic configuration query/change. As we do not use locks, transactions might have to be repeated in order to guarantee atomicity. Suggested usage is as follows: .. code-block:: python for watcher in config.watcher(timeout=0.1): for txn in watcher.txn(): # Use txn to read+write configuration As the `for` loop suggests, the code might get run multiple times. Any writes using the transaction will be discarded if the transaction fails, but the application must make sure that the loop body has no other observable side effects. See also :ref:`Usage Guide <usage-guide>` for best practices for using transactions. :param max_retries: Number of transaction retries before a :class:`RuntimeError` gets raised. """ yield from self._txn(max_retries, True)
[docs] def watcher(self, timeout: Optional[float] = None) -> Iterable[Watcher]: """ Create a new watcher. Useful for waiting for changes in the configuration. Calling :py:meth:`Etcd3Watcher.txn()` on the returned watchers will create :py:class:`Transaction` objects just like :py:meth:`txn()`. See also :ref:`Usage Guide <usage-guide>` for best practices for using watchers. :param timeout: Timeout for waiting. Watcher will loop after this time. """ yield from self._backend.watcher(timeout, self._wrap_txn)
[docs] def set_alive(self) -> None: """ Set the keep-alive key. """ if self._owned_entity is None: raise TypeError( "No entity provided at creation time, cannot set it alive" ) for txn in self._txn(wrap=False): txn.self.take_ownership_if_not_alive()
[docs] def is_alive(self) -> bool: """ Is the connection alive in the sense that the keep-alive key exists? :returns: whether it is """ if self._owned_entity is None: raise TypeError( "No entity provided at creation time, cannot check if alive" ) alive = False for txn in self._txn(wrap=False): alive = txn.self.is_alive() return alive
[docs] def close(self) -> None: """Close the client connection.""" LOG.info("Closing connection to Config DB client.") self.revoke_lease() self._backend.close()
# Can declare Self as return type from Python 3.11. def __enter__(self): """Scope the client connection.""" return self def __exit__(self, exc_type, exc_val, exc_tb) -> bool: """Scope the client connection.""" self.close() return False
# pylint: disable-next=too-many-instance-attributes,too-many-public-methods
[docs] class Transaction: """High-level configuration queries and updates to execute atomically.""" # pylint: disable=too-many-arguments # pylint: disable=too-many-positional-arguments def __init__( self, owner: entity.Owner, owned_entity: tuple[str, str] | None, lease_getter: Callable[[], Lease], txn: DbTransaction, global_prefix: str, ): """Instantiate transaction.""" self._raw_txn = txn base_txn = BaseTransaction(owner, lease_getter, txn, global_prefix) self.component: ComponentOperations = ComponentOperations(base_txn) """Operations over SDP components""" self.deployment: DeploymentOperations = DeploymentOperations(base_txn) """Operations over Deployments""" self.execution_block: ExecutionBlockOperations = ( ExecutionBlockOperations(base_txn) ) """Operations over Execution Blocks""" self.processing_block: ProcessingBlockOperations = ( ProcessingBlockOperations(base_txn) ) """Operations over Processing Blocks""" self.script: ScriptOperations = ScriptOperations(base_txn) """Operations over Scripts""" self.flow: FlowOperations = FlowOperations(base_txn) """Operations over Flows""" self.system: SystemOperations = SystemOperations(base_txn) """Operations over system config""" self.resource: ResourceOperations = ResourceOperations(base_txn) """Operations over resources""" self.request: RequestOperations = RequestOperations(base_txn) """Operations over requests""" self.allocation: AllocationOperations = AllocationOperations(base_txn) """Operations over allocations""" self.dependency: DependencyOperations = DependencyOperations(base_txn) """Operations over Dependencies""" known_roots = { ComponentOperations.PREFIX, DeploymentOperations.PREFIX, ExecutionBlockOperations.PREFIX, ProcessingBlockOperations.PREFIX, ScriptOperations.PREFIX, DependencyOperations.PREFIX, FlowOperations.PREFIX, SystemOperations.PATH, ResourceOperations.PREFIX, RequestOperations.PREFIX, AllocationOperations.PREFIX, } self.arbitrary: ArbitraryOperations = ArbitraryOperations( base_txn, known_roots ) """Operations over arbitrary paths""" # Special case for entity of interest self._self: EntityOperations | None = None if owned_entity: attr_name, key = owned_entity self._self = getattr(self, attr_name).index_by_key_parts(key) @property def raw(self) -> DbTransaction: """Return transaction object for accessing database directly.""" return self._raw_txn @property def self(self) -> EntityOperations | None: """Fast access to entity identified when creating the parent Config""" return self._self @staticmethod def _new_block_id( generator: str, prefix: str, description: str, list_blocks: Callable[[str], list[str]], list_arg: str, ) -> str: # Find existing blocks with same prefix id_prefix = f"{prefix}-{generator}-{date.today():%Y%m%d}" kwargs = {list_arg: id_prefix} existing_ids = list_blocks(**kwargs) # Choose ID that doesn't exist block_id = index = None max_blocks = 100000 for index in range(max_blocks): block_id = f"{id_prefix}-{index:05}" if block_id not in existing_ids: break if index >= max_blocks: raise RuntimeError( f"Exceeded daily number of {description} blocks!" ) return block_id
[docs] def new_processing_block_id(self, generator: str) -> str: """Generate a new processing block ID that is not yet in use. :param generator: Name of the generator :returns: Processing block ID """ return self._new_block_id( generator, "pb", "processing", self.processing_block.list_keys, "key_prefix", )
[docs] def new_execution_block_id(self, generator: str) -> str: """Generate a new execution block ID that is not yet in use. :param generator: Name of the generator :returns: execution block ID """ return self._new_block_id( generator, "eb", "execution", self.execution_block.list_keys, "key_prefix", )
[docs] class LeaseLostError(RuntimeError): """ For when a Config object's lease has been lost. """