Source code for ska_sdp_config.entity.flow

"""Flow entity Model and related sub-entities."""

from __future__ import annotations

import warnings
from typing import Annotated, Any, Literal, Sequence, TypeVar

from annotated_types import Ge
from pydantic import (
    AnyUrl,
    AwareDatetime,
    Field,
    NonNegativeInt,
    computed_field,
    model_validator,
)

from ska_sdp_config.entity.common import (
    AnyPurePath,
    KafkaUrl,
    ProcessingBlockId,
    PVCPath,
    TangoArgType,
    TangoAttributeUrl,
    TangoDataType,
)
from ska_sdp_config.entity.common.tango_trl import AnyTRL, AttributeTRL

from .base import EntityBaseModel, EntityKeyBaseModel, MultiEntityBaseModel

ScanTypeId = Annotated[
    str,
    Field(
        pattern=r"^[a-z0-9-:]+$",
        description="Observation ScanType ID.",
    ),
]

BeamId = Annotated[
    str,
    Field(
        pattern=r"^[a-z0-9]+$",
        description="Observation Beam ID.",
    ),
]

ReceiverId = NonNegativeInt
"""SPEAD Receiver ID."""

ChannelAddressMapping = dict[
    NonNegativeInt, tuple[ReceiverId, NonNegativeInt, NonNegativeInt]
]
"""A contiguous, ordered mapping of observation channels to receivers.

`dict[start_channel, (receiver_id, port_start, port_increment))`, e.g.

>>> from pydantic import TypeAdapter
>>> from ska_sdp_config.entity.flow import ChannelAddressMapping
>>> channel_map = TypeAdapter(ChannelAddressMapping).validate_python({
...     0: (0, 8000, 1),
...     400: (1, 8000, 1),
...     800: (2, 8000, 1),
... })
"""


