API Documentation

The data access library is meant to enable the Science Data Processor to exchange tensor and table data between processing functions at high data rates. This is implemented using Apache Arrow data exchanged over a Apache Plasma shared memory object store.

Processing Functions

ska_sdp_dal.processor

Provides facilities for registering processing components

Each Processor registers a number of processing functions in the Plasma store, see common.make_call_schema(). You can especially use Plasma tensor and table objects as input and output parameters, see common.make_tensor_input_par(), common.make_tensor_output_par(), common.make_table_input_par() and common.make_table_output_par().

These processing functions can then be called from other processes using a caller.Caller instance connected to the same Plasma store.

class ska_sdp_dal.processor.Processor(procs: List[pyarrow.Schema], plasma_path: str, prefix: bytes = b'', name: Optional[str] = None)[source]

A high-level processor interface.

Should be subclassed for implementing a concrete processor, with methods for every processing function registered by the processor. When BasicProcessor.process() is called, the processor will wait for a matching function call to appear in the Plasma store and input parameters to become available (see BasicProcessor).

All parameters are then automatically retrieved and unpacked into Python objects. References to the Plasma store are represented using connection.TensorRef or connection.TableRef instances. These can be used for getting and setting input and output tensors and tables respectively.

Parameters
  • procs – Processing functions accepted by this processor

  • plasma_path – Socket path of the Apache Plasma store

  • prefix – Prefix to use for namespace.

  • name – Name to use for registering the processor in the store. Defaults to the class name.

class ska_sdp_dal.processor.LogProcessor(procs: List[pyarrow.Schema], plasma_path: str, prefix: bytes = b'', name: Optional[str] = None)[source]

Simple processor that just logs all calls.

Parameters
  • procs – Processing functions accepted by this processor

  • plasma_path – Socket path of the Apache Plasma store

  • prefix – Prefix to use for namespace

  • name – Name to use for registering the processor in the store. Defaults to the class name.

class ska_sdp_dal.processor.BasicProcessor(procs: List[pyarrow.Schema], plasma_path: str, prefix: bytes = b'', name: Optional[str] = None)[source]

Low-level processor interface.

Deprecated: Use Processor instead.

Should be sub-classed for implementing a concrete processor. When process() is called, the processor will wait for a matching function call to appear in the Plasma store and input parameters to become available. Once that’s the case, _process_call() will be called with the function name and a pyarrow.RecordBatch containing the parameters to the call.

Simple parameters can then be retrieved using :py:meth`parameter`, and input tensors using tensor_parameter()/tensor_parameters(). Output parameters can be set using output_tensor()

Parameters
  • procs – Processing functions accepted by this processor

  • plasma_path – Socket path of the Apache Plasma store

  • prefix – Prefix to use for namespace

  • name – Name to use for registering the processor in the store. Defaults to the class name.

abstract _process_call(proc_func: str, batch: pyarrow.RecordBatch)[source]
oid_parameter(batch: pyarrow.RecordBatch, name: str, allow_null: bool = False) pyarrow.plasma.ObjectID[source]

Extract Object ID parameter from first row of record batch.

Parameters
  • batch – Record batch containing parameter

  • name – Name of parameter to extract

  • allow_null – Value allowed to be null - will return None

output_tensor(batch, name, array, typ=None)[source]

Write output tensor to storage

Note that this is less efficient than constructing it in-place, which we should support at some point (TODO)

Parameters
  • batch – Record batch containing parameters

  • name – Name of parameter to extract

  • arr – Tensor as numpy array

  • typ – Tensor value type

parameter(batch: pyarrow.RecordBatch, name: str, typ: Optional[pyarrow.DataType] = None, allow_null: bool = False) Any[source]

Extract parameter from first row of record batch

Parameters
  • batch – Record batch containing parameter

  • name – Name of parameter to extract

  • typ – Type to check (optional)

  • allow_null – Value allowed to be null - will return None

property prefix

The Plasma prefix for calls to this processor.

process(timeout=None, catch_exceptions=True)[source]

Attempts to process a call.

Blocks if no call is currently available.

Parameters
  • timeout – Maximum time to block, in seconds

  • catch_exceptions – Whether exceptions thrown by _process_call() should be caught and logged (the default).

Returns

False if timeout expired, otherwise True

tensor_parameter(batch: pyarrow.RecordBatch, name: str, typ: Optional[pyarrow.DataType] = None, dim_names: Optional[List[str]] = None, allow_null: bool = False) pyarrow.Tensor[source]

