Descriptor Configuration

Warning

Configuration via class descriptors is disabled by default in 5.0.0. To re-enable, set experimental_flow device attribute to False. For configuring in SDP 0.24 onwards, see Flow Configuration.

The LMC Queue Connector is configured using a QueueConnectorDescriptor containing one or more ExchangeDescriptor instances in JSON, where each exchange represents independent streams for processing.

The tango device can be configured after initializing via multiple approaches, see Tango Device.

Queue Connector Descriptor

The Queue Connector Descriptor is the root level config document that IO is performed with.

It describes a collection of ExchangeDescriptor that each stop, start and run together automatically.

More on exchanges below.

Exchange Descriptor

A typical QueueConnectorDescriptor with a single ExchangeDescriptor consists of:

{
    "exchanges": [
        {
            "dtype": "<data type>",
            "shape": [ ... ],
            "source": { "type": "<source type>" } | [ ... ],
            "pipe": { "type": "<pipe type>" },
            "sink": { "type": "<sink type>" },
        }
    ]
}

Exchanges contain the following attributes:

  • dtype: Numpy dtype or Python type to deserialize to.

  • shape: Optional data dimensions of the source if data is arraylike.

  • source: Input description for reading data.

  • pipe: Optional pipe function for stream operations.

  • sink: Output description for writing data.

A compatible combination of dtype, shape, and format must be chosen at runtime to initialize the exchange ready for streaming.

See below for detailed summaries of each, and API for further details on available sources, sinks, and pipes.

Data Dtype

The data type provided to an exchange may be either a python native built-in type (bool, int, float, str, object, bytes) or any name of a supported numpy generic class as listed below:

Descriptor Literal

DType

Tango ArgType

Notes

"bytes"

tuple [str, dtype('S')]

DevEncoded

Shape must be scalar. Data is not validated to ensure it matches the format.

"object_"

numpy.dtype('O')

N/A

Shape must be scalar. This type is reserved largely for container types such as list, tuple, and dict, but is compatible with all dtypes. For Tango, it is only compatible with sink TangoObjectScatterAttributeSink.

"str_"

numpy.dtype('<U')

DevString

Uses UTF-8 format.

"bool"

numpy.dtype('bool')

DevBoolean

"uint8"

numpy.dtype('uint8')

DevUChar

"uint16"

numpy.dtype('uint16')

DevUShort

"uint32"

numpy.dtype('uint32')

DevULong

"uint64"

numpy.dtype('uint64')

DevULong64

"int16"

numpy.dtype('int16')

DevShort

"int32"

numpy.dtype('int32')

DevLong

"int64" or "int"

numpy.dtype('int64')

DevLong64

"float32"

numpy.dtype('float32')

DevFloat

"float64" or "float"

numpy.dtype('float64')

DevDouble

"datetime64" or datetime64[**time precision**]

numpy.dtype('<M8[**time precision**]')

None

structured datatype (List[tuple(str, Scalar)])

numpy.dtype('[…]')

Any

For Tango, it is only compatible with source TangoDishPointingSubscriptionSource.

Note

Tango DevEnum and DevPipeBlob are currently not supported.

In addition, the dtype may be specified as a numpy structured datatype, though this is limited to the In-Memory and Kafka data sources and sinks. For example, a list of Alt/Az coordinates might have descriptor dtype of [["alt", "float64"], ["az", "float64"]].

Note

When using a structured datatype and serialising to Kafka with "carray" format, a limited subset of field data types is allowed. Notably, datetime64 types are not supported (See this issue). There is no such limitation with "npy" format.

Data Shape

The optional exchange shape describes the size of the maximum dimension for an array or matrix of values. An empty list (default) indicates a scalar, and up to a single negative shape value may be used to indicate a dynamically sized dimension.

Using a Tango source or sink will limit the number of dimensions to 2 as Tango only supports the SCALAR format, the SPECTRUM format for 1D arrays, and the IMAGE format for 2D arrays. Specialized sink TangoArrayScatterAttributeSink can work around this limitation by using array flattening and generating a separate attribute of the original shape.

Note

Tango attributes require a maximum dimension size, thus Tango sinks currently do not support negative values in the data shape.

Note

