API Documentation¶
The data access library is meant to enable the Science Data Processor to exchange tensor and table data between processing functions at high data rates. This is implemented using Apache Arrow data exchanged over a Apache Plasma shared memory object store.
Processing Functions¶
ska_sdp_dal.processor¶
Provides facilities for registering processing components
Each Processor
registers a number of processing functions
in the Plasma store, see common.make_call_schema()
. You can
especially use Plasma tensor and table objects as input and output
parameters, see common.make_tensor_input_par()
,
common.make_tensor_output_par()
,
common.make_table_input_par()
and
common.make_table_output_par()
.
These processing functions can then be called from other processes
using a caller.Caller
instance connected to the same
Plasma store.
- class ska_sdp_dal.processor.Processor(procs: List[pyarrow.Schema], plasma_path: str, prefix: bytes = b'', name: Optional[str] = None)[source]¶
A high-level processor interface.
Should be subclassed for implementing a concrete processor, with methods for every processing function registered by the processor. When
BasicProcessor.process()
is called, the processor will wait for a matching function call to appear in the Plasma store and input parameters to become available (seeBasicProcessor
).All parameters are then automatically retrieved and unpacked into Python objects. References to the Plasma store are represented using
connection.TensorRef
orconnection.TableRef
instances. These can be used for getting and setting input and output tensors and tables respectively.- Parameters
procs – Processing functions accepted by this processor
plasma_path – Socket path of the Apache Plasma store
prefix – Prefix to use for namespace.
name – Name to use for registering the processor in the store. Defaults to the class name.
- class ska_sdp_dal.processor.LogProcessor(procs: List[pyarrow.Schema], plasma_path: str, prefix: bytes = b'', name: Optional[str] = None)[source]¶
Simple processor that just logs all calls.
- Parameters
procs – Processing functions accepted by this processor
plasma_path – Socket path of the Apache Plasma store
prefix – Prefix to use for namespace
name – Name to use for registering the processor in the store. Defaults to the class name.
- class ska_sdp_dal.processor.BasicProcessor(procs: List[pyarrow.Schema], plasma_path: str, prefix: bytes = b'', name: Optional[str] = None)[source]¶
Low-level processor interface.
Deprecated: Use
Processor
instead.Should be sub-classed for implementing a concrete processor. When
process()
is called, the processor will wait for a matching function call to appear in the Plasma store and input parameters to become available. Once that’s the case,_process_call()
will be called with the function name and apyarrow.RecordBatch
containing the parameters to the call.Simple parameters can then be retrieved using :py:meth`parameter`, and input tensors using
tensor_parameter()
/tensor_parameters()
. Output parameters can be set usingoutput_tensor()
- Parameters
procs – Processing functions accepted by this processor
plasma_path – Socket path of the Apache Plasma store
prefix – Prefix to use for namespace
name – Name to use for registering the processor in the store. Defaults to the class name.
- oid_parameter(batch: pyarrow.RecordBatch, name: str, allow_null: bool = False) pyarrow.plasma.ObjectID [source]¶
Extract Object ID parameter from first row of record batch.
- Parameters
batch – Record batch containing parameter
name – Name of parameter to extract
allow_null – Value allowed to be null - will return None
- output_tensor(batch, name, array, typ=None)[source]¶
Write output tensor to storage
Note that this is less efficient than constructing it in-place, which we should support at some point (TODO)
- Parameters
batch – Record batch containing parameters
name – Name of parameter to extract
arr – Tensor as numpy array
typ – Tensor value type
- parameter(batch: pyarrow.RecordBatch, name: str, typ: Optional[pyarrow.DataType] = None, allow_null: bool = False) Any [source]¶
Extract parameter from first row of record batch
- Parameters
batch – Record batch containing parameter
name – Name of parameter to extract
typ – Type to check (optional)
allow_null – Value allowed to be null - will return None
- property prefix¶
The Plasma prefix for calls to this processor.
- process(timeout=None, catch_exceptions=True)[source]¶
Attempts to process a call.
Blocks if no call is currently available.
- Parameters
timeout – Maximum time to block, in seconds
catch_exceptions – Whether exceptions thrown by
_process_call()
should be caught and logged (the default).
- Returns
False
if timeout expired, otherwiseTrue
- tensor_parameter(batch: pyarrow.RecordBatch, name: str, typ: Optional[pyarrow.DataType] = None, dim_names: Optional[List[str]] = None, allow_null: bool = False) pyarrow.Tensor [source]¶
Read tensor referred to via object ID parameter.
- Parameters
batch – Record batch containing parameters
name – Name of parameter to extract
typ – Tensor value type to check (optional)
dim_names – Tensor dimensionality to check (optional)
- tensor_parameters(batch: pyarrow.RecordBatch, tensor_specs: List[Tuple[str, pyarrow.DataType, List[str], bool]]) List[pyarrow.Tensor] [source]¶
Read tensors referred to via object ID parameters.
- Parameters
batch – Record batch containing parameters
tensor_specs – Either list of strings or list of tuples of form (name, type, dimensionality, allow_null). If given, type and dimensionality will be checked. If allow_null is set, the object ID is allowed to be null, in which case None will get returned instead of a tensor.
ska_sdp_dal.caller¶
- class ska_sdp_dal.caller.Caller(procs: List[pyarrow.Schema], store: ska_sdp_dal.store.Store, broadcast: bool = False, minimum_processors: int = 1, processor_prefix: bytes = b'', max_attempts: int = 100, verbose: bool = False)[source]¶
Base class for calls to a
processor.Processor
classThe constructor will create methods according to the passed call schemas - both for single and for batch calls. The batch variant will expect a list of dictionaries, see
batch_call()
.- Parameters
procs – Call schemas to support. Will be used to find a compatible processor.
store – Store area to use for calls (will use its Plasma client)
broadcast – Send calls to all matching processors?
minimum_processors – Raise an error if fewer processors are available
processor_prefix – Allow changing processors after initialisation?
max_attempts – Maximum attempts at resolving ObjectID collisions
verbose – Log information about found processors
- batch_call(call_schema: pyarrow.Schema, calls: List[Dict[str, Any]]) List[Dict[str, ska_sdp_dal.connection.TensorRef]] [source]¶
Create a number of calls to a function with the given schema.
- Parameters
call_schema – Schema of the call
calls – List of parameter dictionaries
- Returns
List of output parameter dictionaries per call (if broadcasting also per processor)
- call(call_schema: pyarrow.Schema, *args, **kwargs) pyarrow.plasma.ObjectID [source]¶
Create a number of calls to a function with the given schema.
Both positioned and keyword arrays are supported, using the position and name of the parameter in the schema, respectively.
- Parameters
call_schema – Schema of the call
args – List of parameters
kwargs – Dictionary of parameters
- find_processors(verbose=False)[source]¶
Locate compatible processors.
Done automatically when the caller is constructed. Call again to refresh the list of processors to call. Typically used with broadcasting callers.
- Parameters
verbose – Log information about found processors
- property num_processors¶
The number of processors located by this caller
Storage¶
ska_sdp_dal.store¶
- class ska_sdp_dal.store.Store(plasma_path: str, max_attempts: int = 10000, name: Optional[str] = None)[source]¶
A storage namespace within a Plasma store
Used for holding shared data objects, such as tensors and tables. These can be passed to processors.
- property conn: ska_sdp_dal.connection.Connection¶
- make_tensor_ref(oid: pyarrow.plasma.ObjectID, typ: Optional[pyarrow.DataType] = None, dim_names: Optional[List[str]] = None) ska_sdp_dal.connection.TensorRef [source]¶
Create a TensorRef object for an existing object in Plasma
- Parameters
oid – Existing object ID
typ – Element datatype. If ComplexType, will convert.
dim_names – Dimension names
- Returns
Reference to tensor
- new_table_ref(schema: Optional[pyarrow.Schema] = None) ska_sdp_dal.connection.TableRef [source]¶
Allocate an Object ID for a new tensor in Plasma
- Parameters
typ – Element datatype. If ComplexType, will convert.
dim_names – Dimension names
- Returns
Reference to tensor
- new_tensor_ref(typ: Optional[pyarrow.DataType] = None, dim_names: Optional[List[str]] = None) ska_sdp_dal.connection.TensorRef [source]¶
Allocate an Object ID for a new tensor in Plasma
- Parameters
typ – Element datatype. If ComplexType, will convert.
dim_names – Dimension names
- Returns
Reference to tensor
- put_new_table(table: Union[pyarrow.Table, Mapping[str, pyarrow.ChunkedArray], Mapping[str, pyarrow.Array], Mapping[str, list]], schema: Optional[pyarrow.Schema] = None) ska_sdp_dal.connection.TableRef [source]¶
Allocate and create a new table in Plasma
See
connection.TableRef.put()
for notes about possible parameters.- Parameters
table – Table data
schema – Table schema
- Returns
Reference to table
- put_new_tensor(arr: numpy.ndarray, typ: Optional[pyarrow.DataType] = None, dim_names: Optional[List[str]] = None) ska_sdp_dal.connection.TensorRef [source]¶
Allocate and create a new tensor in Plasma
- Parameters
arr – Data as numpy array
typ – Element datatype. If ComplexType, will convert.
dim_names – Dimension names
- Returns
Reference to tensor
ska_sdp_dal.connection¶
- class ska_sdp_dal.connection.Connection(plasma_path: str)[source]¶
A connection to a Plasma store.
Subscribes to events and uses it to maintain a list of objects in the store. We especially track namespaces.
- property client: pyarrow.plasma.PlasmaClient¶
- get_buffers(oids, timeout=None) pyarrow.plasma.PlasmaBuffer [source]¶
Retrieve object for given OIDs.
Uses a cache to prevent duplicated requests to the Plasma store.
- Parameters
oids – Plasma object IDs
timeout – Time to wait for buffers to become available
- Returns
Plasma buffer
- get_ref_buffers(refs: List[ska_sdp_dal.connection.Ref], timeout: Optional[float] = None, auto_delete: bool = True) None [source]¶
Retrieves the buffers for multiple Plasma references at a time.
Blocks as long as any (!) of the objects have not been created yet.
- Parameters
refs – References to retrieve buffer of
timeout – Maximum time this function is allowed to block.
auto_delete – Delete object in store when reference is dropped
- Raises
TimeoutException
- property namespace_procs: Dict[pyarrow.plasma.ObjectID, List[pyarrow.Schema]]¶
- property namespaces: List[pyarrow.plasma.ObjectID]¶
- object_exists(oid) bool [source]¶
Checks whether the given object ID is known to exist
- Parameters
oid – Object ID to check
- object_size(oid) bool [source]¶
Gets the size of the given object
- Parameters
oid – Object ID to check
- reserve_namespace(name: Optional[str] = None, procs: List[pyarrow.Schema] = [], prefix: bytes = b'') Tuple[bytes, pyarrow.plasma.PlasmaBuffer] [source]¶
Reserve a new namespace within the Plasma store
This will automatically clear all objects with the given prefix
- Parameters
name – Informative display name for namespace
procs – Call schemas supported (if any)
name – Metadata to associate with schema
prefix – Prefix for prefix
- Returns
Prefix, buffer with declaration (to keep namespace alive)
- class ska_sdp_dal.connection.Ref(conn: ska_sdp_dal.connection.Connection, oid: pyarrow.plasma.ObjectID, auto_delete: bool = True, dependencies: List[ska_sdp_dal.connection.Ref] = [], references: List[ska_sdp_dal.connection.Ref] = [])[source]¶
Refers to an object in storage
Subclassed by type. Might not have been created yet. Can have two kinds of relationships with other objects:
dependency: Object is required for this object to be created. Must ensure objects stay alive until this object is found to be created. Typically refers to call objects.
reference: Object that is referenced from this object and must therefore be kept alive while this object is still needed.
- add_dependency(ref: ska_sdp_dal.connection.Ref, timeout: float = 0) None [source]¶
Registers the identified object as a dependency.
This will ensure that the object is kept alive until we have retrieved the data for this object. Blocks if the object does not yet exist in the store.
- Parameters
ref – Reference to register as dependency
timeout – Maximum time this method is allowed to block.
- Raises
TimeoutException
- add_reference(ref: Union[ska_sdp_dal.connection.Ref, pyarrow.plasma.ObjectID], timeout: float = 0) None [source]¶
Registers the identified object as referenced
This will ensure that the object is kept alive as long as this object is referenced. Might Block if the object does not yet exist in the Plasma store.
- Parameters
ref – Reference to register as referenced
timeout – Maximum time this method is allowed to block.
- Raises
TimeoutException
- get_buffer(timeout: Optional[float] = None, auto_delete: bool = True) pyarrow.Buffer [source]¶
Get Arrow buffer for this Plasma reference.
Blocks if the object has not yet been created. :param timeout: Maximum time this method is allowed to block. :param auto_delete: Delete object in store when reference is dropped :raises: TimeoutException
- property oid: pyarrow.plasma.ObjectID¶
- class ska_sdp_dal.connection.TableRef(conn: ska_sdp_dal.connection.Connection, oid: pyarrow.plasma.ObjectID, schema: Optional[pyarrow.Schema] = None, auto_delete: bool = True)[source]¶
Refers to a Table in Plasma store.
Might not have been created yet - wraps an Object ID and expected type information.
- get(timeout=None)[source]¶
Get Arrow table
- Parameters
timeout – How long the function is allowed to block if object does not exist yet
- get_awkward()[source]¶
Get table as Awkward array
This can generally be done without copying the data.
- get_pandas(*args, **kwargs)[source]¶
Get table as Pandas dataframe
- Parameters
kwargs – Parameters to panda conversion. See
pyarrow.Table.to_pandas()
.
- put(table: Union[pyarrow.Table, Mapping[str, pyarrow.ChunkedArray], Mapping[str, pyarrow.Array], Mapping[str, list]], max_chunksize=None)[source]¶
Write the given table into storage.
The table can be given as pandas
DataFrame
, dictionary of strings topyarrow.Array
or lists, which will be converted into the equivalent table. If the arrays are chunked, the chunks of all columns must match. See alsopyarrow.table()
.- Parameters
table – Table to write.
max_chunksize – Maximum size of record batches to split table into
- property schema: pyarrow.Schema¶
- class ska_sdp_dal.connection.TensorRef(conn: ska_sdp_dal.connection.Connection, oid: pyarrow.plasma.ObjectID, typ: Optional[pyarrow.DataType] = None, dim_names: Optional[List[str]] = None, auto_delete: bool = True)[source]¶
Refers to a tensor in object storage.
Might not have been created yet - wraps an Object ID and expected type information.
- get(timeout=None)[source]¶
Retrieve the tensor from storage. Might block.
- Parameters
timeout – Maximum time this method is allowed to block.
- put(arr: Optional[numpy.ndarray] = None)[source]¶
Write the given value into storage.
- Parameters
arr – Array to write to storage. Empty by default.
- property typ: pyarrow.DataType¶
ska_sdp_dal.common¶
- class ska_sdp_dal.common.ComplexType(real_type, complex_dtype, real_dtype)[source]¶
Pseudo-type to refer to complex values.
Use in place of arrow types.
- ska_sdp_dal.common.NAMESPACE_DECL_SUFFIX = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'¶
First object in namespace is expected to be its declaration
- ska_sdp_dal.common.NAMESPACE_ID_SIZE = 4¶
Bytes of the Object ID we are going to use as namespace prefix
- ska_sdp_dal.common.OBJECT_ID_SIZE = 20¶
Length of Plasma’s object IDs
- ska_sdp_dal.common.OBJECT_ID_TYPE = 20¶
Arrow type used for representing Plasma object IDs
- ska_sdp_dal.common.PROC_NAMESPACE_ARGV_META = b'proc:argv'¶
Metadata entry for namespace process arguments
- ska_sdp_dal.common.PROC_NAMESPACE_META = b'proc:namespace'¶
Metadata entry for namespace name
- ska_sdp_dal.common.PROC_NAMESPACE_PID_META = b'proc:pid'¶
Metadata entry for namespace process ID
- ska_sdp_dal.common.call_name(schema: pyarrow.Schema) str [source]¶
Get call name from call schema
- Parameters
schema – Call schema
- ska_sdp_dal.common.complex128 = <ska_sdp_dal.common.ComplexType object>¶
Double-precision complex type
- ska_sdp_dal.common.complex64 = <ska_sdp_dal.common.ComplexType object>¶
Single-precision complex type
- ska_sdp_dal.common.is_namespace_decl(oid: pyarrow.plasma.ObjectID)[source]¶
Checks whether the given object ID declares a namespace.
- ska_sdp_dal.common.make_call_schema(func_name: str, pars: List[par_spec], metadata={}) pyarrow.Schema [source]¶
Create schema for calls through the Plasma store.
- Parameters
func_name – Function name
pars – List of parameters
metadata – Metadata to associate with schema
- ska_sdp_dal.common.make_oid_input_par(name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {}) par_spec [source]¶
Create input Object ID parameter to pass to make_call_schema.
Marking the parameter as input means that the call will be delayed until an object with the given ID appears in the Plasma store.
- Parameters
name – Parameter name
nullable – Allowed to be null?
metadata – Metadata dictionary to associate with field.
- ska_sdp_dal.common.make_oid_output_par(name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {}) par_spec [source]¶
Create Object ID parameter to pass to make_call_schema.
The call will be skipped if all outputs already exist in the Plasma store.
- Parameters
name – Parameter name
nullable – Allowed to be null?
metadata – Metadata dictionary to associate with field.
- ska_sdp_dal.common.make_oid_par(name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {}) par_spec [source]¶
Create Object ID parameter to pass to make_call_schema.
- Parameters
name – Parameter name
nullable – Allowed to be null?
metadata – Metadata dictionary to associate with field.
- ska_sdp_dal.common.make_par(name: str, typ: pyarrow.DataType, nullable: bool = False, metadata: Dict[str, str] = {}) par_spec [source]¶
Create parameter declaration to pass to make_call_schema.
- Parameters
name – Parameter name
typ – Arrow data type
nullable – Allowed to be null?
metadata – Metadata dictionary to associate with field.
- ska_sdp_dal.common.make_table_input_par(name: str, table_schema: pyarrow.Schema, nullable: bool = False) par_spec [source]¶
Create input tensor parameter to pass to make_call_schema.
Marking the parameter as input means that the call will be delayed until a tensor with the given ID appears in the Plasma store.
- Parameters
name – Parameter name
elem_type – Tensor element type
dim_names – Dimension names
nullable – Allowed to be null?
- ska_sdp_dal.common.make_tensor_input_par(name: str, elem_type: pyarrow.DataType, dim_names: List[str], nullable: bool = False) par_spec [source]¶
Create input tensor parameter to pass to make_call_schema.
Marking the parameter as input means that the call will be delayed until a tensor with the given ID appears in the Plasma store.
- Parameters
name – Parameter name
elem_type – Tensor element type
dim_names – Dimension names
nullable – Allowed to be null?
- ska_sdp_dal.common.make_tensor_output_par(name: str, elem_type: pyarrow.DataType, dim_names: List[str], nullable: bool = False) par_spec [source]¶
Create input tensor parameter to pass to make_call_schema.
Marking the parameter as input means that the call will be delayed until a tensor with the given ID appears in the Plasma store.
- Parameters
name – Parameter name
elem_type – Tensor element type
dim_names – Dimension names
nullable – Allowed to be null?
- ska_sdp_dal.common.object_id_hex(oid: pyarrow.plasma.ObjectID) str [source]¶
Convert Object ID into a hexadecimal string representation
- Parameters
oid – The Object ID to convert (as bytearray or ObjectID)
- ska_sdp_dal.common.objectid_generator(prefix: bytes, size: int = 20) Iterator[bytes] [source]¶
Generate ObjectIDs with a given prefix.
- Parameters
prefix – Prefix as binary string
- ska_sdp_dal.common.par_meta(field: pyarrow.Field) Optional[str] [source]¶
Get parameter kind metadata from schema field
- Parameters
schema – Field
- Returns
Parameter kind, or None if not set
- ska_sdp_dal.common.par_table_schema(field: pyarrow.Field) Optional[pyarrow.Schema] [source]¶
Get table schma for a parameter
- Parameters
field – Field to read metadata frmo
- Returns
Table schema, or None if not set
- ska_sdp_dal.common.par_tensor_dim_names(field: pyarrow.Field) Optional[List[str]] [source]¶
Get tensor element type parameter
- Parameters
field – Field to read metadata frmo
- Returns
Parameter element type, or None if not set
- ska_sdp_dal.common.par_tensor_elem_type(field: pyarrow.Field) Optional[pyarrow.DataType] [source]¶
Get tensor element type parameter
- Parameters
field – Field to read metadata from
- Returns
Parameter element type, or None if not set
- ska_sdp_dal.common.parse_hex_objectid(oid_str: str) bytes [source]¶
Parse an Object ID given as a hexadecimal string representation
Note that this allows Object IDs to have less than 20 bytes, i.e. partial Object IDs (prefixes) are parsed without error.
- Parameters
oid_str – String representation
- Returns
Object ID as binary string
- ska_sdp_dal.common.schema_compatible(expected: pyarrow.Schema, actual: pyarrow.Schema) bool [source]¶
Checks for compatibility between (call) schemas.
This means that all expected fields are there and have the same types (including relevant metadata).
- Parameters
expected – Expected schema
actual – Schema to check
- Returns
Empty list if compatible, otherwise list of mismatches