Source code for ska_sdp_config.entity.common.kafka_url

"""DSN (Data Service Name) type definitions."""

from __future__ import annotations

from typing import Any

from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler
from pydantic.json_schema import JsonSchemaValue
from pydantic_core import Url, core_schema


[docs] class KafkaUrl(Url): """Kafka client specific URL for connecting to bootstrap servers. Supports initialization using either a URL or bootstrap address. >>> from ska_sdp_config.entity.common import KafkaUrl >>> from pydantic import TypeAdapter >>> assert TypeAdapter(KafkaUrl).validate_python( ... "kafka://localhost:9092" ... ).bootstrap_address == "localhost:9092" >>> assert TypeAdapter(KafkaUrl).validate_python( ... "localhost:9092" ... ).bootstrap_address == "localhost:9092" """ def __new__(cls, uri: str): return super().__new__(cls, uri) @property def bootstrap_address(self): """Bootstrap address format used by python Consumer and Producers.""" return f"{self.host}:{self.port}" @classmethod def __validate(cls, dsn: Any) -> KafkaUrl: if not isinstance(dsn, KafkaUrl): dsn = KafkaUrl(str(dsn)) assert ( dsn.username is None ), f"username not supported in kafka url: {dsn.username}" assert ( dsn.password is None ), f"password not supported in kafka url: {dsn.password}" assert dsn.path is None, f"path not supported in kafka url: {dsn.path}" assert ( dsn.query is None ), f"query not supported in kafka url: {dsn.query}" assert ( dsn.fragment is None ), f"fragment not supported in kafka url: {dsn.fragment}" return dsn @classmethod def __url_conversion(cls, uri: str) -> str: return "kafka://" + uri @classmethod def __get_pydantic_core_schema__( cls, _source_type: Any, handler: GetCoreSchemaHandler ) -> core_schema.CoreSchema: schema = core_schema.union_schema( [ core_schema.chain_schema( [ core_schema.url_schema( host_required=True, default_port=9092, allowed_schemes=["kafka"], ), core_schema.no_info_plain_validator_function( cls.__validate ), ] ), # for backwards compatibility, try prepending a scheme core_schema.chain_schema( [ core_schema.str_schema(), core_schema.no_info_plain_validator_function( cls.__url_conversion ), core_schema.url_schema( host_required=True, default_port=9092, allowed_schemes=["kafka"], ), core_schema.no_info_plain_validator_function( cls.__validate ), ] ), ], serialization=core_schema.to_string_ser_schema(), ) return handler(schema) @classmethod def __get_pydantic_json_schema__( cls, _core_schema: core_schema.CoreSchema, _handler: GetJsonSchemaHandler, ) -> JsonSchemaValue: return { "format": "uri", "minLength": 1, "type": "string", }