Backends

Common

Common functionality for implementing backends.

exception ska_sdp_config.backend.common.ConfigCollision(path: str, message: str)[source]

Exception generated if key to create already exists.

exception ska_sdp_config.backend.common.ConfigVanished(path: str, message: str)[source]

Exception generated if key to update that does not exist.

ska_sdp_config.backend.common.depth_of_path(path: str) int[source]

Get the depth of a path, this is the number of “/” in it.

Returns:

the depth

Etcd3 backend

Etcd3 backend for SKA SDP configuration DB.

class ska_sdp_config.backend.etcd3.Etcd3Backend(host='127.0.0.1', port='2379', max_retries: int = 15, retry_time: float = 0.1, **kw_args)[source]

Highly consistent database backend store.

See https://github.com/kragniz/python-etcd3

close() None[source]

Close the client connection.

create(path: str, value: str, lease: Lease | None = None) None[source]

Create a key and initialise it with the value.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision if the key already exists

delete(path: str, must_exist: bool = True, recursive: bool = False, prefix: bool = False, max_depth: int = 16)[source]

Delete the given key or key range.

Parameters:
  • path – path (prefix) of keys to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively

  • max_depth – Recursion limit

  • prefix – Delete all keys at given level with prefix

get(path: str, revision: DbRevision | None = None) tuple[str, DbRevision][source]

Get value of a key.

Parameters:
  • path – Path of key to query

  • revision – to get

Returns:

value and revision

lease(ttl: float = 10) Lease[source]

Generate a new lease.

Once entered, it can be associated with keys which will be kept alive until the end of the lease.

Note that this involves starting a daemon thread that will refresh the lease periodically (default seems to be TTL/4).

Parameters:

ttl – Time to live for lease

Returns:

lease object

