SKA SDP Data Queue Classes
A collection of classes that provides access to Kafka.
- enum ska_sdp_dataqueues.data_queue.Encoding(value)
The encode/decode method to use
- Member Type:
str
Valid values are as follows:
- UTF8 = <Encoding.UTF8: 'utf-8'>
- ASCII = <Encoding.ASCII: 'ascii'>
- MSGPACK_NUMPY = <Encoding.MSGPACK_NUMPY: 'msgpack_numpy'>
- NPY = <Encoding.NPY: 'npy'>
- XARRAY = <Encoding.XARRAY: 'xarray'>
- JSON = <Encoding.JSON: 'json'>
- BYTES = <Encoding.BYTES: 'bytes'>
- class ska_sdp_dataqueues.data_queue.DataQueueProducer(server: str | list[str], message_max_bytes: int = 1048576, topic: str | None = None, encoding: Encoding | None = None)
A Producer object makes a connection to a Kafka server and allows messages to be pushed to specified topics, and optionally specific partitions.
There is an option to set the maximum message size, which defaults to 1 MiB (2**20 bytes) as recommended by Kafka. Sending larger messages than this will linearly increase latency.
- Parameters:
server – The address(es) for the Kafka Broker to query for metadata and set up the connection.
topic – The default Kafka topic to write messages to
message_max_bytes – maximum message size for push
encoding – The default encoding to use (or None to specify for each message)
- async astart()
Start this producer
- async astop()
Stop this producer
- async send(data: tuple[str, bytes] | ndarray | dict[str, object] | int | float | bool | str, encoding: Encoding | None = None, schema: AnyDataclass | None = None, validate: bool = False, topic: str | None = None, partition: int | None = None) RecordMetadata
Sends data to the Kafka topic
- Parameters:
data – Data to send to Kafka topic
encoding – Encoding type
schema – Schema to validate data (either xradio DatasetSchema or a dataclass)
validate – Validate using schema?
topic – The topic to send data to. Optional. If not provided the default topic (from the constructor will be used).
partition – The explicit partition to send data to. Optional. If not provided partitions will be set automatically.
- Returns:
AIOKafka RecordMetadata
- class ska_sdp_dataqueues.data_queue.DataQueueConsumer(server: str | list[str], topics: list[str] | None = None, encoding: Encoding = Encoding.BYTES, set_group_id: bool = True, auto_offset_reset='earliest', set_consumer_offsets: bool = True)
A Consumer object makes a connection to a Kafka server and allows messages to be streamed from specified topics, and optionally specific partitions.
- Parameters:
server – The address(es) for the Kafka Broker to query for metadata and set up the connection.
topics – The Kafka topics to stream messages from.
encoding – The encoding to use, use
Noneto get back bytes.set_group_id – Whether to set a group ID or not (based on server and topics)
auto_offset_reset – From where the consumer should start (
earliest|latest)set_consumer_offsets – Whether to set the consumer offsets.
- async astart()
Start this consumer and adjust its offsets
- async astop()
Stop this consumer
- update_topics(topics: list[str]) None
Update the list of topics that can be consumed.
Note this will replace all topics subscribed to.
- assign_topics(topic_partitions: list[tuple[str, int]]) None
Update the list of topics (and partitions) that can be consumed.
Note this will replace all topics subscribed to.
Example call:
admin.assign_topics([("topic-name", 0), ("topic-name", 3)])Would then consume messages from
topic-nameusing only partitions0and3
- class ska_sdp_dataqueues.data_queue.DataQueueAdmin(server: str | list[str])
An Admin object allows for priveledge access to the cluster, this can be used to create new topics/partitions, and to list the current topics/partitions.
- Parameters:
server – The address(es) for the Kafka Broker to query for metadata and set up the connection.
- async list_topics() list[dict]
List the currently created topics
- async create_topics(topics: list[str] | list[tuple[str, int]], topic_configuration: dict | None = None)
Create a list of topics, or create topics with partition counts.
topicsshould either be:topics=["topic1", "topic2"]topics=[("topic1", 1), ("topic2", 10)]
The two types can be mixed, and in most cases the first should be used.
- class ska_sdp_dataqueues.data_queue.DeprecatedAwaitable(value, message: str)
Stub Awaitable that raises DeprecationWarning when awaited.