Source code for ska_sdp_lmc_queue_connector.exchange

"""Serializeable descriptors for configuring queue connector devices"""

import asyncio
import logging
from collections.abc import Sequence
from contextlib import aclosing
from typing import Annotated

from pydantic import Field

import ska_sdp_lmc_queue_connector.astream_utils as astream
from ska_sdp_lmc_queue_connector.dataqueue_sourcesink import (
    DataQueueConsumerSourceDescriptor,
    DataQueueProducerSinkDescriptor,
)
from ska_sdp_lmc_queue_connector.in_memory_sourcesink import (
    InMemorySinkDescriptor,
    InMemorySourceDescriptor,
)
from ska_sdp_lmc_queue_connector.kafka_sourcesink import (
    KafkaConsumerSourceDescriptor,
    KafkaProducerSinkDescriptor,
)
from ska_sdp_lmc_queue_connector.pipe.buffer_pipe import BufferWithTimePipeDescriptor
from ska_sdp_lmc_queue_connector.pipe.default_pipe import DefaultPipeDescriptor
from ska_sdp_lmc_queue_connector.pydantic import DType, SkaBaseModel
from ska_sdp_lmc_queue_connector.sourcesink import DataPipe, DataSink, DataSource
from ska_sdp_lmc_queue_connector.tango_array_scatter_sink import (
    TangoArrayScatterAttributeSinkDescriptor,
)
from ska_sdp_lmc_queue_connector.tango_object_scatter_sink import (
    TangoObjectScatterAttributeSinkDescriptor,
)
from ska_sdp_lmc_queue_connector.tango_pointing_source import (
    TangoDishPointingSubscriptionSourceDescriptor,
    TangoMccsPointingSubscriptionSourceDescriptor,
    TangoMccsTrackingSubscriptionSourceDescriptor,
)
from ska_sdp_lmc_queue_connector.tango_sourcesink import (
    TangoLocalAttributeSinkDescriptor,
    TangoRemoteAttributeSinkDescriptor,
    TangoSubscriptionSourceDescriptor,
)

logger = logging.getLogger(__name__)


SourceDescriptor = Annotated[
    InMemorySourceDescriptor
    | TangoSubscriptionSourceDescriptor
    | TangoDishPointingSubscriptionSourceDescriptor
    | TangoMccsPointingSubscriptionSourceDescriptor
    | TangoMccsTrackingSubscriptionSourceDescriptor
    | KafkaConsumerSourceDescriptor
    | DataQueueConsumerSourceDescriptor,
    Field(discriminator="type"),
]
SinkDescriptor = Annotated[
    InMemorySinkDescriptor
    | TangoLocalAttributeSinkDescriptor
    | TangoArrayScatterAttributeSinkDescriptor
    | TangoObjectScatterAttributeSinkDescriptor
    | TangoRemoteAttributeSinkDescriptor
    | KafkaProducerSinkDescriptor
    | DataQueueProducerSinkDescriptor,
    Field(discriminator="type"),
]
PipeDescriptor = Annotated[
    DefaultPipeDescriptor | BufferWithTimePipeDescriptor,
    Field(discriminator="type"),
]


[docs] class ExchangeDescriptor(SkaBaseModel): """Descriptor for instantiating an exchange.""" dtype: DType = "str" """Python primitive, numpy dtype or tango dtype of the dynamic attribute""" shape: list = [] """The in-memory data shape in C-order""" source: SourceDescriptor | list[SourceDescriptor] """One or more data source descriptors to be read by the exchange""" sink: SinkDescriptor """A data sink descriptor to be written to by the exchange""" pipe: PipeDescriptor = DefaultPipeDescriptor() """A pipe operator to be applied between source and sink read and write"""
[docs] class Exchange: """ A container representing a connection between a source and sink and handles asynchronous streaming between them. """ def __init__( self, sources: Sequence[DataSource], sink: DataSink, pipe: DataPipe, ): self.sources = sources self.sink = sink self.pipe = pipe
[docs] async def start(self): """Invokes start on the sinks and sources.""" await self.sink.start() await asyncio.gather(*[source.start() for source in self.sources])
[docs] async def stop(self): """Invokes stop on the sinks and sources.""" await asyncio.gather(*[source.stop() for source in self.sources]) await self.sink.stop()
[docs] async def run(self): """Asynchronously runs the exchange connecting source payloads to the sink. Successful completion without exceptions guarantees that no payloads are lost between sources and sinks. """ async with aclosing(self.pipe(astream.merge(*self.sources))) as stream: async for value in stream: logger.debug( "Writing new value to sink(%s): %s", type(self.sink), value, ) await self.sink.awrite(value)