Read tensor referred to via object ID parameter.

Parameters
  • batch – Record batch containing parameters

  • name – Name of parameter to extract

  • typ – Tensor value type to check (optional)

  • dim_names – Tensor dimensionality to check (optional)

tensor_parameters(batch: pyarrow.RecordBatch, tensor_specs: List[Tuple[str, pyarrow.DataType, List[str], bool]]) List[pyarrow.Tensor][source]

Read tensors referred to via object ID parameters.

Parameters
  • batch – Record batch containing parameters

  • tensor_specs – Either list of strings or list of tuples of form (name, type, dimensionality, allow_null). If given, type and dimensionality will be checked. If allow_null is set, the object ID is allowed to be null, in which case None will get returned instead of a tensor.

ska_sdp_dal.caller

class ska_sdp_dal.caller.Caller(procs: List[pyarrow.Schema], store: ska_sdp_dal.store.Store, broadcast: bool = False, minimum_processors: int = 1, processor_prefix: bytes = b'', max_attempts: int = 100, verbose: bool = False)[source]

Base class for calls to a processor.Processor class

The constructor will create methods according to the passed call schemas - both for single and for batch calls. The batch variant will expect a list of dictionaries, see batch_call().

Parameters
  • procs – Call schemas to support. Will be used to find a compatible processor.

  • store – Store area to use for calls (will use its Plasma client)

  • broadcast – Send calls to all matching processors?

  • minimum_processors – Raise an error if fewer processors are available

  • processor_prefix – Allow changing processors after initialisation?

  • max_attempts – Maximum attempts at resolving ObjectID collisions

  • verbose – Log information about found processors

batch_call(call_schema: pyarrow.Schema, calls: List[Dict[str, Any]]) List[Dict[str, ska_sdp_dal.connection.TensorRef]][source]

Create a number of calls to a function with the given schema.

Parameters
  • call_schema – Schema of the call

  • calls – List of parameter dictionaries

Returns

List of output parameter dictionaries per call (if broadcasting also per processor)

call(call_schema: pyarrow.Schema, *args, **kwargs) pyarrow.plasma.ObjectID[source]

Create a number of calls to a function with the given schema.

Both positioned and keyword arrays are supported, using the position and name of the parameter in the schema, respectively.

Parameters
  • call_schema – Schema of the call

  • args – List of parameters

  • kwargs – Dictionary of parameters

find_processors(verbose=False)[source]

Locate compatible processors.

Done automatically when the caller is constructed. Call again to refresh the list of processors to call. Typically used with broadcasting callers.

Parameters

verbose – Log information about found processors

property num_processors

The number of processors located by this caller

Storage

ska_sdp_dal.store

class ska_sdp_dal.store.Store(plasma_path: str, max_attempts: int = 10000, name: Optional[str] = None)[source]

A storage namespace within a Plasma store

Used for holding shared data objects, such as tensors and tables. These can be passed to processors.

property conn: ska_sdp_dal.connection.Connection
make_tensor_ref(oid: pyarrow.plasma.ObjectID, typ: Optional[pyarrow.DataType] = None, dim_names: Optional[List[str]] = None) ska_sdp_dal.connection.TensorRef[source]

Create a TensorRef object for an existing object in Plasma

Parameters
  • oid – Existing object ID

  • typ – Element datatype. If ComplexType, will convert.

  • dim_names – Dimension names

Returns

Reference to tensor

new_table_ref(schema: Optional[pyarrow.Schema] = None) ska_sdp_dal.connection.TableRef[source]

Allocate an Object ID for a new tensor in Plasma

Parameters
  • typ – Element datatype. If ComplexType, will convert.

  • dim_names – Dimension names

Returns

Reference to tensor

new_tensor_ref(typ: Optional[pyarrow.DataType] = None, dim_names: Optional[List[str]] = None) ska_sdp_dal.connection.TensorRef[source]

Allocate an Object ID for a new tensor in Plasma

Parameters
  • typ – Element datatype. If ComplexType, will convert.

  • dim_names – Dimension names

Returns

Reference to tensor

put_new_table(table: Union[pyarrow.Table, Mapping[str, pyarrow.ChunkedArray], Mapping[str, pyarrow.Array], Mapping[str, list]], schema: Optional[pyarrow.Schema] = None) ska_sdp_dal.connection.TableRef[source]

Allocate and create a new table in Plasma

See connection.TableRef.put() for notes about possible parameters.

Parameters
  • table – Table data

  • schema – Table schema

