API
SDP Queue Connector Device
- class ska_sdp_lmc_queue_connector.sdp_queue_connector.SDPQueueConnector(*args: Any, **kwargs: Any)[source]
A dynamically configured tango device for sending and receiving data to and from data services.
- exchanges_config_path: str
Configuration Database Path to a JSON configuration
- generic_read(attr: tango.Attr)[source]
Generic method for reading the internal device values into the output attribute argument.
- Parameters:
attr (tango.Attr) – output tango attribute
- async group_configure(configuration: str, group_name_override: str | None = None)[source]
Commands the device to load exchanges using a provided config json string.
- Parameters:
configuration (str | None) – QueueConnectorDescriptor json. None
config. (value is considered as an empty) –
group_name_override (str | None) – When provided, overrides the
QueueConnectorDescriptor. (group name inside the) –
- async reconfigure(group_name: str, config_value: str | None)[source]
Tries commanding to stop, reconfigure and start the device depending on the current state.
- Parameters:
config (str | None) – the new configuration json string.
- set_attr(name: str, value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str, push_event=True)[source]
Sets the internal attribute and optionally pushes a change event.
- Parameters:
name (str) – name of attribute to set
value (DataType) – value to set
push_event (bool | None, optional) – Whether to push an event.
to (Setting to None will push if polling is not active. Defaults) –
None. –
- class ska_sdp_lmc_queue_connector.sdp_queue_connector.QueueConnectorDescriptor(*args: Any, **kwargs: Any)[source]
Primary JSON serializable descriptor for configuring a queue connector device. Note: exchanges as dictionary of groups is experimental and not fully supported via configuration database.
- exchanges: Dict[str | None, List[ExchangeDescriptor]] | List[ExchangeDescriptor] | None = None
Mapping or list of exchanges for a queue connector to process when running.
Exchange Interfaces
- class ska_sdp_lmc_queue_connector.exchange.ExchangeDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor for instantiating an exchange.
- dtype: numpy.dtype = 'str'
Python primitive, numpy dtype or tango dtype of the dynamic attribute
- pipe: DefaultPipeDescriptor | BufferWithTimePipeDescriptor
A pipe operator to be applied between source and sink read and write
- shape: list = []
Data shape used by numpy and tango
- sink: InMemorySinkDescriptor | TangoLocalAttributeSinkDescriptor | TangoArrayScatterAttributeSinkDescriptor | TangoObjectScatterAttributeSinkDescriptor | TangoRemoteAttributeSinkDescriptor | KafkaProducerSinkDescriptor
A data sink descriptor to be written to by the exchange
- source: InMemorySourceDescriptor | TangoSubscriptionSourceDescriptor | TangoPointingSubscriptionSourceDescriptor | KafkaConsumerSourceDescriptor | list[InMemorySourceDescriptor | TangoSubscriptionSourceDescriptor | TangoPointingSubscriptionSourceDescriptor | KafkaConsumerSourceDescriptor]
One or more data source descriptors to be read by the exchange
- class ska_sdp_lmc_queue_connector.exchange.Exchange(sources: Sequence[DataSource], sink: DataSink, pipe: DataPipe)[source]
A container representing a connection between a source and sink and handles asynchronous streaming between them.
Data Source, Sink and Pipe Interfaces
- class ska_sdp_lmc_queue_connector.sourcesink.DataSource[source]
Interface for an object containing data that can be asynchronously read.
- class ska_sdp_lmc_queue_connector.sourcesink.DataSink[source]
Interface for an object that receives data that can be asynchronously written to.
- abstract async awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)[source]
Writes a single data entry to the sink
- class ska_sdp_lmc_queue_connector.sourcesink.DataPipe[source]
Functor interface for pipe operators to perform on python data between a DataSource and DataSink.
- abstract property output_dtype: numpy.dtype
Dtype of the functor stream output
- abstract property output_shape: list[int]
Shape of the functor stream output
Tango
TangoSubscriptionSource
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoSubscriptionSourceDescriptor(*args: Any, **kwargs: Any)[source]
- attribute_name: str
Attribute name the subscription is to
- device_name: str
Device name containing the attribute the subscription is to
- etype: SchemaEventType
The type of attribute event to listen for
- stateless: bool = True
When True will retry subscribing every 10 seconds if failed subscription, otherwise will raise an exception on start if False.
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoSubscriptionSource(desc: TangoSubscriptionSourceDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
A DataSource populated from a subscription to tango attribute events.
- start()
Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
TangoPointingSubscriptionSource
- class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoPointingSubscriptionSourceDescriptor(*args: Any, **kwargs: Any)[source]
- class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoPointingSubscriptionSource(desc: TangoPointingSubscriptionSourceDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
A specialized tango attribute source for converting pointings to a structured type.
TangoAttributeSink
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoLocalAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]
- attribute_name: str
Attribute name to dynamically add to the SDPQueueConnector
- default_value: SchemaDataType = ''
Starting value of the dynamic attribute before a source is read
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoLocalAttributeSink(desc: TangoLocalAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
A DataSink that publishes data to a tango attribute on the Tango Device member.
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
TangoArrayScatterAttributeSink
- class ska_sdp_lmc_queue_connector.tango_array_scatter_sink.TangoArrayScatterAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]
A Tango Attribute Sink for splitting and scattering of ndarray data.
- attribute_names: list[str]
Ordered attributute names to dynamically add the SDPQueueConnector
- attribute_shape_names: list[str] | None = None
Optional attribute shapes names
- axis: int = 0
Axis to split on. Non-zero values may reduce performance.
- default_value: SchemaDataType = ''
Starting value of the dynamic attribute before a source is read
- indices: list[int] | None = None
Optional ordered indexes of the split boundaries
- class ska_sdp_lmc_queue_connector.tango_array_scatter_sink.TangoArrayScatterAttributeSink(desc: TangoArrayScatterAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
TangoObjectScatterAttributeSink
- class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoObjectScatterAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]
A Tango Attribute Sink for splitting and scattering object heirarchy data.
- attributes: list[TangoAttributeDescriptor]
Attribute names to dynamically add the SDPQueueConnector
- class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoAttributeDescriptor(*args: Any, **kwargs: Any)[source]
An attribute descriptor describing a filter and path to to a value in JSON.
- attribute_name: str
Attribute name to dynamically add to the SDPQueueConnector
- default_value: SchemaDataType = ''
Starting value of the dynamic attribute before a source is read
- dtype: DType
Python primitive, numpy dtype or tango dtype of the dynamic attribute
- filter: str | None = None
JMESPath predicate expression for whether to update the attribute
- path: str = '@'
JMESPath expression for the location of the data to extract
- shape: list[int] = []
Maximum shape of the dynamic attribute
- class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoObjectScatterAttributeSink(desc: TangoObjectScatterAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
Kafka
KafkaConsumerSource
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaConsumerSourceDescriptor(*args: Any, **kwargs: Any)[source]
- encoding: Literal['utf-8', 'ascii', 'python', 'json', 'msgpack_numpy', 'npy', 'carray'] = 'python'
Encoding of the message bytes
- servers: str | list[str]
The Kafka broker(s) to query for metadata and setup the connection
- topic: str
The Kafka topic to read from
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaConsumerSource(descriptor: KafkaConsumerSourceDescriptor, dtype: numpy.dtype, shape: list[int])[source]
A DataSource which consumes messages from a Kafka topic
- start()
Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)
KafkaProducerSink
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaProducerSinkDescriptor(*args: Any, **kwargs: Any)[source]
- class TimestampOptions(*args: Any, **kwargs: Any)[source]
A set of kafka producer options related to extracting Kafka timestamps from dynamic data. Timestamps in dynamic data must be one of:
An offset to unix epoch in milliseconds.
A numpy datetime64 on TAI scale.
- key: str | None = None
Timestamp key for dictionary-like data types.
- reducer: Literal['min', 'max', 'mean'] | None = None
Axes reduce operation for timestamps.
- slices: tuple[int | slice, ...] = ()
Timestamp slice location for multidimensional data. Size must match number of dimensions.
- encoding: Literal['utf-8', 'ascii', 'python', 'json', 'msgpack_numpy', 'npy', 'carray'] = 'python'
The encoding of the kafka message.
- message_max_bytes: int = 1048576
The max size of encoded messages in bytes. NOTE: The 1MiB default is recommended by Kafka, sending larger messages higher than this will linearly increase latency.
- servers: str | list[str]
The Kafka broker(s) to query for metadata and setup the connection
- timestamp_options: TimestampOptions | None = None
An optional group of settings related to extracting Kafka timestamps from dynamic data. None results in using the current time.
- topic: str
The Kafka topic to write messages to
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaProducerSink(descriptor: KafkaProducerSinkDescriptor, dtype: numpy.dtype, shape: list[int])[source]
A DataSink which produces messages for a Kafka topic
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
In-Memory
InMemorySource
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySourceDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor for instantiating an InMemorySource
- data: list[Tuple[str, bytes] | numpy.ndarray | Dict[str, Any] | int | float | bool | str]
Data values to be read from the source to a sink
- delay: float = 0
Time delay in seconds before the next data value is available to read
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySource(desc: InMemorySourceDescriptor, dtype: numpy.dtype, shape: list[int] | None = None)[source]
An in-memory implementation of a DataSource for testing.
- start()
Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
InMemorySink
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySinkDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor for instantiating an InMemorySink
- key: str
Key for accessing the stored data queue via classmethod
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySink(desc: InMemorySinkDescriptor)[source]
- An in-memory implementation of a DataSink for testing. Instances of
sink data queues exist as class members referenced via a lookup key.
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
Pipes
- class ska_sdp_lmc_queue_connector.pipe.default_pipe.DefaultPipe(dtype: numpy.dtype, shape: list[int])[source]
Interface for pipeline operators to perform on python data between a DataSource and DataSink
- property output_dtype: numpy.dtype
Dtype of the functor stream output
- property output_shape: list[int]
Shape of the functor stream output
- class ska_sdp_lmc_queue_connector.pipe.buffer_pipe.BufferWithTimePipe(desc: BufferWithTimePipeDescriptor, dtype: numpy.dtype, shape: list[int])[source]
Buffers data with time window by dynamically by adding a dynamic dimension to the ouput shape and appending data until the time window closes.
- property output_dtype: numpy.dtype
Dtype of the functor stream output
- property output_shape: list[int]
Shape of the functor stream output