list_keys(path: str, recurse: int | Iterable[int] = 0, revision: DbRevision | None = None, with_values: bool = False) tuple[list[str], DbRevision][source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Maximum recursion level to query. If iterable, cover exactly the recursion levels specified.

  • revision – Database revision for which to list

  • with_values – Also return key values and mod revisions (i.e. sorted list of key-value-rev tuples)

Returns:

(sorted key list, DbRevision object)

txn(max_retries: int = 64) Iterable[Etcd3Transaction][source]

Create a new transaction.

Note that this uses an optimistic STM-style implementation, which cannot guarantee that a transaction runs through successfully. Therefore, this function returns an iterator, which loops until the transaction succeeds:

for txn in etcd3.txn():
    # ... transaction steps ...

Note that this will in most cases only execute one iteration. If you actually want to loop - for instance because you intend to wait for something to happen in the configuration - use watcher() instead.

Parameters:

max_retries – Maximum number of transaction loops

Returns:

Transaction iterator

update(path: str, value: str) None[source]

Update an existing key. Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – New value of key

Raises:

ConfigVanished if the key does not exist

watcher(timeout=None, txn_wrapper: Callable[['Etcd3Transaction'], object] = None, requery_progress: float = 0.2) Iterable[Etcd3Watcher][source]

Create a new watcher.

Useful for waiting for changes in the configuration. See etcd3_watcher.Etcd3Watcher.

Parameters:
  • timeout – Timeout for waiting. Watcher will loop after this time.

  • txn_wrapper – Function to wrap transactions returned by the wrapper.

  • requery_progress – How often we “refresh” the current database state for watcher transactions even without watcher notification (upper bound on how “stale” non-watched values retrieved in transactions can be)

Returns:

Watcher iterator

class ska_sdp_config.backend.etcd3.Etcd3Transaction(backend: Etcd3Backend, client: client, max_retries: int = 64)[source]

A series of queries and updates to be executed atomically.

commit() bool[source]

Commit the transaction to the database.

This can fail, in which case the transaction must get reset and built again.

Returns:

Whether the commit succeeded

create(path: str, value: str, lease: Lease | None = None) None[source]

Create a key and initialise it with the value.

Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision if the key already exists

delete(path: str, must_exist: bool = True, recursive: bool = False, max_depth: int = 16, prefix: bool = False) None[source]

Delete the given key.

Parameters:
  • path – Path of key to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively (not used yet)

get(path: str) str | None[source]

Get value of a key.

Parameters:

path – Path of key to query

Returns:

Key value. None if it doesn’t exist.

list_keys(path: str, recurse: int | Iterable[int] = 0) list[str][source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Children depths to include in search

Returns:

sorted key list

on_commit(callback: Callable[[], None]) None[source]

Register a callback to call when the transaction succeeds.

Exists mostly to enable test cases.

Parameters:

callback – Callback to call

reset(revision: DbRevision | None = None) None[source]

Reset the transaction, so it can be restarted after commit().

Parameters:

revision – to reset

Raises:

RuntimeError if the transaction is not committed.

property revision: int

The last-committed database revision.

Only valid to call after the transaction has been committed.

Returns:

revision from DbRevision

update(path: str, value: str) None[source]

Update an existing key.

Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – Value to set

Raises:

ConfigVanished if the key is not found

Etcd3 backend revolution 1

Etcd3 backend for SKA SDP configuration DB, using client from https://github.com/Revolution1/etcd3-py

class ska_sdp_config.backend.etcd3_revolution1.Etcd3BackendRevolution1(*args, max_retries: int = 15, retry_time: float = 0.1, **kw_args)[source]

Highly consistent database backend store.

See https://github.com/etcd-io/etcd

All parameters will be passed on to etcd3.Client().

close() None[source]

Close the client connection.

create(path: str, value: str, lease: Lease | None = None) None[source]

Create a key and initialise it with the value.

Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision

delete(path: str, must_exist: bool = True, recursive: bool = False, prefix: bool = False, max_depth: int = 16) None[source]

Delete the given key or key range.

Parameters:
  • path – Path (prefix) of keys to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively

  • prefix – Delete all keys at given level with prefix

  • max_depth – Recursion limit

Returns:

Whether transaction was successful

get(path: str, revision: DbRevision | None = None) tuple[str, DbRevision][source]

Get value of a key.

Parameters:
  • path – Path of key to query

  • revision – Database revision for which to read key

Returns:

(value, revision). value is None if it doesn’t exist

lease(ttl: int = 10) Lease[source]

Generate a new lease.

Once entered can be associated with keys, which will be kept alive until the end of the lease. Note that this involves starting a daemon thread that will refresh the lease periodically (default seems to be TTL/4).

Parameters:

ttl – Time to live for lease

Returns:

lease object

list_keys(path: str, recurse: int = 0, revision: DbRevision | None = None) tuple[list[str], DbRevision][source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Maximum recursion level to query. If iterable, cover exactly the recursion levels specified.

  • revision – Database revision for which to list

Returns:

(sorted key list, revision)

txn(max_retries: int = 64) Iterable[Etcd3Transaction][source]

Create a new transaction.

Note that this uses an optimistic STM-style implementation, which cannot guarantee that a transaction runs through successfully. Therefore, this function returns an iterator, which loops until the transaction succeeds:

for txn in etcd3.txn():
    # ... transaction steps ...

Note that this will in most cases only execute one iteration. If you actually want to loop - for instance because you intend to wait for something to happen in the configuration - use watcher() instead.

Parameters:

max_retries – Maximum number of transaction loops

Returns:

Transaction iterator

update(path: str, value: str, must_be_rev: DbRevision | None = None) None[source]

Update an existing key. Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – Value to set

  • must_be_rev – Fail if found value does not match given revision (atomic update)

Raises:

ConfigVanished

watch(path: str, prefix: bool = False, revision: DbRevision | None = None, depth: int | None = None)[source]

Watch key or key range.

Use a path ending with ‘/’ in combination with prefix to watch all child keys.

Parameters:
  • path – Path of key to query, or prefix of keys.

  • prefix – Watch for keys with given prefix if set

  • revision – Database revision from which to watch

  • depth – tag depth

Returns:

Etcd3Watch object for watch request

watcher(timeout: float | None = None, txn_wrapper: TxnWrapper | None = None) Iterable[Etcd3Watcher][source]

Create a new watcher.

Useful for waiting for changes in the configuration. See Etcd3Watcher.

Parameters:
  • timeout – Timeout for waiting. Watcher will loop after this time.

  • txn_wrapper – Function to wrap transactions returned by the wrapper.

Returns:

Watcher iterator

class ska_sdp_config.backend.etcd3_revolution1.Etcd3Transaction(backend: Etcd3BackendRevolution1, client: Client, max_retries: int = 64)[source]

A series of queries and updates to be executed atomically.

Use Etcd3Backend.txn() or Etcd3Watcher.txn() to construct transactions.

clear_watch() None[source]

Stop all currently active watchers.

Deprecated: Use Etcd3Watcher instead.

commit() bool[source]

Commit the transaction to the database.

This can fail, in which case the transaction must get reset and built again.

Returns:

Whether the commit succeeded

create(path: str, value: str, lease: Lease | None = None) None[source]

Create a key and initialise it with the value.

Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision

delete(path: str, must_exist: bool = True, recursive: bool = False) None[source]

Delete the given key.

Parameters:
  • path – Path of key to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively (not used)

get(path: str) str[source]

Get value of a key.

Parameters:

path – Path of key to query

Returns:

Key value. None if it doesn’t exist.

list_keys(path: str, recurse: int = 0) list[str][source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Children depths to include in search

Returns:

sorted key list

loop(watch: bool = False, watch_timeout: float | None = None)[source]

Repeat transaction execution, even if it succeeds.

Deprecated: Use Etcd3Watcher instead, or loop manually.

Parameters:
  • watch – Once the transaction succeeds, block until one of the values read changes, then loop the transaction

  • watch_timeout – timeout value

on_commit(callback: Callable[[], None]) None[source]

Register a callback to call when the transaction succeeds.

A bit of a hack, but occassionally useful to add additional side-effects to a transaction that are guaranteed to not get duplicated.

Parameters:

callback – Callback to call

reset(revision: DbRevision | None = None) None[source]

Reset the transaction, so it can be restarted after commit().

property revision: int

The last-committed database revision.

Only valid to call after the transaction has been comitted.

trigger_loop() None[source]

Manually triggers a loop

Effectively makes loop(True) behave like loop(False), looping immediately. This is useful for interrupting a blocking watch() from a different thread.

update(path: str, value: str) None[source]

Update an existing key. Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – Value to set

Raises:

ConfigVanished

watch() None[source]

Wait for a change on one of the values read.

Deprecated: Use Etcd3Watcher instead.

Returns:

The revision at which a change was detected.

class ska_sdp_config.backend.etcd3_revolution1.Etcd3Watch(backend: Etcd3BackendRevolution1, tagged_path: str, start_revision: int, prefix: bool, max_retries: int = 20, retry_time: float = 0.1)[source]

Wrapper for etc3 watch requests.

Entering the watcher using a with block yields a queue of (key, val, rev) triples.

start(queue: Queue | None = None) None[source]

Activates the watcher, yielding a queue for updates.

stop()[source]

Deactivates the watcher.

class ska_sdp_config.backend.etcd3_revolution1.Etcd3Watcher(backend: Etcd3BackendRevolution1, client: Client, timeout: float | None = None, txn_wrapper: TxnWrapper | None = None)[source]

Watch for database changes by using nested transactions

Use as follows:

for watcher in config.watcher():
    for txn in watcher.txn():
        # ... do something
    for txn in watcher.txn():
        # ... do something else

At the end of a for loop iteration, the watcher will start watching all values read by transactions started through txn(), and only repeat the execution of the loop body once one of these values has changed.

trigger() None[source]

Manually triggers a loop

Can be called from a different thread to force a loop, even if the watcher is currently waiting.

txn(max_retries: int = 64) Iterable[Etcd3Transaction | TxnWrapper][source]

Create nested transaction.

The watcher loop will iterate when any value read by transactions created by this method have changed in the database.

Note that these transactions otherwise behave exactly as normal transactions: As long as they are internally consistent, they will be commited. This means there is no consistency guarantees between transactions created from the same watcher, i.e. one transaction might read one value from the database while a later one reads another.

Parameters:

max_retries – Maximum number of times the transaction will be tried before giving up.

Etcd3 watcher

Etcd3 backend for SKA SDP configuration DB, implementating of caching watcher.

class ska_sdp_config.backend.etcd3_watcher.Etcd3TransactionWatcher(watcher: Etcd3Watcher, backend: Etcd3Backend, client: Etcd3Client, revision: DbRevision, max_retries: int = 64)[source]

A series of queries and updates to be executed atomically.

This transaction offers the same interface as Etcd3Transaction, but utilises the watcher’s cache to return results more efficiently.

commit() bool[source]

Commit the transaction to the database.

This can fail, in which case the transaction must get reset and built again.

Returns:

Whether the commit succeeded

get(path: str) str[source]

Get value of a key.

Parameters:

path – Path of key to query

Returns:

Key value. None if it doesn’t exist.

list_keys(path: str, recurse: int = 0)[source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Children depths to include in search

Returns:

sorted key list

reset(revision: DbRevision | None = None) None[source]

Reset the transaction, so it can be restarted after commit()

class ska_sdp_config.backend.etcd3_watcher.Etcd3Watcher(backend: Etcd3Backend, client: Etcd3Client, timeout: float | None = None, txn_wrapper: Callable[[Etcd3Transaction], object] | None = None, requery_progress: float = 0.2, max_retries: int = 15, retry_time: float = 0.1)[source]

Watch for database changes by using nested transactions

Use as follows:

for watcher in config.watcher():
    for txn in watcher.txn():
        # ... do something
    for txn in watcher.txn():
        # ... do something else

At the end of a for loop iteration, the watcher will start watching all values read by transactions started through txn(), and only repeat the execution of the loop body once one of these values has changed.

trigger()[source]

Manually triggers a loop

Can be called from a different thread to force a loop, even if the watcher is currently waiting.

txn(max_retries: int = 64) Etcd3Transaction[source]

Create nested transaction.

The watcher loop will iterate when any value read by transactions created by this method have changed in the database.

Note that these transactions otherwise behave exactly as normal transactions: As long as they are internally consistent, they will be commited. This means there is no consistency guarantees between transactions created from the same watcher, i.e. one transaction might read one value from the database while a later one reads another.

Parameters:

max_retries – Maximum number of times the transaction will be tried before giving up.

Memory backend

Memory backend for SKA SDP configuration DB.

The main purpose of this is for use in testing. In principle, it should behave in the same way as the etcd backend. No attempt has been made to make it thread-safe, so it probably isn’t.

class ska_sdp_config.backend.memory.MemoryBackend[source]

In-memory backend implementation, principally for testing.

close() None[source]

Close the resource. This does nothing.

create(path: str, value: str, lease: Lease | None = None) None[source]

Create a key and initialise it with the value.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision if the key already exists

delete(path: str, must_exist: bool = True, recursive: bool = False, prefix: bool = False, max_depth: int = 16) None[source]

Delete the given key or key range.

Parameters:
  • path – path (prefix) of keys to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively

  • max_depth – Recursion limit

  • prefix – Delete all keys at given level with prefix

get(path: str, revision: DbRevision | None = None) tuple[str, DbRevision][source]

Get value of a key.

Parameters:
  • path – Path of key to query

  • revision – to get

Returns:

value and revision

lease(ttl: float = 10) Lease[source]

Generate a dummy lease object.

Parameters:

ttl – time to live

Returns:

dummy lease object

list_keys(path: str, recurse: int = 0) list[str][source]

Get a list of the keys at the given path.

In common with the etcd backend, the structure is “flat” rather than a real hierarchy, even though it looks like one.

Parameters:
  • path – prefix of keys to query

  • recurse – maximum recursion level to query

Returns:

list of keys

txn(max_retries: int = 64) Iterable[MemoryTransaction][source]

Create an in-memory “transaction”.

Parameters:

max_retries – Maximum number of transaction loops

Returns:

transaction object

update(path: str, value: str) None[source]

Update an existing key. Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – New value of key

Raises:

ConfigVanished if the key does not exist

watcher(timeout: float | None = None, txn_wrapper: TxnWrapper | None = None) Watcher[source]

Create an in-memory “watcher”.

Parameters:
  • timeout – timeout in seconds

  • txn_wrapper – wrapper (factory) to return transaction

Returns:

MemoryWatcher object (mock of Etcd3Watcher)

class ska_sdp_config.backend.memory.MemoryTransaction(backend: Backend)[source]

Transaction wrapper around the backend implementation.

Transactions always succeed if they are valid, so there is no need to loop; however the iterator is supported for compatibility with the etcd backend.

commit() bool[source]

Commit the transaction. This does nothing.

create(path: str, value: str, lease: Lease | None = None) None[source]

Create a key and initialise it with the value.

Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision if the key already exists

delete(path: str, must_exist: bool = True, recursive: bool = False)[source]

Delete the given key.

Parameters:
  • path – Path of key to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively (not used yet)

get(path: str) str[source]

Get value of a key.

Parameters:

path – Path of key to query

Returns:

Key value or None if it doesn’t exist.

list_keys(path: str, recurse: int = 0) list[str][source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Children depths to include in search

Returns:

sorted key list

loop(*_args, **_kwargs) None[source]

Loop the transaction. This does nothing.

reset(revision: DbRevision | None = None) None[source]

Reset the transaction. This does nothing.

update(path: str, value: str) None[source]

Update an existing key.

Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – Value to set

Raises:

ConfigVanished if the key is not found

class ska_sdp_config.backend.memory.MemoryWatcher(backend: Backend, timeout: float | None = None, txn_wrapper: TxnWrapper | None = None)[source]

Watcher wrapper around the backend implementation (Etcd3Watcher).

txn() Iterable[MemoryTransaction][source]

Yield the wrapped MemoryTransaction object.

It does not implement the commit check that is part of Etcd3Watcher.txn(), hence it acts as MemoryBackend.txn()