API
Processing block
- class ska_sdp_scripting.processing_block.ProcessingBlock(pb_id: str | None = None)
This is the core class of the API. It handles the following processes:
- Claiming and Releasing Processing Blocks:
The constructor claims a processing block identified by
pb_idfrom the configuration database. The exit method (and the context manager protocol via __enter__ and __exit__) releases the claim.
- Managing Processing Block State:
The state of the processing block in the configuration database is continuously updated with error messages as they occur. Additionally, the
publish_receive_addresses()method updates the state of the processing block in the configuration database to record the receive addresses, which are genearted withgenerate_receive_addresses().
- Handling Script Parameters:
Methods
get_parameters(),update_parameters(), andvalidate_parameters()manage script parameters, including validation against a provided Pydantic model.
- Defining Data Flows:
The
create_data_flow()method allows defining data flows within the processing block, specifying source and sink types.
- Handling Dependencies:
In
__init__()all FlowDependencies are automatically registered with the class, which are returned by theget_dependencies()method.create_flow_dependency()allows users to register custom dependencies as well.
- Creating Execution Phases:
The
create_phase()method creates execution phases, which represent steps in the processing block, and associates them with resource requests (like buffer reservations).
- Parameters:
pb_id¶ (str, optional) – Processing block ID.
- add_phase(phase_name: str, length: int, status: str | None = None) None
- create_data_flow(name: str, data_model: str, sink: ska_sdp_config.entity.flow.SharedMem | ska_sdp_config.entity.flow.DataQueue | ska_sdp_config.entity.flow.Display | ska_sdp_config.entity.flow.DataProduct | ska_sdp_config.entity.flow.TangoAttribute | ska_sdp_config.entity.flow.TangoAttributeMap | ska_sdp_config.entity.flow.DataProductPersist | ska_sdp_config.entity.flow.SpeadStream, sources: list | None = None, expiry_time: int = -1) DataFlow
Create and register a DataFlow to this processing block.
- Parameters:
- Returns:
Data flow.
- Return type:
dataflow.DataFlow
- create_flow_dependency(pb_id: str, name: str, origin: str, kind: str = 'data-product', expiry_time: float = -1, description: str = '') ska_sdp_config.entity.Dependency
Create and register a Flow Dependency to this processing block.
- create_phase(name: str, requests: list[BufferRequest]) Phase
Create a script phase for deploying execution engines.
The phase is created with a list of resource requests which must be satisfied before the phase can start executing. For the time being the only resource requests are (placeholder) buffer reservations, but eventually this will include compute requests too.
- Parameters:
name¶ (str) – Name of the phase.
requests¶ (list of
BufferRequest.) – Resource requests.dependencies¶ (List of :py:class: Dependency.) – Flow dependencies.
- Returns:
The script phase object.
- Return type:
- create_sdm_flow(name: str, expiry_time: int = -1) DataFlow
Create a request to an LSM.
- property eb: ska_sdp_config.entity.ExecutionBlock
The Execution Block for to this ProcessingBlock.
- property eb_id: str
The ID of the Execution Block associated to this ProcessingBlock.
- generate_receive_addresses(function: str, num_hosts: int = 1, port_start: int = 21000, channels_per_port: int = 1) PartialReceiveAddresses
Generate and return the partial receive addresses for this ProcessingBlock’s scan types. At least @par function needs to be provided. See
PartialReceiveAddresses.generate()for details on all parameters.
- get_dependencies() list
Get the list of processing block dependencies.
- Returns:
Processing block dependencies.
- Return type:
list
- get_parameters() dict
Get script parameters from processing block. The schema checking is not currently implemented.
- Returns:
Processing block parameters.
- Return type:
dict
- get_scan_types() list[dict]
Get scan types from the execution block. Updates the scan types with the default parameters and channels. This is only supported for real-time scripts
- Returns:
Scan types.
- Return type:
list
- property pb_id: str
The ID of this Processing Block.
- publish_receive_addresses(receive_addresses: ReceiveAddresses)
Update the processing block state with the given receive addresses. The receive addresses are updated to add beam information from the Execution Block.
- Parameters:
receive_addresses¶ – The receive addresses to write.
- request_buffer(size: float, kind: str, name: str, phases: list[str] | None = None) BufferRequest | None
Request a buffer reservation.
- set_phases()
Update the phase status in the processing block state.
- property subarray_id: str | None
The Subarray ID to which this ProcessingBlok belongs.
- update_parameters(default_parameters: dict, parameters: dict | Mapping) dict
Overwrite (nested) values in default_parameter mapping with those from parameters mapping.
- update_phase(phase_name: str, status: str | None = None, length: int | None = None) None
Update one or more fields of a specific phase by name.
- validate_parameters(model)
Validate processing block parameters against a given Pydantic model.
Raises Validation error if validation fails else returns the Pydantic class initialized with the processing block parameters.
- Parameters:
model – Model to validate against
- Returns:
Parameters
- Return type:
ParameterBaseModel- Raises:
ValidationError
Buffer request
- class ska_sdp_scripting.buffer_request.BufferRequest(size: int, pb_id: str, kind: str, name: str, phases: list[str] | None = None)
Request a buffer reservation.
This represents a request for a resource (typically buffer storage), depending on whether the RESOURCE_MANAGEMENT_TOGGLE feature toggle is enabled.
- Parameters:
- property request_key
Create request key.
- resource_request()
Create a resource request for buffer storage, including phases.
Script phase
- class ska_sdp_scripting.phase.Phase(name: str, list_requests: list, config: ska_sdp_config.Config, pb_id: str, eb_id: str, script_kind: str, flow_data: list[DataFlow], dependencies: list[ska_sdp_config.entity.Dependency] | None = None)
This class encapsulates the logic for managing a single phase within a processing block.
Objects of this class should not be created directly. use the
ProcessingBlock.create_phasemethod instead.The key functionalities include:
- Resource Management:
Handles the allocation and release of resources required by the phase, including interacting with the queue connector.
- Execution Engine Deployment:
Provides methods for deploying various types of execution engines (e.g.,
ee_deploy_helm(),ee_deploy_dask(),ee_deploy_test()).
- State Management:
Monitors the state of deployments and data flows, updating the processing block state accordingly. Includes methods like
check_state(),update_pb_state(),monitor_deployments(), andmonitor_data_flows().
- Context Management:
Designed to be used with the with statement (
__enter__()and__exit__()methods) to ensure proper setup and teardown of resources.
- Parameters:
name¶ (str) – Name of the phase.
list_requests¶ (list) – List of requests.
config¶ (
ska_sdp_config.Config) – SDP configuration client.pb_id¶ (str) – Processing block ID.
eb_id¶ (str) – Execution block ID.
script_kind¶ (str) – Script kind.
flow_data¶ (list[DataFlow]) – Dataflow list.
dependencies¶ (list[Dependency]) – Dependency list.
- check_resource_allocation() bool
Check whether resource requests have corresponding allocations.
- Returns:
True if all resources are allocated, False otherwise
- Return type:
bool
- check_state(txn: ska_sdp_config.config.Transaction, check_realtime: bool = True) None
Check the state of the processing block.
Check if the
ProcessingBlockstate is finished or cancelled, and for real-time scripts check if theExecutionBlockis finished or cancelled.
- ee_deploy_dask(name: str, n_workers: int, func: Callable, f_args: tuple[Any]) DaskDeploy
Deploy a Dask execution engine.
- Parameters:
- Returns:
Dask execution engine deployment
- Return type:
- ee_deploy_helm(deploy_name: str, values: dict | None = None, version: str = '1.0.0') HelmDeploy
Deploy a Helm execution engine.
This can be used to deploy any Helm chart.
- Parameters:
- Returns:
Helm execution engine deployment
- Return type:
- ee_deploy_slurm(deploy_name: str, slargs: dict | None = None) SlurmDeploy
Deploy a Slurm execution engine.
- ee_deploy_test(deploy_name: str, func: Callable | None = None, f_args: list[Any] | None = None) FakeDeploy
Deploy a fake execution engine.
This is used for testing and example purposes.
- Parameters:
- Returns:
Fake execution engine deployment
- Return type:
- is_eb_and_processing_finished(txn: ska_sdp_config.config.Transaction, grace_period: float = 60.0) bool
Check if the
ExecutionBlockis finished or cancelled. - If finished or cancelled, wait for all data product flows to reach a final state (COMPLETED, INCOMPLETE, or FAILED) within a grace period. - If timeout is reached and not all flows are in a final state, log an error and proceed to clean up. - If there are no data product flows, proceed to clean up immediately.
- monitor_data_flows(txn: ska_sdp_config.config.Transaction, iteration: int = 0) dict[str, list[ska_sdp_config.entity.Flow.Key]]
Monitor the data flow state and aggregate any error messages into the processing block’s ‘error_messages’ key.
If any of the flow statuses is FAILED, updates the pb status to FAILED.
- monitor_deployments(txn: ska_sdp_config.config.Transaction, iteration: int = 0) dict[str, list[str]]
Monitor deployments, update the deployment status in the processing block state based on the deployments’ pods’ state.
Also update the deployments_ready pb state key.
At the moment deployments_ready = True only if all deployments are RUNNING. Else, it is False.
- update_pb_state(status: ProcessingBlockStatus = ProcessingBlockStatus.UNSET) None
Update processing block state.
If the status is UNSET, it is marked as finished.
- Parameters:
status¶ (str, optional) – Status.
- wait_loop(func: Callable[[ska_sdp_config.config.Transaction], bool], time_to_ready: int = 0) None
Wait loop to check the status of the processing block. It also updates the processing block state with deployment statuses and errors captured in data flows.
- Parameters:
func¶ – Function to check condition for exiting the watcher loop.
time_to_ready¶ – Set deployments_ready to true after this amount of time has passed (seconds). Only for deployments deployed with
ee_deploy_test().
Execution Engine (EE) deployment
- class ska_sdp_scripting.ee_base_deploy.EEDeploy(pb_id: str, config: ska_sdp_config.Config)
Base class for execution engine deployment.
- Parameters:
- property deploy_id: str
Get the deployment ID.
- Returns:
deployment ID
- Return type:
str
- is_finished(txn: ska_sdp_config.config.Transaction) bool
Check if the deployment is finished.
- Parameters:
txn¶ (ska_sdp_config.Transaction) – configuration transaction
- Return type:
bool
Helm EE Deployment
- class ska_sdp_scripting.helm_deploy.HelmDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, values: dict | None = None, version: str = '1.0.0')
Deploy Helm execution engine.
This should not be created directly, use the
Phase.ee_deploy_helm()method instead.
Slurm EE Deployment
- class ska_sdp_scripting.slurm_deploy.SlurmDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, slargs: dict | None = None)
Deploy Slurm execution engine.
This should not be created directly, use the
Phase.ee_deploy_slurm()method instead.
Dask EE deployment
- class ska_sdp_scripting.dask_deploy.DaskDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, n_workers: int, func: Callable, f_args: Tuple[Any])
Deploy a Dask execution engine.
The function (func) when called with the arguments (f_args) should return a Dask graph. The graph is then executed by calling the compute method:
result = func(*f_args) result.compute()
This happens in a separate thread so the constructor can return immediately.
This should not be created directly, use the
Phase.ee_deploy_dask()method instead.
Fake EE deployment
- class ska_sdp_scripting.fake_deploy.FakeDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, func: Callable | None = None, f_args: Tuple[Any] | None = None)
Deploy a fake execution engine.
The function is called with the arguments in a separate thread so the constructor can return immediately.
This should not be created directly, use the
Phase.ee_deploy_test()method instead.- Parameters:
Queue connector functions
Common functions used to configure the QueueConnector
- ska_sdp_scripting.queue_connector.wait_for_queue_connector_state(config: ska_sdp_config.Config, pb_id: str | None, expected_state: str | None, timeout: float) None
Wait for all queue connector flows to reach an expected state.
Receive Addresses utilities
Generate Receive Addresses
- class ska_sdp_scripting.receive_addresses.receive_addresses.PartialReceiveAddresses(channels_per_port: int = 1, allocations: dict[str, ~ska_sdp_scripting.receive_addresses.receive_addresses._ScanTypePortAllocT] = <factory>)
Bases:
ReceiveAddressesBase[PartialScanTypePortAllocation,PartialBeamChannelAddresses,int]Receive addresses generated by SDP. They have placeholders for hostnames, and include only host and port information.
- static generate(scan_types: list[dict] | list[ScanType], port_start: int, channels_per_port: int, *, function: str = 'visibilities', num_hosts: int = 1) PartialReceiveAddresses
Creates a PartialReceiveAddresses object with the results of calling
_allocate_ports().
- replace_hosts(hosts: list[str]) ReceiveAddresses
Replaces the hosts in this PartialReceiveAddresses object using
replace_hosts(), returning a new ReceiveAddresses object.
- class ska_sdp_scripting.receive_addresses.receive_addresses.ReceiveAddresses(channels_per_port: int = 1, allocations: dict[str, ~ska_sdp_scripting.receive_addresses.receive_addresses._ScanTypePortAllocT] = <factory>, extra_values: dict[str, dict[str, ~typing.Any]] = <factory>)
Bases:
ReceiveAddressesBase[ScanTypePortAllocation,BeamChannelAddresses,str]Receive addresses generated and published by SDP. They indicate endpoints that other subsystems are interested in (e.g., visibility receive UDP hosts and ports, tango attributes of interest, etc).
- extra_values: dict[str, dict[str, Any]]
Extra per-scan type, per-beam dictionary values.
- to_recv_addresses_dict() dict[str, dict[str, BeamChannelAddresses]]
Returns a dictionary representation of these receive addresses, suitable for storage in the SDP Config DB.
- class ska_sdp_scripting.receive_addresses.receive_addresses.ReceiveAddressesBase(channels_per_port: int = 1, allocations: dict[str, ~ska_sdp_scripting.receive_addresses.receive_addresses._ScanTypePortAllocT] = <factory>)
Bases:
Generic[_ScanTypePortAllocT,_BeamChanAddrT,_HostT]Base classes for partial and full receive addresses.
- allocations: dict[str, _ScanTypePortAllocT]
Per-ScanType partial allocations (i.e., with host placeholders).
- channels_per_port: int = 1
Channels to receive per port.
- property host_count: int
The number of hosts contained in these receive addresses.
- property hosts: set[_HostT]
The set of hosts contained in these receive addresses.
- property max_port_count: int
Maximum port count for any given host.
- property port_counts: dict[_HostT, int]
Port count per host, across all scan types.
- to_recv_addresses_dict() dict[str, dict[str, _BeamChanAddrT]]
Returns a dictionary representation of these receive addresses, suitable for storage in the SDP Config DB.
- ska_sdp_scripting.receive_addresses.receive_addresses.copy_beam_info_from_eb(receive_addresses: dict[str, dict[str, BeamChannelAddresses]], eb_beam_map: dict[str, Any])
Add the values for the “function”, and if present the “{x}_beam_id”, keys from the given Execution Block beams to the appropriate receive addresses beams.
“{x}_beam_id” can be visibility_beam_id, search_beam_id, timing_beam_id, vlbi_beam_id depending on the type of beam.
Updates receive_addresses in place.