Source code for ska_sdp_config.entity.pb

"""Processing block configuration entities."""

import copy
import re

# Permit identifiers up to 64 bytes in length
_PB_ID_RE = re.compile("^[A-Za-z0-9\\-]{1,64}$")


[docs] class ProcessingBlock: """Processing block entity. Collects configuration information relating to a processing job for the SDP. This might be either real-time (supporting a running observation) or batch (to process data after the fact). Actual execution of processing steps will be performed by a (parameterised) processing script interpreting processing block information. """ def __init__( self, pb_id, eb_id, script, parameters=None, dependencies=None, **kwargs, ): """ Create a new processing block structure. :param pb_id: Processing block ID :param eb_id: Execution block ID (None if not associated with an EB) :param script: Processing script description (dictionary for now) :param parameters: Processing script parameters :param dependencies: Dependencies on other processing blocks (not for real-time processing) :param dct: Dictionary to load from (will ignore other parameters) :returns: ProcessingBlock object """ # pylint: disable=too-many-arguments # Get parameter dictionary self._dict = { "pb_id": str(pb_id), "eb_id": None if eb_id is None else str(eb_id), "script": dict(copy.deepcopy(script)), "parameters": {} if parameters is None else dict(copy.deepcopy(parameters)), "dependencies": [] if dependencies is None else list(copy.deepcopy(dependencies)), } self._dict.update(kwargs) # Validate if not set(self.script) >= {"kind", "name", "version"}: raise ValueError( "Processing script must specify kind, name and version!" ) if not _PB_ID_RE.match(self.pb_id): raise ValueError( f"Processing block ID {self.pb_id} not permissible!" )
[docs] def to_dict(self): """Return data as dictionary.""" return self._dict
@property def pb_id(self): """Return the processing block ID.""" return self._dict["pb_id"] @property def eb_id(self): """Return execution block instance ID, if associated with one.""" return self._dict.get("eb_id") @property def script(self): """Return information identifying the processing script.""" return self._dict["script"] @property def parameters(self): """Return processing script-specific parameters.""" return self._dict["parameters"] @property def dependencies(self): """Return dependencies on other processing blocks.""" return self._dict["dependencies"] def __repr__(self): """Build string representation.""" content = ", ".join([f"{k}={v!r}" for k, v in self._dict.items()]) return f"ProcessingBlock({content})" def __eq__(self, other): """Equality check.""" return self.to_dict() == other.to_dict()