Returns

Reference to table

put_new_tensor(arr: numpy.ndarray, typ: Optional[pyarrow.DataType] = None, dim_names: Optional[List[str]] = None) ska_sdp_dal.connection.TensorRef[source]

Allocate and create a new tensor in Plasma

Parameters
  • arr – Data as numpy array

  • typ – Element datatype. If ComplexType, will convert.

  • dim_names – Dimension names

Returns

Reference to tensor

ska_sdp_dal.connection

class ska_sdp_dal.connection.Connection(plasma_path: str)[source]

A connection to a Plasma store.

Subscribes to events and uses it to maintain a list of objects in the store. We especially track namespaces.

property client: pyarrow.plasma.PlasmaClient
get_buffers(oids, timeout=None) pyarrow.plasma.PlasmaBuffer[source]

Retrieve object for given OIDs.

Uses a cache to prevent duplicated requests to the Plasma store.

Parameters
  • oids – Plasma object IDs

  • timeout – Time to wait for buffers to become available

Returns

Plasma buffer

get_ref_buffers(refs: List[ska_sdp_dal.connection.Ref], timeout: Optional[float] = None, auto_delete: bool = True) None[source]

Retrieves the buffers for multiple Plasma references at a time.

Blocks as long as any (!) of the objects have not been created yet.

Parameters
  • refs – References to retrieve buffer of

  • timeout – Maximum time this function is allowed to block.

  • auto_delete – Delete object in store when reference is dropped

Raises

TimeoutException

property namespace_meta: Dict[pyarrow.plasma.ObjectID, Dict[bytes, bytes]]
property namespace_procs: Dict[pyarrow.plasma.ObjectID, List[pyarrow.Schema]]
property namespaces: List[pyarrow.plasma.ObjectID]
object_exists(oid) bool[source]

Checks whether the given object ID is known to exist

Parameters

oid – Object ID to check

object_size(oid) bool[source]

Gets the size of the given object

Parameters

oid – Object ID to check

reserve_namespace(name: Optional[str] = None, procs: List[pyarrow.Schema] = [], prefix: bytes = b'') Tuple[bytes, pyarrow.plasma.PlasmaBuffer][source]

Reserve a new namespace within the Plasma store

This will automatically clear all objects with the given prefix

Parameters
  • name – Informative display name for namespace

  • procs – Call schemas supported (if any)

  • name – Metadata to associate with schema

  • prefix – Prefix for prefix

Returns

Prefix, buffer with declaration (to keep namespace alive)

update_obj_table(timeout: float = 0)[source]

Update the object table

Parameters

timeout – If given, allow blocking for up to the given time or until the next update happens.

Returns

A list of received update notifications

class ska_sdp_dal.connection.Ref(conn: ska_sdp_dal.connection.Connection, oid: pyarrow.plasma.ObjectID, auto_delete: bool = True, dependencies: List[ska_sdp_dal.connection.Ref] = [], references: List[ska_sdp_dal.connection.Ref] = [])[source]

Refers to an object in storage

Subclassed by type. Might not have been created yet. Can have two kinds of relationships with other objects:

  • dependency: Object is required for this object to be created. Must ensure objects stay alive until this object is found to be created. Typically refers to call objects.

  • reference: Object that is referenced from this object and must therefore be kept alive while this object is still needed.

add_dependency(ref: ska_sdp_dal.connection.Ref, timeout: float = 0) None[source]

Registers the identified object as a dependency.

This will ensure that the object is kept alive until we have retrieved the data for this object. Blocks if the object does not yet exist in the store.

Parameters
  • ref – Reference to register as dependency

  • timeout – Maximum time this method is allowed to block.

Raises

TimeoutException

add_reference(ref: Union[ska_sdp_dal.connection.Ref, pyarrow.plasma.ObjectID], timeout: float = 0) None[source]

Registers the identified object as referenced

This will ensure that the object is kept alive as long as this object is referenced. Might Block if the object does not yet exist in the Plasma store.

Parameters
  • ref – Reference to register as referenced

  • timeout – Maximum time this method is allowed to block.

Raises

TimeoutException

get_buffer(timeout: Optional[float] = None, auto_delete: bool = True) pyarrow.Buffer[source]

Get Arrow buffer for this Plasma reference.

Blocks if the object has not yet been created. :param timeout: Maximum time this method is allowed to block. :param auto_delete: Delete object in store when reference is dropped :raises: TimeoutException

