API

SDP Queue Connector Device

class ska_sdp_lmc_queue_connector.sdp_queue_connector.SDPQueueConnector(*args: Any, **kwargs: Any)[source]

A dynamically configured tango device for sending and receiving data to and from data services.

exchanges_config_path: str

Configuration Database Path to a JSON configuration

generic_read(attr: tango.Attr)[source]

Generic method for reading the internal device values into the output attribute argument.

Parameters:

attr (tango.Attr) – output tango attribute

async group_configure(configuration: str, group_name_override: str | None = None)[source]

Commands the device to load exchanges using a provided config json string.

Parameters:
  • configuration (str | None) – QueueConnectorDescriptor json. None

  • config. (value is considered as an empty) –

  • group_name_override (str | None) – When provided, overrides the

  • QueueConnectorDescriptor. (group name inside the) –

async reconfigure(group_name: str, config_value: str | None)[source]

Tries commanding to stop, reconfigure and start the device depending on the current state.

Parameters:

config (str | None) – the new configuration json string.

set_attr(name: str, value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str, push_event=True)[source]

Sets the internal attribute and optionally pushes a change event.

Parameters:
  • name (str) – name of attribute to set

  • value (DataType) – value to set

  • push_event (bool | None, optional) – Whether to push an event.

  • to (Setting to None will push if polling is not active. Defaults) –

  • None.

async watch_database_and_reconfigure(database_path: str)[source]

Long running awaitable that watches the configuration database and automatically tries reconfiguring and the device.

Parameters:

database_path (str) – database path to watch

class ska_sdp_lmc_queue_connector.sdp_queue_connector.QueueConnectorDescriptor(*args: Any, **kwargs: Any)[source]

Primary JSON serializable descriptor for configuring a queue connector device. Note: exchanges as dictionary of groups is experimental and not fully supported via configuration database.

exchanges: Dict[str | None, List[ExchangeDescriptor]] | List[ExchangeDescriptor] | None = None

Mapping or list of exchanges for a queue connector to process when running.

Exchange Interfaces

class ska_sdp_lmc_queue_connector.exchange.ExchangeDescriptor(*args: Any, **kwargs: Any)[source]

Descriptor for instantiating an exchange.

dtype: numpy.dtype = 'str'

Python primitive, numpy dtype or tango dtype of the dynamic attribute

pipe: DefaultPipeDescriptor | BufferWithTimePipeDescriptor

A pipe operator to be applied between source and sink read and write

shape: list = []

Data shape used by numpy and tango

sink: InMemorySinkDescriptor | TangoLocalAttributeSinkDescriptor | TangoArrayScatterAttributeSinkDescriptor | TangoObjectScatterAttributeSinkDescriptor | TangoRemoteAttributeSinkDescriptor | KafkaProducerSinkDescriptor

A data sink descriptor to be written to by the exchange

source: InMemorySourceDescriptor | TangoSubscriptionSourceDescriptor | TangoPointingSubscriptionSourceDescriptor | KafkaConsumerSourceDescriptor | list[InMemorySourceDescriptor | TangoSubscriptionSourceDescriptor | TangoPointingSubscriptionSourceDescriptor | KafkaConsumerSourceDescriptor]

One or more data source descriptors to be read by the exchange

class ska_sdp_lmc_queue_connector.exchange.Exchange(sources: Sequence[DataSource], sink: DataSink, pipe: DataPipe)[source]

A container representing a connection between a source and sink and handles asynchronous streaming between them.

async run()[source]

Asynchronously runs the exchange connecting source payloads to the sink.

Running this to completion without exceptions guarentees no payloads are lost between sources and sinks.

async start()[source]

Invokes start on the sinks and sources.

async stop()[source]

Invokes stop on the sinks and sources.

Data Source, Sink and Pipe Interfaces

class ska_sdp_lmc_queue_connector.sourcesink.DataSource[source]

Interface for an object containing data that can be asynchronously read.

