Configuration API

High-Level API

High-level API for SKA SDP configuration.

class ska_sdp_config.config.Config(backend=None, global_prefix: str = '', owner: dict | None = None, component_name: str | None = None, wrapper: TxnWrapper | None = None, **cargs)[source]

Connection to SKA SDP configuration.

property alive_key: str | None

Get the alive key.

Returns:

alive key or None if not set

property backend: Backend

Get the backend database object.

property client_lease: Lease

Return the lease associated with the client.

It will be kept alive until the client gets closed.

close() None[source]

Close the client connection.

is_alive() bool[source]

Is the connection alive in the sense that the keep-alive key exists?

Returns:

whether it is

lease(ttl=10) Lease[source]

Generate a new lease.

Once entered can be associated with keys, which will be kept alive until the end of the lease. At that point a daemon thread will be started automatically to refresh the lease periodically (default seems to be TTL/4).

Parameters:

ttl – Time to live for lease

Returns:

lease object

set_alive() None[source]

Set the keep-alive key.

txn(max_retries: int = 64) Iterable[Transaction | TxnWrapper][source]

Create a Transaction for atomic configuration query/change.

As we do not use locks, transactions might have to be repeated in order to guarantee atomicity. Suggested usage is as follows:

for txn in config.txn():
    # Use txn to read+write configuration
    # [Possibly call txn.loop()]

As the for loop suggests, the code might get run multiple times even if not forced by calling Transaction.loop(). Any writes using the transaction will be discarded if the transaction fails, but the application must make sure that the loop body has no other observable side effects.

See also Usage Guide for best practices for using transactions.

Parameters:

max_retries – Number of transaction retries before a RuntimeError gets raised.

watcher(timeout: float | None = None) Iterable[Watcher][source]

Create a new watcher.

Useful for waiting for changes in the configuration. Calling Etcd3Watcher.txn() on the returned watchers will create Transaction objects just like txn().

See also Usage Guide for best practices for using watchers.

Parameters:

timeout – Timeout for waiting. Watcher will loop after this time.

class ska_sdp_config.config.Transaction(config: Any, txn: DbTransaction, paths: dict[str, str])[source]

High-level configuration queries and updates to execute atomically.

create_controller(state: dict) None[source]

Create controller state.

Parameters:

state – controller state

create_deployment(dpl: Deployment) None[source]

Request a change to cluster configuration.

Parameters:

dpl – Deployment to add to database

create_deployment_state(deploy_id: str, state: dict) None[source]

Create Deployment state.

Parameters:
  • deploy_id – Deployment ID

  • state – Deployment state to create

create_execution_block(eb_id: str, state: dict) None[source]

Create execution block.

Parameters:
  • eb_id – execution block ID

  • state – execution block state

create_is_alive(key: str, lease: Lease) str[source]

Create an “is_alive” entry.

Parameters:
  • key – “is alive” key in database e.g. “lmc-controller/owner”

  • lease – to associate with the entry

Returns:

the full path of the entry

create_processing_block(pblock: ProcessingBlock) None[source]

Add a new ProcessingBlock to the configuration.

Parameters:

pblock – Processing block to create

create_processing_block_state(pb_id: str, state: dict) None[source]

Create processing block state.

Parameters:
  • pb_id – Processing block ID

  • state – Processing block state to create

create_script(kind: str, name: str, version: str, script: dict) None[source]

Create processing script definition.

Parameters:
  • kind – script kind

  • name – script name

  • version – script version

  • script – script definition

create_subarray(subarray_id: str, state: dict) None[source]

Create subarray state.

Parameters:
  • subarray_id – subarray ID

  • state – subarray state

delete_deployment(dpl: Deployment) None[source]

Undo a change to cluster configuration.

Parameters:

dpl – Deployment to remove

delete_execution_block(eb_id: str, recurse: bool = True) None[source]

Delete an execution block (eb)

Parameters:
  • eb_id – Execution block ID

  • recurse – if True, run recursive query and delete all objects

delete_processing_block(pb_id: str, recurse: bool = True) None[source]

Delete a processing block (pb)

Parameters:
  • pb_id – Processing block ID

  • recurse – if True, run recursive query and delete all includes deleting /state and /owner of pb if exists

delete_script(kind: str, name: str, version: str) None[source]

Delete processing script definition.

Parameters:
  • kind – script kind

  • name – script name

  • version – script version

get_controller() dict | None[source]

Get controller state.

Returns:

controller state

get_deployment(deploy_id: str) Deployment | None[source]