Multi-dimensional shape values always represent numpy ndarrays (or lists of lists for strings) in C-order. Pytango automatically swaps dimensions to always expose matrices to python in C-order, and all available Kafka encoders/decoders serialize and deserialize to C-order. Unless pickle is adopted, there is currently no way to get an F-order numpy matrix into a queue connector device.

Note

When working with multi-dimensional str data, the Tango DeviceProxy will return data as a list rather than a numpy.ndarray.

Data Source(s)

A DataSource is an extensible component of the LMC Queue Connector representing a location the data can be asynchronously pulled from. The data source will convert incoming data to the dtype specified in the exchange descriptor.

Multiple data sources are supported on a single exchange and are merged into a single stream.

Data Pipe

An optional DataPipe is an extensible component of the LMC Queue Connector representing a stream operation performed on data in-between the source and sink. Outside of simple functions, this interface also allows for changing the data shape itself using stream buffering, aggregation, splitting functions.

Data Sink(s)

A DataSink is an extensible component of the LMC Queue Connector representing a location that data can be asynchronously (or synchronously) pushed to. The type data into the sink is determined by the dtype specified in the exchange descriptor and it’s shape is calculated by the Data Pipe.

Note

Multiple data sinks are currently not supported on a single exchange (but trivial to implement). A temporary workaround to this is to use duplicate exchanges differing by sink.

Source/Sink Compatibility

Below is a complete compatibility matrix currently supported by the various sink and source combinations.

Sink/Source
bytes
object
int16
int32
int64
uint8
uint16
uint32
uint64
float32
float64
complex64
complex128
datetime64
struct

KafkaConsumerSource

TangoSubscriptionSource

TangoDishPointingSubscriptionSource

KafkaProducerSink

TangoLocalAttributeSink

TangoArrayScatterAttributeSink

TangoObjectScatterAttributeSink

Data Format

Certain sinks, sources, and data using serialized data formats may require selecting a suitable Format string.

Kafka Format

Since Kafka Topics only store raw bytes, KafkaProducerSink and KafkaConsumerSource must provide an format to describe how to read from/write to raw bytes.

Supported Format/dtype/shape combinations are described in the below table:

Format

Compatible Dtypes

Compatible Shapes

Notes

"python" (default)

"str", "dict", "bytes", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]"

[], [x], [x, y] (only [] for bytes and dict types)

Data is serialized using repr(data).encode('utf-8') and deserialized using ast.literal_eval(data.decode('utf-8')).

"utf-8"

"str" "bytes", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]"

[]

Data is serialized using str(data).encode('utf-8') and deserialized using dtype(data).

"ascii"

"str", "bytes", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]"

[]

Data is serialized using str(data).encode('ascii') and deserialized using dtype(data).

"json"

"str", "dict", "bytes", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]"

[], [x], [x, y] (only [] for bytes and dict types)

Data is serialized using json.dumps(data).encode('utf-8') and deserialized using json.loads(data).

"msgpack_numpy"

"str", "dict", "bytes", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]"

[], [x], [x, y] (only [] for bytes and dict types)

Data is serialized using msgpack.packb(value, default=msgpack_numpy.encode) and deserialized using msgpack.unpackb(value, object_hook=msgpack_numpy.decode)

"carray"

"bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]", List[tuple(str, Scalar)]

[], [x], [x, y]

Writes and zero-copy reads raw numpy buffer in row-major order, without any dimension information. KafkaConsumerSource will attempt to reshape according to the provided shape information.

"npy"

"str", "bool", "int[16|32|64]", "uint[8|16|32|64]", "float[32|64]", List[tuple(str, Scalar)]

[], [x], [x, y]

Uses np.save() and np.load(), includes dimension information.

any string

"bytes"

[]

If specified, the KafkaProducerSink will validate that the format of data matches the format from associated DataSource. For the TangoSubscriptionSource, this is set by Tango clients when they write a value. For KafkaConsumerSource this is specified in the configuration.

None

"bytes"

[]

Bytes Data Format

The bytes data format is unique in that an Format string is paired and travels with the bytes value, similar to tango.DevEncoded.

Note

The format parameter is optional for KafkaProducerSink when using the bytes data type. No format or decoding is performed when used by KafkaConsumerSource or KafkaProducerSink.