SKA LMC Queue Connector Tango Device
This project implements a Tango device intended to provide services to exchange data between various components internal and external to the Science Data Processor (SDP). Initially, this will essentially only be exchanging data between Tango devices external to the SDP and Kafka topics only available internally to SDP components, though it is reasonable to expect this may expand to other sources and sinks in future.
The LMC Queue Connector Device will sit in the Science Data Processor and interact with various other components both within and outside that boundary. These components will directly interact by reading from or subscribing to attributes on the Queue Connector device. They will also indirectly interact with the Data Exchange device by publishing data as a Tango attribute (if it’s a Tango device) or to a Kafka topic, which the Queue Connector device will subscribe to.
In the diagram below, we describe expected flow of data between each of these components, with the Queue Connector device essentially acting as a middle-man between Tango and Kafka.
One example workflow which this is expected to be used for is to calculate and publish pointing offsets to the telescope. Our Queue Connector Device will subscribe to the telescope sub-array device to obtain pointing data and write it to a Kafka topic, this will then be consumed by a processor in the realtime receive pipeline which will perform some calculations using the pointing data and the visibilities it is receiving. It will then write these offsets to another Kafka topic, which the the Queue Connector Device will be subscribed to, which is then exposed using a Tango attribute. The telescope sub-array will subscribe to this attribute to obtain the pointing offsets, completing the feedback loop.
Architecture
Exchange Model
The LMC Queue Connector Device processes data in the form of one or more configurable exchanges/streams that continuously process when the device is in the ON
state.
The simplified class diagram for this can be expressed by the following:
Data Flow
An Exchange
is fundamentally an object that asynchronously pulls data from one or more sources streams (DataSource
), through a pipe operator (DataPipe
), and then asynchronously pushes to a sink stream (DataSink
). The asynchronous interface between sources and sinks allows for a single device to indefinately process multiple exchanges/streams, without threads being blocked waiting for I/O, and until instructed to stop.
Data into an exchange each use the same single dtype and input shape, and pipe operators may change the output data shape.
Example
Two common configurations for exchanges are:
Publishing a Tango subscription to a Kafka topic
Publishing a Kafka topic as a Tango attribute
In this example there are 2 runtime implementations for each of DataSource
and DataSink
. Data is sourced from either another Tango device attribute or a Kafka topic. This data is then be propagated to either a local attribute on the Queue Connector device or to a Kafka topic. It is reasonable to expect the need for other implementations too (e.g. An in-memory Source and Sink for unit testing) and this is accomplished by implementing the DataSource
and DataSink
interfaces.
Tango Device
Properties
Property |
Type |
Default Value |
Example Value |
---|---|---|---|
|
String |
|
|
String |
|
|
Commands
Command |
Args |
Return Type |
---|---|---|
|
Void |
Bool |
|
String |
Void |
|
Void |
Void |
Note
Additional commands Standby()
, Start()
, Stop()
, Abort()
are intended for testing purposes and may be disabled or removed in future releases.
State Model
The present implementation utilizes the following native Tango operational states that can be read using the state
attribute.
INIT: the device is currently initialising.
FAULT: the device has experienced an unexpected error from which it needs to be reset.
STANDBY: the device is not configured.
OFF: the device is configured, but not currently in use.
OPEN: the device is opening stream connections.
ON: the device is streaming data.
CLOSE: the device is closing stream connections.
Note
Transition arrows are triggered by a corresponding tango command, with the exception of underlined transitions, which are triggered automatically.
Configuring
The Queue Connector Tango Device only streams data when it has been configured at runtime. This can be performed via multiple approaches.
Configure by Property
To configure via properties, set either the exchanges_json
and exchanges_config_path
properties in the tango database and the device will configure on initialization.
Note
If both exchanges_json
and exchanges_config_path
properties are provided, then exchanges_config_path
will take priority.
Configuration Database Path
To configure via SDP Config, set the exchanges_config_path
property to a SDP Config path (including leading slash). The device will fetch the JSON value at the given path parsed as a QueueConnectorDescriptor
.
JSON
To configure via JSON, set the exchanges_json
property to JSON that will be parsed as a QueueConnectorDescriptor
.
Configure by Command
Configuration may be performed post initialization using the Configure()
command.
JSON
Pass a raw JSON string of the QueueConnectorDescriptor
to the Configure()
when in the STANDBY state. Configuring the device successfully will transition the device to the OFF state.
Configuration Database Path
Pass a configuration database path string to the Configure()
command.
Note
The command will check for a leading ‘/’ as a heuristic for detecting a config_path to configure from, otherwise attempting to parse the string as JSON.
Note
Configure by command is deprecated and estimated for remove in version 4.0.0
Configuration
The LMC Queue Connector is configured using a QueueConnectorDescriptor
containing one or more ExchangeDescriptor
instances in JSON, where each exchange represents independent streams for processing.
The tango device can be configured after initializing via multiple approaches, see Tango Device.
Queue Connector Descriptor
A typical QueueConnectorDescriptor
with a single ExchangeDescriptor
consists of:
{
"exchanges": [
{
"dtype": "<data type>",
"shape": [ ... ],
"source": { "type": "<source type>" } | [ ... ],
"pipe": { "type": "<pipe type>" },
"sink": { "type": "<sink type>" },
}
]
}
Where:
dtype: Numpy dtype or Python type to deserialize to.
shape: Optional data dimensions of the source if data is arraylike.
source: Exchange input description for reading data.
pipe: Optional pipe function for stream operations.
sink: Exchange output description for writing data.
A compatible combination of dtype, shape and encoding must be chosen at runtime to initialize the exchange instance ready for streaming.
See below for summaries of each, Configuration Schema for schema and API for further details on available sources, sinks and pipes.
Data Dtype
The data type provided to an exchange may be either a python native built-in type (bool, int, float, str, object, bytes) or any name of a supported numpy generic class as listed below:
Descriptor Literal |
DType |
Tango ArgType |
Notes |
---|---|---|---|
|
|
|
Shape must be scalar. Data is not validated to ensure it matches the encoding. |
|
|
N/A |
Shape must be scalar. This type is reserved largely for container types such list, tuple and dict, but is compatible with all dtypes. For Tango, only compatible with sink |
|
|
|
Uses UTF-8 encoding. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
None |
|
structured datatype ( |
|
Any |
For Tango, only compatible with sink |
Note
Tango DevEnum and DevPipeBlob are currently not supported.
In addition, the dtype may be specified as a numpy structured datatype, though this is limited to the In-Memory and Kafka data sources and sinks. For example, a list of Alt/Az coordinates might have descriptor dtype of [["alt", "float64"], ["az", "float64"]]
.
Note
When using a structured datatype and serialising to Kafka with "carray"
encoding, a limited subset of field data types is allowed. Notably, datetime64
types are not supported (See this issue). There is no such limitation with "npy"
encoding.
Data Shape
The optional exchange shape describes the maximum dimensions size for an array of values. An empty list (default) indicates a scalar, and up to a single negative shape value may be used to indicate a dynamically sized dimension.
Using a Tango source or sink will limit the number of dimensions to 2 as Tango only supports the SCALAR
format, the SPECTRUM
format for 1D arrays, and the IMAGE
format or 2D arrays. Specialized sink TangoArrayScatterAttributeSink
can work around this limitation by using array flattening and generating a separate attribute of the original shape.
Note
Tango attributes require a maximum dimension size, thus Tango sinks currently do not support negative values in the data shape.
Note
When working with multi-dimensional str
data,
the Tango DeviceProxy will return data as a list
rather than
a numpy.ndarray
.
Data Source(s)
A DataSource
is an extensible component of the LMC Queue Connector representing a location the data can be asynchronously pulled from. The data source will convert incoming data to the dtype specified in the exchange descriptor.
Multiple data sources are supported on a single exchange and are merged into a single stream.
Data Pipe
An optional DataPipe
is an extensible component of the LMC Queue Connector representing a stream operation performed on data in-between the source and sink. Outside of simple functions, this interface also allows for changing the data shape itself using stream buffering, aggregation, splitting functions.
Data Sink(s)
A DataSink
is an extensible component of the LMC Queue Connector representing a location that data can be asynchronously (or synchronously) pushed to. The type data into the sink is determined by the dtype specified in the exchange descriptor and it’s shape is calculated by the Data Pipe.
Note
Multiple data sinks are currently not supported on a single exchange (but trivial to implement). A temporary workaround to this is to use duplicate exchanges differing by sink.
Source/Sink Compatibility
Below is a complete compatibility matrix currently supported by the various sink and source combinations.
Sink/Source
|
bytes
|
object
|
int16
int32
int64
|
uint8
uint16
uint32
uint64
|
float32
float64
|
complex64
complex128
|
datetime64
|
struct
|
---|---|---|---|---|---|---|---|---|
KafkaConsumerSource |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
TangoSubscriptionSource |
✓ |
✓ |
✓ |
✓ |
||||
TangoPointingSubscriptionSource |
✓ |
|||||||
KafkaProducerSink |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
TangoLocalAttributeSink |
✓ |
✓ |
✓ |
✓ |
||||
TangoArrayScatterSink |
✓ |
✓ |
✓ |
|||||
TangoObjectScatterAttributeSink |
✓ |
✓ |
✓ |
✓ |
✓ |
Data Encoding
Certain sinks, sources and data using raw byte buffers may require selecting a suitable Encoding
string.
Bytes Data Encoding
The bytes data format is unique in that an encoding string is paired and travels with the bytes value, similar to tango.DevEncoded.
Note
The encoding parameter is optional for KafkaProducerSink
when using the bytes data type. No encoding or decoding is performed when used by KafkaConsumerSource
or KafkaProducerSink
.
Kafka Encoding
Since Kafka Topics only store raw bytes, KafkaProducerSink
and KafkaConsumerSource
must provide an encoding to describe how to read from/write to raw bytes.
Supported encoding
/dtype
/shape
combinations are described in the below table:
Encoding |
Compatible Dtypes |
Compatible Shapes |
Notes |
---|---|---|---|
|
|
[], [x], [x, y] (only [] for bytes and dict types) |
Data is serialized using |
|
|
[] |
Data is serialized using |
|
|
[] |
Data is serialized using |
|
|
[], [x], [x, y] (only [] for bytes and dict types) |
Data is serialized using |
|
|
[], [x], [x, y] (only [] for bytes and dict types) |
Data is serialized using |
|
|
[], [x], [x, y] |
Writes and zero-copy reads raw numpy buffer in row-major order, without any dimension information. |
|
|
[], [x], [x, y] |
Uses |
any string |
|
[] |
If specified, the |
|
|
[] |
Configuration Schema
The JSON configuration provided to the Queue Connector Device via Configuration Database, Property or Command Parameter must conform to the schema outlined by QueueConnectorDescriptor.
Note
Whilst it is valid to configure with no exchanges, doing so will keep the device in the STANDBY state. This behaviour may change in future.
Schema
QueueConnectorDescriptor
Primary JSON serializable descriptor for configuring a queue connector device. Note: exchanges as dictionary of groups is experimental and not fully supported via configuration database.
type |
object |
|||
properties |
||||
|
Exchanges |
|||
default |
null |
|||
anyOf |
type |
object |
||
additionalProperties |
type |
array |
||
items |
||||
type |
array |
|||
items |
||||
type |
null |
|||
additionalProperties |
False |
BufferWithTimePipeDescriptor
type |
object |
|
properties |
||
|
Type |
|
default |
BufferWithTimePipe |
|
const |
BufferWithTimePipe |
|
|
Timespan |
|
type |
number |
|
default |
0 |
|
additionalProperties |
False |
DefaultPipeDescriptor
type |
object |
|
properties |
||
|
Type |
|
default |
DefaultPipe |
|
const |
DefaultPipe |
|
additionalProperties |
False |
ExchangeDescriptor
Descriptor for instantiating an exchange.
type |
object |
|||
properties |
||||
|
Dtype |
|||
default |
str |
|||
anyOf |
type |
string |
||
type |
array |
|||
items |
type |
array |
||
items |
||||
maxItems |
3 |
|||
minItems |
2 |
|||
|
Shape |
|||
type |
array |
|||
default |
||||
items |
||||
|
Source |
|||
anyOf |
oneOf |
|||
type |
array |
|||
items |
oneOf |
|||
|
Sink |
|||
oneOf |
||||
|
Pipe |
|||
default |
type |
DefaultPipe |
||
oneOf |
||||
additionalProperties |
False |
InMemorySinkDescriptor
Descriptor for instantiating an InMemorySink
type |
object |
|
properties |
||
|
Type |
|
default |
InMemorySink |
|
const |
InMemorySink |
|
|
Key |
|
type |
string |
|
additionalProperties |
False |
InMemorySourceDescriptor
Descriptor for instantiating an InMemorySource
type |
object |
|||
properties |
||||
|
Type |
|||
default |
InMemorySource |
|||
const |
InMemorySource |
|||
|
Data |
|||
type |
array |
|||
items |
anyOf |
type |
array |
|
maxItems |
2 |
|||
minItems |
2 |
|||
type |
array |
|||
items |
||||
type |
object |
|||
type |
integer |
|||
type |
number |
|||
type |
boolean |
|||
type |
string |
|||
|
Delay |
|||
type |
number |
|||
default |
0 |
|||
additionalProperties |
False |
KafkaConsumerSourceDescriptor
type |
object |
|||
properties |
||||
|
Type |
|||
default |
KafkaConsumerSource |
|||
const |
KafkaConsumerSource |
|||
|
Servers |
|||
anyOf |
type |
string |
||
type |
array |
|||
items |
type |
string |
||
|
Topic |
|||
type |
string |
|||
|
Encoding |
|||
type |
string |
|||
enum |
utf-8, ascii, python, json, msgpack_numpy, npy, carray |
|||
default |
python |
|||
additionalProperties |
False |
KafkaProducerSinkDescriptor
type |
object |
|||
properties |
||||
|
Type |
|||
default |
KafkaProducerSink |
|||
const |
KafkaProducerSink |
|||
|
Servers |
|||
anyOf |
type |
string |
||
type |
array |
|||
items |
type |
string |
||
|
Topic |
|||
type |
string |
|||
|
Encoding |
|||
type |
string |
|||
enum |
utf-8, ascii, python, json, msgpack_numpy, npy, carray |
|||
default |
python |
|||
|
Message Max Bytes |
|||
type |
integer |
|||
default |
1048576 |
|||
|
default |
null |
||
anyOf |
||||
type |
null |
|||
additionalProperties |
False |
TangoArrayScatterAttributeSinkDescriptor
A Tango Attribute Sink for splitting and scattering of ndarray data.
type |
object |
|||
properties |
||||
|
Type |
|||
type |
string |
|||
enum |
TangoArrayScatterAttributeSink, TangoSplitAttributeSink |
|||
default |
TangoArrayScatterAttributeSink |
|||
|
Attribute Names |
|||
type |
array |
|||
items |
type |
string |
||
|
Axis |
|||
type |
integer |
|||
default |
0 |
|||
|
Default Value |
|||
default |
||||
anyOf |
type |
array |
||
maxItems |
2 |
|||
minItems |
2 |
|||
type |
array |
|||
items |
||||
type |
object |
|||
type |
integer |
|||
type |
number |
|||
type |
boolean |
|||
type |
string |
|||
|
Attribute Shape Names |
|||
default |
null |
|||
anyOf |
type |
array |
||
items |
type |
string |
||
type |
null |
|||
|
Indices |
|||
default |
null |
|||
anyOf |
type |
array |
||
items |
type |
integer |
||
type |
null |
|||
additionalProperties |
False |
TangoAttributeDescriptor
An attribute descriptor describing a filter and path to to a value in JSON.
type |
object |
|||
properties |
||||
|
Attribute Name |
|||
type |
string |
|||
|
Dtype |
|||
anyOf |
type |
string |
||
type |
array |
|||
items |
type |
array |
||
items |
||||
maxItems |
3 |
|||
minItems |
2 |
|||
|
Shape |
|||
type |
array |
|||
default |
||||
items |
type |
integer |
||
|
Path |
|||
type |
string |
|||
default |
@ |
|||
|
Filter |
|||
default |
null |
|||
anyOf |
type |
string |
||
type |
null |
|||
|
Default Value |
|||
default |
||||
anyOf |
type |
array |
||
maxItems |
2 |
|||
minItems |
2 |
|||
type |
array |
|||
items |
||||
type |
object |
|||
type |
integer |
|||
type |
number |
|||
type |
boolean |
|||
type |
string |
|||
additionalProperties |
False |
TangoLocalAttributeSinkDescriptor
type |
object |
||
properties |
|||
|
Type |
||
default |
TangoLocalAttributeSink |
||
const |
TangoLocalAttributeSink |
||
|
Attribute Name |
||
type |
string |
||
|
Default Value |
||
default |
|||
anyOf |
type |
array |
|
maxItems |
2 |
||
minItems |
2 |
||
type |
array |
||
items |
|||
type |
object |
||
type |
integer |
||
type |
number |
||
type |
boolean |
||
type |
string |
||
additionalProperties |
False |
TangoObjectScatterAttributeSinkDescriptor
A Tango Attribute Sink for splitting and scattering object heirarchy data.
type |
object |
|
properties |
||
|
Type |
|
default |
TangoObjectScatterAttributeSink |
|
const |
TangoObjectScatterAttributeSink |
|
|
Attributes |
|
type |
array |
|
items |
||
additionalProperties |
False |
TangoPointingSubscriptionSourceDescriptor
type |
object |
|
properties |
||
|
Type |
|
default |
TangoPointingSubscriptionSource |
|
const |
TangoPointingSubscriptionSource |
|
|
Device Name |
|
type |
string |
|
|
Attribute Name |
|
type |
string |
|
|
Etype |
|
type |
integer |
|
default |
0 |
|
|
Stateless |
|
type |
boolean |
|
default |
True |
|
additionalProperties |
False |
TangoRemoteAttributeSinkDescriptor
type |
object |
|
properties |
||
|
Type |
|
default |
TangoRemoteAttributeSink |
|
const |
TangoRemoteAttributeSink |
|
|
Device Name |
|
type |
string |
|
|
Attribute Name |
|
type |
string |
|
additionalProperties |
False |
TangoSubscriptionSourceDescriptor
type |
object |
|
properties |
||
|
Type |
|
default |
TangoSubscriptionSource |
|
const |
TangoSubscriptionSource |
|
|
Device Name |
|
type |
string |
|
|
Attribute Name |
|
type |
string |
|
|
Etype |
|
type |
integer |
|
default |
0 |
|
|
Stateless |
|
type |
boolean |
|
default |
True |
|
additionalProperties |
False |
TimestampOptions
A set of kafka producer options related to extracting Kafka timestamps from dynamic data. Timestamps in dynamic data must be one of:
An offset to unix epoch in milliseconds.
A numpy datetime64 on TAI scale.
type |
object |
|||||
properties |
||||||
|
Slices |
|||||
type |
array |
|||||
default |
||||||
items |
anyOf |
type |
integer |
|||
type |
array |
|||||
items |
anyOf |
type |
integer |
|||
type |
null |
|||||
maxItems |
3 |
|||||
minItems |
3 |
|||||
|
Key |
|||||
default |
null |
|||||
anyOf |
type |
string |
||||
type |
null |
|||||
|
Reducer |
|||||
default |
null |
|||||
anyOf |
type |
string |
||||
enum |
min, max, mean |
|||||
type |
null |
|||||
additionalProperties |
False |
Note
bold indicates a required key-value pair.
Examples
Empty Config
{}
Image Config
{
"exchanges": [
{
"dtype": "float32",
"shape": [2,2],
"source": {
"type": "InMemorySource",
"data": [
[[0,0],
[0,1]],
[1.0, 2.0],
2.0,
2.1,
2.5,
],
"delay": 0.5
},
"sink": {
"type": "KafkaProducerSink",
"topic": "test-topic",
"servers": "localhost",
"encoding": "carray"
}
},
{
"dtype": "float32",
"shape": [2,2],
"source": {
"type": "KafkaConsumerSource",
"topic": "test-topic",
"servers": "localhost",
"encoding": "carray"
},
"sink": {
"type": "TangoLocalAttributeSink",
"attribute_name": "matrix",
"default_value": 0
}
}
]
}
API
SDP Queue Connector Device
- class ska_sdp_lmc_queue_connector.sdp_queue_connector.SDPQueueConnector(*args: Any, **kwargs: Any)[source]
A dynamically configured tango device for sending and receiving data to and from data services.
- exchanges_config_path: str
Configuration Database Path to a JSON configuration
- generic_read(attr: tango.Attr)[source]
Generic method for reading the internal device values into the output attribute argument.
- Parameters:
attr (tango.Attr) – output tango attribute
- async group_configure(configuration: str, group_name_override: str | None = None)[source]
Commands the device to load exchanges using a provided config json string.
- Parameters:
configuration (str | None) – QueueConnectorDescriptor json. None
config. (value is considered as an empty) –
group_name_override (str | None) – When provided, overrides the
QueueConnectorDescriptor. (group name inside the) –
- async reconfigure(group_name: str, config_value: str | None)[source]
Tries commanding to stop, reconfigure and start the device depending on the current state.
- Parameters:
config (str | None) – the new configuration json string.
- set_attr(name: str, value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str, push_event=True)[source]
Sets the internal attribute and optionally pushes a change event.
- Parameters:
name (str) – name of attribute to set
value (DataType) – value to set
push_event (bool | None, optional) – Whether to push an event.
to (Setting to None will push if polling is not active. Defaults) –
None. –
- class ska_sdp_lmc_queue_connector.sdp_queue_connector.QueueConnectorDescriptor(*args: Any, **kwargs: Any)[source]
Primary JSON serializable descriptor for configuring a queue connector device. Note: exchanges as dictionary of groups is experimental and not fully supported via configuration database.
- exchanges: Dict[str | None, List[ExchangeDescriptor]] | List[ExchangeDescriptor] | None = None
Mapping or list of exchanges for a queue connector to process when running.
Exchange Interfaces
- class ska_sdp_lmc_queue_connector.exchange.ExchangeDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor for instantiating an exchange.
- dtype: numpy.dtype = 'str'
Python primitive, numpy dtype or tango dtype of the dynamic attribute
- pipe: DefaultPipeDescriptor | BufferWithTimePipeDescriptor
A pipe operator to be applied between source and sink read and write
- shape: list = []
Data shape used by numpy and tango
- sink: InMemorySinkDescriptor | TangoLocalAttributeSinkDescriptor | TangoArrayScatterAttributeSinkDescriptor | TangoObjectScatterAttributeSinkDescriptor | TangoRemoteAttributeSinkDescriptor | KafkaProducerSinkDescriptor
A data sink descriptor to be written to by the exchange
- source: InMemorySourceDescriptor | TangoSubscriptionSourceDescriptor | TangoPointingSubscriptionSourceDescriptor | KafkaConsumerSourceDescriptor | list[InMemorySourceDescriptor | TangoSubscriptionSourceDescriptor | TangoPointingSubscriptionSourceDescriptor | KafkaConsumerSourceDescriptor]
One or more data source descriptors to be read by the exchange
- class ska_sdp_lmc_queue_connector.exchange.Exchange(sources: Sequence[DataSource], sink: DataSink, pipe: DataPipe)[source]
A container representing a connection between a source and sink and handles asynchronous streaming between them.
Data Source, Sink and Pipe Interfaces
- class ska_sdp_lmc_queue_connector.sourcesink.DataSource[source]
Interface for an object containing data that can be asynchronously read.
- class ska_sdp_lmc_queue_connector.sourcesink.DataSink[source]
Interface for an object that receives data that can be asynchronously written to.
- abstract async awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)[source]
Writes a single data entry to the sink
- class ska_sdp_lmc_queue_connector.sourcesink.DataPipe[source]
Functor interface for pipe operators to perform on python data between a DataSource and DataSink.
- abstract property output_dtype: numpy.dtype
Dtype of the functor stream output
- abstract property output_shape: list[int]
Shape of the functor stream output
Tango
TangoSubscriptionSource
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoSubscriptionSourceDescriptor(*args: Any, **kwargs: Any)[source]
- attribute_name: str
Attribute name the subscription is to
- device_name: str
Device name containing the attribute the subscription is to
- etype: SchemaEventType
The type of attribute event to listen for
- stateless: bool = True
When True will retry subscribing every 10 seconds if failed subscription, otherwise will raise an exception on start if False.
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoSubscriptionSource(desc: TangoSubscriptionSourceDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
A DataSource populated from a subscription to tango attribute events.
- start()
Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
TangoPointingSubscriptionSource
- class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoPointingSubscriptionSourceDescriptor(*args: Any, **kwargs: Any)[source]
- class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoPointingSubscriptionSource(desc: TangoPointingSubscriptionSourceDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
A specialized tango attribute source for converting pointings to a structured type.
TangoAttributeSink
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoLocalAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]
- attribute_name: str
Attribute name to dynamically add to the SDPQueueConnector
- default_value: SchemaDataType = ''
Starting value of the dynamic attribute before a source is read
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoLocalAttributeSink(desc: TangoLocalAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
A DataSink that publishes data to a tango attribute on the Tango Device member.
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
TangoArrayScatterAttributeSink
- class ska_sdp_lmc_queue_connector.tango_array_scatter_sink.TangoArrayScatterAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]
A Tango Attribute Sink for splitting and scattering of ndarray data.
- attribute_names: list[str]
Ordered attributute names to dynamically add the SDPQueueConnector
- attribute_shape_names: list[str] | None = None
Optional attribute shapes names
- axis: int = 0
Axis to split on. Non-zero values may reduce performance.
- default_value: SchemaDataType = ''
Starting value of the dynamic attribute before a source is read
- indices: list[int] | None = None
Optional ordered indexes of the split boundaries
- class ska_sdp_lmc_queue_connector.tango_array_scatter_sink.TangoArrayScatterAttributeSink(desc: TangoArrayScatterAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
TangoObjectScatterAttributeSink
- class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoObjectScatterAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]
A Tango Attribute Sink for splitting and scattering object heirarchy data.
- attributes: list[TangoAttributeDescriptor]
Attribute names to dynamically add the SDPQueueConnector
- class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoAttributeDescriptor(*args: Any, **kwargs: Any)[source]
An attribute descriptor describing a filter and path to to a value in JSON.
- attribute_name: str
Attribute name to dynamically add to the SDPQueueConnector
- default_value: SchemaDataType = ''
Starting value of the dynamic attribute before a source is read
- dtype: DType
Python primitive, numpy dtype or tango dtype of the dynamic attribute
- filter: str | None = None
JMESPath predicate expression for whether to update the attribute
- path: str = '@'
JMESPath expression for the location of the data to extract
- shape: list[int] = []
Maximum shape of the dynamic attribute
- class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoObjectScatterAttributeSink(desc: TangoObjectScatterAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
Kafka
KafkaConsumerSource
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaConsumerSourceDescriptor(*args: Any, **kwargs: Any)[source]
- encoding: Literal['utf-8', 'ascii', 'python', 'json', 'msgpack_numpy', 'npy', 'carray'] = 'python'
Encoding of the message bytes
- servers: str | list[str]
The Kafka broker(s) to query for metadata and setup the connection
- topic: str
The Kafka topic to read from
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaConsumerSource(descriptor: KafkaConsumerSourceDescriptor, dtype: numpy.dtype, shape: list[int])[source]
A DataSource which consumes messages from a Kafka topic
- start()
Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)
KafkaProducerSink
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaProducerSinkDescriptor(*args: Any, **kwargs: Any)[source]
- class TimestampOptions(*args: Any, **kwargs: Any)[source]
A set of kafka producer options related to extracting Kafka timestamps from dynamic data. Timestamps in dynamic data must be one of:
An offset to unix epoch in milliseconds.
A numpy datetime64 on TAI scale.
- key: str | None = None
Timestamp key for dictionary-like data types.
- reducer: Literal['min', 'max', 'mean'] | None = None
Axes reduce operation for timestamps.
- slices: tuple[int | slice, ...] = ()
Timestamp slice location for multidimensional data. Size must match number of dimensions.
- encoding: Literal['utf-8', 'ascii', 'python', 'json', 'msgpack_numpy', 'npy', 'carray'] = 'python'
The encoding of the kafka message.
- message_max_bytes: int = 1048576
The max size of encoded messages in bytes. NOTE: The 1MiB default is recommended by Kafka, sending larger messages higher than this will linearly increase latency.
- servers: str | list[str]
The Kafka broker(s) to query for metadata and setup the connection
- timestamp_options: TimestampOptions | None = None
An optional group of settings related to extracting Kafka timestamps from dynamic data. None results in using the current time.
- topic: str
The Kafka topic to write messages to
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaProducerSink(descriptor: KafkaProducerSinkDescriptor, dtype: numpy.dtype, shape: list[int])[source]
A DataSink which produces messages for a Kafka topic
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
In-Memory
InMemorySource
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySourceDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor for instantiating an InMemorySource
- data: list[Tuple[str, bytes] | numpy.ndarray | Dict[str, Any] | int | float | bool | str]
Data values to be read from the source to a sink
- delay: float = 0
Time delay in seconds before the next data value is available to read
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySource(desc: InMemorySourceDescriptor, dtype: numpy.dtype, shape: list[int] | None = None)[source]
An in-memory implementation of a DataSource for testing.
- start()
Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
InMemorySink
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySinkDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor for instantiating an InMemorySink
- key: str
Key for accessing the stored data queue via classmethod
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySink(desc: InMemorySinkDescriptor)[source]
- An in-memory implementation of a DataSink for testing. Instances of
sink data queues exist as class members referenced via a lookup key.
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
Pipes
- class ska_sdp_lmc_queue_connector.pipe.default_pipe.DefaultPipe(dtype: numpy.dtype, shape: list[int])[source]
Interface for pipeline operators to perform on python data between a DataSource and DataSink
- property output_dtype: numpy.dtype
Dtype of the functor stream output
- property output_shape: list[int]
Shape of the functor stream output
- class ska_sdp_lmc_queue_connector.pipe.buffer_pipe.BufferWithTimePipe(desc: BufferWithTimePipeDescriptor, dtype: numpy.dtype, shape: list[int])[source]
Buffers data with time window by dynamically by adding a dynamic dimension to the ouput shape and appending data until the time window closes.
- property output_dtype: numpy.dtype
Dtype of the functor stream output
- property output_shape: list[int]
Shape of the functor stream output
Getting Started
This page outlines instructions for how to setup an example pipeline using Queue Connector Device in the SDP Integration context.
Configuration
A JSON instantiation of QueueConnectorDescriptor
serves as the configuration for the LMC QueueConnector is required to turn the device on. A minimal example of this is follows:
{
"exchanges": [
{
"dtype": "string",
"shape": [],
"source": {
"type": "KafkaConsumerSource",
"servers": "localhost:9092",
"topic": "test_topic",
"encoding": "utf-8"
},
"sink": {
"type": "TangoLocalAttributeSink",
"attribute_name": "message"
}
}
]
}
This indicates to the queue connector to stream string values read from the test_topic topic on the Kafka server running on localhost:9092
to a new scalar tango attribute named message
.
Write to Configuration Database
The configuration database is the recommended approach to configuring the LMC Queue Connector by populating the exchanges_config_path
device property. The SDP Integration Helm chart is setup to create one Queue Connector per subarray, each of which is configured to monitor JSON configs at direct child paths of '/component/lmc-queueconnector-<subarray-id>/owner'
in the database.
Using ska-sdp-config
, write the above configuration to the database location from a container inside the same cluster as the device, e.g.:
import ska_sdp_config
import json
configdb = ska_sdp_config.Config()
subarray_id = 1
for txn in configdb.txn():
txn._create(
f'/component/lmc-queueconnector-{subarray_id:02d}/owner/my_id',
json.dumps({
"exchanges": [
{
"dtype": "string",
"shape": [],
"source": {
"type": "KafkaConsumerSource",
"servers": "localhost:9092",
"topic": "test_topic",
"encoding": "utf-8"
},
"sink": {
"type": "TangoLocalAttributeSink",
"attribute_name": "message"
}
}
]
})
)
Note
This approach does not require the software performing configuration to install and use the tango API, only ska-sdp-config.
Subscribe to Exchange Sinks
Since the Queue Connector indefinitely streams data until instructed to stop via providing a new configuration, a component intended to receive data from the Queue Connector should subscribe to the endpoint the device writes to. In the case of a Tango attribute this can be done using tango subscriptions:
>>> import tango
>>> proxy = tango.DeviceProxy('test-sdp/queueconnector/01')
>>> proxy.subscribe_event("message", tango.EventType.CHANGE_EVENT, print)
Stream Data
For this example, the specified Kafka topic now automatically pushes data to the subscribed Queue Connector, and will start immediately after the new config is detected and read.
>>> import aiokafka
>>> import asyncio
>>> async def stream():
... async with aiokafka.AIOKafkaProducer(boostrap_servers="test_topic") as producer:
... for message in ["Hello", "world!"]:
... await producer.send_and_wait(message)
... await asyncio.sleep(1)
...
>>> asyncio.run(stream())
As data is sent to Kafka, the Tango subscription handler will trigger on background thread and print.
End Streaming
When the stream becomes idle (in this example after 2 seconds), any Queue Connector attributes will remain on the device, e.g.
>>> print(proxy.read_attribute("message"))
"World!"
At this point the device can either be stopped by writing an empty config to the configuration database or be reconfigured by writing a new configuration. Any existing tango attributes from the previous configuration will be removed (even if a newly detected config also defines them).
Developer guide
Get Started
Install dependencies
You will need:
Before running poetry install
to install the Python
dependencies you will need a system tango library installed
on your system (which is required by pytango
).
For Debian/Ubuntu:
$ sudo apt update
$ sudo apt install -y curl git build-essential libboost-python-dev libtango-dev
Please note that:
The
libtango-dev
will install an old version of the TANGO-controls framework (9.2.5);The best way to get the latest version of the framework is compiling it (instructions can be found here)
MacOS is not supported
Windows users will need to use WSL
The above script has been tested with Ubuntu 22.04.
During this step, libtango-dev
installation might ask for the
Tango Server IP:PORT. Just accept the default proposed value.
Once you have that available you can install the python dependencies. Note that on some systems, you may need to explicitly provide the path to the tango C++ headers:
CPPFLAGS=-I/usr/include/tango poetry install
Run linting and testing
Since this project supports interfacing with Kafka, we need to spin up a instance for testing. For this we use Docker Compose so you will need to install docker engine, and docker compose.
When these are available you can run the tests using
$ poetry run make python-tests
Linting can be run in a similar way:
$ poetry run make python-lint
Other
Makefile targets
This project contains a Makefile which acts as a UI for building Docker images, testing images, and for launching interactive developer environments.
For the documentation of the Makefile run make help
.
TANGO References
https://pytango.readthedocs.io/en/stable/contents.html
https://pytango.readthedocs.io/en/stable/green_modes/green_modes_server.html
https://pytango.readthedocs.io/en/stable/testing.html
https://pytango.readthedocs.io/en/stable/client_api/index.html
https://pytango.readthedocs.io/en/stable/server_api/server.html
Change Log
All notable changes to this project will be documented in this file. This project adheres to Semantic Versioning.
[Development]
Added
Added thread-safe async stream utilities.
Enabled more warnings as errors in tests.
Changed
BREAKING Renamed
Exchange.stream
toExchange.run
.Updated to pydantic ^2.6
Updated to pytest ^8.1
[4.0.4]
Fixed
Fixed dimensionality of 2D attributes created by the
TangoLocalAttributeSink
, dimensions were swapped.
[4.0.3]
Fixed
TangoArrayScatterSink
now correctly creating value attributes.
[4.0.2]
Fixed
Device now automatically leaves INIT state when monitoring is configured.
[4.0.1]
Fixed
Python logging setup now correctly follows the
-v
command line option.
[4.0.0]
Changed
BREAKING Tango attributes representing exchange sinks are always read-only. The
access
option that allowed them to become read-write has been removed.TangoObjectScatterAttributeSink
can now process arrays.
Removed
BREAKING Removed all device commands
BREAKING Removed exchanges_json tango property
BREAKING Removed
poll_period
,abs_change
andrel_change
parameters onTangoLocalAttributeSink
as polled Tango attributes are unable to be removed from the device.
[3.0.1]
Changed
Changed configuration database state format to JSON dictionary
Changed configuration database fault format to JSON dictionary
[3.0.0] [YANKED]
Added
Added exchange groups for supporting multiple configs
Added configuration database watching from multiple keys
Added separate state machine per exchange group
Added forwarding of exchange group state and exceptions to configuration database
Changed
BREAKING Changed Configure() to no longer accept a SDP configuration path. Use Monitor() instead.
BREAKING Changed config monitoring to monitor only direct child paths of exchanges_config_path.
[2.0.0]
Added
Added TimestampOptions to KafkaProducerSink
Added python encoding option
Added DataPipe interface with DefaultPipe and BufferWithTimePipe
Added msgpack_numpy to the list of available kakfa encodings
Added Getting Started section to online documentation
Changed
BREAKING Changed TangoJsonScatterAttributeSink to TangoObjectScatterAttributeSink
BREAKING Changed to python as default encoding for KafkaProducerSink and KafkaConsumerSource
Updated
ska-sdp-config
dependency from0.5.1
to0.5.2
for improved etcd performance. This removes the dependency on git.
Removed
BREAKING Removed “bytearray” as an alias for “bytes” dtype
BREAKING Removed json as a supported dtype. Use object with encoding json
BREAKING Removed “dict” and “list” as supported dtypes. Use object instead.
[1.2.0]
Added
Added json encoding type
Changed
Updated
ska-sdp-config
dependency to0.5.1
for improved etcd performance. This raises the minimum support version of etcd from3.3
to3.4
.
Fixed
Fixed TangoJsonScatterAttributeSink default filter behaviour
[1.1.1]
Fixed
Fixed default string values on TangoJsonScatterAttributeSink
[1.1.0]
Added
Add limited support for numpy structured arrays when using InMemory and Kafka sources and sinks. Specify the structured dtype in the
exchanges[].dtype
fieldAdded JSON/Dictionary types to exchanges
Added TangoJsonScatterAttributeSink
Added changelog to sphinx docs
Deprecated
Deprecated TangoSplitAttributeSink in favor of TangoArrayScatterAttributeSink
Removed
Removed ska-tango-base dependency
[1.0.0]
Added
Added TangoSplitAttributeSink
Added etcd3 path monitoring
Added IsMonitoringDB command
Added Tango command docstrings
Added thread-safe SubscribeEventConditionContext
Changed
Set correct server logging format
Set default server logging level to INFO
Updated Configure command behaviour
Renamed “ExchangeDescriptorList” to “QueueConnectorDescriptor”
Fixed
Bugfix for docs not displaying correctly
[0.2.2]
Added
Support manual pushing of change events when polling not defined
Fixed
Kafka consumer source performance improvements
[0.2.1]
Fixed
Entrypoint module path bugfix
Only log warning if a kafka sink is written to after being stopped
[0.2.0]
Added
Extended documentation around configuration and data types
Changed
Renamed project from ska-sdp-data-exchange to ska-sdp-lmc-queue-connector
Renamed device class from DynamicExchangeDevice to SDPQueueConnector
Renamed entrypoint from SDPExchange to SDPQueueConnector
Removed
Removed “Data” and “Descriptor” from config type descriminators
Removed “Data” from long class names
Fixed
Bugfix for when stopping a kafka sink whilst it is sending
[0.1.0]
Added
string, bytes, primitive and nparray data type support
Tango subscription source and tango attribute sink
Kafka Consumer source and Kafka Producer sink
In-memory source and sink
- Dynamic Exchange Device with configuration via a
Tango property or using the
Configure()
command providing JSON or a SDP Config path
Empty Python project directory structure