Retrieve details about a cluster configuration change.

Parameters:

deploy_id – Name of the deployment

Returns:

Deployment details

get_deployment_state(deploy_id: str) dict | None[source]

Get the current Deployment state.

Parameters:

deploy_id – Deployment ID

Returns:

Deployment state, or None if not present

get_execution_block(eb_id: str) dict[source]

Get execution block.

Parameters:

eb_id – execution block ID

Returns:

execution block state

get_processing_block(pb_id: str) ProcessingBlock | None[source]

Look up processing block data.

Parameters:

pb_id – Processing block ID to look up

Returns:

Processing block entity, or None if it doesn’t exist

get_processing_block_owner(pb_id: str) dict | None[source]

Look up the current processing block owner.

Parameters:

pb_id – Processing block ID to look up

Returns:

Processing block owner data, or None if not claimed

get_processing_block_state(pb_id: str) dict | None[source]

Get the current processing block state.

Parameters:

pb_id – Processing block ID

Returns:

Processing block state, or None if not present

get_script(kind: str, name: str, version: str) dict | None[source]

Get processing script definition.

Parameters:
  • kind – script kind

  • name – script name

  • version – script version

Returns:

script definition

get_subarray(subarray_id: str) dict | None[source]

Get subarray state.

Parameters:

subarray_id – subarray ID

Returns:

subarray state

is_alive(key: str) bool[source]

Check if the “is alive” key still exists.

Parameters:

key – “is alive” key in database e.g. “lmc-controller/owner”

Returns:

True if it does

is_processing_block_owner(pb_id: str) bool[source]

Check whether this client is owner of the processing block.

Parameters:

pb_id – Processing block ID to look up

Returns:

Whether processing block exists and is claimed

list_deployments(prefix: str = '') list[str][source]

List all current deployments.

Returns:

Deployment IDs

list_execution_blocks(prefix: str = '') list[str][source]

Query execution block IDs from the configuration.

Parameters:

prefix – if given, only search for execution block IDs with the given prefix

Returns:

execution block IDs, in lexicographical order

list_processing_blocks(prefix: str = '') list[str][source]

Query processing block IDs from the configuration.

Parameters:

prefix – If given, only search for processing block IDs with the given prefix

Returns:

Processing block ids, in lexicographical order

list_scripts(kind: str = '', name: str = '') list[tuple[str]][source]

List processing script definitions.

Parameters:
  • kind – script kind. Default empty

  • name – script name. Default empty

Returns:

list of script definitions

list_subarrays(prefix: str = '') list[str][source]

Query subarray IDs from the configuration.

Parameters:

prefix – if given, only search for subarray IDs with the given prefix

Returns:

subarray IDs, in lexicographical order

loop(wait: bool = False, timeout: float | None = None) None[source]

Repeat transaction regardless of whether commit succeeds.

Parameters:
  • wait – If transaction succeeded, wait for any read values to change before repeating it.

  • timeout – Maximum time to wait, in seconds

new_execution_block_id(generator: str) str[source]

Generate a new execution block ID that is not yet in use.

Parameters:

generator – Name of the generator

Returns:

execution block ID

new_processing_block_id(generator: str) str[source]

Generate a new processing block ID that is not yet in use.

Parameters:

generator – Name of the generator

Returns:

Processing block ID

property raw: DbTransaction

Return transaction object for accessing database directly.

take_processing_block(pb_id: str, lease: Lease) None[source]

Take ownership of the processing block.

Parameters:
  • pb_id – Processing block ID to take ownership of

  • lease – lease

Raises:

backend.ConfigCollision

update_controller(state: dict) None[source]

Update controller state.

Parameters:

state – controller state

update_deployment_state(deploy_id: str, state: dict) None[source]

Update Deployment state.

Parameters:
  • deploy_id – Deployment ID

  • state – Deployment state to update

update_execution_block(eb_id: str, state: dict) None[source]

Update execution block.

Parameters:
  • eb_id – execution block ID

  • state – execution block state

update_processing_block(pblock: ProcessingBlock) None[source]

Update a ProcessingBlock in the configuration.

Parameters:

pblock – Processing block to update

update_processing_block_state(pb_id: str, state: dict) None[source]

Update processing block state.

Parameters:
  • pb_id – Processing block ID

  • state – Processing block state to update

update_script(kind: str, name: str, version: str, script: dict) None[source]

Update processing script definition.

Parameters:
  • kind – script kind

  • name – script name

  • version – script version

  • script – script definition

