Configuration
The LMC Queue Connector is configured using a QueueConnectorDescriptor
containing one or more ExchangeDescriptor
instances in JSON, where each exchange represents independent streams for processing.
The tango device can be configured after initializing via multiple approaches, see Tango Device.
Queue Connector Descriptor
A typical QueueConnectorDescriptor
with a single ExchangeDescriptor
consists of:
{
"exchanges": [
{
"dtype": "<data type>",
"shape": [ ... ],
"source": { "type": "<source type>" } | [ ... ],
"pipe": { "type": "<pipe type>" },
"sink": { "type": "<sink type>" },
}
]
}
Where:
dtype: Numpy dtype or Python type to deserialize to.
shape: Optional data dimensions of the source if data is arraylike.
source: Exchange input description for reading data.
pipe: Optional pipe function for stream operations.
sink: Exchange output description for writing data.
A compatible combination of dtype, shape and encoding must be chosen at runtime to initialize the exchange instance ready for streaming.
See below for summaries of each, Configuration Schema for schema and API for further details on available sources, sinks and pipes.
Data Dtype
The data type provided to an exchange may be either a python native built-in type (bool, int, float, str, object, bytes) or any name of a supported numpy generic class as listed below:
Descriptor Literal |
DType |
Tango ArgType |
Notes |
---|---|---|---|
|
|
|
Shape must be scalar. Data is not validated to ensure it matches the encoding. |
|
|
N/A |
Shape must be scalar. This type is reserved largely for container types such list, tuple and dict, but is compatible with all dtypes. For Tango, only compatible with sink |
|
|
|
Uses UTF-8 encoding. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
None |
|
structured datatype ( |
|
Any |
For Tango, only compatible with sink |
Note
Tango DevEnum and DevPipeBlob are currently not supported.
In addition, the dtype may be specified as a numpy structured datatype, though this is limited to the In-Memory and Kafka data sources and sinks. For example, a list of Alt/Az coordinates might have descriptor dtype of [["alt", "float64"], ["az", "float64"]]
.
Note
When using a structured datatype and serialising to Kafka with "carray"
encoding, a limited subset of field data types is allowed. Notably, datetime64
types are not supported (See this issue). There is no such limitation with "npy"
encoding.
Data Shape
The optional exchange shape describes the maximum dimensions size for an array of values. An empty list (default) indicates a scalar, and up to a single negative shape value may be used to indicate a dynamically sized dimension.
Using a Tango source or sink will limit the number of dimensions to 2 as Tango only supports the SCALAR
format, the SPECTRUM
format for 1D arrays, and the IMAGE
format or 2D arrays. Specialized sink TangoArrayScatterAttributeSink
can work around this limitation by using array flattening and generating a separate attribute of the original shape.
Note
Tango attributes require a maximum dimension size, thus Tango sinks currently do not support negative values in the data shape.
Note
When working with multi-dimensional str
data,
the Tango DeviceProxy will return data as a list
rather than
a numpy.ndarray
.
Data Source(s)
A DataSource
is an extensible component of the LMC Queue Connector representing a location the data can be asynchronously pulled from. The data source will convert incoming data to the dtype specified in the exchange descriptor.
Multiple data sources are supported on a single exchange and are merged into a single stream.
Data Pipe
An optional DataPipe
is an extensible component of the LMC Queue Connector representing a stream operation performed on data in-between the source and sink. Outside of simple functions, this interface also allows for changing the data shape itself using stream buffering, aggregation, splitting functions.
Data Sink(s)
A DataSink
is an extensible component of the LMC Queue Connector representing a location that data can be asynchronously (or synchronously) pushed to. The type data into the sink is determined by the dtype specified in the exchange descriptor and it’s shape is calculated by the Data Pipe.
Note
Multiple data sinks are currently not supported on a single exchange (but trivial to implement). A temporary workaround to this is to use duplicate exchanges differing by sink.
Source/Sink Compatibility
Below is a complete compatibility matrix currently supported by the various sink and source combinations.
Sink/Source
|
bytes
|
object
|
int16
int32
int64
|
uint8
uint16
uint32
uint64
|
float32
float64
|
complex64
complex128
|
datetime64
|
struct
|
---|---|---|---|---|---|---|---|---|
KafkaConsumerSource |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
TangoSubscriptionSource |
✓ |
✓ |
✓ |
✓ |
||||
TangoPointingSubscriptionSource |
✓ |
|||||||
KafkaProducerSink |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
TangoLocalAttributeSink |
✓ |
✓ |
✓ |
✓ |
||||
TangoArrayScatterSink |
✓ |
✓ |
✓ |
|||||
TangoObjectScatterAttributeSink |
✓ |
✓ |
✓ |
✓ |
✓ |
Data Encoding
Certain sinks, sources and data using raw byte buffers may require selecting a suitable Encoding
string.
Bytes Data Encoding
The bytes data format is unique in that an encoding string is paired and travels with the bytes value, similar to tango.DevEncoded.
Note
The encoding parameter is optional for KafkaProducerSink
when using the bytes data type. No encoding or decoding is performed when used by KafkaConsumerSource
or KafkaProducerSink
.
Kafka Encoding
Since Kafka Topics only store raw bytes, KafkaProducerSink
and KafkaConsumerSource
must provide an encoding to describe how to read from/write to raw bytes.
Supported encoding
/dtype
/shape
combinations are described in the below table:
Encoding |
Compatible Dtypes |
Compatible Shapes |
Notes |
---|---|---|---|
|
|
[], [x], [x, y] (only [] for bytes and dict types) |
Data is serialized using |
|
|
[] |
Data is serialized using |
|
|
[] |
Data is serialized using |
|
|
[], [x], [x, y] (only [] for bytes and dict types) |
Data is serialized using |
|
|
[], [x], [x, y] (only [] for bytes and dict types) |
Data is serialized using |
|
|
[], [x], [x, y] |
Writes and zero-copy reads raw numpy buffer in row-major order, without any dimension information. |
|
|
[], [x], [x, y] |
Uses |
any string |
|
[] |
If specified, the |
|
|
[] |