Source code for ska_sdp_config.entity.pb

"""Processing block configuration entities."""

import re
import copy

# Permit identifiers up to 64 bytes in length
_PB_ID_RE = re.compile("^[A-Za-z0-9\\-]{1,64}$")


[docs]class ProcessingBlock: """Processing block entity. Collects configuration information relating to a processing job for the SDP. This might be either real-time (supporting a running observation) or batch (to process data after the fact). Actual execution of processing steps will be performed by a (parameterised) workflow interpreting processing block information. """ # pylint: disable=dangerous-default-value # pylint: disable=redefined-builtin def __init__(self, id, sbi_id, workflow, parameters={}, dependencies=[], **kwargs): """ Create a new processing block structure. :param id: Processing block ID :param sbi_id: Scheduling block ID (None if not associated with an SBI) :param workflow: Workflow description (dictionary for now) :param parameters: Workflow parameters :param dependencies: Dependencies on other processing blocks (not for real-time processing) :param dct: Dictionary to load from (will ignore other parameters) :returns: ProcessingBlock object """ # Get parameter dictionary self._dict = { 'id': str(id), 'sbi_id': None if sbi_id is None else str(sbi_id), 'workflow': dict(copy.deepcopy(workflow)), 'parameters': dict(copy.deepcopy(parameters)), 'dependencies': list(copy.deepcopy(dependencies)) } self._dict.update(kwargs) # Validate if not set(self.workflow) >= {'type', 'id', 'version'}: raise ValueError("Workflow must specify type, ID and version!") if not _PB_ID_RE.match(self.id): raise ValueError("Processing block ID {} not permissible!".format( self.id))
[docs] def to_dict(self): """Return data as dictionary.""" return self._dict
@property def id(self): """Return the processing block ID.""" return self._dict['id'] @property def sbi_id(self): """Return scheduling block instance ID, if associated with one.""" return self._dict.get('sbi_id') @property def workflow(self): """Return information identifying the workflow.""" return self._dict['workflow'] @property def parameters(self): """Return workflow-specific parameters.""" return self._dict['parameters'] @property def dependencies(self): """Return dependencies on other processing blocks.""" return self._dict['dependencies'] def __repr__(self): """Build string representation.""" return "ProcessingBlock({})".format( ", ".join(["{}={}".format(k, repr(v)) for k, v in self._dict.items()])) def __eq__(self, other): """Equality check.""" return self.to_dict() == other.to_dict()