update_subarray(subarray_id: str, state: dict) None[source]

Update subarray state.

Parameters:
  • subarray_id – subarray ID

  • state – subarray state

ska_sdp_config.config.dict_to_json(obj: dict) str[source]

Format a dictionary for writing it into the database.

Parameters:

obj – Dictionary object to format

Returns:

String representation

Entities

Processing Block

Processing block configuration entities.

class ska_sdp_config.entity.pb.ProcessingBlock(pb_id, eb_id, script, parameters=None, dependencies=None, **kwargs)[source]

Processing block entity.

Collects configuration information relating to a processing job for the SDP. This might be either real-time (supporting a running observation) or batch (to process data after the fact).

Actual execution of processing steps will be performed by a (parameterised) processing script interpreting processing block information.

property dependencies

Return dependencies on other processing blocks.

property eb_id

Return execution block instance ID, if associated with one.

property parameters

Return processing script-specific parameters.

property pb_id

Return the processing block ID.

property script

Return information identifying the processing script.

to_dict()[source]

Return data as dictionary.

Deployment

Deployment configuration entities.

class ska_sdp_config.entity.deployment.Deployment(dpl_id, kind, args)[source]

Deployment entity.

Collects configuration information relating to a cluster configuration change.

property args

Return deployment arguments.

property dpl_id

Return the deployment id.

property kind

Return deployment kind.

to_dict()[source]

Return data as dictionary.

Backends

Common

Common functionality for implementing backends.

exception ska_sdp_config.backend.common.ConfigCollision(path: str, message: str)[source]

Exception generated if key to create already exists.

exception ska_sdp_config.backend.common.ConfigVanished(path: str, message: str)[source]

Exception generated if key to update that does not exist.

ska_sdp_config.backend.common.depth_of_path(path: str) int[source]

Get the depth of a path, this is the number of “/” in it.

Returns:

the depth

Etcd3 backend

Etcd3 backend for SKA SDP configuration DB.

class ska_sdp_config.backend.etcd3.Etcd3Backend(host='localhost', port='2379', max_retries: int = 15, retry_time: float = 0.1, **kw_args)[source]

Highly consistent database backend store.

See https://github.com/kragniz/python-etcd3

close() None[source]

Close the client connection.

create(path: str, value: str, lease: etcd3.Lease | None = None) None[source]

Create a key and initialise it with the value.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision if the key already exists

delete(path: str, must_exist: bool = True, recursive: bool = False, prefix: bool = False, max_depth: int = 16)[source]

Delete the given key or key range.

Parameters:
  • path – path (prefix) of keys to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively

  • max_depth – Recursion limit

  • prefix – Delete all keys at given level with prefix

get(path: str, revision: DbRevision | None = None) tuple[str, DbRevision][source]

Get value of a key.

Parameters:
  • path – Path of key to query

  • revision – to get

Returns:

value and revision

lease(ttl: float = 10) Lease[source]

Generate a new lease.

Once entered, it can be associated with keys which will be kept alive until the end of the lease.

Note that this involves starting a daemon thread that will refresh the lease periodically (default seems to be TTL/4).

Parameters:

ttl – Time to live for lease

Returns:

lease object