property oid: pyarrow.plasma.ObjectID
class ska_sdp_dal.connection.TableRef(conn: ska_sdp_dal.connection.Connection, oid: pyarrow.plasma.ObjectID, schema: Optional[pyarrow.Schema] = None, auto_delete: bool = True)[source]

Refers to a Table in Plasma store.

Might not have been created yet - wraps an Object ID and expected type information.

get(timeout=None)[source]

Get Arrow table

Parameters

timeout – How long the function is allowed to block if object does not exist yet

get_awkward()[source]

Get table as Awkward array

This can generally be done without copying the data.

get_dict()[source]

Get table as Python dictionary

get_pandas(*args, **kwargs)[source]

Get table as Pandas dataframe

Parameters

kwargs – Parameters to panda conversion. See pyarrow.Table.to_pandas().

put(table: Union[pyarrow.Table, Mapping[str, pyarrow.ChunkedArray], Mapping[str, pyarrow.Array], Mapping[str, list]], max_chunksize=None)[source]

Write the given table into storage.

The table can be given as pandas DataFrame, dictionary of strings to pyarrow.Array or lists, which will be converted into the equivalent table. If the arrays are chunked, the chunks of all columns must match. See also pyarrow.table().

Parameters
  • table – Table to write.

  • max_chunksize – Maximum size of record batches to split table into

property schema: pyarrow.Schema
class ska_sdp_dal.connection.TensorRef(conn: ska_sdp_dal.connection.Connection, oid: pyarrow.plasma.ObjectID, typ: Optional[pyarrow.DataType] = None, dim_names: Optional[List[str]] = None, auto_delete: bool = True)[source]

Refers to a tensor in object storage.

Might not have been created yet - wraps an Object ID and expected type information.

property dim_names: List[str]
get(timeout=None)[source]

Retrieve the tensor from storage. Might block.

Parameters

timeout – Maximum time this method is allowed to block.

put(arr: Optional[numpy.ndarray] = None)[source]

Write the given value into storage.

Parameters

arr – Array to write to storage. Empty by default.

property typ: pyarrow.DataType
exception ska_sdp_dal.connection.TimeoutException(refs, timeout)[source]
property refs
property timeout

ska_sdp_dal.common

class ska_sdp_dal.common.ComplexType(real_type, complex_dtype, real_dtype)[source]

Pseudo-type to refer to complex values.

Use in place of arrow types.

to_pandas_dtype()[source]
ska_sdp_dal.common.NAMESPACE_DECL_SUFFIX = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

First object in namespace is expected to be its declaration

ska_sdp_dal.common.NAMESPACE_ID_SIZE = 4

Bytes of the Object ID we are going to use as namespace prefix

ska_sdp_dal.common.OBJECT_ID_SIZE = 20

Length of Plasma’s object IDs

ska_sdp_dal.common.OBJECT_ID_TYPE = 20

Arrow type used for representing Plasma object IDs

ska_sdp_dal.common.PROC_NAMESPACE_ARGV_META = b'proc:argv'

Metadata entry for namespace process arguments

ska_sdp_dal.common.PROC_NAMESPACE_META = b'proc:namespace'

Metadata entry for namespace name

ska_sdp_dal.common.PROC_NAMESPACE_PID_META = b'proc:pid'

Metadata entry for namespace process ID

ska_sdp_dal.common.call_name(schema: pyarrow.Schema) str[source]

Get call name from call schema

Parameters

schema – Call schema

ska_sdp_dal.common.complex128 = <ska_sdp_dal.common.ComplexType object>

Double-precision complex type

ska_sdp_dal.common.complex64 = <ska_sdp_dal.common.ComplexType object>

Single-precision complex type

ska_sdp_dal.common.from_numpy_dtype(dtype: Any) pyarrow.DataType[source]
ska_sdp_dal.common.is_namespace_decl(oid: pyarrow.plasma.ObjectID)[source]

Checks whether the given object ID declares a namespace.

ska_sdp_dal.common.make_call_schema(func_name: str, pars: List[par_spec], metadata={}) pyarrow.Schema[source]

Create schema for calls through the Plasma store.

Parameters
  • func_name – Function name

  • pars – List of parameters

  • metadata – Metadata to associate with schema

ska_sdp_dal.common.make_oid_input_par(name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {}) par_spec[source]

Create input Object ID parameter to pass to make_call_schema.

Marking the parameter as input means that the call will be delayed until an object with the given ID appears in the Plasma store.

Parameters
  • name – Parameter name

  • nullable – Allowed to be null?

  • metadata – Metadata dictionary to associate with field.

