"""Serializeable descriptors for configuring queue connector devices"""
import asyncio
import logging
from collections.abc import Sequence
from contextlib import aclosing
from typing import Annotated
from pydantic import Field
import ska_sdp_lmc_queue_connector.astream_utils as astream
from ska_sdp_lmc_queue_connector.dataqueue_sourcesink import (
DataQueueConsumerSourceDescriptor,
DataQueueProducerSinkDescriptor,
)
from ska_sdp_lmc_queue_connector.in_memory_sourcesink import (
InMemorySinkDescriptor,
InMemorySourceDescriptor,
)
from ska_sdp_lmc_queue_connector.kafka_sourcesink import (
KafkaConsumerSourceDescriptor,
KafkaProducerSinkDescriptor,
)
from ska_sdp_lmc_queue_connector.pipe.buffer_pipe import BufferWithTimePipeDescriptor
from ska_sdp_lmc_queue_connector.pipe.default_pipe import DefaultPipeDescriptor
from ska_sdp_lmc_queue_connector.pydantic import DType, SkaBaseModel
from ska_sdp_lmc_queue_connector.sourcesink import DataPipe, DataSink, DataSource
from ska_sdp_lmc_queue_connector.tango_array_scatter_sink import (
TangoArrayScatterAttributeSinkDescriptor,
)
from ska_sdp_lmc_queue_connector.tango_object_scatter_sink import (
TangoObjectScatterAttributeSinkDescriptor,
)
from ska_sdp_lmc_queue_connector.tango_pointing_source import (
TangoDishPointingSubscriptionSourceDescriptor,
TangoMccsPointingSubscriptionSourceDescriptor,
TangoMccsTrackingSubscriptionSourceDescriptor,
)
from ska_sdp_lmc_queue_connector.tango_sourcesink import (
TangoLocalAttributeSinkDescriptor,
TangoRemoteAttributeSinkDescriptor,
TangoSubscriptionSourceDescriptor,
)
logger = logging.getLogger(__name__)
SourceDescriptor = Annotated[
InMemorySourceDescriptor
| TangoSubscriptionSourceDescriptor
| TangoDishPointingSubscriptionSourceDescriptor
| TangoMccsPointingSubscriptionSourceDescriptor
| TangoMccsTrackingSubscriptionSourceDescriptor
| KafkaConsumerSourceDescriptor
| DataQueueConsumerSourceDescriptor,
Field(discriminator="type"),
]
SinkDescriptor = Annotated[
InMemorySinkDescriptor
| TangoLocalAttributeSinkDescriptor
| TangoArrayScatterAttributeSinkDescriptor
| TangoObjectScatterAttributeSinkDescriptor
| TangoRemoteAttributeSinkDescriptor
| KafkaProducerSinkDescriptor
| DataQueueProducerSinkDescriptor,
Field(discriminator="type"),
]
PipeDescriptor = Annotated[
DefaultPipeDescriptor | BufferWithTimePipeDescriptor,
Field(discriminator="type"),
]
[docs]
class ExchangeDescriptor(SkaBaseModel):
"""Descriptor for instantiating an exchange."""
dtype: DType = "str"
"""Python primitive, numpy dtype or tango dtype of the dynamic attribute"""
shape: list = []
"""The in-memory data shape in C-order"""
source: SourceDescriptor | list[SourceDescriptor]
"""One or more data source descriptors to be read by the exchange"""
sink: SinkDescriptor
"""A data sink descriptor to be written to by the exchange"""
pipe: PipeDescriptor = DefaultPipeDescriptor()
"""A pipe operator to be applied between source and sink read and write"""
[docs]
class Exchange:
"""
A container representing a connection between a source
and sink and handles asynchronous streaming between them.
"""
def __init__(
self,
sources: Sequence[DataSource],
sink: DataSink,
pipe: DataPipe,
):
self.sources = sources
self.sink = sink
self.pipe = pipe
[docs]
async def start(self):
"""Invokes start on the sinks and sources."""
await self.sink.start()
await asyncio.gather(*[source.start() for source in self.sources])
[docs]
async def stop(self):
"""Invokes stop on the sinks and sources."""
await asyncio.gather(*[source.stop() for source in self.sources])
await self.sink.stop()
[docs]
async def run(self):
"""Asynchronously runs the exchange connecting source payloads to the sink.
Successful completion without exceptions guarantees that no payloads
are lost between sources and sinks.
"""
async with aclosing(self.pipe(astream.merge(*self.sources))) as stream:
async for value in stream:
logger.debug(
"Writing new value to sink(%s): %s",
type(self.sink),
value,
)
await self.sink.awrite(value)