API
SDP Queue Connector Device
- class ska_sdp_lmc_queue_connector.sdp_queue_connector.SDPQueueConnector(*args: Any, **kwargs: Any)[source]
A dynamically configured tango device for sending and receiving data to and from data services.
- exchanges_config_path: str
Configuration Database template parent path for exchange configs.
Brace templates are substituted with the device subarray_id.
Examples: * /component/lmc-queueconnector-01/owner * /component/lmc-queueconnector-{}/owner
- experimental_flow: Annotated[bool, deprecated.deprecated]
Enable to automatically configure using configuration database flow configs.
- generic_read(attr: tango.Attribute)[source]
Generic read function conforming to attribute.fread interface.
- Parameters:
attr – output tango attribute
- generic_write(wattr: tango.WAttribute)[source]
Generic read function conforming to attribute.fwrite interface.
- Parameters:
wattr – input tango write attribute
- async group_configure(configuration_json: str, group_name_override: str | None = None)[source]
Commands the device to load exchanges using a provided config json string.
- Parameters:
configuration_json – QueueConnectorDescriptor json. None value is considered as an empty config.
group_name_override – When provided, overrides the group name inside the QueueConnectorDescriptor.
- async reconfigure(group_name: str, config_json: str | None)[source]
Tries commanding to stop, reconfigure and start the device depending on the current state.
- Parameters:
group_name – name of group to reconfigure.
config_json – new group config json or None for empty config.
- set_attr(name: str, value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str | None, *, push_event=True)[source]
Sets the internal attribute and optionally pushes a change event.
- Parameters:
name – name of attribute to set
value – value to set
push_event – Whether to push an event. Setting to None will push if polling is not active. Defaults to None.
- class ska_sdp_lmc_queue_connector.sdp_queue_connector.QueueConnectorDescriptor(*args: Any, **kwargs: Any)[source]
Primary JSON serializable descriptor for configuring a queue connector device.
- exchanges: dict[str | None, list[ExchangeDescriptor]] | list[ExchangeDescriptor] | None = None
Mapping or list of exchanges for a queue connector to process when running.
Exchange Interfaces
- class ska_sdp_lmc_queue_connector.exchange.ExchangeDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor for instantiating an exchange.
- dtype: Annotated[numpy.dtype, _DType] = 'str'
Python primitive, numpy dtype or tango dtype of the dynamic attribute
- pipe: Annotated[DefaultPipeDescriptor | BufferWithTimePipeDescriptor, pydantic.Field]
A pipe operator to be applied between source and sink read and write
- sink: Annotated[InMemorySinkDescriptor | TangoLocalAttributeSinkDescriptor | TangoArrayScatterAttributeSinkDescriptor | TangoObjectScatterAttributeSinkDescriptor | TangoRemoteAttributeSinkDescriptor | KafkaProducerSinkDescriptor | DataQueueProducerSinkDescriptor, pydantic.Field]
A data sink descriptor to be written to by the exchange
- source: Annotated[InMemorySourceDescriptor | TangoSubscriptionSourceDescriptor | TangoDishPointingSubscriptionSourceDescriptor | TangoMccsPointingSubscriptionSourceDescriptor | TangoMccsTrackingSubscriptionSourceDescriptor | KafkaConsumerSourceDescriptor | DataQueueConsumerSourceDescriptor, pydantic.Field] | list[Annotated[InMemorySourceDescriptor | TangoSubscriptionSourceDescriptor | TangoDishPointingSubscriptionSourceDescriptor | TangoMccsPointingSubscriptionSourceDescriptor | TangoMccsTrackingSubscriptionSourceDescriptor | KafkaConsumerSourceDescriptor | DataQueueConsumerSourceDescriptor, pydantic.Field]]
One or more data source descriptors to be read by the exchange
- class ska_sdp_lmc_queue_connector.exchange.Exchange(sources: Sequence[DataSource], sink: DataSink, pipe: DataPipe)[source]
A container representing a connection between a source and sink and handles asynchronous streaming between them.
Data Source, Sink and Pipe Interfaces
- class ska_sdp_lmc_queue_connector.sourcesink.DataSource[source]
Interface for an object containing data that can be asynchronously read.
- class ska_sdp_lmc_queue_connector.sourcesink.DataSink[source]
Interface for an object that receives data that can be asynchronously written to.
- abstract async awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)[source]
Writes a single data entry to the sink.
- class ska_sdp_lmc_queue_connector.sourcesink.DataPipe[source]
Functor interface for pipe operators to perform on python data between a DataSource and DataSink.
- abstract property output_dtype: numpy.dtype
Dtype of the functor stream output
Tango
TangoSubscriptionSource
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoSubscriptionSourceDescriptor(*args: Any, **kwargs: Any)[source]
-
- etype: SchemaEventType = 0
The type of attribute event to listen for.
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoSubscriptionSource(desc: TangoSubscriptionSourceDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
A DataSource populated from a subscription to tango attribute events.
- start()
Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
TangoDishPointingSubscriptionSource
- class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoDishPointingSubscriptionSourceDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor to TangoDishPointingSubscriptionSource.
- class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoDishPointingSubscriptionSource(desc: TangoDishPointingSubscriptionSourceDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
Specialized tango attribute source for converting Dish pointings to a structured ndarray.
TangoMccsPointingSubscriptionSource
- class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoMccsPointingSubscriptionSourceDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor to TangoMccsPointingSubscriptionSource.
- class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoMccsPointingSubscriptionSource(desc: TangoMccsPointingSubscriptionSourceDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
Specialized tango attribute source for converting MCCS station beam pointings to a structured ndarray.
TangoMccsTrackingSubscriptionSource
- class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoMccsTrackingSubscriptionSourceDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor to TangoMccsTrackingSubscriptionSource.
- class ska_sdp_lmc_queue_connector.tango_pointing_source.TangoMccsTrackingSubscriptionSource(desc: TangoMccsTrackingSubscriptionSourceDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
Specialized tango attribute source for converting MCCS station beam tracking flags to a structured ndarray.
TangoAttributeSink
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoLocalAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]
- class ska_sdp_lmc_queue_connector.tango_sourcesink.TangoLocalAttributeSink(desc: TangoLocalAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
A DataSink that publishes data to a tango attribute on the Tango Device member.
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink.
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
TangoArrayScatterAttributeSink
- class ska_sdp_lmc_queue_connector.tango_array_scatter_sink.TangoArrayScatterAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]
A Tango Attribute Sink for splitting and scattering of ndarray data.
- default_value: SchemaDataType = ''
Starting value of the dynamic attribute before a source is read
- class ska_sdp_lmc_queue_connector.tango_array_scatter_sink.TangoArrayScatterAttributeSink(desc: TangoArrayScatterAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink.
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
TangoObjectScatterAttributeSink
- class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoObjectScatterAttributeSinkDescriptor(*args: Any, **kwargs: Any)[source]
A Tango Attribute Sink for splitting and scattering object heirarchy data.
- attributes: list[TangoAttributeDescriptor]
Attribute names to dynamically add the SDPQueueConnector
- class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoAttributeDescriptor(*args: Any, **kwargs: Any)[source]
An attribute descriptor describing a filter and path to a value in JSON.
- default_value: SchemaDataType | None = None
Starting value of the dynamic attribute before a source is read
- dtype: DType
Python primitive, numpy dtype or tango dtype of the dynamic attribute
- filter: str | None = None
JMESPath predicate expression for whether to update the attribute.
For testing on https://play.jmespath.org, the deserialized message can be converted to the equivalent JSON using the following:
>>> import json, numpy >>> class NumpyArrayJSONEncoder(json.JSONEncoder): ... def default(self, obj): ... if isinstance(obj, numpy.ndarray): ... return obj.tolist() ... return json.JSONEncoder.default(self, obj) ... >>> example = numpy.array([("a", 1)], dtype=[("name", object),("value", int)]) >>> json.dumps(example, cls=NumpyArrayJSONEncoder) '[["a", 1]]'
- class ska_sdp_lmc_queue_connector.tango_object_scatter_sink.TangoObjectScatterAttributeSink(desc: TangoObjectScatterAttributeSinkDescriptor, device: SDPQueueConnector, dtype: np.dtype, shape: list[int])[source]
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink.
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
Kafka
KafkaConsumerSource
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaConsumerSourceDescriptor(*args: Any, **kwargs: Any)[source]
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaConsumerSource(descriptor: KafkaConsumerSourceDescriptor, dtype: numpy.dtype, shape: list[int])[source]
A DataSource which consumes messages from a Kafka topic
- start()
Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)
KafkaProducerSink
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaProducerSinkDescriptor(*args: Any, **kwargs: Any)[source]
- class TimestampOptions(*args: Any, **kwargs: Any)[source]
A set of kafka producer options related to overriding Kafka message timestamps from dynamic data instead of the system time when messages are produced. Timestamps in dynamic data must be one of:
An offset to unix epoch in milliseconds.
A numpy datetime64 on TAI scale.
- format: Literal['utf-8', 'ascii', 'python', 'json', 'msgpack_numpy', 'npy', 'carray'] = 'python'
The format of the Kafka message.
- message_max_bytes: int = 1048576
The max size of encoded messages in bytes. NOTE: The 1MiB default is recommended by Kafka, sending larger messages higher than this will linearly increase latency.
- timestamp_options: TimestampOptions | None = None
An optional group of settings related to extracting Kafka timestamps from dynamic data. None results in using the current time.
- class ska_sdp_lmc_queue_connector.kafka_sourcesink.KafkaProducerSink(descriptor: KafkaProducerSinkDescriptor, dtype: numpy.dtype, shape: list[int])[source]
A DataSink which produces messages for a Kafka topic
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink.
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
In-Memory
InMemorySource
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySourceDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor for instantiating an InMemorySource
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySource(desc: InMemorySourceDescriptor, dtype: numpy.dtype, shape: list[int] | None = None)[source]
An in-memory implementation of a DataSource for testing.
- start()
Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
InMemorySink
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySinkDescriptor(*args: Any, **kwargs: Any)[source]
Descriptor for instantiating an InMemorySink
- class ska_sdp_lmc_queue_connector.in_memory_sourcesink.InMemorySink(desc: InMemorySinkDescriptor)[source]
- An in-memory implementation of a DataSink for testing. Instances of
sink data queues exist as class members referenced via a lookup key.
- awrite(value: Tuple[str, bytes] | numpy.ndarray | Dict[str, object] | int | float | bool | str)
Writes a single data entry to the sink.
- start()
Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections)
- stop()
Asynchronously performs all end of stream destruction after reading (e.g. closing sockets)
Pipes
- class ska_sdp_lmc_queue_connector.pipe.default_pipe.DefaultPipe(dtype: numpy.dtype, shape: list[int])[source]
Simple pipe that does not modify the data stream.
- property output_dtype: numpy.dtype
Dtype of the functor stream output
- class ska_sdp_lmc_queue_connector.pipe.buffer_pipe.BufferWithTimePipe(desc: BufferWithTimePipeDescriptor, dtype: numpy.dtype, shape: list[int])[source]
Buffers data with time window by dynamically by adding a dynamic dimension to the ouput shape and appending data until the time window closes.
- property output_dtype: numpy.dtype
Dtype of the functor stream output
Format
- ska_sdp_lmc_queue_connector.serialization.Format
Supported serialized format of structured data.
utf-8: string data format using python __str__ method encoded as utf-8
ascii: string data format using python __str__ method encoded as ascii
python: python literal from python __repr__ method encoded as utf-8
json: json format using utf-8 format
msgpack_numpy: binary format for JSON-like data combined with numpy support
npy: binary format for numpy arrays containing type and shape
carray: raw data buffer in C/row major order