Source code for ska_sdp_config.backend.memory

"""
Memory backend for SKA SDP configuration DB.

The main purpose of this is for use in testing.
In principle, it should behave in the same way as the etcd backend.
No attempt has been made to make it thread-safe, so it probably isn't.
"""
from typing import Callable, Iterable, Optional

from .backend import (
    Backend,
    DbRevision,
    DbTransaction,
    Lease,
    TxnWrapper,
    Watcher,
)
from .common import (
    ConfigCollision,
    ConfigVanished,
    _check_path,
    _tag_depth,
    _untag_depth,
    depth_of_path,
)


def _op(
    path: str,
    value: str,
    to_check: Callable[[str], None],
    to_do: Callable[[str, str], None],
):
    _check_path(path)
    tag = _tag_depth(path)
    to_check(tag)
    to_do(tag, value)


[docs] class MemoryBackend(Backend): """In-memory backend implementation, principally for testing.""" # Class variable to store data _data: dict[str, str] = {}
[docs] def lease(self, ttl: float = 10) -> Lease: """ Generate a dummy lease object. :param ttl: time to live :returns: dummy lease object """ return Lease()
[docs] def txn(self, max_retries: int = 64) -> Iterable["MemoryTransaction"]: """ Create an in-memory "transaction". :param max_retries: Maximum number of transaction loops :returns: transaction object """ yield MemoryTransaction(self)
[docs] def watcher( self, timeout: float = None, txn_wrapper: TxnWrapper = None ) -> Watcher: """ Create an in-memory "watcher". :param timeout: timeout in seconds :param txn_wrapper: wrapper (factory) to return transaction :returns: MemoryWatcher object (mock of Etcd3Watcher) """ return MemoryWatcher(self, timeout, txn_wrapper)
[docs] def get( self, path: str, revision: Optional[DbRevision] = None ) -> tuple[str, DbRevision]: return ( self._data.get(_tag_depth(path), None), revision or DbRevision(0), )
def _put(self, path: str, value: str) -> None: self._data[path] = value def _check_exists(self, path: str) -> None: if path not in self._data: raise ConfigVanished(path, f"{path} not in dictionary") def _check_not_exists(self, path: str) -> None: if path in self._data: raise ConfigCollision(path, f"path {path} already in dictionary")
[docs] def create( self, path: str, value: str, lease: Optional[Lease] = None ) -> None: _op(path, value, self._check_not_exists, self._put)
[docs] def update(self, path: str, value: str) -> None: _op(path, value, self._check_exists, self._put)
# pylint: disable=too-many-arguments
[docs] def delete( self, path: str, must_exist: bool = True, recursive: bool = False, prefix: bool = False, max_depth: int = 16, ) -> None: _check_path(path) tag = _tag_depth(path) if must_exist: self._check_exists(tag) if recursive: depth = depth_of_path(path) for lvl in range(depth, depth + max_depth): tag = _tag_depth(path, depth=lvl) for key in self._data.copy(): if key.startswith(tag): self._data.pop(key) elif tag in self._data: self._data.pop(tag)
[docs] def list_keys(self, path: str, recurse: int = 0) -> list[str]: """ Get a list of the keys at the given path. In common with the etcd backend, the structure is "flat" rather than a real hierarchy, even though it looks like one. :param path: prefix of keys to query :param recurse: maximum recursion level to query :returns: list of keys """ depth = depth_of_path(path) keys = [] for lvl in range(depth, depth + recurse + 1): tag = _tag_depth(path, depth=lvl) keys += [_untag_depth(k) for k in self._data if k.startswith(tag)] return sorted(keys)
[docs] def close(self) -> None: """ Close the resource. This does nothing. """
def __repr__(self) -> str: return str(self._data)
[docs] class MemoryTransaction(DbTransaction): """ Transaction wrapper around the backend implementation. Transactions always succeed if they are valid, so there is no need to loop; however the iterator is supported for compatibility with the etcd backend. """ def __iter__(self): """ Iterate over just this object. :returns: this object """ yield self
[docs] def commit(self) -> bool: """ Commit the transaction. This does nothing. """ return True
[docs] def reset(self, revision: Optional[DbRevision] = None) -> None: """ Reset the transaction. This does nothing. """
[docs] def get(self, path: str) -> str: value, _ = self.backend.get(path) return value
[docs] def create( self, path: str, value: str, lease: Optional[Lease] = None ) -> None: self.backend.create(path, value)
[docs] def update(self, path: str, value: str) -> None: self.backend.update(path, value)
[docs] def delete( self, path: str, must_exist: bool = True, recursive: bool = False, ): self.backend.delete(path, must_exist=must_exist, recursive=recursive)
[docs] def list_keys(self, path: str, recurse: int = 0) -> list[str]: return self.backend.list_keys(path, recurse=recurse)
[docs] def loop(self, *_args, **_kwargs) -> None: """ Loop the transaction. This does nothing. """
[docs] class MemoryWatcher(Watcher): """ Watcher wrapper around the backend implementation (Etcd3Watcher). """ def __iter__(self) -> Iterable["MemoryWatcher"]: """ Iterate over just this object. :returns: this object """ yield self
[docs] def txn(self) -> Iterable[MemoryTransaction]: """ Yield the wrapped MemoryTransaction object. It does not implement the commit check that is part of Etcd3Watcher.txn(), hence it acts as MemoryBackend.txn() """ for txn in MemoryTransaction(self.backend): yield self.get_txn(txn)