Flow Configuration

SDP LMC Queue Connector is configurable via Flow entities in the SDP configuration database. Flows of matching pb_id are converted to a single collection of Exchange instance that share a common state machine (see State Model).

Flow instances are detected automatically as they apear in the configuration database that meet the following conditions:

Flow Compatibility

DataQueue

A DataQueue flow represents data flowing through a Message Queue service such as Kafka. DataQueue’s are capable of transfering any data serializable to and from raw bytes.

Supported as either a source or a sink.

>>> from ska_sdp_config.entity.flow import DataQueue, Flow, FlowSource
>>> pb_id = "pb-test-00000000-0000"
>>> flow = Flow(
...   key=Flow.Key(pb_id=pb_id, name="my-dataqueue"),
...   sink=DataQueue(  # sink example
...     host="kafka://localhost:9092",
...     topics="my-topic",
...     format="json"
...   ),
...   data_model="Metrics",
...   sources=[
...     FlowSource(  # source key example
...       uri=Flow.Key(pb_id=pb_id, kind="dataqueue", name="other-dataqueue"),
...       function="ska-sdp-lmc-queue-connector:exchange"
...     ),
...   ]
... )

TangoAttribute

A TangoAttribute flow represents data flowing through a Tango attribute using subscriptions. Tango attributes support only primitives and up to 2D fixed-dimensional arrays of primitives listed in the pytango documentation.

Supported as either a source or a sink.

>>> from ska_sdp_config.entity.flow import Flow, FlowSource, TangoAttribute
>>> pb_id = "pb-test-00000000-0000"
>>> flow = Flow(
...   key=Flow.Key(pb_id=pb_id, name="my-tango-attribute"),
...   sink=TangoAttribute(  # sink example
...     attribute_trl="tango://mid-sdp/queueconnector/01/my_tango_attribute",
...     dtype="DevString",
...     max_dim_x=1,
...     max_dim_y=0,
...   ),
...   data_model="str[]",
...   sources=[
...     FlowSource(  # source key example
...       uri=Flow.Key(pb_id=pb_id, kind="tango", name="other-tango-attribute"),
...       function="ska-sdp-lmc-queue-connector:exchange"
...     ),
...     FlowSource(  # source url example
...       uri="tango://mid-sdp/subarray/01/status",
...       function="ska-sdp-lmc-queue-connector:exchange"
...     ),
...   ]
... )

TangoAttributeMap

A TangoAttributeMap map represents data flowing through multiple Tango attributes. It is required for moving hierarchical and structured data through Tango, such as JSON, xarray Datasets, structured numpy arrays.

Supported only as a sink.

>>> from ska_sdp_config.entity.flow import Flow, FlowSource, TangoAttribute, TangoAttributeMap
>>> pb_id = "pb-test-00000000-0000"
>>> flow = Flow(
...   key=Flow.Key(pb_id=pb_id, name="my-tango-attribute-map"),
...   sink=TangoAttributeMap(  # sink example
...     attributes=[
...       (
...         TangoAttribute(  # sink example
...           attribute_trl="tango://mid-sdp/queueconnector/01/my_attribute",
...           dtype="DevString",
...           max_dim_x=1,
...         ),
...         TangoAttributeMap.DataQuery(
...           select="@.my_attribute",
...           when="@.my_attribute != null",
...         )
...       ),
...       (
...         TangoAttribute(  # sink example
...           attribute_trl="tango://mid-sdp/queueconnector/01/my_array",
...           dtype="DevDouble",
...           max_dim_x=2,
...           max_dim_y=2,
...         ),
...         TangoAttributeMap.DataQuery(
...           select="@.xarray_dataset.data_vars.gain",
...         )
...       ),
...     ],
...   ),
...   data_model="Visibility",
...   sources=[
...     # not supported as a source
...   ]
... )