[docs] class SpeadStream(EntityBaseModel): """A SPEAD flow entity.""" kind: Literal["spead"] = "spead" channel_map: dict[ScanTypeId, dict[BeamId, ChannelAddressMapping]] """The channel to network address mapping for scans and beams.""" receiver_version: str """Receiver OCI image version.""" streams: int = 1 """Total number of streams per receiver.""" transport_protocol: Literal["tcp", "udp"] = "udp" """The spead stream transport protocol.""" continuous_mode: bool = False """ Flag for enabling receivers to re-create the streams and resume receiving data after all end of streams are reached. """ @computed_field @property def instances(self) -> int: """Number of receiver instances required for all scans.""" return max( ( len(beam) for scan_type in self.channel_map.values() for beam in scan_type.values() ), default=0, )
[docs] class SharedMem(EntityBaseModel): """Shared Memory flow entity e.g. Plasma.""" kind: Literal["sharedmem"] = "sharedmem" impl: Literal["plasma"] """The shared memory implementation.""" host_path: AnyPurePath """The Path of the shared memory socket."""
KafkaTopicName = Annotated[ str, Field( pattern=r"^[A-Za-z0-9-_.]+$", description=""" A Kafka topic name. Legal characters including alphanumeric, `-`, `_`, and `.`""", ), ] _T = TypeVar("_T") ChannelMap = Sequence[tuple[NonNegativeInt, _T]] """A contiguous, ordered mapping of channels to a value. Tuples represent `(start_channel, value)`, e.g. >>> from ipaddress import IPv4Address >>> from pydantic import TypeAdapter >>> from ska_sdp_config.entity.flow import ChannelMap >>> channel_map = TypeAdapter(ChannelMap[IPv4Address]).validate_python([ ... (0, '192.168.0.1'), ... (400, '192.168.0.2') ... ]) """
[docs] class DataQueue(EntityBaseModel): """Data Queue flow entity e.g. Kafka topic.""" kind: Literal["data-queue"] = "data-queue" topics: KafkaTopicName | ChannelMap[KafkaTopicName] """The topic names.""" host: KafkaUrl """The host URL of the data queue service.""" format: Literal["json", "npy", "npz", "msgpack_numpy"] """The data encoded format."""
[docs] class Display(EntityBaseModel): """Data Display flow entity e.g. QA Display.""" kind: Literal["display"] = "display" widget: str """The type of display widget.""" endpoint: AnyUrl """The display URL endpoint."""
[docs] class DataProduct(EntityBaseModel): """Data Product flow entity e.g. Measurement Set.""" kind: Literal["data-product"] = "data-product" data_dir: Annotated[ PVCPath | AnyPurePath, Field( union_mode="left_to_right", description="Output directory of data products.", ), ] paths: list[AnyPurePath] """Channel mapping of output data products relative to the persistent volume directory."""
[docs] class DataProductPersist(EntityBaseModel): """ Data Product Persist flow entity. Represents an external persistent storage endpoint. """ kind: Literal["data-product-persist"] = "data-product-persist" phase: Literal["SOLID", "LIQUID", "GAS"] """Level of resilience in archival heuristic.""" expires_at: AwareDatetime | None = None """UTC timestamp for the data product archive to expire at. If `None`, no archive expiry."""
[docs] class TangoAttribute(EntityBaseModel): """Tango attribute flow entity. This entity instructs to flow data into a single Tango attribute. """ kind: Literal["tango"] = "tango" attribute_trl: Annotated[ AttributeTRL, Field( alias="attribute_url", description=""" Tango attribute TRL. e.g. `"tango://mid-sdp/subarray/01/some_attribute"`. """, ), ] dtype: TangoArgType """Attribute data type corresponding to a `tango.ArgType`.""" max_dim_x: NonNegativeInt = 0 """Size of the slowest changing dimension.""" max_dim_y: NonNegativeInt = 0 """Size of the fastest changing dimension.""" default_value: TangoDataType | None = None """Initial attribute value, cast to the configured `dtype` to allow support for non-JSON Tango types. If ``None``, the attribute is initialized with quality `ATTR_INVALID`. """ @property def attribute_url(self) -> TangoAttributeUrl: """ Fully qualified tango attribute URL. e.g. `"tango://tangodb:10000/mid-sdp/subarray/01/some_attribute"`. """ return TangoAttributeUrl(self.attribute_trl.url)
[docs] @model_validator(mode="before") @classmethod def warn_deprecated(cls, values): """warn for deprecations during validation.""" if "attribute_url" in values: warnings.warn( "Field 'attribute_url' is deprecated. " "Use 'attribute_trl' instead.", DeprecationWarning, stacklevel=2, ) return values
[docs] class TangoAttributeMap(EntityBaseModel): """Tango attribute map flow entity. This entity instructs to flow data into multiple Tango attributes. """
[docs] class DataQuery(EntityBaseModel): """ Decomposed JMESPath query for locating data within a data model. """ when: str | None = None """JMESPath when query clause on the flow data model.""" select: str = "@" """JMESPath select query clause on the flow data model."""
kind: Literal["tangomap"] = "tangomap" attributes: list[tuple[TangoAttribute, DataQuery]] """ Mapping of TangoAttributes with unique attribute URL to a data query. """
[docs] def model_post_init(self, context: Any, /) -> None: attribute_trls = [ attribute[0].attribute_trl for attribute in self.attributes ] if len(set(attribute_trls)) != len(attribute_trls): raise ValueError( f"attribute_trls not unique for {self.attributes}" )
[docs] class FlowSource(EntityBaseModel): """ A generic flow source pointing to either: - an existing flow entry in the configuration database - external source, such as a tango attribute - an external url """ uri: Annotated[ Flow.Key | AttributeTRL | AnyTRL | AnyUrl, Field( union_mode="left_to_right", description=""" Reference to the data source: - Flow.Key: key of a data flow entry - TangoAttributeUrl: tango attribute reference - AnyURL: url of any other kind """, ), ] function: str | None = None """ Python function, class or OCI image that will flow data from source to sink """ parameters: dict | None = None """Function, class or OCI image parameters"""
[docs] class Flow(MultiEntityBaseModel): """Flow datastream entity. Contains configuration information relating to streaming data connections related to a processing job for SDP. Flows are associated with and persist for the lifetime of a ProcessingBlock. """
[docs] class Key(EntityKeyBaseModel): """An SDP Flow primary key.""" pb_id: ProcessingBlockId """The ID of the Processing Block that this flow belongs to.""" kind: Annotated[ str | None, Field( pattern=r"^[A-Za-z0-9-]{1,96}$", description="The kind of sink this flow moves data to.", ), ] = None name: Annotated[ str, Field( pattern=r"^[A-Za-z0-9-]{1,96}$", description="The name of this flow.", ), ]
key: Annotated[Key, Field(exclude=True)] sink: Annotated[ SharedMem | DataQueue | Display | DataProduct | DataProductPersist | TangoAttribute | TangoAttributeMap | SpeadStream, Field( discriminator="kind", description="The kind of sink this flow moves data to.", ), ] sources: list[FlowSource] """A list containing sources of flow data.""" data_model: Annotated[ str, Field( pattern=r"^[A-Za-z0-9\,\[\]]+$", description="The DataModel that the flow data represents.", ), ] expiry_time: int = -1 """ The amount of time in seconds to wait to expire. Negative numbers mean never expire. """
[docs] def model_post_init(self, context: Any, /) -> None: super().model_post_init(context) if self.key.kind is None: self.key.kind = self.sink.kind else: assert self.key.kind == self.sink.kind
[docs] class Dependency(MultiEntityBaseModel): """ Dependencies are used to lock flows, preventing deletion until the dependency is cleared. """
[docs] class Key(Flow.Key): """A Dependency primary key.""" kind: Annotated[ str, Field( pattern=r"^[A-Za-z0-9-]{1,96}$", description=""" The kind of sink this flow moves data to. In flow, this is post-initialized with sink kind. provided.""", ), ] origin: Annotated[ str, Field( pattern=r"^[A-Za-z0-9-]{1,96}", description=""" The name of the origin component that issued this lock. This could also be a processing block.""", ), ]
key: Key """The reservation key.""" expiry_time: Annotated[int, Ge(-1)] """After this time the dependency should be released. The time is specified in seconds. -1 == infinity.""" description: str | None = None """A free form description."""