"""Processing block configuration entities."""
import copy
import re
# Permit identifiers up to 64 bytes in length
_PB_ID_RE = re.compile("^[A-Za-z0-9\\-]{1,64}$")
[docs]
class ProcessingBlock:
"""Processing block entity.
Collects configuration information relating to a processing job for the
SDP. This might be either real-time (supporting a running observation) or
batch (to process data after the fact).
Actual execution of processing steps will be performed by a (parameterised)
processing script interpreting processing block information.
"""
def __init__(
self,
pb_id,
eb_id,
script,
parameters=None,
dependencies=None,
**kwargs,
):
"""
Create a new processing block structure.
:param pb_id: Processing block ID
:param eb_id: Execution block ID (None if not associated with an EB)
:param script: Processing script description (dictionary for now)
:param parameters: Processing script parameters
:param dependencies: Dependencies on other processing blocks (not for
real-time processing)
:param dct: Dictionary to load from (will ignore other parameters)
:returns: ProcessingBlock object
"""
# pylint: disable=too-many-arguments
# Get parameter dictionary
self._dict = {
"pb_id": str(pb_id),
"eb_id": None if eb_id is None else str(eb_id),
"script": dict(copy.deepcopy(script)),
"parameters": {}
if parameters is None
else dict(copy.deepcopy(parameters)),
"dependencies": []
if dependencies is None
else list(copy.deepcopy(dependencies)),
}
self._dict.update(kwargs)
# Validate
if not set(self.script) >= {"kind", "name", "version"}:
raise ValueError(
"Processing script must specify kind, name and version!"
)
if not _PB_ID_RE.match(self.pb_id):
raise ValueError(
f"Processing block ID {self.pb_id} not permissible!"
)
[docs]
def to_dict(self):
"""Return data as dictionary."""
return self._dict
@property
def pb_id(self):
"""Return the processing block ID."""
return self._dict["pb_id"]
@property
def eb_id(self):
"""Return execution block instance ID, if associated with one."""
return self._dict.get("eb_id")
@property
def script(self):
"""Return information identifying the processing script."""
return self._dict["script"]
@property
def parameters(self):
"""Return processing script-specific parameters."""
return self._dict["parameters"]
@property
def dependencies(self):
"""Return dependencies on other processing blocks."""
return self._dict["dependencies"]
def __repr__(self):
"""Build string representation."""
content = ", ".join([f"{k}={v!r}" for k, v in self._dict.items()])
return f"ProcessingBlock({content})"
def __eq__(self, other):
"""Equality check."""
return self.to_dict() == other.to_dict()