Source code for ska_sdp_config.config

"""High-level API for SKA SDP configuration."""

import os
import sys
from datetime import date
import json
from socket import gethostname

from . import backend as backend_mod, entity


[docs]class Config: """Connection to SKA SDP configuration.""" def __init__(self, backend=None, global_prefix='', owner=None, **cargs): """ Connect to configuration using the given backend. :param backend: Backend to use. Defaults to environment or etcd3 if not set. :param global_prefix: Prefix to use within the database :param owner: Dictionary used for identifying the process when claiming ownership. :param cargs: Backend client arguments """ self._backend = self._determine_backend(backend, **cargs) # Owner dictionary if owner is None: owner = { 'pid': os.getpid(), 'hostname': gethostname(), 'command': sys.argv } self.owner = dict(owner) # Prefixes assert global_prefix == '' or global_prefix[0] == '/' self.pb_path = global_prefix + '/pb/' self.sb_path = global_prefix + '/sb/' self.subarray_path = global_prefix + '/subarray/' self.deploy_path = global_prefix + '/deploy/' # Lease associated with client self._client_lease = None @property def backend(self): """ Get the backend database object. """ return self._backend @staticmethod def _determine_backend(backend, **cargs): # Determine backend if not backend: backend = os.getenv('SDP_CONFIG_BACKEND', 'etcd3') # Instantiate backend, reading configuration from environment/dotenv if backend == 'etcd3': if 'host' not in cargs: cargs['host'] = os.getenv('SDP_CONFIG_HOST', '127.0.0.1') if 'port' not in cargs: cargs['port'] = int(os.getenv('SDP_CONFIG_PORT', '2379')) if 'protocol' not in cargs: cargs['protocol'] = os.getenv('SDP_CONFIG_PROTOCOL', 'http') if 'cert' not in cargs: cargs['cert'] = os.getenv('SDP_CONFIG_CERT', None) if 'username' not in cargs: cargs['username'] = os.getenv('SDP_CONFIG_USERNAME', None) if 'password' not in cargs: cargs['password'] = os.getenv('SDP_CONFIG_PASSWORD', None) return backend_mod.Etcd3Backend(**cargs) elif backend == 'memory': return backend_mod.MemoryBackend() else: raise ValueError( "Unknown configuration backend {}!".format(backend))
[docs] def lease(self, ttl=10): """ Generate a new lease. Once entered can be associated with keys, which will be kept alive until the end of the lease. At that point a daemon thread will be started automatically to refresh the lease periodically (default seems to be TTL/4). :param ttl: Time to live for lease :returns: lease object """ return self._backend.lease(ttl)
@property def client_lease(self): """Return the lease associated with the client. It will be kept alive until the client gets closed. """ if self._client_lease is None: self._client_lease = self.lease() self._client_lease.__enter__() return self._client_lease
[docs] def txn(self, max_retries=64): """Create a :class:`Transaction` for atomic configuration query/change. As we do not use locks, transactions might have to be repeated in order to guarantee atomicity. Suggested usage is as follows: .. code-block:: python for txn in config.txn(): # Use txn to read+write configuration # [Possibly call txn.loop()] As the `for` loop suggests, the code might get run multiple times even if not forced by calling :meth:`Transaction.loop`. Any writes using the transaction will be discarded if the transaction fails, but the application must make sure that the loop body has no other observable side effects. :param max_retries: Number of transaction retries before a :class:`RuntimeError` gets raised. """ return TransactionFactory( self, self._backend.txn(max_retries=max_retries))
[docs] def close(self): """Close the client connection.""" if self._client_lease: self._client_lease.__exit__(None, None, None) self._client_lease = None self._backend.close()
def __enter__(self): """Scope the client connection.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Scope the client connection.""" self.close() return False
[docs]class TransactionFactory: """Helper object for making transactions.""" def __init__(self, config, txn): """Create transaction factory.""" self._config = config self._txn = txn def __iter__(self): """Create new transaction objects.""" for txn in self._txn: yield Transaction(self._config, txn)
[docs]def dict_to_json(obj): """Format a dictionary for writing it into the database. :param obj: Dictionary object to format :returns: String representation """ # We only write dictionaries (JSON objects) at the moment assert isinstance(obj, dict) # Export to JSON. No need to convert to ASCII, as the backend # should handle unicode. Otherwise we optimise for legibility # over compactness. return json.dumps( obj, ensure_ascii=False, indent=2, separators=(',', ': '), sort_keys=True)
[docs]class Transaction: """High-level configuration queries and updates to execute atomically.""" def __init__(self, config, txn): """Instantiate transaction.""" self._cfg = config self._txn = txn self._pb_path = config.pb_path self._sb_path = config.sb_path self._subarray_path = config.subarray_path self._deploy_path = config.deploy_path @property def raw(self): """Return transaction object for accessing database directly.""" return self._txn def _get(self, path): """Get a JSON object from the database.""" txt = self._txn.get(path) if txt is None: return None return json.loads(txt) def _create(self, path, obj, lease=None): """Set a new path in the database to a JSON object.""" self._txn.create(path, dict_to_json(obj), lease) def _update(self, path, obj): """Set a existing path in the database to a JSON object.""" self._txn.update(path, dict_to_json(obj))
[docs] def loop(self, wait=False, timeout=None): """Repeat transaction regardless of whether commit succeeds. :param wait: If transaction succeeded, wait for any read values to change before repeating it. :param timeout: Maximum time to wait, in seconds """ return self._txn.loop(wait, timeout)
[docs] def list_processing_blocks(self, prefix=""): """Query processing block IDs from the configuration. :param prefix: If given, only search for processing block IDs with the given prefix :returns: Processing block ids, in lexicographical order """ # List keys pb_path = self._pb_path keys = self._txn.list_keys(pb_path + prefix) # return list, stripping the prefix assert all([key.startswith(pb_path) for key in keys]) return list([key[len(pb_path):] for key in keys])
[docs] def new_processing_block_id(self, generator: str): """Generate a new processing block ID that is not yet in use. :param generator: Name of the generator :returns: Processing block ID """ # Find existing processing blocks with same prefix pb_id_prefix = "pb-{}-{}".format( generator, date.today().strftime('%Y%m%d')) existing_ids = self.list_processing_blocks(pb_id_prefix) # Choose ID that doesn't exist for pb_ix in range(100000): pb_id = "{}-{:05}".format(pb_id_prefix, pb_ix) if pb_id not in existing_ids: break if pb_ix >= 100000: raise RuntimeError("Exceeded daily number of processing blocks!") return pb_id
[docs] def get_processing_block(self, pb_id: str) -> entity.ProcessingBlock: """ Look up processing block data. :param pb_id: Processing block ID to look up :returns: Processing block entity, or None if it doesn't exist """ dct = self._get(self._pb_path + pb_id) if dct is None: return None return entity.ProcessingBlock(**dct)
[docs] def create_processing_block(self, pb: entity.ProcessingBlock): """ Add a new :class:`ProcessingBlock` to the configuration. :param pb: Processing block to create """ assert isinstance(pb, entity.ProcessingBlock) self._create(self._pb_path + pb.id, pb.to_dict())
[docs] def update_processing_block(self, pb: entity.ProcessingBlock): """ Update a :class:`ProcessingBlock` in the configuration. :param pb: Processing block to update """ assert isinstance(pb, entity.ProcessingBlock) self._update(self._pb_path + pb.id, pb.to_dict())
[docs] def get_processing_block_owner(self, pb_id: str) -> dict: """ Look up the current processing block owner. :param pb_id: Processing block ID to look up :returns: Processing block owner data, or None if not claimed """ dct = self._get(self._pb_path + pb_id + "/owner") if dct is None: return None return dct
[docs] def is_processing_block_owner(self, pb_id: str) -> bool: """ Check whether this client is owner of the processing block. :param pb_id: Processing block ID to look up :returns: Whether processing block exists and is claimed """ return self.get_processing_block(pb_id) is not None and \ self.get_processing_block_owner(pb_id) == self._cfg.owner
[docs] def take_processing_block(self, pb_id: str, lease): """ Take ownership of the processing block. :param pb_id: Processing block ID to take ownership of :raises: backend.ConfigCollision """ # Lease must be provided assert lease is not None # Provide information identifying this process self._create(self._pb_path + pb_id + "/owner", self._cfg.owner, lease)
[docs] def get_processing_block_state(self, pb_id: str) -> dict: """ Get the current processing block state. :param pb_id: Processing block ID :returns: Processing block state, or None if not present """ state = self._get(self._pb_path + pb_id + "/state") if state is None: return None return state
[docs] def create_processing_block_state(self, pb_id: str, state: dict): """ Create processing block state. :param pb_id: Processing block ID :param state: Processing block state to create """ self._create(self._pb_path + pb_id + "/state", state)
[docs] def update_processing_block_state(self, pb_id: str, state: dict): """ Update processing block state. :param pb_id: Processing block ID :param state: Processing block state to update """ self._update(self._pb_path + pb_id + "/state", state)
[docs] def get_deployment(self, deploy_id: str) -> entity.Deployment: """ Retrieve details about a cluster configuration change. :param deploy_id: Name of the deployment :returns: Deployment details """ dct = self._get(self._deploy_path + deploy_id) return entity.Deployment(**dct)
[docs] def list_deployments(self, prefix=""): """ List all current deployments. :returns: Deployment IDs """ # List keys keys = self._txn.list_keys(self._deploy_path + prefix) # return list, stripping the prefix assert all([key.startswith(self._deploy_path) for key in keys]) return list([key[len(self._deploy_path):] for key in keys])
[docs] def create_deployment(self, dpl: entity.Deployment): """ Request a change to cluster configuration. :param dpl: Deployment to add to database """ # Add to database assert isinstance(dpl, entity.Deployment) self._create(self._deploy_path + dpl.id, dpl.to_dict())
[docs] def delete_deployment(self, dpl: entity.Deployment): """ Undo a change to cluster configuration. :param dpl: Deployment to remove """ # Delete all data associated with deployment deploy_path = self._deploy_path + dpl.id for key in self._txn.list_keys(deploy_path, recurse=5): self._txn.delete(key)
[docs] def list_scheduling_blocks(self, prefix=""): """Query scheduling block IDs from the configuration. :param prefix: if given, only search for scheduling block IDs with the given prefix :returns: scheduling block IDs, in lexicographical order """ # List keys sb_path = self._sb_path keys = self._txn.list_keys(sb_path + prefix) # Return list, stripping the prefix assert all([key.startswith(sb_path) for key in keys]) return list([key[len(sb_path):] for key in keys])
[docs] def get_scheduling_block(self, sb_id: str) -> dict: """ Get scheduling block. :param sb_id: scheduling block ID :returns: scheduling block state """ state = self._get(self._sb_path + sb_id) return state
[docs] def create_scheduling_block(self, sb_id: str, state: dict): """ Create scheduling block. :param sb_id: scheduling block ID :param state: scheduling block state """ self._create(self._sb_path + sb_id, state)
[docs] def update_scheduling_block(self, sb_id: str, state: dict): """ Update scheduling block. :param sb_id: scheduling block ID :param state: scheduling block state """ self._update(self._sb_path + sb_id, state)
[docs] def list_subarrays(self, prefix=""): """Query subarray IDs from the configuration. :param prefix: if given, only search for subarray IDs with the given prefix :returns: subarray IDs, in lexicographical order """ # List keys subarray_path = self._subarray_path keys = self._txn.list_keys(subarray_path + prefix) # Return list, stripping the prefix assert all([key.startswith(subarray_path) for key in keys]) return list([key[len(subarray_path):] for key in keys])
[docs] def get_subarray(self, subarray_id: str) -> dict: """ Get subarray. :param subarray_id: subarray ID :returns: subarray state """ state = self._get(self._subarray_path + subarray_id) return state
[docs] def create_subarray(self, subarray_id: str, state: dict): """ Create subarray. :param subarray_id: subarray ID :param state: subarray state """ self._create(self._subarray_path + subarray_id, state)
[docs] def update_subarray(self, subarray_id: str, state: dict): """ Update subarray. :param subarray_id: subarray ID :param state: subarray state """ self._update(self._subarray_path + subarray_id, state)