Source code for ska_sdp_lmc_queue_connector.sourcesink
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import AsyncGenerator, AsyncIterator
from typing import Annotated, Any, Dict, Tuple
import numpy as np
from pydantic import Field
from ska_sdp_lmc_queue_connector.pydantic import B64Bytes, NDArray
logger = logging.getLogger(__name__)
DataType = Tuple[str, bytes] | np.ndarray | Dict[str, object] | int | float | bool | str
"""
Python compatible datatypes for exchange streams.
"""
SchemaDataType = Annotated[
tuple[str, B64Bytes] | NDArray | dict[str, Any] | int | float | bool | str,
Field(union_mode="left_to_right"),
]
"""
Pydantic schema compatible datatypes for exchange streams.
"""
[docs]
class DataSource(AsyncIterator[DataType], metaclass=ABCMeta):
"""
Interface for an object containing data that can be
asynchronously read.
"""
[docs]
@abstractmethod
async def start(self):
"""
Asynchronously performs all additional initialization
before reading (e.g. connecting to socket endpoints)
"""
[docs]
@abstractmethod
async def stop(self):
"""
Asynchronously performs all end of stream destruction
after reading (e.g. closing sockets)
"""
@abstractmethod
async def __anext__(self) -> DataType:
"""
Asynchronously pulls the next item in the source stream
backbuffer.
When the backbuffer is empty, will suspend until a new item
arrives, otherwise will raise StopAsyncIteration after
:method:`stop()` is called.
Raises:
StopAsyncIteration
"""
[docs]
class DataSink(metaclass=ABCMeta):
"""
Interface for an object that receives data that can be
asynchronously written to.
"""
[docs]
@abstractmethod
async def start(self):
"""
Asynchronously performs all additional initialization
before writing (e.g. waiting for socket connections)
"""
[docs]
@abstractmethod
async def stop(self):
"""
Asynchronously performs all end of stream destruction
after reading (e.g. closing sockets)
"""
[docs]
@abstractmethod
async def awrite(self, value: DataType):
"""Writes a single data entry to the sink."""
[docs]
class DataPipe(metaclass=ABCMeta):
"""
Functor interface for pipe operators to perform on python
data between a DataSource and DataSink.
"""
@property
@abstractmethod
def output_dtype(self) -> np.dtype:
"""Dtype of the functor stream output"""
@property
@abstractmethod
def output_shape(self) -> list[int]:
"""Shape of the functor stream output"""
@abstractmethod
def __call__(self, iterator: AsyncIterator) -> AsyncGenerator:
"""
Functor method that invokes pipeline operators on an
async data stream to an adapted stream of output_dtype
and output_shape.
Args:
iterator: input async iterator.
Returns:
output async iterator.
"""