list_keys(path: str, recurse: int | Iterable[int] = 0, revision: DbRevision | None = None, with_values: bool = False) tuple[list[str], DbRevision][source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Maximum recursion level to query. If iterable, cover exactly the recursion levels specified.

  • revision – Database revision for which to list

  • with_values – Also return key values and mod revisions (i.e. sorted list of key-value-rev tuples)

Returns:

(sorted key list, DbRevision object)

txn(max_retries: int = 64) Iterable[Etcd3Transaction][source]

Create a new transaction.

Note that this uses an optimistic STM-style implementation, which cannot guarantee that a transaction runs through successfully. Therefore, this function returns an iterator, which loops until the transaction succeeds:

for txn in etcd3.txn():
    # ... transaction steps ...

Note that this will in most cases only execute one iteration. If you actually want to loop - for instance because you intend to wait for something to happen in the configuration - use watcher() instead.

Parameters:

max_retries – Maximum number of transaction loops

Returns:

Transaction iterator

update(path: str, value: str) None[source]

Update an existing key. Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – New value of key

Raises:

ConfigVanished if the key does not exist

watcher(timeout=None, txn_wrapper: Callable[['Etcd3Transaction'], object] = None, requery_progress: float = 0.2) Iterable[Etcd3Watcher][source]

Create a new watcher.

Useful for waiting for changes in the configuration. See Etcd3Watcher.

Parameters:
  • timeout – Timeout for waiting. Watcher will loop after this time.

  • txn_wrapper – Function to wrap transactions returned by the wrapper.

  • requery_progress – How often we “refresh” the current database state for watcher transactions even without watcher notification (upper bound on how “stale” non-watched values retrieved in transactions can be)

Returns:

Watcher iterator

class ska_sdp_config.backend.etcd3.Etcd3Transaction(backend: Etcd3Backend, client: etcd3.client, max_retries: int = 64)[source]

A series of queries and updates to be executed atomically.

commit() bool[source]

Commit the transaction to the database.

This can fail, in which case the transaction must get reset and built again.

Returns:

Whether the commit succeeded

create(path: str, value: str, lease: etcd3.Lease | None = None) None[source]

Create a key and initialise it with the value.

Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision if the key already exists

delete(path: str, must_exist: bool = True, recursive: bool = False, max_depth: int = 16, prefix: bool = False) None[source]

Delete the given key.

Parameters:
  • path – Path of key to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively (not used yet)

get(path: str) str | None[source]

Get value of a key.

Parameters:

path – Path of key to query

Returns:

Key value. None if it doesn’t exist.

list_keys(path: str, recurse: int | Iterable[int] = 0) list[str][source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Children depths to include in search

Returns:

sorted key list

on_commit(callback: Callable[[], None]) None[source]

Register a callback to call when the transaction succeeds.

Exists mostly to enable test cases.

Parameters:

callback – Callback to call

reset(revision: DbRevision | None = None) None[source]

Reset the transaction, so it can be restarted after commit().

Parameters:

revision – to reset

Raises:

RuntimeError if the transaction is not committed.

property revision: int

The last-committed database revision.

Only valid to call after the transaction has been committed.

Returns:

revision from DbRevision

update(path: str, value: str) None[source]

Update an existing key.

Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – Value to set

Raises:

ConfigVanished if the key is not found

Etcd3 backend revolution 1

Etcd3 backend for SKA SDP configuration DB, using client from https://github.com/Revolution1/etcd3-py

class ska_sdp_config.backend.etcd3_revolution1.Etcd3BackendRevolution1(*args, max_retries: int = 15, retry_time: float = 0.1, **kw_args)[source]

Highly consistent database backend store.

See https://github.com/etcd-io/etcd

All parameters will be passed on to etcd3.Client().

close() None[source]

Close the client connection.

create(path: str, value: str, lease: etcd3_revolution1.Lease | None = None) None[source]

Create a key and initialise it with the value.

Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision

delete(path: str, must_exist: bool = True, recursive: bool = False, prefix: bool = False, max_depth: int = 16) None[source]

Delete the given key or key range.

Parameters:
  • path – Path (prefix) of keys to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively

  • prefix – Delete all keys at given level with prefix

  • max_depth – Recursion limit

Returns:

Whether transaction was successful

get(path: str, revision: DbRevision | None = None) tuple[str, DbRevision][source]

Get value of a key.

Parameters:
  • path – Path of key to query

  • revision – Database revision for which to read key

Returns:

(value, revision). value is None if it doesn’t exist

lease(ttl: int = 10) Lease[source]

Generate a new lease.

Once entered can be associated with keys, which will be kept alive until the end of the lease. Note that this involves starting a daemon thread that will refresh the lease periodically (default seems to be TTL/4).

Parameters:

ttl – Time to live for lease

Returns:

lease object

list_keys(path: str, recurse: int = 0, revision: DbRevision | None = None) tuple[list[str], DbRevision][source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Maximum recursion level to query. If iterable, cover exactly the recursion levels specified.

  • revision – Database revision for which to list

Returns:

(sorted key list, revision)

txn(max_retries: int = 64) Iterable[Etcd3Transaction][source]

Create a new transaction.

Note that this uses an optimistic STM-style implementation, which cannot guarantee that a transaction runs through successfully. Therefore, this function returns an iterator, which loops until the transaction succeeds:

for txn in etcd3.txn():
    # ... transaction steps ...

Note that this will in most cases only execute one iteration. If you actually want to loop - for instance because you intend to wait for something to happen in the configuration - use watcher() instead.

Parameters:

max_retries – Maximum number of transaction loops

Returns:

Transaction iterator

update(path: str, value: str, must_be_rev: DbRevision | None = None) None[source]

Update an existing key. Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – Value to set

  • must_be_rev – Fail if found value does not match given revision (atomic update)

Raises:

ConfigVanished

watch(path: str, prefix: bool = False, revision: DbRevision | None = None, depth: int | None = None)[source]

Watch key or key range.

Use a path ending with ‘/’ in combination with prefix to watch all child keys.

Parameters:
  • path – Path of key to query, or prefix of keys.

  • prefix – Watch for keys with given prefix if set

  • revision – Database revision from which to watch

  • depth – tag depth

Returns:

Etcd3Watch object for watch request

watcher(timeout: float | None = None, txn_wrapper: TxnWrapper | None = None) Iterable[Etcd3Watcher][source]

Create a new watcher.

Useful for waiting for changes in the configuration. See Etcd3Watcher.

Parameters:
  • timeout – Timeout for waiting. Watcher will loop after this time.

  • txn_wrapper – Function to wrap transactions returned by the wrapper.

Returns:

Watcher iterator

class ska_sdp_config.backend.etcd3_revolution1.Etcd3Transaction(backend: Etcd3BackendRevolution1, client: etcd3_revolution1.Client, max_retries: int = 64)[source]

A series of queries and updates to be executed atomically.

Use Etcd3Backend.txn() or Etcd3Watcher.txn() to construct transactions.

clear_watch() None

Stop all currently active watchers.

Deprecated: Use Etcd3Watcher instead.

commit() bool[source]

Commit the transaction to the database.

This can fail, in which case the transaction must get reset and built again.

Returns:

Whether the commit succeeded

create(path: str, value: str, lease: etcd3_revolution1.Lease | None = None) None[source]

Create a key and initialise it with the value.

Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision

delete(path: str, must_exist: bool = True, recursive: bool = False) None[source]

Delete the given key.

Parameters:
  • path – Path of key to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively (not used)

get(path: str) str[source]

Get value of a key.

Parameters:

path – Path of key to query

Returns:

Key value. None if it doesn’t exist.

list_keys(path: str, recurse: int = 0) list[str][source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Children depths to include in search

Returns:

sorted key list

loop(watch: bool = False, watch_timeout: float | None = None)

Repeat transaction execution, even if it succeeds.

Deprecated: Use Etcd3Watcher instead, or loop manually.

Parameters:
  • watch – Once the transaction succeeds, block until one of the values read changes, then loop the transaction

  • watch_timeout – timeout value

on_commit(callback: Callable[[], None]) None[source]

Register a callback to call when the transaction succeeds.

A bit of a hack, but occassionally useful to add additional side-effects to a transaction that are guaranteed to not get duplicated.

Parameters:

callback – Callback to call

reset(revision: DbRevision | None = None) None[source]

Reset the transaction, so it can be restarted after commit().

property revision: int

The last-committed database revision.

Only valid to call after the transaction has been comitted.

trigger_loop() None[source]

Manually triggers a loop

Effectively makes loop(True) behave like loop(False), looping immediately. This is useful for interrupting a blocking watch() from a different thread.

update(path: str, value: str) None[source]

Update an existing key. Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – Value to set

Raises:

ConfigVanished

watch() None[source]

Wait for a change on one of the values read.

Deprecated: Use Etcd3Watcher instead.

Returns:

The revision at which a change was detected.

class ska_sdp_config.backend.etcd3_revolution1.Etcd3Watch(backend: Etcd3BackendRevolution1, tagged_path: str, start_revision: int, prefix: bool, max_retries: int = 20, retry_time: float = 0.1)[source]

Wrapper for etc3 watch requests.

Entering the watcher using a with block yields a queue of (key, val, rev) triples.

start(queue: Queue | None = None) None[source]

Activates the watcher, yielding a queue for updates.

stop()[source]

Deactivates the watcher.

class ska_sdp_config.backend.etcd3_revolution1.Etcd3Watcher(backend: Etcd3BackendRevolution1, client: etcd3_revolution1.Client, timeout: float | None = None, txn_wrapper: TxnWrapper | None = None)[source]

Watch for database changes by using nested transactions

Use as follows:

for watcher in config.watcher():
    for txn in watcher.txn():
        # ... do something
    for txn in watcher.txn():
        # ... do something else

At the end of a for loop iteration, the watcher will start watching all values read by transactions started through txn(), and only repeat the execution of the loop body once one of these values has changed.

trigger() None[source]

Manually triggers a loop

Can be called from a different thread to force a loop, even if the watcher is currently waiting.

txn(max_retries: int = 64) Iterable[Etcd3Transaction | TxnWrapper][source]

Create nested transaction.

The watcher loop will iterate when any value read by transactions created by this method have changed in the database.

Note that these transactions otherwise behave exactly as normal transactions: As long as they are internally consistent, they will be commited. This means there is no consistency guarantees between transactions created from the same watcher, i.e. one transaction might read one value from the database while a later one reads another.

Parameters:

max_retries – Maximum number of times the transaction will be tried before giving up.

Etcd3 watcher

Memory backend

Memory backend for SKA SDP configuration DB.

The main purpose of this is for use in testing. In principle, it should behave in the same way as the etcd backend. No attempt has been made to make it thread-safe, so it probably isn’t.

class ska_sdp_config.backend.memory.MemoryBackend[source]

In-memory backend implementation, principally for testing.

close() None[source]

Close the resource. This does nothing.

create(path: str, value: str, lease: Lease | None = None) None[source]

Create a key and initialise it with the value.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision if the key already exists

delete(path: str, must_exist: bool = True, recursive: bool = False, prefix: bool = False, max_depth: int = 16) None[source]

Delete the given key or key range.

Parameters:
  • path – path (prefix) of keys to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively

  • max_depth – Recursion limit

  • prefix – Delete all keys at given level with prefix

get(path: str, revision: DbRevision | None = None) tuple[str, DbRevision][source]

Get value of a key.

Parameters:
  • path – Path of key to query

  • revision – to get

Returns:

value and revision

lease(ttl: float = 10) Lease[source]

Generate a dummy lease object.

Parameters:

ttl – time to live

Returns:

dummy lease object

list_keys(path: str, recurse: int = 0) list[str][source]

Get a list of the keys at the given path.

In common with the etcd backend, the structure is “flat” rather than a real hierarchy, even though it looks like one.

Parameters:
  • path – prefix of keys to query

  • recurse – maximum recursion level to query

Returns:

list of keys

txn(max_retries: int = 64) Iterable[MemoryTransaction][source]

Create an in-memory “transaction”.

Parameters:

max_retries – Maximum number of transaction loops

Returns:

transaction object

update(path: str, value: str) None[source]

Update an existing key. Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – New value of key

Raises:

ConfigVanished if the key does not exist

watcher(timeout: float | None = None, txn_wrapper: TxnWrapper | None = None) Watcher[source]

Create an in-memory “watcher”.

Parameters:
  • timeout – timeout in seconds

  • txn_wrapper – wrapper (factory) to return transaction

Returns:

MemoryWatcher object (mock of Etcd3Watcher)

class ska_sdp_config.backend.memory.MemoryTransaction(backend: Backend)[source]

Transaction wrapper around the backend implementation.

Transactions always succeed if they are valid, so there is no need to loop; however the iterator is supported for compatibility with the etcd backend.

commit() bool[source]

Commit the transaction. This does nothing.

create(path: str, value: str, lease: Lease | None = None) None[source]

Create a key and initialise it with the value.

Fails if the key already exists. If a lease is given, the key will automatically get deleted once it expires.

Parameters:
  • path – Path to create

  • value – Value to set

  • lease – Lease to associate

Raises:

ConfigCollision if the key already exists

delete(path: str, must_exist: bool = True, recursive: bool = False)[source]

Delete the given key.

Parameters:
  • path – Path of key to remove

  • must_exist – Fail if path does not exist?

  • recursive – Delete children keys at lower levels recursively (not used yet)

get(path: str) str[source]

Get value of a key.

Parameters:

path – Path of key to query

Returns:

Key value or None if it doesn’t exist.

list_keys(path: str, recurse: int = 0) list[str][source]

List keys under given path.

Parameters:
  • path – Prefix of keys to query. Append ‘/’ to list child paths.

  • recurse – Children depths to include in search

Returns:

sorted key list

loop(*_args, **_kwargs) None[source]

Loop the transaction. This does nothing.

reset(revision: DbRevision | None = None) None[source]

Reset the transaction. This does nothing.

update(path: str, value: str) None[source]

Update an existing key.

Fails if the key does not exist.

Parameters:
  • path – Path to update

  • value – Value to set

Raises:

ConfigVanished if the key is not found

class ska_sdp_config.backend.memory.MemoryWatcher(backend: Backend, timeout: float | None = None, txn_wrapper: TxnWrapper | None = None)[source]

Watcher wrapper around the backend implementation (Etcd3Watcher).

txn() Iterable[MemoryTransaction][source]

Yield the wrapped MemoryTransaction object.

It does not implement the commit check that is part of Etcd3Watcher.txn(), hence it acts as MemoryBackend.txn()