Entities
Common
Common type definitions for SDP config entity models.
- class ska_sdp_config.entity.common.AnyTRL(trl: str | TRL | AnyTRL)[source]
Base type for all TRL models.
- property attribute_name: str | None
The Tango attribute name part of the TRL, or None.
- property dbase: bool
The using database flag. True for ‘dbase=yes’.
- property device_name: str
The Tango device name part of the TRL.
- property host: str | None
The host part of the TRL, or None.
- property port: int | None
The port part of the TRL, or None.
- property property_name: str | None
Property part of the TRL, or None.
- property scheme: str | None
The scheme part of the TRL, or None.
- property url: AnyUrl
The Uniform Resource Locator (URL) with all defaults evaluated (i.e TANGO_HOST, dbase).
Is RFC 3986 compliant for wider library and tool compatibility.
- class ska_sdp_config.entity.common.AttributeTRL(trl: str | TRL | AnyTRL)[source]
Base type for a TRL to a Tango Attribute.
- class ska_sdp_config.entity.common.KafkaUrl(uri: str)[source]
Kafka client specific URL for connecting to bootstrap servers.
Supports initialization using either a URL or bootstrap address.
>>> from ska_sdp_config.entity.common import KafkaUrl >>> from pydantic import TypeAdapter >>> assert TypeAdapter(KafkaUrl).validate_python( ... "kafka://localhost:9092" ... ).bootstrap_address == "localhost:9092" >>> assert TypeAdapter(KafkaUrl).validate_python( ... "localhost:9092" ... ).bootstrap_address == "localhost:9092"
- property bootstrap_address
Bootstrap address format used by python Consumer and Producers.
- class ska_sdp_config.entity.common.PVCPath(*, k8s_namespaces: str | Sequence[str], k8s_pvc_name: str, pvc_mount_path: Annotated[PurePath, PlainValidator(func=_validate_pure_path, json_schema_input_type=str), PlainSerializer(func=str, return_type=str, when_used=always), WrapValidator(func=_validate_is_absolute, json_schema_input_type=str), PlainSerializer(func=str, return_type=str, when_used=always)], pvc_subpath: Annotated[PurePath, PlainValidator(func=_validate_pure_path, json_schema_input_type=str), PlainSerializer(func=str, return_type=str, when_used=always), WrapValidator(func=_validate_is_relative, json_schema_input_type=str), PlainSerializer(func=str, return_type=str, when_used=always)])[source]
Mounted path within a Kubernetes Persistent Volume Claim.
- k8s_namespaces: str | Sequence[str]
Kuberentes pvc namespaces.
- k8s_pvc_name: str
Kubernetes persistent volume claim name.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': False, 'populate_by_name': True, 'serialize_by_alias': True, 'strict': True, 'validate_assignment': True, 'validate_by_alias': True, 'validate_by_name': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- property path: PlainSerializer(func=str, return_type=str, when_used=always), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'path', 'type': 'string'})]
The container mounted absolute path to subpath.
- pvc_mount_path: PlainSerializer(func=str, return_type=str, when_used=always), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'path', 'type': 'string'})]
Mount path of the persistent volume claim.
- pvc_subpath: PlainSerializer(func=str, return_type=str, when_used=always), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'path', 'type': 'string'})]
Subpath within the persistent volume claim.
- class ska_sdp_config.entity.common.TRL(value: Any)[source]
Tango Resource Locator (TRL) of the form:
[protocol://][host:port/]device-name[/attribute][->property][#dbase=xx]For more information see: https://tango-controls.readthedocs.io/projects/rfc/en/latest/16/TangoResourceLocator.html
- property attribute_name: str | None
The Tango attribute name part of the TRL, or None.
- property dbase: bool
The using database flag. True for ‘dbase=yes’.
- property device_name: str
The Tango device name part of the TRL.
- property host: str | None
The host part of the TRL, or None.
- property port: int | None
The port part of the TRL, or None.
- property property_name: str | None
Property part of the TRL, or None.
- property scheme: str | None
The scheme part of the TRL, or None.
- property url: Url
The Uniform Resource Locator (URL) with all defaults evaluated (i.e TANGO_HOST, dbase).
Is RFC 3986 compliant for wider library and tool compatibility.
- class ska_sdp_config.entity.common.TangoAttributeUrl(url: str | Url | _BaseUrl)[source]
Tango URL to an attribute on a device instance.
Supported input expressions are of the form:
tango://<host>[:<port>]/<device_domain>/<device_family>/<device_member>/<device_attribute>[#dbase=yes|no]Defaults: - port=10000 - dbase=yes
Pre-conditions: - host required if dbase=yes
For more information see: https://tango-controls.readthedocs.io/en/9.2.5/manual/C-naming.html
- classmethod validate_url(value: AnyUrl | TRL | str, handler: ValidatorFunctionWrapHandler) TangoAttributeUrl[source]
Wrap validate a URL string or TRL instance to a Tango attribute URL.
- class ska_sdp_config.entity.common.TangoUrl(url: str | Url | _BaseUrl)[source]
Tango URL compliant to RFC 3986 parsers.
Supported input expressions are of the form:
tango://<host>[:<port>]/<device_domain>/<device_family>/<device_member>/[<device_attribute>][#dbase=yes|no]Defaults: - port=10000 - dbase=yes
Pre-conditions: - host required if dbase=yes
Additionally supports construction from a TRL.
For more info see: https://tango-controls.readthedocs.io/en/9.2.5/manual/C-naming.html
- property attribute_name: str
The device name part of the URL.
- property dbase: bool
The using database flag. True for ‘dbase=yes’.
- property device_name: str
The Tango device name part of the TRL.
- property domain_name: str
The device family part of the URL.
- property family_name: str
The device family part of the URL.
- property member_name: str
The device member part of the URL.
- property property_name: str
The Tango attribute name part of the TRL, or None.
Owner
Deployment configuration entities.
- class ska_sdp_config.entity.owner.Owner(*, pid: int, hostname: str, command: list[str])[source]
Owner information for an entity, uniquely identifying the process owning the entity on a distributed system.
- command: list[str]
The command line used to start the process
- hostname: str
The name of the host where the process is running
- pid: int
The process ID of the entity’s owner
System
System config model entities
- class ska_sdp_config.entity.system.System(*, version: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')], components: dict[str, SystemComponent], dependencies: dict[str, SystemDependency])[source]
Configuration details for SDP including components, dependencies and their versions.
- components: dict[str, SystemComponent]
The components comprising SDP.
- dependencies: dict[str, SystemDependency]
The dependencies of SDP.
- version: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])]
The version of SDP.
- class ska_sdp_config.entity.system.SystemComponent(*, devicename: Annotated[str | None, _PydanticGeneralMetadata(pattern='^[^/]+/[^/]+/[^/]+$')] = None, image: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_:\\./-]+$')], version: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])[source]
SDP component entity e.g. lmc-controller.
- devicename: str | None
The tango device name of this component.
- image: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_:\\./-]+$')])]
The image used to launch this component.
- version: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])]
The version of this component.
- class ska_sdp_config.entity.system.SystemDependency(*, repository: Annotated[Url, UrlConstraints(max_length=None, allowed_schemes=['https', 'file'], host_required=None, default_host=None, default_port=None, default_path=None, preserve_empty_path=None)] | Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='(alias:|\\@)\\w([-_]?\\w){0,}$')])], version: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])[source]
SDP dependency entity e.g. ska-tango-base.
- repository: Annotated[Url, UrlConstraints(max_length=None, allowed_schemes=['https', 'file'], host_required=None, default_host=None, default_port=None, default_path=None, preserve_empty_path=None)] | Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='(alias:|\\@)\\w([-_]?\\w){0,}$')])]
The repository containing this dependency.
- version: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])]
The version of this dependency.
Execution Block
Execution block configuration entities.
- class ska_sdp_config.entity.eb.ExecutionBlock(*, key: ~typing.Annotated[str, _PydanticGeneralMetadata(pattern='^eb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], beams: list = <factory>, channels: list = <factory>, context: dict = <factory>, fields: list = <factory>, max_length: float | None = None, pb_batch: list[~typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]] = <factory>, pb_realtime: list[~typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]] = <factory>, polarisations: list = <factory>, resources: dict = <factory>, scan_types: list = <factory>, subarray_id: ~types.Annotated[str | None, _PydanticGeneralMetadata(pattern='^[0-9]+$')] = None, telmodel: ~ska_sdp_config.entity.eb.TelModel | None = None)[source]
Execution block entity.
Collects configuration information relating to an execution block for the SDP.
- beams: Annotated[list, FieldInfo(annotation=NoneType, required=False, default_factory=list)]
Beam parameters for the purpose of the Science Data Processor
- channels: Annotated[list, FieldInfo(annotation=NoneType, required=False, default_factory=list)]
Channels. This is basically a list of spectral windows per channel configuration.
- context: Annotated[dict, FieldInfo(annotation=NoneType, required=False, default_factory=dict)]
Free-form information from OET to describe the observing context
- fields: Annotated[list, FieldInfo(annotation=NoneType, required=False, default_factory=list)]
List of fields/targets
- key: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^eb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]
ID of the Execution Block
- max_length: float | None
Maximum duration of the execution block in seconds.
- pb_batch: Annotated[list[Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]], FieldInfo(annotation=NoneType, required=False, default_factory=list)]
Batch processing blocks.
- pb_realtime: Annotated[list[Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]], FieldInfo(annotation=NoneType, required=False, default_factory=list)]
Real-time processing blocks.
- polarisations: Annotated[list, FieldInfo(annotation=NoneType, required=False, default_factory=list)]
Polarisation definitions
- resources: Annotated[dict, FieldInfo(annotation=NoneType, required=False, default_factory=dict)]
Free-form Resources allocated to the execution block.
- scan_types: Annotated[list, FieldInfo(annotation=NoneType, required=False, default_factory=list)]
Scan types. Associates scans with per-beam fields & channel configurations
- subarray_id: Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, metadata=[_PydanticGeneralMetadata(pattern='^[0-9]+$')])]
Subarray with which this EB is associated.
- class ska_sdp_config.entity.eb.TelModel(*, source_uris: list[str], array_layout_path: str, station_data_path_prefix: str | None)[source]
Telescope model data.
- array_layout_path: str
Path of array layout and static delays in telescope model.
- source_uris: list[str]
List of source URIs to use to look up telescope model data.
- station_data_path_prefix: str | None
Prefix of paths to station data in telescope model.
Processing Block
Processing block configuration entities.
- class ska_sdp_config.entity.pb.FlowDependency(*, purpose: list[str], flow_key: Key)[source]
A flow dependency
- purpose: list[str]
what the flow is to be used for (e.g. calibration)
- class ska_sdp_config.entity.pb.PBDependency(*, kind: list[str], pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])[source]
A Processing Block dependency.
- kind: list[str]
How this dependency is meant to be interpreted.
- pb_id: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]
The ID of the Processing Block that is depended on.
- class ska_sdp_config.entity.pb.ProcessingBlock(*, key: ~typing.Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], eb_id: ~typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^eb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])] | None, script: ~ska_sdp_config.entity.script.Script.Key, parameters: dict = <factory>, dependencies: list[~ska_sdp_config.entity.pb.FlowDependency | ~ska_sdp_config.entity.pb.PBDependency] = <factory>)[source]
Processing block entity.
Collects configuration information relating to a processing job for the SDP. This might be either real-time (supporting a running observation) or batch (to process data after the fact).
Actual execution of processing steps will be performed by a (parameterised) processing script interpreting processing block information.
- dependencies: Annotated[list[FlowDependency | PBDependency], FieldInfo(annotation=NoneType, required=False, default_factory=list)]
Dependencies of this Processing Block on other Processing Blocks and Flows.
- eb_id: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^eb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])] | None
ID of the Execution Block associated with this Processing Block, if any.
- key: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')])]
The primary key to this entity.
- parameters: Annotated[dict, FieldInfo(annotation=NoneType, required=False, default_factory=dict)]
Parameters for the Processing Block.
Deployment
Deployment configuration entities.
- class ska_sdp_config.entity.deployment.Deployment(*, key: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9\\-]{1,96}$')], kind: Literal['helm', 'slurm'], args: dict)[source]
Deployment entity.
Collects configuration information relating to a cluster configuration change.
- args: dict
A dictionary of values used to customise the deployment. In the case of helm deployments, these are Helm values. For slurm deployment these are used to construct a Slurm script.
- key: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[A-Za-z0-9\\-]{1,96}$')])]
The primary key to this entity.
- kind: Literal['helm', 'slurm']
The kind of deployment, currently “helm” and “slurm” are supported.
Script
Script model.
- class ska_sdp_config.entity.script.Script(*, key: ~ska_sdp_config.entity.script.Script.Key, image: ~typing.Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_:\\./-]+$')], parameters: dict = <factory>, sdp_version: ~types.Annotated[str | None, _PydanticGeneralMetadata(pattern='^(?:[><]=?|==)\\d+\\.\\d+\\.\\d+(?:,\\s*(?:[><]=?|==)\\d+\\.\\d+\\.\\d+)*$')] = None, resources: list[~ska_sdp_config.entity.script.ScriptResources] = [])[source]
An SDP Script.
- class Key(*, kind: Annotated[str, _PydanticGeneralMetadata(pattern='^(realtime)|(batch)$')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_-]+$')], version: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])[source]
An SDP Script primary key.
- kind: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^(realtime)|(batch)$')])]
The kind of this script (realtime or batch).
- name: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_-]+$')])]
The name of this script.
- version: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_\\.-]+$')])]
The version of this script.
- image: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_:\\./-]+$')])]
The OCI image used to launch this script.
- key: Annotated[Key, FieldInfo(annotation=NoneType, required=True, exclude=True)]
The primary key to this entity.
- parameters: Annotated[dict, FieldInfo(annotation=NoneType, required=False, default_factory=dict)]
Parameters for the script.
- resources: list[ScriptResources]
- sdp_version: Annotated[str | None, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^(?:[><]=?|==)\\d+\\.\\d+\\.\\d+(?:,\\s*(?:[><]=?|==)\\d+\\.\\d+\\.\\d+)*$')])]
The range of SDP versions this script is compatible with.
- class ska_sdp_config.entity.script.ScriptResources(*, kind: Annotated[Literal['performance-buffer-storage', 'capacity-buffer-storage', 'processing-node'], _PydanticGeneralMetadata(pattern='^[a-z-]{1,96}$')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_-]+$')], phases: list[str] = [], quantity: str)[source]
The Resources estimated to be required by an SDP Script.
- kind: Annotated[Literal['performance-buffer-storage', 'capacity-buffer-storage', 'processing-node'], FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-z-]{1,96}$')])]
The kind of resource.
- name: Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^[a-zA-Z0-9_-]+$')])]
The name of the resource definition.
- phases: list[str]
The phases that this applies to.
- quantity: str
The quantity as a python parsable expression.
Flow
Flow entity Model and related sub-entities.
- ska_sdp_config.entity.flow.ChannelAddressMapping
A contiguous, ordered mapping of observation channels to receivers.
dict[start_channel, (receiver_id, port_start, port_increment)), e.g.
>>> from pydantic import TypeAdapter >>> from ska_sdp_config.entity.flow import ChannelAddressMapping >>> channel_map = TypeAdapter(ChannelAddressMapping).validate_python({ ... 0: (0, 8000, 1), ... 400: (1, 8000, 1), ... 800: (2, 8000, 1), ... })
- ska_sdp_config.entity.flow.ChannelMap
A contiguous, ordered mapping of channels to a value. Tuples represent (start_channel, value), e.g.
>>> from ipaddress import IPv4Address >>> from pydantic import TypeAdapter >>> from ska_sdp_config.entity.flow import ChannelMap >>> channel_map = TypeAdapter(ChannelMap[IPv4Address]).validate_python([ ... (0, '192.168.0.1'), ... (400, '192.168.0.2') ... ])
alias of
Sequence[tuple[Annotated[int,Ge(ge=0)],_T]]
- class ska_sdp_config.entity.flow.DataProduct(*, kind: Literal['data-product'] = 'data-product', data_dir: PlainSerializer(func=str, return_type=str, when_used=always), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'path', 'type': 'string'})], _PydanticGeneralMetadata(union_mode='left_to_right')], paths: list[~pathlib.Annotated[~pathlib.PurePath, ~pydantic.functional_validators.PlainValidator(func=~ska_sdp_config.entity.common.path._validate_pure_path, json_schema_input_type=str), ~pydantic.functional_serializers.PlainSerializer(func=str, return_type=str, when_used=always), FieldInfo(annotation=NoneType, required=True, json_schema_extra={'format': 'path', 'type': 'string'})]])[source]
Data Product flow entity e.g. Measurement Set.
- data_dir: Annotated[PVCPath | AnyPurePath, Field(union_mode='left_to_right', description='Output directory of data products.')]
- kind: Literal['data-product']
- paths: list[AnyPurePath]
Channel mapping of output data products relative to the persistent volume directory.
- class ska_sdp_config.entity.flow.DataProductPersist(*, kind: Literal['data-product-persist'] = 'data-product-persist', phase: Literal['SOLID', 'LIQUID', 'GAS'], expires_at: AwareDatetime | None = None)[source]
Data Product Persist flow entity.
Represents an external persistent storage endpoint.
- expires_at: AwareDatetime | None
UTC timestamp for the data product archive to expire at.
If None, no archive expiry.
- kind: Literal['data-product-persist']
- phase: Literal['SOLID', 'LIQUID', 'GAS']
Level of resilience in archival heuristic.
- class ska_sdp_config.entity.flow.DataQueue(*, kind: Literal['data-queue'] = 'data-queue', topics: Annotated[str, FieldInfo(annotation=NoneType, required=True, description='\n A Kafka topic name. Legal characters including alphanumeric,\n `-`, `_`, and `.`', metadata=[_PydanticGeneralMetadata(pattern='^[A-Za-z0-9-_.]+$')])] | Sequence[tuple[Annotated[int, Ge(ge=0)], Annotated[str, FieldInfo(annotation=NoneType, required=True, description='\n A Kafka topic name. Legal characters including alphanumeric,\n `-`, `_`, and `.`', metadata=[_PydanticGeneralMetadata(pattern='^[A-Za-z0-9-_.]+$')])]]], host: KafkaUrl, format: Literal['json', 'npy', 'npz', 'msgpack_numpy'])[source]
Data Queue flow entity e.g. Kafka topic.
- format: Literal['json', 'npy', 'npz', 'msgpack_numpy']
The data encoded format.
- kind: Literal['data-queue']
- topics: KafkaTopicName | ChannelMap[KafkaTopicName]
The topic names.
- class ska_sdp_config.entity.flow.Dependency(*, key: Key, expiry_time: Annotated[int, Ge(ge=-1)], description: str | None = None)[source]
Dependencies are used to lock flows, preventing deletion until the dependency is cleared.
- class Key(*, pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], kind: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')], origin: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}')])[source]
A Dependency primary key.
- kind: Annotated[str, Field(pattern='^[A-Za-z0-9-]{1,96}$', description='\n The kind of sink this flow moves data to.\n In flow, this is post-initialized with sink kind.\n provided.')]
- origin: Annotated[str, Field(pattern='^[A-Za-z0-9-]{1,96}', description='\n The name of the origin component that issued this lock.\n This could also be a processing block.')]
- description: str | None
A free form description.
- expiry_time: Annotated[int, Ge(-1)]
After this time the dependency should be released. The time is specified in seconds. -1 == infinity.
- class ska_sdp_config.entity.flow.Display(*, kind: Literal['display'] = 'display', widget: str, endpoint: AnyUrl)[source]
Data Display flow entity e.g. QA Display.
- endpoint: AnyUrl
The display URL endpoint.
- kind: Literal['display']
- widget: str
The type of display widget.
- class ska_sdp_config.entity.flow.Flow(*, key: Key, sink: SharedMem | DataQueue | Display | DataProduct | DataProductPersist | TangoAttribute | TangoAttributeMap | SpeadStream, sources: list[FlowSource], data_model: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9\\,\\[\\]]+$')], expiry_time: int = -1)[source]
Flow datastream entity.
Contains configuration information relating to streaming data connections related to a processing job for SDP. Flows are associated with and persist for the lifetime of a ProcessingBlock.
- class Key(*, pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], kind: Annotated[str | None, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')] = None, name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')])[source]
An SDP Flow primary key.
- kind: Annotated[str | None, Field(pattern='^[A-Za-z0-9-]{1,96}$', description='The kind of sink this flow moves data to.')]
- name: Annotated[str, Field(pattern='^[A-Za-z0-9-]{1,96}$', description='The name of this flow.')]
- pb_id: ProcessingBlockId
The ID of the Processing Block that this flow belongs to.
- data_model: Annotated[str, Field(pattern='^[A-Za-z0-9\\,\\[\\]]+$', description='The DataModel that the flow data represents.')]
- expiry_time: int
The amount of time in seconds to wait to expire. Negative numbers mean never expire.
- model_post_init(context: Any, /) None[source]
Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.
- sink: Annotated[SharedMem | DataQueue | Display | DataProduct | DataProductPersist | TangoAttribute | TangoAttributeMap | SpeadStream, Field(discriminator='kind', description='The kind of sink this flow moves data to.')]
- sources: list[FlowSource]
A list containing sources of flow data.
- class ska_sdp_config.entity.flow.FlowSource[source]
A generic flow source pointing to either: - an existing flow entry in the configuration database - external source, such as a tango attribute - an external url
- function: str | None
Python function, class or OCI image that will flow data from source to sink
- parameters: dict | None
Function, class or OCI image parameters
- uri: Annotated[Flow.Key | AttributeTRL | AnyTRL | AnyUrl, Field(union_mode='left_to_right', description='\n Reference to the data source:\n - Flow.Key: key of a data flow entry\n - TangoAttributeUrl: tango attribute reference\n - AnyURL: url of any other kind\n ')]
- ska_sdp_config.entity.flow.ReceiverId
SPEAD Receiver ID.
alias of
Annotated[int,Ge(ge=0)]
Shared Memory flow entity e.g. Plasma.
The Path of the shared memory socket.
The shared memory implementation.
- class ska_sdp_config.entity.flow.SpeadStream(*, kind: Literal['spead'] = 'spead', channel_map: dict[Annotated[str, FieldInfo(annotation=NoneType, required=True, description='Observation ScanType ID.', metadata=[_PydanticGeneralMetadata(pattern='^[a-z0-9-:]+$')])], dict[Annotated[str, FieldInfo(annotation=NoneType, required=True, description='Observation Beam ID.', metadata=[_PydanticGeneralMetadata(pattern='^[a-z0-9]+$')])], dict[Annotated[int, Ge(ge=0)], tuple[Annotated[int, Ge(ge=0)], Annotated[int, Ge(ge=0)], Annotated[int, Ge(ge=0)]]]]], receiver_version: str, streams: int = 1, transport_protocol: Literal['tcp', 'udp'] = 'udp', continuous_mode: bool = False)[source]
A SPEAD flow entity.
- channel_map: dict[ScanTypeId, dict[BeamId, ChannelAddressMapping]]
The channel to network address mapping for scans and beams.
- continuous_mode: bool
Flag for enabling receivers to re-create the streams and resume receiving data after all end of streams are reached.
- property instances: int
Number of receiver instances required for all scans.
- kind: Literal['spead']
- receiver_version: str
Receiver OCI image version.
- streams: int
Total number of streams per receiver.
- transport_protocol: Literal['tcp', 'udp']
The spead stream transport protocol.
- class ska_sdp_config.entity.flow.TangoAttribute(*, kind: Literal['tango'] = 'tango', attribute_url: AttributeTRL, dtype: Literal['DevEncoded', 'DevString', 'DevBoolean', 'DevUChar', 'DevUShort', 'DevULong', 'DevULong64', 'DevShort', 'DevLong', 'DevLong64', 'DevFloat', 'DevDouble'], max_dim_x: Annotated[int, Ge(ge=0)] = 0, max_dim_y: Annotated[int, Ge(ge=0)] = 0, default_value: tuple[str, bytes] | int | float | bool | str | None = None)[source]
Tango attribute flow entity.
This entity instructs to flow data into a single Tango attribute.
- attribute_trl: Annotated[AttributeTRL, Field(alias='attribute_url', description='\n Tango attribute TRL.\n\n e.g. `"tango://mid-sdp/subarray/01/some_attribute"`.\n ')]
- property attribute_url: TangoAttributeUrl
Fully qualified tango attribute URL.
e.g. “tango://tangodb:10000/mid-sdp/subarray/01/some_attribute”.
- default_value: TangoDataType | None
Initial attribute value, cast to the configured dtype to allow support for non-JSON Tango types.
If
None, the attribute is initialized with quality ATTR_INVALID.
- dtype: TangoArgType
Attribute data type corresponding to a tango.ArgType.
- kind: Literal['tango']
- max_dim_x: NonNegativeInt
Size of the slowest changing dimension.
- max_dim_y: NonNegativeInt
Size of the fastest changing dimension.
- class ska_sdp_config.entity.flow.TangoAttributeMap(*, kind: Literal['tangomap'] = 'tangomap', attributes: list[tuple[TangoAttribute, DataQuery]])[source]
Tango attribute map flow entity.
This entity instructs to flow data into multiple Tango attributes.
- class DataQuery(*, when: str | None = None, select: str = '@')[source]
Decomposed JMESPath query for locating data within a data model.
- select: str
JMESPath select query clause on the flow data model.
- when: str | None
JMESPath when query clause on the flow data model.
- attributes: list[tuple[TangoAttribute, DataQuery]]
Mapping of TangoAttributes with unique attribute URL to a data query.
- kind: Literal['tangomap']
Resource Management
Entities related to resource management.
- class ska_sdp_config.entity.resource_management.Allocation(*, key: Key, quantity: Annotated[int, Ge(ge=0)] = 0, tags: list[str] = [], instances: list[str] = [], resource_link: Key | None = None, request_link: Key | None = None)[source]
Allocation for a resource request, created by the processing controller.
- class Key(*, pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], kind: Annotated[str, _PydanticGeneralMetadata(pattern='performance-buffer-storage|capacity-buffer-storage|processing-node')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')])[source]
A request primary key. No overrides. Needed to be redefined here for Key pattern validation
- request_link: Request.Key | None
Key to the Request this Allocation fulfills.
- resource_link: Resource.Key | None
Key to Resource this Allocation is taken from.
- validate_data() Allocation[source]
Run post-init validation
- class ska_sdp_config.entity.resource_management.Request(*, key: ~ska_sdp_config.entity.resource_management.Request.Key, quantity: ~typing.Annotated[int, ~annotated_types.Ge(ge=0)] = 0, tags: list[str] = [], instances: list[str] = [], phases: list[~typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^\\d+-[a-zA-Z]+$')])]] = <factory>)[source]
Indicates a request for a resource. Created by processing scripts to request resources of a certain kind.
- class Key(*, pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], kind: Annotated[str, _PydanticGeneralMetadata(pattern='performance-buffer-storage|capacity-buffer-storage|processing-node')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')])[source]
A request primary key. No overrides. Needed to be redefined here for Key pattern validation
- phases: Annotated[list[Annotated[str, Field(pattern=PHASES_NAME_PATTERN)]], Field(default_factory=list)]
- class ska_sdp_config.entity.resource_management.RequestAllocationKey(*, pb_id: Annotated[str, _PydanticGeneralMetadata(pattern='^pb\\-[a-z0-9]+\\-[0-9]{8}\\-[a-z0-9]+$')], kind: Annotated[str, _PydanticGeneralMetadata(pattern='performance-buffer-storage|capacity-buffer-storage|processing-node')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')])[source]
Primary key for Request or Allocation.
- kind: Annotated[str, Field(pattern=KIND_PATTERN)]
The kind of this particular request/allocation/resource.
- name: Annotated[str, Field(pattern='^[A-Za-z0-9-]{1,96}$')]
Unique name of a request/allocation/resource of the given kind. In case of allocations, they need to match the name of the corresponding request.
- pb_id: ProcessingBlockId
The ID of the Processing Block that the request/allocation belongs to.
- class ska_sdp_config.entity.resource_management.Resource(*, key: Key, quantity: Annotated[int, Ge(ge=0)] = 0, tags: list[str] = [], instances: list[str] = [], information: dict | None = None)[source]
Indicates platform availability of a resource. The resource manager will update free space quantity and status on each cycle. Resources types can include, but not limited to: buffer, cpu, memory, nodes etc.
- class Key(*, kind: Annotated[str, _PydanticGeneralMetadata(pattern='performance-buffer-storage|capacity-buffer-storage|processing-node')], name: Annotated[str, _PydanticGeneralMetadata(pattern='^[A-Za-z0-9-]{1,96}$')])[source]
A resource primary key.
- kind: Annotated[str, Field(pattern=KIND_PATTERN)]
The kind of this particular resource.
- name: Annotated[str, Field(pattern='^[A-Za-z0-9-]{1,96}$')]
The name of this resource (e.g. a PVC of choice).
- information: dict | None
Optional specifications such as contingency reserve
- class ska_sdp_config.entity.resource_management.ResourceBaseModel(*, key: Annotated[str, _PydanticGeneralMetadata(pattern='.+')], quantity: Annotated[int, Ge(ge=0)] = 0, tags: list[str] = [], instances: list[str] = [])[source]
Shared model between resource management entities.
- are_tags_subset_of(other_entity: ResourceBaseModel) bool[source]
Are the tags of this entity a subset of another?
- instances: list[str]
List of identifiers of a certain resource type, e.g. node names, subnets, etc.
- is_quantity_subset_of(other_entity: ResourceBaseModel) bool[source]
Is this entity’s quantity <= than the quanity of another?
- is_same_kind_as(other_entity: ResourceBaseModel) bool[source]
Does this entity share a kind with another one?
- is_subset_of(other_entity: ResourceBaseModel) bool[source]
Is this entity a subset of another based on kind, quantity, and tags?
- quantity: NonNegativeInt
Quantity of request/allocation/resource.
- tags: list[str]
Tags specifying the sub-resource types. E.g. “cpu_node”, “gpu_node”.
- property unit: str
Unit of the quantity.
- validate_data() ResourceBaseModel[source]
Run post-init validation