Backends
Common
Common functionality for implementing backends.
- exception ska_sdp_config.backend.common.ConfigCollision(path: str, message: str)[source]
Exception generated if key to create already exists.
Etcd3 backend
Etcd3 backend for SKA SDP configuration DB.
- class ska_sdp_config.backend.etcd3.Etcd3Backend(host='127.0.0.1', port='2379', max_retries: int = 15, retry_time: float = 0.1, **kw_args)[source]
Highly consistent database backend store.
See https://github.com/kragniz/python-etcd3
- create(path: str, value: str, lease: Lease | None = None) None [source]
Create a key and initialise it with the value.
- Parameters:
path – Path to create
value – Value to set
lease – Lease to associate
- Raises:
ConfigCollision if the key already exists
- delete(path: str, must_exist: bool = True, recursive: bool = False, prefix: bool = False, max_depth: int = 16)[source]
Delete the given key or key range.
- Parameters:
path – path (prefix) of keys to remove
must_exist – Fail if path does not exist?
recursive – Delete children keys at lower levels recursively
max_depth – Recursion limit
prefix – Delete all keys at given level with prefix
- get(path: str, revision: DbRevision | None = None) tuple[str, DbRevision] [source]
Get value of a key.
- Parameters:
path – Path of key to query
revision – to get
- Returns:
value and revision
- lease(ttl: float = 10) Lease [source]
Generate a new lease.
Once entered, it can be associated with keys which will be kept alive until the end of the lease.
Note that this involves starting a daemon thread that will refresh the lease periodically (default seems to be TTL/4).
- Parameters:
ttl – Time to live for lease
- Returns:
lease object
- list_keys(path: str, recurse: int | Iterable[int] = 0, revision: DbRevision | None = None, with_values: bool = False) tuple[list[str], DbRevision] [source]
List keys under given path.
- Parameters:
path – Prefix of keys to query. Append ‘/’ to list child paths.
recurse – Maximum recursion level to query. If iterable, cover exactly the recursion levels specified.
revision – Database revision for which to list
with_values – Also return key values and mod revisions (i.e. sorted list of key-value-rev tuples)
- Returns:
(sorted key list, DbRevision object)
- property server_version: Version
Returns the version of the server
- txn(max_retries: int = 64) Iterable[Etcd3Transaction] [source]
Create a new transaction.
Note that this uses an optimistic STM-style implementation, which cannot guarantee that a transaction runs through successfully. Therefore, this function returns an iterator, which loops until the transaction succeeds:
for txn in etcd3.txn(): # ... transaction steps ...
Note that this will in most cases only execute one iteration. If you actually want to loop - for instance because you intend to wait for something to happen in the configuration - use
watcher()
instead.- Parameters:
max_retries – Maximum number of transaction loops
- Returns:
Transaction iterator
- update(path: str, value: str) None [source]
Update an existing key. Fails if the key does not exist.
- Parameters:
path – Path to update
value – New value of key
- Raises:
ConfigVanished if the key does not exist
- watcher(timeout=None, txn_wrapper: Callable[['Etcd3Transaction'], object] = None, requery_progress: float = 0.2) Iterable[Etcd3Watcher] [source]
Create a new watcher.
Useful for waiting for changes in the configuration. See
etcd3_watcher.Etcd3Watcher
.- Parameters:
timeout – Timeout for waiting. Watcher will loop after this time.
txn_wrapper – Function to wrap transactions returned by the wrapper.
requery_progress – How often we “refresh” the current database state for watcher transactions even without watcher notification (upper bound on how “stale” non-watched values retrieved in transactions can be)
- Returns:
Watcher iterator
- class ska_sdp_config.backend.etcd3.Etcd3Transaction(backend: Etcd3Backend, client: client, max_retries: int = 64)[source]
A series of queries and updates to be executed atomically.
- commit() bool [source]
Commit the transaction to the database.
This can fail, in which case the transaction must get reset and built again.
- Returns:
Whether the commit succeeded
- create(path: str, value: str, lease: Lease | None = None) None [source]
Create a key and initialise it with the value.
Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires.
- Parameters:
path – Path to create
value – Value to set
lease – Lease to associate
- Raises:
ConfigCollision if the key already exists
- delete(path: str, must_exist: bool = True, recursive: bool = False, max_depth: int = 16, prefix: bool = False) None [source]
Delete the given key.
- Parameters:
path – Path of key to remove
must_exist – Fail if path does not exist?
recursive – Delete children keys at lower levels recursively (not used yet)
- get(path: str) str | None [source]
Get value of a key.
- Parameters:
path – Path of key to query
- Returns:
Key value. None if it doesn’t exist.
- list_keys(path: str, recurse: int | Iterable[int] = 0) list[str] [source]
List keys under given path.
- Parameters:
path – Prefix of keys to query. Append ‘/’ to list child paths.
recurse – Children depths to include in search
- Returns:
sorted key list
- on_commit(callback: Callable[[], None]) None [source]
Register a callback to call when the transaction succeeds.
Exists mostly to enable test cases.
- Parameters:
callback – Callback to call
- reset(revision: DbRevision | None = None) None [source]
Reset the transaction, so it can be restarted after commit().
- Parameters:
revision – to reset
- Raises:
RuntimeError if the transaction is not committed.
- property revision: int
The last-committed database revision.
Only valid to call after the transaction has been committed.
- Returns:
revision from DbRevision
Etcd3 watcher
Etcd3 backend for SKA SDP configuration DB, implementating of caching watcher.
- class ska_sdp_config.backend.etcd3_watcher.Etcd3TransactionWatcher(watcher: Etcd3Watcher, backend: Etcd3Backend, client: Etcd3Client, revision: DbRevision, max_retries: int = 64)[source]
A series of queries and updates to be executed atomically.
This transaction offers the same interface as Etcd3Transaction, but utilises the watcher’s cache to return results more efficiently.
- commit() bool [source]
Commit the transaction to the database.
This can fail, in which case the transaction must get reset and built again.
- Returns:
Whether the commit succeeded
- get(path: str) str [source]
Get value of a key.
- Parameters:
path – Path of key to query
- Returns:
Key value. None if it doesn’t exist.
- class ska_sdp_config.backend.etcd3_watcher.Etcd3Watcher(backend: Etcd3Backend, client: Etcd3Client, timeout: float | None = None, txn_wrapper: Callable[[Etcd3Transaction], object] | None = None, requery_progress: float = 0.2, max_retries: int = 15, retry_time: float = 0.1)[source]
Watch for database changes by using nested transactions
Use as follows:
for watcher in config.watcher(): for txn in watcher.txn(): # ... do something for txn in watcher.txn(): # ... do something else
At the end of a for loop iteration, the watcher will start watching all values read by transactions started through
txn()
, and only repeat the execution of the loop body once one of these values has changed.- trigger()[source]
Manually triggers a loop
Can be called from a different thread to force a loop, even if the watcher is currently waiting.
- txn(max_retries: int = 64) Etcd3Transaction [source]
Create nested transaction.
The watcher loop will iterate when any value read by transactions created by this method have changed in the database.
Note that these transactions otherwise behave exactly as normal transactions: As long as they are internally consistent, they will be commited. This means there is no consistency guarantees between transactions created from the same watcher, i.e. one transaction might read one value from the database while a later one reads another.
- Parameters:
max_retries – Maximum number of times the transaction will be tried before giving up.
Memory backend
Memory backend for SKA SDP configuration DB.
The main purpose of this is for use in testing. In principle, it should behave in the same way as the etcd backend. No attempt has been made to make it thread-safe, so it probably isn’t.
- class ska_sdp_config.backend.memory.MemoryBackend[source]
In-memory backend implementation, principally for testing.
- create(path: str, value: str, lease: Lease | None = None) None [source]
Create a key and initialise it with the value.
- Parameters:
path – Path to create
value – Value to set
lease – Lease to associate
- Raises:
ConfigCollision if the key already exists
- delete(path: str, must_exist: bool = True, recursive: bool = False, prefix: bool = False, max_depth: int = 16) None [source]
Delete the given key or key range.
- Parameters:
path – path (prefix) of keys to remove
must_exist – Fail if path does not exist?
recursive – Delete children keys at lower levels recursively
max_depth – Recursion limit
prefix – Delete all keys at given level with prefix
- get(path: str, revision: DbRevision | None = None) tuple[str, DbRevision] [source]
Get value of a key.
- Parameters:
path – Path of key to query
revision – to get
- Returns:
value and revision
- lease(ttl: float = 10) Lease [source]
Generate a dummy lease object.
- Parameters:
ttl – time to live
- Returns:
dummy lease object
- list_keys(path: str, recurse: int = 0) list[str] [source]
Get a list of the keys at the given path.
In common with the etcd backend, the structure is “flat” rather than a real hierarchy, even though it looks like one.
- Parameters:
path – prefix of keys to query
recurse – maximum recursion level to query
- Returns:
list of keys
- txn(max_retries: int = 64) Iterable[MemoryTransaction] [source]
Create an in-memory “transaction”.
- Parameters:
max_retries – Maximum number of transaction loops
- Returns:
transaction object
- class ska_sdp_config.backend.memory.MemoryTransaction(backend: Backend)[source]
Transaction wrapper around the backend implementation.
Transactions always succeed if they are valid, so there is no need to loop; however the iterator is supported for compatibility with the etcd backend.
- create(path: str, value: str, lease: Lease | None = None) None [source]
Create a key and initialise it with the value.
Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires.
- Parameters:
path – Path to create
value – Value to set
lease – Lease to associate
- Raises:
ConfigCollision if the key already exists
- delete(path: str, must_exist: bool = True, recursive: bool = False)[source]
Delete the given key.
- Parameters:
path – Path of key to remove
must_exist – Fail if path does not exist?
recursive – Delete children keys at lower levels recursively (not used yet)
- get(path: str) str [source]
Get value of a key.
- Parameters:
path – Path of key to query
- Returns:
Key value or None if it doesn’t exist.
- class ska_sdp_config.backend.memory.MemoryWatcher(backend: Backend, timeout: float | None = None, txn_wrapper: TxnWrapper | None = None)[source]
Watcher wrapper around the backend implementation (Etcd3Watcher).
- txn() Iterable[MemoryTransaction] [source]
Yield the wrapped MemoryTransaction object.
It does not implement the commit check that is part of Etcd3Watcher.txn(), hence it acts as MemoryBackend.txn()