Source code for ska_sdp_config.backend.etcd3_revolution1

"""
Etcd3 backend for SKA SDP configuration DB, using client from
https://github.com/Revolution1/etcd3-py
"""
import logging
import os
import queue as queue_m
import socket
import threading
import time
from typing import Any, Callable, Iterable, Optional, cast

import etcd3_revolution1
import requests
from deprecated import deprecated

from .backend import (
    Backend,
    DbRevision,
    DbTransaction,
    Lease,
    TxnWrapper,
    Watcher,
)
from .common import (
    ConfigCollision,
    ConfigVanished,
    _check_path,
    _tag_depth,
    _untag_depth,
)

LOGGER = logging.getLogger(__name__)

# Change the log level for the imported package 'etcd3'
logging.getLogger("etcd3").setLevel(
    os.getenv("SDP_CONFIG_ETCD3_LOG_LEVEL", "INFO")
)


[docs] class Etcd3BackendRevolution1(Backend): """ Highly consistent database backend store. See https://github.com/etcd-io/etcd All parameters will be passed on to :py:meth:`etcd3.Client`. """ def __init__( self, *args, max_retries: int = 15, retry_time: float = 0.1, **kw_args ): """Instantiate the database client. :param max_retries: Number of times we retry any database interaction :param retry_time: Initial back-off time after a failed database interaction, in seconds. Will be increased by 50% for every failed attempt. """ self._max_retries = max_retries self._retry_time = retry_time self._client = self._retry_loop( lambda: etcd3_revolution1.Client(*args, **kw_args) ) def _retry_loop(self, code_to_try: Callable) -> Any: """ Helper that retries code if an exception gets thrown that typically indicates a loss of connection. Note that this *can* rarely mean that the effect of the code in question was executed multiple times. """ # Retry loop retry_time = self._retry_time for i in range(self._max_retries): # Common retry code def log_exception(ex, i): LOGGER.warning( "Caught %s, retry %d after %gs", repr(ex), i, retry_time, ) # Run the code, catching typical exceptions try: return code_to_try() except requests.exceptions.ConnectionError as ex: log_exception(ex, i) except ( etcd3_revolution1.errors.go_etcd_rpctypes_error.ErrUnknownError ) as ex: log_exception(ex, i) except AttributeError as ex: # This gets raised when we fail to connect to etcd # when calling the etcd3.Client constructor, for some # reason. if ex.name == "server_version_sem": log_exception(ex, i) else: raise # Delay before next iteration time.sleep(retry_time) retry_time *= 1.5 # back off # Attempt one final time - without safety net return code_to_try()
[docs] def lease(self, ttl: int = 10) -> Lease: """Generate a new lease. Once entered can be associated with keys, which will be kept alive until the end of the lease. Note that this involves starting a daemon thread that will refresh the lease periodically (default seems to be TTL/4). :param ttl: Time to live for lease :return: lease object """ return cast(Lease, self._client.Lease(ttl=ttl))
[docs] def txn(self, max_retries: int = 64) -> Iterable["Etcd3Transaction"]: """Create a new transaction. Note that this uses an optimistic STM-style implementation, which cannot guarantee that a transaction runs through successfully. Therefore, this function returns an iterator, which loops until the transaction succeeds: .. code-block:: python for txn in etcd3.txn(): # ... transaction steps ... Note that this will in most cases only execute one iteration. If you actually want to loop - for instance because you intend to wait for something to happen in the configuration - use :py:meth:`watcher()` instead. :param max_retries: Maximum number of transaction loops :returns: Transaction iterator """ for txn in Etcd3Transaction(self, self._client, max_retries): yield txn
[docs] def watcher( self, timeout: float = None, txn_wrapper: TxnWrapper = None, ) -> Iterable["Etcd3Watcher"]: """Create a new watcher. Useful for waiting for changes in the configuration. See :py:class:`Etcd3Watcher`. :param timeout: Timeout for waiting. Watcher will loop after this time. :param txn_wrapper: Function to wrap transactions returned by the wrapper. :returns: Watcher iterator """ return Etcd3Watcher(self, self._client, timeout, txn_wrapper)
[docs] def get( self, path: str, revision: Optional[DbRevision] = None ) -> tuple[str, DbRevision]: """ Get value of a key. :param path: Path of key to query :param revision: Database revision for which to read key :returns: (value, revision). value is None if it doesn't exist """ # Check/prepare parameters _check_path(path) tagged_path = _tag_depth(path) rev = None if revision is None else revision.revision # Query range response = self._retry_loop( lambda: self._client.range(tagged_path, revision=rev) ) # Get value returned result = response.kvs if result is not None: assert ( len(response.kvs) == 1 ), f"Requesting '{path}' yielded more than one match!" result = result[0].value.decode("utf-8") # Return value together with revision return result, DbRevision(response.header.revision)
[docs] def watch( self, path: str, prefix: bool = False, revision: Optional[DbRevision] = None, depth: Optional[int] = None, ): """Watch key or key range. Use a path ending with `'/'` in combination with `prefix` to watch all child keys. :param path: Path of key to query, or prefix of keys. :param prefix: Watch for keys with given prefix if set :param revision: Database revision from which to watch :param depth: tag depth :returns: `Etcd3Watch` object for watch request """ # Check/prepare parameters if not prefix and path and path[-1] == "/": raise ValueError("Path should not have a trailing '/'!") tagged_path = _tag_depth(path, depth) rev = None if revision is None else revision.revision # Set up watcher return Etcd3Watch( backend=self, tagged_path=tagged_path, start_revision=rev, prefix=prefix, max_retries=self._max_retries, retry_time=self._retry_time, )
[docs] def list_keys( self, path: str, recurse: int = 0, revision: Optional[DbRevision] = None, ) -> tuple[list[str], DbRevision]: """ List keys under given path. :param path: Prefix of keys to query. Append '/' to list child paths. :param recurse: Maximum recursion level to query. If iterable, cover exactly the recursion levels specified. :param revision: Database revision for which to list :returns: (sorted key list, revision) """ # Prepare parameters path_depth = path.count("/") rev = None if revision is not None: rev = revision.revision # Make transaction to collect keys from all levels txn = self._client.Txn() try: depth_iter = iter(recurse) except TypeError: depth_iter = range(recurse + 1) for depth in depth_iter: tagged_path = _tag_depth(path, depth + path_depth) txn.success( txn.range( tagged_path, prefix=True, keys_only=True, revision=rev ) ) response = self._retry_loop(txn.commit) # We do not return a mod revision here - this would not be # very useful anyway as we are not returning values revision = DbRevision(response.header.revision) if response.responses is None: return [], revision # Collect and sort keys sorted_keys = sorted( [ _untag_depth(kv.key) for res in response.responses if res.response_range.kvs is not None for kv in res.response_range.kvs ] ) return sorted_keys, revision
[docs] def create( self, path: str, value: str, lease: etcd3_revolution1.Lease = None ) -> None: """Create a key and initialise it with the value. Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires. :param path: Path to create :param value: Value to set :param lease: Lease to associate :raises: ConfigCollision """ # Prepare parameters _check_path(path) tagged_path = _tag_depth(path) lease_id = 0 if lease is None else lease.ID value = str(value).encode("utf-8") # Put value if version is zero (i.e. does not exist) txn = self._client.Txn() txn.compare(txn.key(tagged_path).version == 0) txn.success(txn.put(tagged_path, value, lease_id)) response = self._retry_loop(txn.commit) if not response.succeeded: raise ConfigCollision( path, f"Cannot create {path}, as it already exists!" )
# pylint: disable=arguments-renamed
[docs] def update( self, path: str, value: str, must_be_rev: Optional[DbRevision] = None ) -> None: """ Update an existing key. Fails if the key does not exist. :param path: Path to update :param value: Value to set :param must_be_rev: Fail if found value does not match given revision (atomic update) :raises: ConfigVanished """ # Validate parameters _check_path(path) tagged_path = _tag_depth(path) value = str(value).encode("utf-8") # Put value if version is *not* zero (i.e. it exists) txn = self._client.Txn() txn.compare(txn.key(tagged_path).version != 0) if must_be_rev is not None: if must_be_rev.revision is None: raise ValueError("Did not pass a valid revision!") txn.compare(txn.key(tagged_path).mod < must_be_rev.revision + 1) txn.success(txn.put(tagged_path, value)) response = self._retry_loop(txn.commit) if not response.succeeded: raise ConfigVanished( path, f"Cannot update {path}, as it does not exist!" )
[docs] def delete( self, path: str, must_exist: bool = True, recursive: bool = False, prefix: bool = False, max_depth: int = 16, ) -> None: # pylint: disable=too-many-arguments """ Delete the given key or key range. :param path: Path (prefix) of keys to remove :param must_exist: Fail if path does not exist? :param recursive: Delete children keys at lower levels recursively :param prefix: Delete all keys at given level with prefix :param max_depth: Recursion limit :returns: Whether transaction was successful """ # Prepare parameters tagged_path = _tag_depth(path) # Determine start recursion level txn = self._client.Txn() if must_exist: txn.compare(txn.key(tagged_path).version != 0) txn.success(txn.delete(tagged_path, prefix=prefix)) # If recursive, we also delete all paths at lower recursion # levels that have the path as a prefix if recursive: depth = path.count("/") for lvl in range(depth + 1, depth + max_depth): dpath = _tag_depth(path if prefix else path + "/", lvl) txn.success(txn.delete(dpath, prefix=True)) # Execute response = self._retry_loop(txn.commit) if not response.succeeded: raise ConfigVanished( path, f"Cannot delete {path}, as it does not exist!" )
[docs] def close(self) -> None: """Close the client connection.""" self._client.close()
[docs] class Etcd3Watch: # pylint: disable=too-many-instance-attributes """Wrapper for etc3 watch requests. Entering the watcher using a `with` block yields a queue of `(key, val, rev)` triples. """ def __init__( self, backend: Etcd3BackendRevolution1, tagged_path: str, start_revision: int, prefix: bool, max_retries: int = 20, retry_time: float = 0.1, ): # pylint: disable=too-many-arguments """Initialise watcher. :param backend: Backend instance :param tagged_path: Tagged path to watch :param start_revision: Yield events starting from this revision (replaying history if needed) :param prefix: Use prefix match instead of exact match for the path :param max_retries: Number of times we retry any database interaction :param retry_time: Initial back-off time after a failed database interaction, in seconds. Will be increased by 50% for every failed attempt. """ self._tagged_path = tagged_path self._start_revision = start_revision self._prefix = prefix self._max_retries = max_retries self._retry_time = retry_time self.queue = None self._stop_thread = True self._thread = None self._watcher = backend._client.Watcher( self._tagged_path, start_revision=self._start_revision, prefix=self._prefix, )
[docs] def start(self, queue: queue_m.Queue = None) -> None: """Activates the watcher, yielding a queue for updates.""" # Create queue if appropriate if queue is None: queue = queue_m.Queue() self.queue = queue # Start thread self._stop_thread = False self._thread = threading.Thread(target=self._run_thread) self._thread.daemon = True self._thread.start()
def _run_thread(self) -> None: """Thread watching for changes.""" # Yet another workaround: Replicate Watcher.run, but catch # various exceptions and either # # 1) Retry (with some delay) a dropped connection, # 2) catch if the server has compacted the revision we are # trying to watch from, or # 3) push the exception to the queue to make the caller # handle it # pylint: disable=protected-access,broad-except,too-many-lines # Configure back-off retries retry_time = self._retry_time retries = 0 while not self._stop_thread: try: # Set up watcher LOGGER.info( "Starting watch at revision %s...", self._start_revision ) self._watcher.start_revision = self._start_revision self._watcher.revision = None # Enter watcher - this is what actually requests the # watch from the etcd3 server with self._watcher: # Connection established: Reset retries retries = 0 retry_time = self._retry_time # Main watch loop: Get events from watcher self._run_watcher_loop(self._watcher) except etcd3_revolution1.errors.Etcd3WatchCanceled as ex: # The etcd server tells us that the watcher was # cancelled. This typically happens because the # connetion was reset (i.e. we need to re-establish # the watch) and our stat_revision has been compacted # in the meantime. After all, in theory we could be # losing data here: If the connection reset lasted # long enough for some change to a watched key to # happen *and* be compacted (!), we would have no way # to learn about it any more. # # This (very low) risk of data loss is basically # inherent to compaction. # Stopping? if self._stop_thread: break # Re-raise any error that isn't about compactation LOGGER.warning("Watcher thread caught %s", repr(ex)) if ( "compact_revision" not in ex.resp or not ex.resp.compact_revision ): self.queue.put((None, ex, None)) raise # If we compacted past the point of the last # update, move the start revision up to that point # to prevent the exact same exception from getting # thrown again if ( self._start_revision is None or ex.resp.compact_revision > self._start_revision ): LOGGER.warning( "Jumping to compaction revision %d", ex.resp.compact_revision, ) self._start_revision = ex.resp.compact_revision # Retry continue except etcd3_revolution1.errors.Etcd3StreamError as ex: # We get this typically when we are trying to close # the watcher (see below). Otherwise this is a genuine # error. # Stopping? if self._stop_thread: break self.queue.put((None, ex, None)) raise except KeyError as ex: # This seems to happen quite regularly instead of a # connection error - etcd3-py tries to look up "None" # in some dictionary. LOGGER.warning( "Watcher thread caught %s - typically harmless, ignored", repr(ex), ) continue except requests.exceptions.ConnectionError as ex: # The connection dropped for some reason. We generally # want to retry a couple times in this situation # before we give up on the database. # Out of retries? Push exception on queue if retries >= self._max_retries: self.queue.put((None, ex, None)) raise # Otherwise retry after some time LOGGER.warning( "Watcher thread caught %s - retry %d (%g s)", repr(ex), retries, retry_time, ) time.sleep(retry_time) retries += 1 retry_time *= 1.5 continue except Exception as ex: # Push exception on queue self.queue.put((None, ex, None)) raise def _run_watcher_loop(self, watcher) -> None: # Get events from watcher for event in watcher: # Get key path + value key = _untag_depth(event.key) if event.type == etcd3_revolution1.EventType.PUT: val = event.value.decode("utf-8") else: val = None # Record that we have seen this revision, so # when we restart we will not ask for it again self._start_revision = event.mod_revision + 1 rev = DbRevision(event.mod_revision) self.queue.put((key, val, rev)) # pylint: disable=too-many-branches
[docs] def stop(self): """Deactivates the watcher.""" if self._thread is None: return # Temporary workaround for testing: Manually stop the # watcher. This is exactly what the original call would do, # just with way more safety and logging # pylint: disable=protected-access,broad-except,too-many-lines # self._watcher.stop() LOGGER.debug("Stopping watcher %s", self._thread.name) # Prevent re-tries by overwriting the watcher's client. The # problem here is that the way Watcher.__iter__ operates, if # self.watching is False after self.request_create() finishes, # it will just silently re-set self.watching and re-try the # connection. This is the only reliable way to break it out of # that loop. # pylint: disable=too-few-public-methods # pylint: disable=missing-class-docstring # pylint: disable=missing-function-docstring class DummyClient: def __init__(self, watcher): self._watcher = watcher def watch_create(self, *_args, **_kwargs): self._watcher.watching = False raise etcd3_revolution1.errors.Etcd3StreamError( None, None, None ) self._watcher.client = DummyClient(self._watcher) # Try to repeat this a couple of times for _ in range(20): # Kill the response stream resp = self._watcher._resp self._watcher.watching = False self._stop_thread = True if resp is not None and not resp.raw.closed: # First attempt to shut down socket try: sock = socket.fromfd( resp.raw._fp.fileno(), socket.AF_INET, socket.SOCK_STREAM, ) sock.shutdown(socket.SHUT_RDWR) sock.close() except Exception as exc: print(exc) LOGGER.debug("Exception in socket shutdown", exc_info=True) # Finally join the thread - but with a timeout if self._thread and self._thread.is_alive(): self._thread.join(0.1) # Stop if there's no thread (any more) if not self._thread or not self._thread.is_alive(): break LOGGER.debug("re-trying closing watcher stream...") time.sleep(0.1) # Then attempt to close the raw request, request, and # connection, if not reset resp = self._watcher._resp if resp is not None and not resp.raw.closed: try: resp.raw.close() except Exception as exc: print(exc) LOGGER.debug("Exception in closing raw request", exc_info=True) try: resp.close() except Exception as exc: print(exc) LOGGER.debug("Exception in closing request", exc_info=True) try: if hasattr(resp, "connection"): resp.connection.close() except Exception as exc: print(exc) LOGGER.debug("Exception in closing connection", exc_info=True) # Final attempt to join the thread if self._thread and self._thread.is_alive(): self._thread.join(0.1) if self._thread and self._thread.is_alive(): LOGGER.warning("Watcher thread did not exit!") else: LOGGER.debug("Watcher thread stopped") self.queue = None
def __enter__(self): """Use for scoping watcher to a block.""" self.start() return self.queue def __exit__(self, *args): """Use for scoping watcher to a block.""" self.stop()
[docs] class Etcd3Transaction(DbTransaction): """A series of queries and updates to be executed atomically. Use :py:meth:`Etcd3Backend.txn()` or :py:meth:`Etcd3Watcher.txn()` to construct transactions. """ # pylint: disable=too-many-instance-attributes # Ideas: # # Ranged deletes - in contrast to the main backend we cannot # release a range of keys (especially recursively) in a # transaction yet. The tricky bit is to make this consistent with # the rest of the transaction machinery and properly # atomic. Easiest solution might just be to simply use a bunch of # list_key and single delete calls to get the same effect. Would # be slightly inefficient, but would get the job done. # # Caching - especially when looping a transaction most of the # queried data from the database might still be valid. So after a # loop we could just migrate the known information to a _cache # (plus any further information we got from watches). Then once # the "new" database revision has been determined we might just # send one cheap query to the database to figure out which bits of # it are still current. # # Cheaper update/create checks - right now we query the old value # of the key on every update/create call, even though we are only # interested in whether or not the key exists. We could instead # query this along the same lines as list_keys. Not entirely sure # this is worthwhile though, given that it is quite typical to # "get" a key before "updating" it anyway, and collisions on # "create" should be quite rare. def __init__( self, backend: Etcd3BackendRevolution1, client: etcd3_revolution1.Client, max_retries: int = 64, ): """Initialise transaction.""" super().__init__(backend) self._client = client self._max_retries = max_retries self._revision = None # Revision backed in after first read self._get_queries = {} # Query log self._list_queries = {} # Query log self._updates = {} # Delayed updates self._committed = False self._loop = False self._watch = False self._watch_timeout = None self._got_timeout = False # For test cases self._retries = 0 self._watchers = {} self._watch_queue = queue_m.Queue() self._commit_callbacks = [] def _ensure_uncommitted(self) -> None: if self._committed: raise RuntimeError("Attempted to modify committed transaction!") @property def revision(self) -> int: """The last-committed database revision. Only valid to call after the transaction has been comitted. """ if not self._committed: raise RuntimeError( "Revision is undefined on an uncommitted transaction!" ) return self._revision.revision
[docs] def get(self, path: str) -> str: """ Get value of a key. :param path: Path of key to query :returns: Key value. None if it doesn't exist. """ self._ensure_uncommitted() # Check whether it was written as part of this transaction if path in self._updates: return self._updates[path][0] # Check whether we already have the request response if path in self._get_queries: return self._get_queries[path][0] # Perform get request val, rev = self._get_queries[path] = self.backend.get( path, revision=self._revision ) # Set revision, if not already done so if self._revision is None: self._revision = rev return val
[docs] def list_keys(self, path: str, recurse: int = 0) -> list[str]: """ List keys under given path. :param path: Prefix of keys to query. Append '/' to list child paths. :param recurse: Children depths to include in search :returns: sorted key list """ self._ensure_uncommitted() path_depth = path.count("/") # Walk through depths, collecting known keys try: depth_iter = iter(recurse) except TypeError: depth_iter = range(recurse + 1) keys = [] for depth in depth_iter: # We might have created or deleted an uncommitted key that # falls into the range - add to list tagged_path = _tag_depth(path, path_depth + depth) matching_vals = [ kv for kv in self._updates.items() if _tag_depth(kv[0]).startswith(tagged_path) ] added_keys = {key for key, val in matching_vals if val is not None} removed_keys = {key for key, val in matching_vals if val is None} # Check whether we need to perform the request query = (path, depth + path_depth) if query not in self._list_queries: self._list_queries[query] = self.backend.list_keys( path, recurse=(depth,), revision=self._revision ) # Add to key set result, rev = self._list_queries[query] keys.extend(set(result) - removed_keys | added_keys) # Bake in revision if not already done so if self._revision is None: self._revision = rev # Sort return sorted(keys)
[docs] def create( self, path: str, value: str, lease: etcd3_revolution1.Lease = None ) -> None: """Create a key and initialise it with the value. Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires. :param path: Path to create :param value: Value to set :param lease: Lease to associate :raises: ConfigCollision """ self._ensure_uncommitted() # Attempt to get the value - mainly to check whether it exists # and put it into the query log result = self.get(path) if result is not None: raise ConfigCollision( path, f"Cannot create {path}, as it already exists!" ) # Add update request self._updates[path] = (value, lease)
[docs] def update(self, path: str, value: str) -> None: """ Update an existing key. Fails if the key does not exist. :param path: Path to update :param value: Value to set :raises: ConfigVanished """ self._ensure_uncommitted() # As with "create" result = self.get(path) if result is None: raise ConfigVanished( path, f"Cannot update {path}, as it does not exist!" ) # Add update request self._updates[path] = (value, None)
[docs] def delete( self, path: str, must_exist: bool = True, recursive: bool = False ) -> None: """ Delete the given key. :param path: Path of key to remove :param must_exist: Fail if path does not exist? :param recursive: Delete children keys at lower levels recursively (not used) """ if must_exist: # As with "update" result = self.get(path) if result is None: raise ConfigVanished( path, f"Cannot delete {path}, it does not exist!" ) # Add delete request self._updates[path] = (None, None)
[docs] def commit(self) -> bool: """ Commit the transaction to the database. This can fail, in which case the transaction must get `reset` and built again. :returns: Whether the commit succeeded """ self._ensure_uncommitted() # If we have made no updates, we don't need to verify the log if not self._updates: self._committed = True return True # Create transaction txn = self._client.Txn() # Verify get() calls from the query log for path, (_, rev) in self._get_queries.items(): tagged_path = _tag_depth(path) if rev.revision is None: # Did not exist? Verify continued non-existance. Note # that it is possible for the key to have been # created, then deleted again in the meantime. txn.compare(txn.key(tagged_path).version == 0) else: # Otherwise check add an assertion that the revision # has not been changed. This actually guarantees that # the key has not been touched since we read it. txn.compare(txn.key(tagged_path).mod < rev.revision + 1) # Verify list_keys() calls from the query log for (path, depth), (result, rev) in self._list_queries.items(): tagged_path = _tag_depth(path, depth) # Make sure that all returned keys still exist for res_path in result: tagged_res_path = _tag_depth(res_path) txn.compare(txn.key(tagged_res_path).version > 0) # Also check that no new keys have entered the range # (by checking whether the request would contain any # keys with a newer create revision than our request) txn.compare( txn.key(tagged_path, prefix=True).create < self._revision.revision + 1 ) # Commit changes. Note that the dictionary guarantees that we # only update any key at most once. for path, (value, lease) in self._updates.items(): tagged_path = _tag_depth(path) lease_id = None if lease is None else lease.ID if value is None: txn.success(txn.delete(tagged_path, value, lease_id)) else: txn.success(txn.put(tagged_path, value, lease_id)) # Done self._committed = True # pylint: disable=protected-access response = self.backend._retry_loop(txn.commit) if response.succeeded: for callback in self._commit_callbacks: callback() self._commit_callbacks = [] return response.succeeded
[docs] def on_commit(self, callback: Callable[[], None]) -> None: """Register a callback to call when the transaction succeeds. A bit of a hack, but occassionally useful to add additional side-effects to a transaction that are guaranteed to not get duplicated. :param callback: Callback to call """ self._commit_callbacks.append(callback)
[docs] def reset(self, revision: Optional[DbRevision] = None) -> None: """Reset the transaction, so it can be restarted after commit().""" if not self._committed: raise RuntimeError("Called reset on an uncommitted transaction!") # Reset self._revision = revision self._get_queries = {} self._list_queries = {} self._updates = {} self._committed = False self._loop = False self._watch = False self._watch_timeout = None
@deprecated def loop(self, watch: bool = False, watch_timeout: Optional[float] = None): """Repeat transaction execution, even if it succeeds. *Deprecated*: Use :py:class:`Etcd3Watcher` instead, or loop manually. :param watch: Once the transaction succeeds, block until one of the values read changes, then loop the transaction :param watch_timeout: timeout value """ if self._loop: # If called multiple times, looping immediately takes precedence self._watch = self._watch and watch else: self._loop = True self._watch = watch if watch: self._watch_timeout = watch_timeout def __iter__(self) -> "Etcd3Transaction": """Iterate transaction as requested by loop(), or until it succeeds.""" try: while self._retries <= self._max_retries: # Should build up a transaction yield self # Try to commit, count how many times we have tried if not self.commit(): self._retries += 1 else: self._retries = 0 # No further loop? if not self._loop: return # Use watches? Then wait for something to happen # before looping. if self._watch: self._do_watch() # Repeat after reset otherwise self.reset() finally: # Warn if we are dropping uncommitted changes to the # database - this is an easy mistake to make: # # for txn in cfg.txn(): # txn.put(...) # # ... # # We are done! # return if self._updates and not self._committed: LOGGER.warning( "Transaction loop aborted - dropping updates to %s!", list(self._updates.keys()), ) self._clear_watch() # Ran out of repeats? Fail raise RuntimeError( f"Transaction did not succeed after {self._max_retries} retries!" ) @deprecated def clear_watch(self) -> None: """Stop all currently active watchers. *Deprecated*: Use :py:class:`Etcd3Watcher` instead. """ self._clear_watch() def _clear_watch(self) -> None: """Stop all currently active watchers.""" # Remove watchers for watcher in self._watchers.values(): watcher.stop() self._watchers = {} def _update_watchers(self) -> None: # Watch any ranges we listed. Note that this will trigger also # on key updates, we will filter that below. prefixes = [] active_watchers = set() for path, depth in self._list_queries: query = ("list", path, depth) # Add tagged prefixes so we can check for key overlap later prefixes.append(_tag_depth(path, depth)) active_watchers.add(query) # Start a watcher, if required if self._watchers.get(query) is None: self._watchers[query] = self.backend.watch( path, revision=self._revision, prefix=True, depth=depth ) self._watchers[query].start(self._watch_queue) # Watch any individual key we read for path in self._get_queries: query = ("get", path) # Check that we are not already watching this key as # part of a range. This is basically using the # above-mentioned property of range watches to our # advantage. This is actually a fairly important # optimisation, as it means that listing keys followed # by iterating over the values won't create extra # watches here! tagged_path = _tag_depth(path) if not any(tagged_path.startswith(pre) for pre in prefixes): active_watchers.add(query) # Start individual watcher, if required if self._watchers.get(query) is None: self._watchers[query] = self.backend.watch( path, revision=self._revision ) self._watchers[query].start(self._watch_queue) # Remove any watchers that we are not currently using. Note # that we only do this on the next watch() call, so watchers # will be kept alive through transaction failures *and* # non-waiting loops. So as long as the set of keys waited on # is relatively constant (and ideally forms ranges), we will # not generate much churn here. for query, watcher in list(self._watchers.items()): if query not in active_watchers: watcher.stop() del self._watchers[query]
[docs] def watch(self) -> None: """Wait for a change on one of the values read. *Deprecated*: Use :py:class:`Etcd3Watcher` instead. :returns: The revision at which a change was detected. """ return self._do_watch()
def _do_watch(self) -> DbRevision: """Wait for a change on one of the values read. :returns: The revision at which a change was detected. """ # Make sure the watchers we have in place match what we read self._update_watchers() # Wait for updates from the watcher queue block = True revision = self._revision start_time = time.time() while True: # Determine timeout timeout = None if self._watch_timeout is not None: timeout = max( 0, start_time + self._watch_timeout - time.time() ) # Wait for something to get pushed on the queue try: path, value, rev = self._watch_queue.get(block, timeout) except queue_m.Empty: self._got_timeout = block return revision # Exception? Re-raise if path is None and isinstance(value, Exception): raise value # Manual trigger? if path is None and value is None and rev is None: self._got_timeout = False break # Check that revision is newer (prevent duplicated updates) if rev.revision <= revision.revision: continue # Are we waiting on a value change of this one? if path not in self._get_queries: # Are we getting this because of one of the list queries? tagged_path = _tag_depth(path) found_match = False for (lpath, depth), (result, _) in self._list_queries.items(): if tagged_path.startswith(_tag_depth(lpath, depth)): # We should not notify for a value change, # only if a key was added / removed. Good # thing we can check that using the log. if value is None or path not in result: found_match = True break # Otherwise this is either a misfire from an old # watcher, or a value update from a list watcher (see # above). Ignore. if not found_match: continue # Alright, we can stop waiting. However, we will attempt # to clear the queue before we do so, as we might get a # lot of updates in batch revision = rev block = False return revision
[docs] def trigger_loop(self) -> None: """Manually triggers a loop Effectively makes loop(True) behave like loop(False), looping immediately. This is useful for interrupting a blocking watch() from a different thread. """ # Push a magic "cancel" entry self._watch_queue.put((None, None, None))
[docs] class Etcd3Watcher(Watcher): """Watch for database changes by using nested transactions Use as follows: .. code-block:: python for watcher in config.watcher(): for txn in watcher.txn(): # ... do something for txn in watcher.txn(): # ... do something else At the end of a for loop iteration, the watcher will start watching all values read by transactions started through :py:meth:`txn`, and only repeat the execution of the loop body once one of these values has changed. """ def __init__( self, backend: Etcd3BackendRevolution1, client: etcd3_revolution1.Client, timeout: float = None, txn_wrapper: TxnWrapper = None, ): """Initialise watcher. :param timeout: Maximum time to wait per loop in seconds. If ``None``, will wait indefinitely. """ super().__init__(backend, timeout, txn_wrapper) self._wait_txn = Etcd3Transaction(backend, client) self._client = client
[docs] def txn( self, max_retries: int = 64 ) -> Iterable[Etcd3Transaction | TxnWrapper]: """Create nested transaction. The watcher loop will iterate when any value read by transactions created by this method have changed in the database. Note that these transactions otherwise behave exactly as normal transactions: As long as they are internally consistent, they will be commited. This means there is no consistency guarantees between transactions created from the same watcher, i.e. one transaction might read one value from the database while a later one reads another. :param max_retries: Maximum number of times the transaction will be tried before giving up. """ # Make a new transaction. # pylint: disable=fixme # TODO: Would be more efficient if the different transactions # could share some sort of cache so they don't need to # re-query keys... for txn in Etcd3Transaction(self.backend, self._client, max_retries): yield self.get_txn(txn) # Extract read values from transaction # pylint: disable=protected-access,undefined-loop-variable if txn._committed: # Take over earliest revision used in a transaction, as we # want to know about any changes from that particular # point forward. if ( self._wait_txn._revision is None or self._wait_txn._revision.revision > txn._revision.revision ): self._wait_txn._revision = txn._revision self._wait_txn._get_queries.update(txn._get_queries) self._wait_txn._list_queries.update(txn._list_queries)
def __iter__(self) -> "Etcd3Watcher": """ Iterate forever, waiting after every interaction for something to change. """ try: while True: yield self # reset the alarm, since we don't want to use it # in the next iteration timeout = self.get_timeout() self._wake_up_at = None # pylint: disable=fixme, protected-access # TODO: Move those to this class! self._wait_txn.loop(True, timeout) self._wait_txn._do_watch() # Clear current queries self._wait_txn._get_queries = {} self._wait_txn._list_queries = {} finally: # pylint: disable=protected-access self._wait_txn._clear_watch()
[docs] def trigger(self) -> None: """Manually triggers a loop Can be called from a different thread to force a loop, even if the watcher is currently waiting. """ # pylint: disable=protected-access self._wait_txn.trigger_loop()