Usage Guide

In the SKA SDP architecture, processing is meant to be orchestrated by execution engines, which perform I/O and delegate any actual work to processing functions. The data access library is therefore geared towards this type of working, with:

  • Processors (see processor) passively providing processing functions

  • Callers (see caller) actively managing the Plasma store and issuing calls to processors.

Processing functions are called by name, which are mapped to a parameter schema (see make_call_schema()).

A simple processor

A simple call schemas could look like follows:

import ska_sdp_dal as dal
import numpy as np
import pyarrow

TEST_SCHEMAS = [
    dal.make_call_schema('simple_fn', [
        dal.make_par('x', pyarrow.int64()),
        dal.make_tensor_input_par('ys', pyarrow.int64(), ['i']),
        dal.make_tensor_output_par('zs', pyarrow.int64(), ['i']),
    ])
]

This declares a processing function that takes an integer value for parameter 'x', a one-dimensional integer tensor for parameter 'ys', and writes a one-dimensional integer for parameter 'zs'. We could define this processor as follows by sub-classing Processor:

class TestProcessor(dal.Processor):
    def __init__(self, **kwargs):
        super(TestProcessor, self).__init__(TEST_SCHEMAS, **kwargs)
    def simple_fn(self, x :int, ys :dal.TensorRef, zs :dal.TensorRef):
        zs.put(x + ys.get())

The get() method returns a numpy.ndarray, so the above method simply adds x to all values in ys and writes the result to zs using put().

To allow running the procesor, we could define a simple event loop:

import sys

if __name__ == "__main__":
    proc = TestProcessor(plasma_path=sys.argv[1])
    while True:
        try:
            proc.process()
        except KeyboardInterrupt:
            exit(0)

Assuming we have started the processor with a backing Plasma store, we can now issue calls using a Caller:

store = dal.Store(plasma_path=sys.argv[1])
caller = dal.Caller(TEST_SCHEMAS, store)

result = caller.simple_fn(1, np.arange(100))
print(result['zs'].get())

Parameters can be passed both by name and by position here - in the latter case parameters are expected in the order they appear in the schema. If output parameters (here zs) are omitted, suitable references are automatically created and passed to the call. In either case, a dictionary with all output parameters is returned.

References

Both parameters passed to the processor as well as values returned by the caller are references to objects stored in Plasma. These objects are only copied into the shared memory space once, from there on out we can pass them around and use them as numpy arrays without incurring another copy.

This conversation happens automatically, but we can also manually allocate using the Store object:

ys = store.put_new_tensor(np.arange(100))
print(caller.simple_fn(1, ys)['zs'].get())
print(caller.simple_fn(2, ys)['zs'].get())
print(caller.simple_fn(3, ys)['zs'].get())

In this case, ys is only allocated in Plasma once, and passed to all calls without making a copy.

As the same principle applies to returned tensors, we can also do the following:

zs1 = caller.simple_fn(1, np.arange(100))['zs']
zs2 = caller.simple_fn(2, zs1)['zs']
zs3 = caller.simple_fn(3, zs2)['zs']
print(zs3.get())

In this case the tensor is passed from one function to the other purely using the Plasma store - the caller never touches it. Note that the caller especially will only wait once get() is called – it effectively works like a future in this context. Or put another way: We are effectively writing a small graph of processing function calls in the Plasma store. This is quite desirable to reduce the overhead of individual calls, however reduces control over the amount of memory used.

Batch Calls

To further reduce calling overhead, we can also issue many calls to the same processing function at the same time. The first example from the last section could also be written as follows:

ys = store.put_new_tensor(np.arange(100))
results = caller.simple_fn_batch([
  dict(x=1, ys=ys), dict(x=2, ys=ys), dict(x=3, ys=ys)
])
for result in results:
   print(result['zs'].get())

This will submit all three requests at the same time to the processor, which is slightly more efficient.

Broadcast

A Caller can issue calls to many processors at the same time:

import sys
import time

store = dal.Store(plasma_path=sys.argv[1])
caller = dal.Caller(TEST_SCHEMAS, store, broadcast=True, minimum_processors=0)

ys = store.put_new_tensor(np.arange(100))
for i in range(1000):
    caller.find_processors()
    results = caller.simple_fn(i, ys)
    print(f"Have {len(results} processors")
    time.sleep(1)

This caller will refresh its list of processors every second, selecting all the ones that accept the calls specified in TEST_SCHEMAS. The broadcast flag passed to Caller means that simple_fn will now return a list with results per call target.

Note that this clearly makes little sense for the processor we have constructed here. However, it might make sense in cases where we want to stream data to an unknown number of consumers without caring about the result. In this case, the recommended practice is to have zero-dimensional tag output parameters.

Scoping

Why do we need such a tag output parameter?

It is important to not forget that the caller is managing the lifecycle of all objects – if something goes out of scope at the caller, it will be removed from the Plasma store. Consider what would happen if we did not capture the result of a call:

caller.simple_fn(1, np.arange(100))

As this call happens asynchronously, the code will issue the call in the store and immediately return. As we are not holding onto any references to the call, the reference count on the returned reference will go to zero, which in turn means there’s no reference to the issued call left. Therefore the issued call will be deleted immediately, possibly before a processor can pick it up.

This gives the caller a lot of control over what gets executed. For instance, we could do the following:

ys = store.put_new_tensor(np.arange(100))
results = [ caller.simple_fn(i, ys) for i in range(100) ]

time.sleep(0.01)
results = [ result['zs'].get(timeout=0) for result in results]

This will run for 10 ms, then collect any results that have been finished so far. Additionally, by overwriting the only remaining references to the remaining calls, any outstanding calls will be implicitly cancelled. Note that this would not work with batch calls, as the processor would pick up all invocations at the same time.