abstract async start()[source]

Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)

abstract async stop()[source]

Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)

class ska_sdp_lmc_queue_connector.sourcesink.DataSink[source]

Interface for an object that receives data that can be asynchronously written to.

abstract async awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)[source]

Writes a single data entry to the sink

abstract async start()[source]

Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)

abstract async stop()[source]

Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)

class ska_sdp_lmc_queue_connector.sourcesink.DataPipe[source]

Functor interface for pipe operators to perform on python data between a DataSource and DataSink.

abstract property output_dtype: numpy.dtype

Dtype of the functor stream output

abstract property output_shape: list[int]

Shape of the functor stream output

Tango

TangoSubscriptionSource

class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoSubscriptionSourceDescriptor(*args: Any, **kwargs: Any)[source]
attribute_name: str

Attribute name the subscription is to

device_name: str

Device name containing the attribute the subscription is to

etype: SchemaEventType

The type of attribute event to listen for

stateless: bool = True

When True will retry subscribing every 10 seconds if failed subscription, otherwise will raise an exception on start if False.

class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoSubscriptionSource(desc: TangoSubscriptionSourceDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]

A DataSource populated from a subscription to tango attribute events.

start()

Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)

stop()

Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)

TangoPointingSubscriptionSource

class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoPointingSubscriptionSourceDescriptor(*args: Any, **kwargs: Any)[source]
class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoPointingSubscriptionSource(desc: TangoPointingSubscriptionSourceDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]

A specialized tango attribute source for converting pointings to a structured type.

TangoAttributeSink

class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoLocalAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]
attribute_name: str

Attribute name to dynamically add to the SDPQueueConnector

default_value: SchemaDataType = ''

Starting value of the dynamic attribute before a source is read

class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoLocalAttributeSink(desc: TangoLocalAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]

A DataSink that publishes data to a tango attribute on the Tango Device member.

awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)

Writes a single data entry to the sink

start()

Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)

stop()

Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)

TangoArrayScatterAttributeSink

class ska_sdp_lmc_queue_connector.tango_array_scatter_sink.TangoArrayScatterAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]

A Tango Attribute Sink for splitting and scattering of ndarray data.

attribute_names: list[str]

Ordered attributute names to dynamically add the SDPQueueConnector

attribute_shape_names: list[str] | None = None

Optional attribute shapes names

axis: int = 0

Axis to split on. Non-zero values may reduce performance.

default_value: SchemaDataType = ''

Starting value of the dynamic attribute before a source is read

indices: list[int] | None = None

Optional ordered indexes of the split boundaries

class ska_sdp_lmc_queue_connector.tango_array_scatter_sink.TangoArrayScatterAttributeSink(desc: TangoArrayScatterAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)

Writes a single data entry to the sink

start()

Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)

stop()

Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)

TangoObjectScatterAttributeSink

class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoObjectScatterAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]

A Tango Attribute Sink for splitting and scattering object heirarchy data.

attributes: list[TangoAttributeDescriptor]

Attribute names to dynamically add the SDPQueueConnector

class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoAttributeDescriptor(*args: Any, **kwargs: Any)[source]

An attribute descriptor describing a filter and path to to a value in JSON.

attribute_name: str

Attribute name to dynamically add to the SDPQueueConnector

default_value: SchemaDataType = ''

Starting value of the dynamic attribute before a source is read

dtype: DType

Python primitive, numpy dtype or tango dtype of the dynamic attribute

filter: str | None = None

JMESPath predicate expression for whether to update the attribute

path: str = '@'

JMESPath expression for the location of the data to extract

shape: list[int] = []

Maximum shape of the dynamic attribute

class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoObjectScatterAttributeSink(desc: TangoObjectScatterAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)

Writes a single data entry to the sink

start()

Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)

stop()

Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)

Kafka

KafkaConsumerSource