ska_sdp_dal.common.make_oid_output_par(name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {}) par_spec[source]

Create Object ID parameter to pass to make_call_schema.

The call will be skipped if all outputs already exist in the Plasma store.

Parameters
  • name – Parameter name

  • nullable – Allowed to be null?

  • metadata – Metadata dictionary to associate with field.

ska_sdp_dal.common.make_oid_par(name: str, nullable: bool = False, metadata: Dict[bytes, bytes] = {}) par_spec[source]

Create Object ID parameter to pass to make_call_schema.

Parameters
  • name – Parameter name

  • nullable – Allowed to be null?

  • metadata – Metadata dictionary to associate with field.

ska_sdp_dal.common.make_par(name: str, typ: pyarrow.DataType, nullable: bool = False, metadata: Dict[str, str] = {}) par_spec[source]

Create parameter declaration to pass to make_call_schema.

Parameters
  • name – Parameter name

  • typ – Arrow data type

  • nullable – Allowed to be null?

  • metadata – Metadata dictionary to associate with field.

ska_sdp_dal.common.make_table_input_par(name: str, table_schema: pyarrow.Schema, nullable: bool = False) par_spec[source]

Create input tensor parameter to pass to make_call_schema.

Marking the parameter as input means that the call will be delayed until a tensor with the given ID appears in the Plasma store.

Parameters
  • name – Parameter name

  • elem_type – Tensor element type

  • dim_names – Dimension names

  • nullable – Allowed to be null?

ska_sdp_dal.common.make_tensor_input_par(name: str, elem_type: pyarrow.DataType, dim_names: List[str], nullable: bool = False) par_spec[source]

Create input tensor parameter to pass to make_call_schema.

Marking the parameter as input means that the call will be delayed until a tensor with the given ID appears in the Plasma store.

Parameters
  • name – Parameter name

  • elem_type – Tensor element type

  • dim_names – Dimension names

  • nullable – Allowed to be null?

ska_sdp_dal.common.make_tensor_output_par(name: str, elem_type: pyarrow.DataType, dim_names: List[str], nullable: bool = False) par_spec[source]

Create input tensor parameter to pass to make_call_schema.

Marking the parameter as input means that the call will be delayed until a tensor with the given ID appears in the Plasma store.

Parameters
  • name – Parameter name

  • elem_type – Tensor element type

  • dim_names – Dimension names

  • nullable – Allowed to be null?

ska_sdp_dal.common.object_id_hex(oid: pyarrow.plasma.ObjectID) str[source]

Convert Object ID into a hexadecimal string representation

Parameters

oid – The Object ID to convert (as bytearray or ObjectID)

ska_sdp_dal.common.objectid_generator(prefix: bytes, size: int = 20) Iterator[bytes][source]

Generate ObjectIDs with a given prefix.

Parameters

prefix – Prefix as binary string

ska_sdp_dal.common.par_meta(field: pyarrow.Field) Optional[str][source]

Get parameter kind metadata from schema field

Parameters

schema – Field

Returns

Parameter kind, or None if not set

ska_sdp_dal.common.par_table_schema(field: pyarrow.Field) Optional[pyarrow.Schema][source]

Get table schma for a parameter

Parameters

field – Field to read metadata frmo

Returns

Table schema, or None if not set

ska_sdp_dal.common.par_tensor_dim_names(field: pyarrow.Field) Optional[List[str]][source]

Get tensor element type parameter

Parameters

field – Field to read metadata frmo

Returns

Parameter element type, or None if not set

ska_sdp_dal.common.par_tensor_elem_type(field: pyarrow.Field) Optional[pyarrow.DataType][source]

Get tensor element type parameter

Parameters

field – Field to read metadata from

Returns

Parameter element type, or None if not set

ska_sdp_dal.common.parse_hex_objectid(oid_str: str) bytes[source]

Parse an Object ID given as a hexadecimal string representation

Note that this allows Object IDs to have less than 20 bytes, i.e. partial Object IDs (prefixes) are parsed without error.

Parameters

oid_str – String representation

Returns

Object ID as binary string

ska_sdp_dal.common.schema_compatible(expected: pyarrow.Schema, actual: pyarrow.Schema) bool[source]

Checks for compatibility between (call) schemas.

This means that all expected fields are there and have the same types (including relevant metadata).

Parameters
  • expected – Expected schema

  • actual – Schema to check

Returns

Empty list if compatible, otherwise list of mismatches