Configuration

The LMC Queue Connector is configured using a QueueConnectorDescriptor containing one or more ExchangeDescriptor instances in JSON, where each exchange represents independent streams for processing.

The tango device can be configured after initializing via multiple approaches, see Tango Device.

Queue Connector Descriptor

A typical QueueConnectorDescriptor with a single ExchangeDescriptor consists of:

{
    "exchanges": [
        {
            "dtype": "<data type>",
            "shape": [ ... ],
            "source": { "type": "<source type>" } | [ ... ],
            "pipe": { "type": "<pipe type>" },
            "sink": { "type": "<sink type>" },
        }
    ]
}

Where:

  • dtype: Numpy dtype or Python type to deserialize to.

  • shape: Optional data dimensions of the source if data is arraylike.

  • source: Exchange input description for reading data.

  • pipe: Optional pipe function for stream operations.

  • sink: Exchange output description for writing data.

A compatible combination of dtype, shape and encoding must be chosen at runtime to initialize the exchange instance ready for streaming.

See below for summaries of each, Configuration Schema for schema and API for further details on available sources, sinks and pipes.

Data Dtype

The data type provided to an exchange may be either a python native built-in type (bool, int, float, str, object, bytes) or any name of a supported numpy generic class as listed below:

Descriptor Literal

DType

Tango ArgType

Notes

"bytes"

tuple [str, dtype('S')]

DevEncoded

Shape must be scalar. Data is not validated to ensure it matches the encoding.

"object_"

numpy.dtype('O')

N/A

Shape must be scalar. This type is reserved largely for container types such list, tuple and dict, but is compatible with all dtypes. For Tango, only compatible with sink TangoObjectScatterAttributeSink.

"str_"

numpy.dtype('<U')

DevString

Uses UTF-8 encoding.

"bool"

numpy.dtype('bool')

DevBoolean

"uint8"

numpy.dtype('uint8')

DevUChar

"uint16"

numpy.dtype('uint16')

DevUShort

"uint32"

numpy.dtype('uint32')

DevULong

"uint64"

numpy.dtype('uint64')

DevULong64

"int16"

numpy.dtype('int16')

DevShort

"int32"

numpy.dtype('int32')

DevLong

"int64" or "int"

numpy.dtype('int64')

DevLong64

"float32"

numpy.dtype('float32')

DevFloat

"float64" or "float"

numpy.dtype('float64')

DevDouble

"datetime64" or datetime64[**time precision**]

numpy.dtype('<M8[**time precision**]')

None

structured datatype (List[tuple(str, Scalar)])

numpy.dtype('[...]')

Any

For Tango, only compatible with sink TangoPointingSubscriptionSource.

Note

Tango DevEnum and DevPipeBlob are currently not supported.

In addition, the dtype may be specified as a numpy structured datatype, though this is limited to the In-Memory and Kafka data sources and sinks. For example, a list of Alt/Az coordinates might have descriptor dtype of [["alt", "float64"], ["az", "float64"]].

Note

When using a structured datatype and serialising to Kafka with "carray" encoding, a limited subset of field data types is allowed. Notably, datetime64 types are not supported (See this issue). There is no such limitation with "npy" encoding.

Data Shape

The optional exchange shape describes the maximum dimensions size for an array of values. An empty list (default) indicates a scalar, and up to a single negative shape value may be used to indicate a dynamically sized dimension.

Using a Tango source or sink will limit the number of dimensions to 2 as Tango only supports the SCALAR format, the SPECTRUM format for 1D arrays, and the IMAGE format or 2D arrays. Specialized sink TangoArrayScatterAttributeSink can work around this limitation by using array flattening and generating a separate attribute of the original shape.

Note

Tango attributes require a maximum dimension size, thus Tango sinks currently do not support negative values in the data shape.

Note

When working with multi-dimensional str data, the Tango DeviceProxy will return data as a list rather than a numpy.ndarray.

Data Source(s)

A DataSource is an extensible component of the LMC Queue Connector representing a location the data can be asynchronously pulled from. The data source will convert incoming data to the dtype specified in the exchange descriptor.

Multiple data sources are supported on a single exchange and are merged into a single stream.

Data Pipe

An optional DataPipe is an extensible component of the LMC Queue Connector representing a stream operation performed on data in-between the source and sink. Outside of simple functions, this interface also allows for changing the data shape itself using stream buffering, aggregation, splitting functions.

Data Sink(s)

A DataSink is an extensible component of the LMC Queue Connector representing a location that data can be asynchronously (or synchronously) pushed to. The type data into the sink is determined by the dtype specified in the exchange descriptor and it’s shape is calculated by the Data Pipe.

Note

Multiple data sinks are currently not supported on a single exchange (but trivial to implement). A temporary workaround to this is to use duplicate exchanges differing by sink.

Source/Sink Compatibility

Below is a complete compatibility matrix currently supported by the various sink and source combinations.

Sink/Source
bytes
object
int16
int32
int64
uint8
uint16
uint32
uint64
float32
float64
complex64
complex128
datetime64
struct

KafkaConsumerSource

TangoSubscriptionSource

TangoPointingSubscriptionSource

KafkaProducerSink

TangoLocalAttributeSink

TangoArrayScatterSink

TangoObjectScatterAttributeSink

Data Encoding

Certain sinks, sources and data using raw byte buffers may require selecting a suitable Encoding string.

Bytes Data Encoding

The bytes data format is unique in that an encoding string is paired and travels with the bytes value, similar to tango.DevEncoded.

Note

The encoding parameter is optional for KafkaProducerSink when using the bytes data type. No encoding or decoding is performed when used by KafkaConsumerSource or KafkaProducerSink.

Kafka Encoding

Since Kafka Topics only store raw bytes, KafkaProducerSink and KafkaConsumerSource must provide an encoding to describe how to read from/write to raw bytes.

Supported encoding/dtype/shape combinations are described in the below table:

Encoding

Compatible Dtypes

Compatible Shapes

Notes

"python" (default)

"str", "dict", "bytes", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]"

[], [x], [x, y] (only [] for bytes and dict types)

Data is serialized using repr(data).encode('utf-8') and deserialized using ast.literal_eval(data.decode('utf-8')).

"utf-8"

"str" "bytes", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]"

[]

Data is serialized using str(data).encode('utf-8') and deserialized using dtype(data).

"ascii"

"str", "bytes", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]"

[]

Data is serialized using str(data).encode('ascii') and deserialized using dtype(data).

"json"

"str", "dict", "bytes", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]"

[], [x], [x, y] (only [] for bytes and dict types)

Data is serialized using json.dumps(data).encode('utf-8') and deserialized using json.loads(data).

"msgpack_numpy"

"str", "dict", "bytes", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]"

[], [x], [x, y] (only [] for bytes and dict types)

Data is serialized using msgpack.packb(value, default=msgpack_numpy.encode) and deserialized using msgpack.unpackb(value, object_hook=msgpack_numpy.decode)

"carray"

"bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]", List[tuple(str, Scalar)]

[], [x], [x, y]

Writes and zero-copy reads raw numpy buffer in row-major order, without any dimension information. KafkaConsumerSource will attempt to reshape according to the provided shape information.

"npy"

"str", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]", List[tuple(str, Scalar)]

[], [x], [x, y]

Uses np.save() and np.load(), includes dimension information.

any string

"bytes"

[]

If specified, the KafkaProducerSink will validate that the encoding of data matches the encoding from associated DataSource. For the TangoSubscriptionSource, this is set by Tango clients when they write a value. For KafkaConsumerSource this is specified in the configuration.

None

"bytes"

[]