"""Tango TRL definitions for SDP config entity models."""
from __future__ import annotations
import os
import re
from typing import Any, Literal
from pydantic import (
AnyUrl,
GetCoreSchemaHandler,
GetJsonSchemaHandler,
TypeAdapter,
UrlConstraints,
)
from pydantic.json_schema import JsonSchemaValue
from pydantic_core import InitErrorDetails, Url, ValidationError, core_schema
from typing_extensions import Self
TRL_RE = re.compile(
# optional [scheme://]
r"^(?:(?P<scheme>[A-Za-z][A-Za-z0-9+.-]*)://)?"
# optional [host:port/]
r"(?:(?P<host>[A-Za-z0-9.-]+):(?P<port>\d+)/)?"
# required 3-part device
r"(?P<device>[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+)"
# optional [/attribute]
r"(?:/(?P<attribute>[A-Za-z0-9_.-]+))?"
# optional [->property] or [-%3Eproperty]
r"(?:(?:->|-%3E)(?P<property>[A-Za-z0-9_.-]+))?"
# optional #[dbase=yes|no]
r"(?:#dbase=(?P<dbase>yes|no))?$"
)
TangoDataType = tuple[str, bytes] | int | float | bool | str
"""Supported tango attribute value types."""
TangoArgType = Literal[
"DevEncoded",
"DevString",
"DevBoolean",
"DevUChar",
"DevUShort",
"DevULong",
"DevULong64",
"DevShort",
"DevLong",
"DevLong64",
"DevFloat",
"DevDouble",
]
"""String literal corresponding to a tango.ArgType value."""
[docs]
class TRL:
"""Tango Resource Locator (TRL) of the form:
``[protocol://][host:port/]device-name[/attribute][->property][#dbase=xx]``
For more information see:
https://tango-controls.readthedocs.io/projects/rfc/en/latest/16/TangoResourceLocator.html
""" # noqa: E501
_trl_matches: re.Match[str]
def __init__(self, value: Any):
self._value = str(value)
matches = TRL_RE.match(self._value)
if matches is None:
raise ValidationError.from_exception_data(
"TRL",
[
InitErrorDetails(
type="value_error",
loc=("TRL",),
input=value,
ctx={
"error": "No TRL pattern match for "
f"{TRL_RE.pattern}"
},
)
],
)
if matches["port"] and (
int(matches["port"]) < 1 or 65535 < int(matches["port"])
):
raise ValidationError.from_exception_data(
"TRL",
[
InitErrorDetails(
type="value_error",
loc=("TRL",),
input=value,
ctx={"error": "Invalid port number"},
)
],
)
self._trl_matches = matches
@property
def scheme(self) -> str | None:
"""The scheme part of the TRL, or `None`."""
return self._trl_matches["scheme"]
@property
def host(self) -> str | None:
"""The host part of the TRL, or `None`."""
return self._trl_matches["host"]
@property
def port(self) -> int | None:
"""The port part of the TRL, or `None`."""
return (
int(self._trl_matches["port"])
if self._trl_matches["port"]
else None
)
@property
def device_name(self) -> str:
"""The Tango device name part of the TRL."""
return self._trl_matches["device"]
@property
def attribute_name(self) -> str | None:
"""The Tango attribute name part of the TRL, or `None`."""
return self._trl_matches["attribute"]
@property
def property_name(self) -> str | None:
"""Property part of the TRL, or `None`."""
return self._trl_matches["property"]
@property
def dbase(self) -> bool:
"""The using database flag. `True` for 'dbase=yes'."""
return self._trl_matches["dbase"] != "no"
@property
def eval(self) -> TRL:
"""TRL with all defaults evaluated (i.e TANGO_HOST, #dbase)."""
# use url constraints for default host and port
url = self.url
host_port = (
f"{url.host}:{str(url.port)}/" if url.host and self.dbase else None
)
attribute = f"/{self.attribute_name}" if self.attribute_name else None
prop = f"->{self.property_name}" if self.property_name else None
fragment = f"#dbase={'yes' if self.dbase else 'no'}"
return TRL(
f"tango://{host_port or ''}{self.device_name}"
f"{attribute or ''}{prop or ''}{fragment}"
)
@property
def url(self) -> Url:
"""The Uniform Resource Locator (URL) with all defaults evaluated
(i.e TANGO_HOST, dbase).
Is RFC 3986 compliant for wider library and tool compatibility."""
path = self.device_name
if self.attribute_name:
path = f"{path}/{self.attribute_name}"
if self.property_name:
path = f"{path}->{self.property_name}"
class _TangoUrl(AnyUrl):
_constraints = self._make_constraints()
return _TangoUrl.build( # pylint: disable=protected-access
scheme=self.scheme if self.scheme is not None else "tango",
host=self.host if self.host and self.dbase else "",
port=self.port,
path=path,
fragment="dbase=yes" if self.dbase else "dbase=no",
)._url
def __str__(self) -> str:
return self._value
def __repr__(self) -> str:
return f"{self.__class__.__name__}({str(self._value)!r})"
def __deepcopy__(self, memo: dict) -> Self:
return self.__class__(self._value)
def __eq__(self, other: Any) -> bool:
return (
self.__class__ is other.__class__ and self._value == other._value
)
def __lt__(self, other: Any) -> bool:
return self.__class__ is other.__class__ and self._value < other._value
def __hash__(self) -> int:
return hash(self._value)
def __len__(self) -> int:
return len(str(self._value))
def _make_constraints(self) -> UrlConstraints:
env = os.environ.get("TANGO_HOST", ":")
host, _, port = env.partition(":")
return UrlConstraints(
allowed_schemes=["tango"],
default_host=host if host and self.dbase else "",
default_port=int(port) if port and self.dbase else None,
host_required=False,
)
[docs]
class AnyTRL:
"""Base type for all TRL models."""
_trl: TRL
def __init__(self, trl: str | TRL | AnyTRL) -> None:
self._trl = TypeAdapter(self.__class__).validate_python(trl)._trl
@property
def scheme(self) -> str | None:
"""The scheme part of the TRL, or `None`."""
return self._trl.scheme
@property
def host(self) -> str | None:
"""The host part of the TRL, or `None`."""
return self._trl.host
@property
def port(self) -> int | None:
"""The port part of the TRL, or `None`."""
return self._trl.port
@property
def device_name(self) -> str:
"""The Tango device name part of the TRL."""
return self._trl.device_name
@property
def attribute_name(self) -> str | None:
"""The Tango attribute name part of the TRL, or `None`."""
return self._trl.attribute_name
@property
def property_name(self) -> str | None:
"""Property part of the TRL, or `None`."""
return self._trl.property_name
@property
def dbase(self) -> bool:
"""The using database flag. `True` for 'dbase=yes'."""
return self._trl.dbase
@property
def url(self) -> AnyUrl:
"""The Uniform Resource Locator (URL) with all defaults evaluated
(i.e TANGO_HOST, dbase).
Is RFC 3986 compliant for wider library and tool compatibility."""
return AnyUrl(self._trl.url)
@property
def eval(self) -> TRL:
"""TRL with all defaults evaluated (i.e TANGO_HOST, #dbase)."""
return self._trl.eval
def __str__(self) -> str:
return str(self._trl)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({str(self._trl)!r})"
def __deepcopy__(self, memo: dict) -> Self:
return self.__class__(self._trl)
def __eq__(self, other: Any) -> bool:
return self.__class__ is other.__class__ and self._trl == other._trl
def __lt__(self, other: Any) -> bool:
return self.__class__ is other.__class__ and self._trl < other._trl
def __hash__(self) -> int:
return hash(self._trl)
def __len__(self) -> int:
return len(str(self._trl))
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: type[AnyTRL], _handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
def validate(value: str | AnyUrl | AnyTRL, handler) -> AnyTRL:
instance = source_type.__new__(source_type)
instance._trl = handler( # pylint: disable=protected-access
TRL(str(value))
)
return instance
return core_schema.no_info_wrap_validator_function(
validate,
schema=core_schema.union_schema(
[
core_schema.str_schema(pattern=TRL_RE.pattern),
core_schema.is_instance_schema(TRL),
]
),
serialization=core_schema.plain_serializer_function_ser_schema(
cls.__str__
),
)
PYTHON_TO_JSON_RE = re.compile(r"\?P<[^>]+>")
"""Python keyword matched regex not supported in json schema."""
[docs]
class AttributeTRL(AnyTRL):
"""Base type for a TRL to a Tango Attribute."""
@classmethod
def __get_pydantic_core_schema__(
cls, source_type: Any, base_handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
def validate_trl(
value: TRL | str,
handler: core_schema.ValidatorFunctionWrapHandler,
) -> AttributeTRL:
trl = TRL(handler(str(value)))
if trl.attribute_name is None:
raise ValueError("Attribute missing")
if trl.property_name is not None:
raise ValueError("Property not supported")
instance = source_type.__new__(source_type)
instance._trl = trl # pylint: disable=protected-access
return instance
return core_schema.no_info_wrap_validator_function(
validate_trl, schema=base_handler.generate_schema(AnyTRL)
)
@classmethod
def __get_pydantic_json_schema__(
cls,
_core_schema: core_schema.CoreSchema,
_handler: GetJsonSchemaHandler,
) -> JsonSchemaValue:
return {
"format": "uri",
"minLength": 1,
"type": "string",
"pattern": PYTHON_TO_JSON_RE.sub("", TRL_RE.pattern),
}