API
Processing block
- class ska_sdp_scripting.processing_block.ProcessingBlock(pb_id: str | None = None)[source]
Claim the processing block.
- Parameters:
pb_id (str, optional) – processing block ID
- configure_recv_processes_ports(scan_types, max_channels_per_process, port_start, channels_per_port)[source]
Calculate how many receive process(es) and ports are required, And configure a dictionary to be fed back into the receive_addresses attribute.
- Parameters:
scan_types – scan types from EB
max_channels_per_process – maximum number of channels per process
port_start – starting port the receiver will be listening in
channels_per_port – number of channels to be sent to each port
- Returns:
tuple(configured receive dict, number of processes)
- Return type:
- create_phase(name: str, requests: List[BufferRequest]) Phase [source]
Create a script phase for deploying execution engines.
The phase is created with a list of resource requests which must be satisfied before the phase can start executing. For the time being the only resource requests are (placeholder) buffer reservations, but eventually this will include compute requests too.
- Parameters:
name (str) – name of the phase
requests (list of
BufferRequest
) – resource requests
- Returns:
the phase
- Return type:
Phase
- get_dependencies()[source]
Get the list of processing block dependencies.
- Returns:
processing block dependencies
- Return type:
- get_parameters(schema=None)[source]
Get script parameters from processing block.
The schema checking is not currently implemented.
- Parameters:
schema – schema to validate the parameters
- Returns:
processing block parameters
- Return type:
- get_scan_types() List[str] [source]
Get scan types from the execution block.
Updates the scan types with the default parameters and channels.
This is only supported for real-time scripts
- Returns:
scan types
- Return type:
- nested_parameters(parameters: dict)[source]
Convert flattened dictionary to nested dictionary.
- Parameters:
parameters – parameters to be converted
- Returns:
nested parameters
- receive_addresses(configured_host_port, chart_name=None, service_name=None, namespace=None, script='vis-receive')[source]
Generate receive addresses and update the processing block state.
- Parameters:
configured_host_port – constructed host and port
chart_name – Name of the statefulset
service_name – Name of the headless service
namespace – namespace where it’s going to be deployed
script – processing script that is configuring receive addresses options: vis-receive, pointing-offset; default: vis-receive
- static request_buffer(size: float, tags: List[str]) BufferRequest [source]
Request a buffer reservation.
This returns a buffer reservation request that is used to create a script phase. These are currently only placeholders.
- update_parameters(default_parameters: dict, parameters: dict | Mapping)[source]
Nested overwrite of default_parameter values with ones in parameters.
- Parameters:
default_parameters –
- dict:
default parameter values
parameters –
- dict:
script specific parameters
- Returns:
processing block additional parameters
- Return type:
Buffer request
Script phase
- class ska_sdp_scripting.phase.Phase(name: str, list_requests: List, config: ska_sdp_config.Config, pb_id: str, eb_id: str, script_kind: str, qc_config: str)[source]
Script phase.
This should not be created directly, use the
ProcessingBlock.create_phase()
method instead.- Parameters:
- check_state(txn: ska_sdp_config.config.Transaction, check_realtime: bool = True) None [source]
Check the state of the processing block.
Check if the PB is finished or cancelled, and for real-time scripts check if the EB is finished or cancelled.
- Parameters:
txn (ska_sdp_config.Transaction) – SDP configuration transaction
check_realtime (bool) – Whether to check execution block state if the processing block is realtime (i.e. cancel processing script for FINISHED/CANCELLED)
- ee_deploy_dask(name: str, n_workers: int, func: Callable, f_args: Tuple[Any])[source]
Deploy a Dask execution engine.
- ee_deploy_helm(deploy_name: str, values: dict | None = None)[source]
Deploy a Helm execution engine.
This can be used to deploy any Helm chart.
- ee_deploy_test(deploy_name: str, func: Callable | None = None, f_args: List[Any] | None = None) EEDeploy [source]
Deploy a fake execution engine.
This is used for testing and example purposes.
- is_eb_finished(txn: ska_sdp_config.config.Transaction) bool [source]
Check if the EB is finished or cancelled.
- Parameters:
txn (ska_sdp_config.Transaction) – config db transaction
- Return type:
- monitor_deployments(txn: ska_sdp_config.config.Transaction, iteration: int = 0)[source]
Monitor deployments, update the deployment status in the processing block state based on the deployments’ pods’ state.
Also update the deployments_ready pb state key.
At the moment deployments_ready = True only if all deployments are RUNNING. Else, it is False.
- Parameters:
txn – Transaction object (config.txn)
iteration – number of txn iteration
- update_pb_state(status: ProcessingBlockStatus = ProcessingBlockStatus.UNSET)[source]
Update processing block state.
If the status is UNSET, it is marked as finished.
- Parameters:
status (str, optional) – status
- wait_loop(func: Callable[[ska_sdp_config.config.Transaction], bool], time_to_ready: int = 0)[source]
Wait loop to check the status of the processing block. It also updates the processing block state with deployment statuses for realtime scripts.
- Parameters:
func – function to check condition for exiting the watcher loop
time_to_ready – set deployments_ready to true after this amount of time has passed (seconds). Only for deployments deployed with phase.ee_deploy_test
Execution engine deployment
- class ska_sdp_scripting.ee_base_deploy.EEDeploy(pb_id: str, config: ska_sdp_config.Config)[source]
Base class for execution engine deployment.
- Parameters:
pb_id (str) – processing block ID
config (ska_sdp_config.Client) – SDP configuration client
- is_finished(txn: ska_sdp_config.config.Transaction) bool [source]
Check if the deployment is finished.
- Parameters:
txn (ska_sdp_config.Transaction) – configuration transaction
- Return type:
Helm EE Deployment
Dask EE deployment
- class ska_sdp_scripting.dask_deploy.DaskDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, n_workers: int, func: Callable, f_args: Tuple[Any])[source]
Deploy a Dask execution engine.
The function when called with the arguments should return a Dask graph. The graph is then executed by calling the compute method:
result = func(*f_args) result.compute()
This happens in a separate thread so the constructor can return immediately.
This should not be created directly, use the
Phase.ee_deploy_dask()
method instead.
Fake EE deployment
- class ska_sdp_scripting.fake_deploy.FakeDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, func: Callable | None = None, f_args: Tuple[Any] | None = None)[source]
Deploy a fake execution engine.
The function is called with the arguments in a separate thread so the constructor can return immediately.
This should not be created directly, use the
Phase.ee_deploy_test()
method instead.- Parameters:
Queue connector functions
Common functions used to configure the QueueConnector
- ska_sdp_scripting.queue_connector.configure_queue_connector(txn, qc_config, pb_id, eb_id)[source]
Function to configure queue connector.
- Parameters:
qc_config – QueueConnector configuration string
txn – Transaction
pb_id – Processing block ID
eb_id – Execution block ID
- Returns:
- ska_sdp_scripting.queue_connector.tear_down_queue_connector(config, qc_config_path)[source]
Function for tearing down the QueueConnector
- Parameters:
config – SDP configuration client
qc_config_path – QueueConnector config path
- ska_sdp_scripting.queue_connector.wait_for_queue_connector_state(config, qc_config_path, expected_state, timeout)[source]
Function to wait for queue connector state.
- Parameters:
config – SDP configuration client
qc_config_path – Path of the QueueConnector configuration (str)
expected_state – Expected state the QueueConnector needs to reach (str)
timeout – How much time to elapse before TimeoutError