High-Level API

High-level API for SKA SDP configuration.

class ska_sdp_config.config.Config(backend=None, global_prefix: str = '', owner: dict | Owner | None = None, component_name: str | None = None, wrapper: TxnWrapper | None = None, owned_entity: tuple[str, str] | None = None, **cargs)[source]

Connection to SKA SDP configuration.

property backend: Backend

Get the backend database object.

property client_lease: Lease

Return the lease associated with the client.

It will be kept alive until the client gets closed.

close() None[source]

Close the client connection.

is_alive() bool[source]

Is the connection alive in the sense that the keep-alive key exists?

Returns:

whether it is

lease(ttl=10) Lease[source]

Generate a new lease.

Once entered can be associated with keys, which will be kept alive until the end of the lease. At that point a daemon thread will be started automatically to refresh the lease periodically (default seems to be TTL/4).

Parameters:

ttl – Time to live for lease

Returns:

lease object

revoke_lease() None[source]

Revokes the lease internally held by this client, if any. Shouldn’t normally be called by users, but is useful for tests.

set_alive() None[source]

Set the keep-alive key.

txn(max_retries: int = 64) Iterable[Transaction | TxnWrapper][source]

Create a Transaction for atomic configuration query/change.

As we do not use locks, transactions might have to be repeated in order to guarantee atomicity. Suggested usage is as follows:

for txn in config.txn():
    # Use txn to read+write configuration
    # [Possibly call txn.loop()]

As the for loop suggests, the code might get run multiple times even if not forced by calling Transaction.loop(). Any writes using the transaction will be discarded if the transaction fails, but the application must make sure that the loop body has no other observable side effects.

See also Usage Guide for best practices for using transactions.

Parameters:

max_retries – Number of transaction retries before a RuntimeError gets raised.

watcher(timeout: float | None = None) Iterable[Watcher][source]

Create a new watcher.

Useful for waiting for changes in the configuration. Calling Etcd3Watcher.txn() on the returned watchers will create Transaction objects just like txn().

See also Usage Guide for best practices for using watchers.

Parameters:

timeout – Timeout for waiting. Watcher will loop after this time.

exception ska_sdp_config.config.SdpConfigDeprecationWarning[source]

Used to mark an sdp-config-db method deprecated

class ska_sdp_config.config.Transaction(owner: Owner, owned_entity: tuple[str, str] | None, lease_getter: Callable[[], Lease], txn: DbTransaction, global_prefix: str)[source]

High-level configuration queries and updates to execute atomically.

arbitrary: ArbitraryOperations

Operations over arbitrary paths

component: ComponentOperations

Operations over SDP components

controller: ControllerOperations

Operations over the LMC Controller

create_controller(state: dict) None[source]

Create controller state.

Parameters:

state – controller state

create_deployment(dpl: Deployment) None[source]

Request a change to cluster configuration.

Parameters:

dpl – Deployment to add to database

create_deployment_state(deploy_id: str, state: dict) None[source]

Create Deployment state.

Parameters:
  • deploy_id – Deployment ID

  • state – Deployment state to create

create_execution_block(eb_id: str, state: dict) None[source]

Create execution block.

Parameters:
  • eb_id – execution block ID

  • state – execution block state

create_is_alive(key: str) str[source]

Create an “is_alive” entry.

Parameters:

key – “is alive” key in database e.g. “lmc-controller/owner”

Returns:

the full path of the entry

create_processing_block(pblock: ProcessingBlock) None[source]

Add a new ProcessingBlock to the configuration.

Parameters:

pblock – Processing block to create

create_processing_block_state(pb_id: str, state: dict) None[source]

Create processing block state.

Parameters:
  • pb_id – Processing block ID

  • state – Processing block state to create

create_script(kind: str, name: str, version: str, script: dict) None[source]

Create processing script definition.

Parameters:
  • kind – script kind

  • name – script name

  • version – script version

  • script – script definition

create_subarray(subarray_id: str, state: dict) None[source]

Create subarray state.

Parameters:
  • subarray_id – subarray ID

  • state – subarray state

delete_deployment(dpl: Deployment) None[source]

Undo a change to cluster configuration.

Parameters:

dpl – Deployment to remove

delete_execution_block(eb_id: str, recurse: bool = True) None[source]

Delete an execution block (eb)

Parameters:
  • eb_id – Execution block ID

  • recurse – if True, run recursive query and delete all objects

delete_processing_block(pb_id: str, recurse: bool = True) None[source]

Delete a processing block (pb)

Parameters:
  • pb_id – Processing block ID

  • recurse – if True, run recursive query and delete all includes deleting /state and /owner of pb if exists

delete_script(kind: str, name: str, version: str) None[source]

Delete processing script definition.

Parameters:
  • kind – script kind

  • name – script name

  • version – script version

deployment: DeploymentOperations

Operations over Deployments

execution_block: ExecutionBlockOperations

Operations over Execution Blocks

get_controller() dict | None[source]

Get controller state.

Returns:

controller state

