Source code for ska_sdp_lmc_queue_connector.sourcesink

import logging
from abc import ABCMeta, abstractmethod
from collections.abc import AsyncGenerator, AsyncIterator
from typing import Annotated, Any, Dict, Tuple

import numpy as np
from pydantic import Field

from ska_sdp_lmc_queue_connector.pydantic import B64Bytes, NDArray

logger = logging.getLogger(__name__)

DataType = Tuple[str, bytes] | np.ndarray | Dict[str, object] | int | float | bool | str
"""
Python compatible datatypes for exchange streams.
"""

SchemaDataType = Annotated[
    tuple[str, B64Bytes] | NDArray | dict[str, Any] | int | float | bool | str,
    Field(union_mode="left_to_right"),
]
"""
Pydantic schema compatible datatypes for exchange streams.
"""


[docs] class DataSource(AsyncIterator[DataType], metaclass=ABCMeta): """ Interface for an object containing data that can be asynchronously read. """
[docs] @abstractmethod async def start(self): """ Asynchronously performs all additional initialization before reading (e.g. connecting to socket endpoints) """
[docs] @abstractmethod async def stop(self): """ Asynchronously performs all end of stream destruction after reading (e.g. closing sockets) """
@abstractmethod async def __anext__(self) -> DataType: """ Asynchronously pulls the next item in the source stream backbuffer. When the backbuffer is empty, will suspend until a new item arrives, otherwise will raise StopAsyncIteration after :method:`stop()` is called. Raises: StopAsyncIteration """
[docs] class DataSink(metaclass=ABCMeta): """ Interface for an object that receives data that can be asynchronously written to. """
[docs] @abstractmethod async def start(self): """ Asynchronously performs all additional initialization before writing (e.g. waiting for socket connections) """
[docs] @abstractmethod async def stop(self): """ Asynchronously performs all end of stream destruction after reading (e.g. closing sockets) """
[docs] @abstractmethod async def awrite(self, value: DataType): """Writes a single data entry to the sink."""
[docs] class DataPipe(metaclass=ABCMeta): """ Functor interface for pipe operators to perform on python data between a DataSource and DataSink. """ @property @abstractmethod def output_dtype(self) -> np.dtype: """Dtype of the functor stream output""" @property @abstractmethod def output_shape(self) -> list[int]: """Shape of the functor stream output""" @abstractmethod def __call__(self, iterator: AsyncIterator) -> AsyncGenerator: """ Functor method that invokes pipeline operators on an async data stream to an adapted stream of output_dtype and output_shape. Args: iterator: input async iterator. Returns: output async iterator. """