"""Flow entity Model and related sub-entities."""
from __future__ import annotations
import warnings
from typing import Annotated, Any, Literal, Sequence, TypeVar
from annotated_types import Ge
from pydantic import (
AnyUrl,
AwareDatetime,
Field,
NonNegativeInt,
computed_field,
model_validator,
)
from ska_sdp_config.entity.common import (
AnyPurePath,
KafkaUrl,
ProcessingBlockId,
PVCPath,
TangoArgType,
TangoAttributeUrl,
TangoDataType,
)
from ska_sdp_config.entity.common.tango_trl import AnyTRL, AttributeTRL
from .base import EntityBaseModel, EntityKeyBaseModel, MultiEntityBaseModel
ScanTypeId = Annotated[
str,
Field(
pattern=r"^[a-z0-9-:]+$",
description="Observation ScanType ID.",
),
]
BeamId = Annotated[
str,
Field(
pattern=r"^[a-z0-9]+$",
description="Observation Beam ID.",
),
]
ReceiverId = NonNegativeInt
"""SPEAD Receiver ID."""
ChannelAddressMapping = dict[
NonNegativeInt, tuple[ReceiverId, NonNegativeInt, NonNegativeInt]
]
"""A contiguous, ordered mapping of observation channels to receivers.
`dict[start_channel, (receiver_id, port_start, port_increment))`, e.g.
>>> from pydantic import TypeAdapter
>>> from ska_sdp_config.entity.flow import ChannelAddressMapping
>>> channel_map = TypeAdapter(ChannelAddressMapping).validate_python({
... 0: (0, 8000, 1),
... 400: (1, 8000, 1),
... 800: (2, 8000, 1),
... })
"""
[docs]
class SpeadStream(EntityBaseModel):
"""A SPEAD flow entity."""
kind: Literal["spead"] = "spead"
channel_map: dict[ScanTypeId, dict[BeamId, ChannelAddressMapping]]
"""The channel to network address mapping for scans and beams."""
receiver_version: str
"""Receiver OCI image version."""
streams: int = 1
"""Total number of streams per receiver."""
transport_protocol: Literal["tcp", "udp"] = "udp"
"""The spead stream transport protocol."""
continuous_mode: bool = False
"""
Flag for enabling receivers to re-create the streams and resume receiving
data after all end of streams are reached.
"""
@computed_field
@property
def instances(self) -> int:
"""Number of receiver instances required for all scans."""
return max(
(
len(beam)
for scan_type in self.channel_map.values()
for beam in scan_type.values()
),
default=0,
)
[docs]
class SharedMem(EntityBaseModel):
"""Shared Memory flow entity e.g. Plasma."""
kind: Literal["sharedmem"] = "sharedmem"
impl: Literal["plasma"]
"""The shared memory implementation."""
host_path: AnyPurePath
"""The Path of the shared memory socket."""
KafkaTopicName = Annotated[
str,
Field(
pattern=r"^[A-Za-z0-9-_.]+$",
description="""
A Kafka topic name. Legal characters including alphanumeric,
`-`, `_`, and `.`""",
),
]
_T = TypeVar("_T")
ChannelMap = Sequence[tuple[NonNegativeInt, _T]]
"""A contiguous, ordered mapping of channels to a value. Tuples
represent `(start_channel, value)`, e.g.
>>> from ipaddress import IPv4Address
>>> from pydantic import TypeAdapter
>>> from ska_sdp_config.entity.flow import ChannelMap
>>> channel_map = TypeAdapter(ChannelMap[IPv4Address]).validate_python([
... (0, '192.168.0.1'),
... (400, '192.168.0.2')
... ])
"""
[docs]
class DataQueue(EntityBaseModel):
"""Data Queue flow entity e.g. Kafka topic."""
kind: Literal["data-queue"] = "data-queue"
topics: KafkaTopicName | ChannelMap[KafkaTopicName]
"""The topic names."""
host: KafkaUrl
"""The host URL of the data queue service."""
format: Literal["json", "npy", "npz", "msgpack_numpy"]
"""The data encoded format."""
[docs]
class Display(EntityBaseModel):
"""Data Display flow entity e.g. QA Display."""
kind: Literal["display"] = "display"
widget: str
"""The type of display widget."""
endpoint: AnyUrl
"""The display URL endpoint."""
[docs]
class DataProduct(EntityBaseModel):
"""Data Product flow entity e.g. Measurement Set."""
kind: Literal["data-product"] = "data-product"
data_dir: Annotated[
PVCPath | AnyPurePath,
Field(
union_mode="left_to_right",
description="Output directory of data products.",
),
]
paths: list[AnyPurePath]
"""Channel mapping of output data products relative to the persistent
volume directory."""
[docs]
class DataProductPersist(EntityBaseModel):
"""
Data Product Persist flow entity.
Represents an external persistent storage endpoint.
"""
kind: Literal["data-product-persist"] = "data-product-persist"
phase: Literal["SOLID", "LIQUID", "GAS"]
"""Level of resilience in archival heuristic."""
expires_at: AwareDatetime | None = None
"""UTC timestamp for the data product archive to expire at.
If `None`, no archive expiry."""
[docs]
class TangoAttribute(EntityBaseModel):
"""Tango attribute flow entity.
This entity instructs to flow data into a single Tango attribute.
"""
kind: Literal["tango"] = "tango"
attribute_trl: Annotated[
AttributeTRL,
Field(
alias="attribute_url",
description="""
Tango attribute TRL.
e.g. `"tango://mid-sdp/subarray/01/some_attribute"`.
""",
),
]
dtype: TangoArgType
"""Attribute data type corresponding to a `tango.ArgType`."""
max_dim_x: NonNegativeInt = 0
"""Size of the slowest changing dimension."""
max_dim_y: NonNegativeInt = 0
"""Size of the fastest changing dimension."""
default_value: TangoDataType | None = None
"""Initial attribute value, cast to the configured `dtype` to allow
support for non-JSON Tango types.
If ``None``, the attribute is initialized with quality `ATTR_INVALID`.
"""
@property
def attribute_url(self) -> TangoAttributeUrl:
"""
Fully qualified tango attribute URL.
e.g. `"tango://tangodb:10000/mid-sdp/subarray/01/some_attribute"`.
"""
return TangoAttributeUrl(self.attribute_trl.url)
[docs]
@model_validator(mode="before")
@classmethod
def warn_deprecated(cls, values):
"""warn for deprecations during validation."""
if "attribute_url" in values:
warnings.warn(
"Field 'attribute_url' is deprecated. "
"Use 'attribute_trl' instead.",
DeprecationWarning,
stacklevel=2,
)
return values
[docs]
class TangoAttributeMap(EntityBaseModel):
"""Tango attribute map flow entity.
This entity instructs to flow data into multiple Tango attributes.
"""
[docs]
class DataQuery(EntityBaseModel):
"""
Decomposed JMESPath query for locating data within a data model.
"""
when: str | None = None
"""JMESPath when query clause on the flow data model."""
select: str = "@"
"""JMESPath select query clause on the flow data model."""
kind: Literal["tangomap"] = "tangomap"
attributes: list[tuple[TangoAttribute, DataQuery]]
"""
Mapping of TangoAttributes with unique attribute URL to a data query.
"""
[docs]
def model_post_init(self, context: Any, /) -> None:
attribute_trls = [
attribute[0].attribute_trl for attribute in self.attributes
]
if len(set(attribute_trls)) != len(attribute_trls):
raise ValueError(
f"attribute_trls not unique for {self.attributes}"
)
[docs]
class FlowSource(EntityBaseModel):
"""
A generic flow source pointing to either:
- an existing flow entry in the configuration database
- external source, such as a tango attribute
- an external url
"""
uri: Annotated[
Flow.Key | AttributeTRL | AnyTRL | AnyUrl,
Field(
union_mode="left_to_right",
description="""
Reference to the data source:
- Flow.Key: key of a data flow entry
- TangoAttributeUrl: tango attribute reference
- AnyURL: url of any other kind
""",
),
]
function: str | None = None
"""
Python function, class or OCI image that will flow data from
source to sink
"""
parameters: dict | None = None
"""Function, class or OCI image parameters"""
[docs]
class Flow(MultiEntityBaseModel):
"""Flow datastream entity.
Contains configuration information relating to streaming data connections
related to a processing job for SDP. Flows are associated with and persist
for the lifetime of a ProcessingBlock.
"""
[docs]
class Key(EntityKeyBaseModel):
"""An SDP Flow primary key."""
pb_id: ProcessingBlockId
"""The ID of the Processing Block that this flow belongs to."""
kind: Annotated[
str | None,
Field(
pattern=r"^[A-Za-z0-9-]{1,96}$",
description="The kind of sink this flow moves data to.",
),
] = None
name: Annotated[
str,
Field(
pattern=r"^[A-Za-z0-9-]{1,96}$",
description="The name of this flow.",
),
]
key: Annotated[Key, Field(exclude=True)]
sink: Annotated[
SharedMem
| DataQueue
| Display
| DataProduct
| DataProductPersist
| TangoAttribute
| TangoAttributeMap
| SpeadStream,
Field(
discriminator="kind",
description="The kind of sink this flow moves data to.",
),
]
sources: list[FlowSource]
"""A list containing sources of flow data."""
data_model: Annotated[
str,
Field(
pattern=r"^[A-Za-z0-9\,\[\]]+$",
description="The DataModel that the flow data represents.",
),
]
expiry_time: int = -1
"""
The amount of time in seconds to wait to expire. Negative numbers mean
never expire.
"""
[docs]
def model_post_init(self, context: Any, /) -> None:
super().model_post_init(context)
if self.key.kind is None:
self.key.kind = self.sink.kind
else:
assert self.key.kind == self.sink.kind
[docs]
class Dependency(MultiEntityBaseModel):
"""
Dependencies are used to lock flows,
preventing deletion until the dependency is cleared.
"""
[docs]
class Key(Flow.Key):
"""A Dependency primary key."""
kind: Annotated[
str,
Field(
pattern=r"^[A-Za-z0-9-]{1,96}$",
description="""
The kind of sink this flow moves data to.
In flow, this is post-initialized with sink kind.
provided.""",
),
]
origin: Annotated[
str,
Field(
pattern=r"^[A-Za-z0-9-]{1,96}",
description="""
The name of the origin component that issued this lock.
This could also be a processing block.""",
),
]
key: Key
"""The reservation key."""
expiry_time: Annotated[int, Ge(-1)]
"""After this time the dependency should be released.
The time is specified in seconds. -1 == infinity."""
description: str | None = None
"""A free form description."""