Getting Started

This page outlines instructions for how to setup an example pipeline using Queue Connector Device in the SDP Integration context.

Configuration

Flow instances in the SDP configuration database serve as the primary definition for connecting two streams of data together via the LMC Queue Connector.

>>> from ska_sdp_config.entity.flow import DataQueue, Flow, FlowSource
>>> pb_id = "pb-test-00000000-0000"
>>> flow = Flow(
...   key=Flow.Key(pb_id=pb_id, name="my-tango-attribute"),
...   sink=TangoAttribute(
...     attribute_trl="tango://mid-sdp/queueconnector/01/my_tango_attribute",
...     dtype="DevString",
...     max_dim_x=1,
...     max_dim_y=0,
...   ),
...   data_model="Metrics",
...   sources=[
...     FlowSource(
...       uri=Flow.Key(pb_id=pb_id, kind="dataqueue", name="my-dataqueue"),
...       function="ska-sdp-lmc-queue-connector:exchange"
...     ),
...   ]
... )

This indicates to the queue connector to stream string values on the "my-dataqueue" topic at Kafka server localhost:9092 to a new scalar tango attribute named my_tango_attribute.

Write to Configuration Database

The configuration database is the recommended approach to configuring the LMC Queue Connector at runtime. The SDP Integration Helm chart is setup to create one Queue Connector per subarray, each of which is configured to monitor |Flow| entities with pb_id belonging to the subarrays active execution block.

Using ska-sdp-scripting, the equivalent to the above configuration can be published to the configuration database using the following:

>>> import ska_sdp_scripting
>>> from ska_sdp_config.entity.flow import DataQueue, FlowSource
>>> with ska_sdp_scripting.ProcessingBlock() as pb:
...   for txn in pb._config.txn():
...     subarray_id = txn.execution_block.get(pb._eb_id).subarray_id
...   pb.create_data_flow(
...     name="my-tango-attribute",
...     data_model="Metrics",
...     sink=TangoAttribute(
...       attribute_trl=f"tango://mid-sdp/queueconnector/{subarray_id}/my_tango_attribute",
...       dtype="DevString",
...       max_dim_x=1,
...       max_dim_y=0,
...     ),
...     sources=[
...       FlowSource(
...         uri=Flow.Key(pb_id=pb_id, kind="dataqueue", name="my-dataqueue"),
...         function="ska-sdp-lmc-queue-connector:exchange"
...       ),
...     ]
...   )
...   with pb.create_phase("Work", []) as work_phase:
...     work_phase.update_pb_state(status=ProcessingBlockStatus.READY)
...     log.info("Done, now idling...")
...     work_phase.wait_loop(work_phase.is_eb_finished)

This will publish the corresponding flow to the configuration database and wait until the execution block is finished before automatically removing it from the database.

Note

This approach does not require the script performing configuration to install and use the pytango API.

Testing

The following snippets describe how to test the LMC Queue Connector is streaming the above flow definition.

Subscribe to Exchange Sinks

Since the Queue Connector indefinitely streams data until instructed to stop via providing a new configuration, a component intended to receive data from the Queue Connector should subscribe to the endpoint the device writes to. In the case of a Tango attribute this can be done using tango subscriptions:

>>> import tango
>>> proxy = tango.DeviceProxy('tango://mid-sdp/queueconnector/01')
>>> proxy.subscribe_event("my_tango_attribute", tango.EventType.CHANGE_EVENT, print)

Stream Data

For this example, the specified Kafka topic now automatically pushes data to the subscribed Queue Connector, and will start immediately after the new config is detected and read.

>>> import aiokafka
>>> import asyncio
>>> async def stream():
...   async with aiokafka.AIOKafkaProducer(bootstrap_servers="localhost:9092") as producer:
...     for message in ["Hello", "world!"]:
...         await producer.send_and_wait("my-dataqueue", message)
...         await asyncio.sleep(1)
...
>>> asyncio.run(stream())

As data is sent to Kafka, the Tango subscription handler will trigger on background thread and print.

End Streaming

When the stream becomes idle (in this example after 2 seconds), any Queue Connector attributes will remain on the device, e.g.

>>> print(proxy.read_attribute("my-dataqueue"))
"World!"

At this point the queue connector will keep streaming to the tango attribute until the flow disappears from the database when the execution block is finished. Existing tango attributes from previous configuration will be removed (even if a newly detected config also defines them).