class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaConsumerSourceDescriptor(*args: Any, **kwargs: Any)[source]
encoding: Literal['utf-8', 'ascii', 'python', 'json', 'msgpack_numpy', 'npy', 'carray'] = 'python'

Encoding of the message bytes

servers: str | list[str]

The Kafka broker(s) to query for metadata and setup the connection

topic: str

The Kafka topic to read from

class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaConsumerSource(descriptor: KafkaConsumerSourceDescriptor, dtype: numpy.dtype, shape: list[int])[source]

A DataSource which consumes messages from a Kafka topic

start()

Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)

async stop()[source]

Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)

KafkaProducerSink

class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaProducerSinkDescriptor(*args: Any, **kwargs: Any)[source]
class TimestampOptions(*args: Any, **kwargs: Any)[source]

A set of kafka producer options related to extracting Kafka timestamps from dynamic data. Timestamps in dynamic data must be one of:

  • An offset to unix epoch in milliseconds.

  • A numpy datetime64 on TAI scale.

key: str | None = None

Timestamp key for dictionary-like data types.

reducer: Literal['min', 'max', 'mean'] | None = None

Axes reduce operation for timestamps.

slices: tuple[int | slice, ...] = ()

Timestamp slice location for multidimensional data. Size must match number of dimensions.

encoding: Literal['utf-8', 'ascii', 'python', 'json', 'msgpack_numpy', 'npy', 'carray'] = 'python'

The encoding of the kafka message.

message_max_bytes: int = 1048576

The max size of encoded messages in bytes. NOTE: The 1MiB default is recommended by Kafka, sending larger messages higher than this will linearly increase latency.

servers: str | list[str]

The Kafka broker(s) to query for metadata and setup the connection

timestamp_options: TimestampOptions | None = None

An optional group of settings related to extracting Kafka timestamps from dynamic data. None results in using the current time.

topic: str

The Kafka topic to write messages to

class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaProducerSink(descriptor: KafkaProducerSinkDescriptor, dtype: numpy.dtype, shape: list[int])[source]

A DataSink which produces messages for a Kafka topic

awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)

Writes a single data entry to the sink

start()

Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)

stop()

Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)

In-Memory

InMemorySource

class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySourceDescriptor(*args: Any, **kwargs: Any)[source]

Descriptor for instantiating an InMemorySource

data: list[Tuple[str, bytes] | numpy.ndarray | Dict[str, Any] | int | float | bool | str]

Data values to be read from the source to a sink

delay: float = 0

Time delay in seconds before the next data value is available to read

class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySource(desc: InMemorySourceDescriptor, dtype: numpy.dtype, shape: list[int] | None = None)[source]

An in-memory implementation of a DataSource for testing.

start()

Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)

stop()

Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)

InMemorySink

class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySinkDescriptor(*args: Any, **kwargs: Any)[source]

Descriptor for instantiating an InMemorySink

key: str

Key for accessing the stored data queue via classmethod

class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySink(desc: InMemorySinkDescriptor)[source]
An in-memory implementation of a DataSink for testing. Instances of

sink data queues exist as class members referenced via a lookup key.

awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)

Writes a single data entry to the sink

start()

Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)

stop()

Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)

Pipes

class ska_sdp_lmc_queue_connector.pipe.default_pipe.DefaultPipe(dtype: numpy.dtype, shape: list[int])[source]

Interface for pipeline operators to perform on python data between a DataSource and DataSink

property output_dtype: numpy.dtype

Dtype of the functor stream output

property output_shape: list[int]

Shape of the functor stream output

class ska_sdp_lmc_queue_connector.pipe.buffer_pipe.BufferWithTimePipe(desc: BufferWithTimePipeDescriptor, dtype: numpy.dtype, shape: list[int])[source]

Buffers data with time window by dynamically by adding a dynamic dimension to the ouput shape and appending data until the time window closes.

property output_dtype: numpy.dtype

Dtype of the functor stream output

property output_shape: list[int]

Shape of the functor stream output