get_deployment(deploy_id: str) Deployment | None[source]

Retrieve details about a cluster configuration change.

Parameters:

deploy_id – Name of the deployment

Returns:

Deployment details

get_deployment_state(deploy_id: str) dict | None[source]

Get the current Deployment state.

Parameters:

deploy_id – Deployment ID

Returns:

Deployment state, or None if not present

get_execution_block(eb_id: str) dict[source]

Get execution block.

Parameters:

eb_id – execution block ID

Returns:

execution block state

get_processing_block(pb_id: str) ProcessingBlock | None[source]

Look up processing block data.

Parameters:

pb_id – Processing block ID to look up

Returns:

Processing block entity, or None if it doesn’t exist

get_processing_block_owner(pb_id: str) dict | None[source]

Look up the current processing block owner.

Parameters:

pb_id – Processing block ID to look up

Returns:

Processing block owner data, or None if not claimed

get_processing_block_state(pb_id: str) dict | None[source]

Get the current processing block state.

Parameters:

pb_id – Processing block ID

Returns:

Processing block state, or None if not present

get_script(kind: str, name: str, version: str) dict | None[source]

Get processing script definition.

Parameters:
  • kind – script kind

  • name – script name

  • version – script version

Returns:

script definition

get_subarray(subarray_id: str) dict | None[source]

Get subarray state.

Parameters:

subarray_id – subarray ID

Returns:

subarray state

is_alive(key: str) bool[source]

Check if the “is alive” key still exists.

Parameters:

key – “is alive” key in database e.g. “lmc-controller/owner”

Returns:

True if it does

is_processing_block_owner(pb_id: str) bool[source]

Check whether this client is owner of the processing block.

Parameters:

pb_id – Processing block ID to look up

Returns:

Whether processing block exists and is claimed

list_deployments(prefix: str = '') list[str][source]

List all current deployments.

Returns:

Deployment IDs

list_execution_blocks(prefix: str = '') list[str][source]

Query execution block IDs from the configuration.

Parameters:

prefix – if given, only search for execution block IDs with the given prefix

Returns:

execution block IDs, in lexicographical order

list_processing_blocks(prefix: str = '') list[str][source]

Query processing block IDs from the configuration.

Parameters:

prefix – If given, only search for processing block IDs with the given prefix

Returns:

Processing block ids, in lexicographical order

list_scripts(kind: str = '', name: str = '') list[tuple[str]][source]

List processing script definitions.

Parameters:
  • kind – script kind. Default empty

  • name – script name. Default empty

Returns:

list of script definitions

list_subarrays(prefix: str = '') list[str][source]

Query subarray IDs from the configuration.

Parameters:

prefix – if given, only search for subarray IDs with the given prefix

Returns:

subarray IDs, in lexicographical order

loop(wait: bool = False, timeout: float | None = None) None[source]

Repeat transaction regardless of whether commit succeeds.

Parameters:
  • wait – If transaction succeeded, wait for any read values to change before repeating it.

  • timeout – Maximum time to wait, in seconds

new_execution_block_id(generator: str) str[source]

Generate a new execution block ID that is not yet in use.

Parameters:

generator – Name of the generator

Returns:

execution block ID

new_processing_block_id(generator: str) str[source]

Generate a new processing block ID that is not yet in use.

Parameters:

generator – Name of the generator

Returns:

Processing block ID

processing_block: ProcessingBlockOperations

Operations over Processing Blocks

property raw: DbTransaction

Return transaction object for accessing database directly.

script: ScriptOperations

Operations over Scripts

property self: EntityOperations | None

Fast access to entity identified when creating the parent Config

subarray: SubarrayOperations

Operations over Subarrays

take_processing_block(pb_id: str) None[source]

Take ownership of the processing block.

Parameters:

pb_id – Processing block ID to take ownership of

Raises:

backend.ConfigCollision

update_controller(state: dict) None[source]

Update controller state.

Parameters:

state – controller state

update_deployment_state(deploy_id: str, state: dict) None[source]

Update Deployment state.

Parameters:
  • deploy_id – Deployment ID

  • state – Deployment state to update

update_execution_block(eb_id: str, state: dict) None[source]

Update execution block.

Parameters:
  • eb_id – execution block ID

  • state – execution block state

update_processing_block(pblock: ProcessingBlock) None[source]

Update a ProcessingBlock in the configuration.

Parameters:

pblock – Processing block to update

update_processing_block_state(pb_id: str, state: dict) None[source]

Update processing block state.

Parameters:
  • pb_id – Processing block ID

  • state – Processing block state to update

update_script(kind: str, name: str, version: str, script: dict) None[source]

Update processing script definition.

Parameters:
  • kind – script kind

  • name – script name

  • version – script version

  • script – script definition

update_subarray(subarray_id: str, state: dict) None[source]

Update subarray state.

Parameters:
  • subarray_id – subarray ID

  • state – subarray state

ska_sdp_config.config.deprecated(subobject_name)[source]

Marks a method as deprecated in favour of using the Transaction sub-objects