Getting Started

This page outlines instructions for how to setup an example pipeline using Queue Connector Device in the SDP Integration context.

Configuration

A JSON instantiation of QueueConnectorDescriptor serves as the configuration for the LMC QueueConnector is required to turn the device on. A minimal example of this is follows:

{
    "exchanges": [
        {
            "dtype": "string",
            "shape": [],
            "source": {
                "type": "KafkaConsumerSource",
                "servers": "localhost:9092",
                "topic": "test_topic",
                "encoding": "utf-8"
            },
            "sink": {
                "type": "TangoLocalAttributeSink",
                "attribute_name": "message"
            }
        }
    ]
}

This indicates to the queue connector to stream string values read from the test_topic topic on the Kafka server running on localhost:9092 to a new scalar tango attribute named message.

Write to Configuration Database

The configuration database is the recommended approach to configuring the LMC Queue Connector by populating the exchanges_config_path device property. The SDP Integration Helm chart is setup to create one Queue Connector per subarray, each of which is configured to monitor JSON configs at direct child paths of '/component/lmc-queueconnector-<subarray-id>/owner' in the database.

Using ska-sdp-config, write the above configuration to the database location from a container inside the same cluster as the device, e.g.:

import ska_sdp_config
import json
configdb = ska_sdp_config.Config()
subarray_id = 1
for txn in configdb.txn():
    txn._create(
        f'/component/lmc-queueconnector-{subarray_id:02d}/owner/my_id',
        json.dumps({
            "exchanges": [
                {
                    "dtype": "string",
                    "shape": [],
                    "source": {
                        "type": "KafkaConsumerSource",
                        "servers": "localhost:9092",
                        "topic": "test_topic",
                        "encoding": "utf-8"
                    },
                    "sink": {
                        "type": "TangoLocalAttributeSink",
                        "attribute_name": "message"
                    }
                }
            ]
        })
    )

Note

This approach does not require the software performing configuration to install and use the tango API, only ska-sdp-config.

Subscribe to Exchange Sinks

Since the Queue Connector indefinitely streams data until instructed to stop via providing a new configuration, a component intended to receive data from the Queue Connector should subscribe to the endpoint the device writes to. In the case of a Tango attribute this can be done using tango subscriptions:

>>> import tango
>>> proxy = tango.DeviceProxy('test-sdp/queueconnector/01')
>>> proxy.subscribe_event("message", tango.EventType.CHANGE_EVENT, print)

Stream Data

For this example, the specified Kafka topic now automatically pushes data to the subscribed Queue Connector, and will start immediately after the new config is detected and read.

>>> import aiokafka
>>> import asyncio
>>> async def stream():
...     async with aiokafka.AIOKafkaProducer(boostrap_servers="test_topic") as producer:
...         for message in ["Hello", "world!"]:
...             await producer.send_and_wait(message)
...             await asyncio.sleep(1)
...
>>> asyncio.run(stream())

As data is sent to Kafka, the Tango subscription handler will trigger on background thread and print.

End Streaming

When the stream becomes idle (in this example after 2 seconds), any Queue Connector attributes will remain on the device, e.g.

>>> print(proxy.read_attribute("message"))
"World!"

At this point the device can either be stopped by writing an empty config to the configuration database or be reconfigured by writing a new configuration. Any existing tango attributes from the previous configuration will be removed (even if a newly detected config also defines them).