SDP Scripting Library
The SDP scripting library is a high-level interface for writing processing scripts. Its goal is to provide abstractions to enable the developer to express the high-level organisation of a processing script without needing to interact directly with the low-level interfaces such as the SDP configuration library.
Development
Installation
The library can be installed using pip
but you need to make sure to use the
SKA artefact repository as the index:
pip install \
--index-url https://artefact.skao.int/repository/pypi-all/simple \
ska-sdp-scripting
To install it using a requirements.txt
file, the pip
options can be
added to the top of the file like this:
--index-url https://artefact.skao.int/repository/pypi-all/simple
ska-sdp-scripting
Usage
Once the SDP scripting library has been installed, use:
import ska_sdp_scripting
Develop a new script
The steps to develop and test an SDP processing script can be found at Script Development.
Functionality
The required functionality of the scripting library is as follows.
Starting, monitoring and ending a script
At the start
Claim the processing block.
Get the parameters defined in the processing block. They should be checked against the parameter schema defined for the script.
Resource requests
Make requests for input and output buffer space. The script will calculate the resources it needs based on the parameters, then request them from the processing controller. This is currently a placeholder.
Declare script phases
Scripts will be divided into phases such as preparation, processing, and clean-up. In the current implementation, only one phase can be declared, which we refer to as the ‘work’ phase.
Execute the work phase
On entry to the work phase, it waits until the resources are available. Meanwhile it monitors the processing block to see it has been cancelled. For real-time scripts, it also checks if the execution block has been cancelled.
Deploys execution engines to execute a script/function.
Monitors the execution engines and processing block state. Waits until the execution is finished, or the processing block is cancelled.
Continuously updates the processing block state with the status of execution engine deployments. It provides aggregate information about these statuses to inform other components about the readiness of deployments.
At the end
Remove the execution engines to release the resources.
Update processing block state with information about the success or failure of the script.
Receive scripts
Get IP and MAC addresses for the receive processes.
Monitor receive processes. If any get restarted, then the addresses may need to be updated.
Write the addresses in the appropriate format into the processing block state.
Compatibility with the telescope model library
We keep the scripting library compatible with the latest version of the telescope model library. If you use a configuration string that is based on an older version of the telescope model, you may experience errors or unexpected behaviour.
Receive Process and Port Configuration
Multiple Port Configuration
The scripting library has the capability to configure multiple ports. This allows to deploy a single receiver with multiple ports. In another word, this will allow a single receiver to receive data for a single SPEAD stream coming from multiple processes.
Now, assuming each sender sends data for 1 channel and all baselines, then we’ll want to have as many ports as
channels on the receiver side. For cbf-receive, a single receiver process can receive on multiple ports already,
and this is configurable via reception.receiver_port_start
and reception.num_ports
.
To make sense of multiple ports, the port map was required to be updated from a three-value list (ADR-10) to a four-value list. The four value defines the increment of the port number.
For example, if we set reception.receiver_port_start = 9000
and reception.num_ports = 3
, count= 3
,
and max_channels=1
then the resulting port_map would look like:
"port": [[0, 9000, 1, 0], [1, 9001, 1, 1], [2, 9002, 1, 2]]
API
Processing block
- class ska_sdp_scripting.processing_block.ProcessingBlock(pb_id: str = None)[source]
Claim the processing block.
- Parameters:
pb_id (str, optional) – processing block ID
- configure_recv_processes_ports(scan_types, max_channels_per_process, port_start, channels_per_port)[source]
Calculate how many receive process(es) and ports are required, And configure a dictionary to be fed back into the receive_addresses attribute.
- Parameters:
scan_types – scan types from EB
max_channels_per_process – maximum number of channels per process
port_start – starting port the receiver will be listening in
channels_per_port – number of channels to be sent to each port
- Returns:
tuple(configured receive dict, number of processes)
- Return type:
- create_phase(name: str, requests: List[BufferRequest]) Phase [source]
Create a script phase for deploying execution engines.
The phase is created with a list of resource requests which must be satisfied before the phase can start executing. For the time being the only resource requests are (placeholder) buffer reservations, but eventually this will include compute requests too.
- Parameters:
name (str) – name of the phase
requests (list of
BufferRequest
) – resource requests
- Returns:
the phase
- Return type:
Phase
- get_dependencies()[source]
Get the list of processing block dependencies.
- Returns:
processing block dependencies
- Return type:
- get_parameters(schema=None)[source]
Get script parameters from processing block.
The schema checking is not currently implemented.
- Parameters:
schema – schema to validate the parameters
- Returns:
processing block parameters
- Return type:
- get_scan_types() List[str] [source]
Get scan types from the execution block.
Updates the scan types with the default parameters and channels.
This is only supported for real-time scripts
- Returns:
scan types
- Return type:
- nested_parameters(parameters: dict)[source]
Convert flattened dictionary to nested dictionary.
- Parameters:
parameters – parameters to be converted
- Returns:
nested parameters
- receive_addresses(configured_host_port, chart_name=None, service_name=None, namespace=None, script='vis-receive')[source]
Generate receive addresses and update the processing block state.
- Parameters:
configured_host_port – constructed host and port
chart_name – Name of the statefulset
service_name – Name of the headless service
namespace – namespace where it’s going to be deployed
script – processing script that is configuring receive addresses options: vis-receive, pointing-offset; default: vis-receive
- static request_buffer(size: float, tags: List[str]) BufferRequest [source]
Request a buffer reservation.
This returns a buffer reservation request that is used to create a script phase. These are currently only placeholders.
- update_parameters(default_parameters: dict, parameters: dict | Mapping)[source]
Nested overwrite of default_parameter values with ones in parameters.
- Parameters:
default_parameters –
- dict:
default parameter values
parameters –
- dict:
script specific parameters
- Returns:
processing block additional parameters
- Return type:
Buffer request
Script phase
- class ska_sdp_scripting.phase.Phase(name: str, list_requests: List, config: ska_sdp_config.Config, pb_id: str, eb_id: str, script_kind: str, qc_config: str)[source]
Script phase.
This should not be created directly, use the
ProcessingBlock.create_phase()
method instead.- Parameters:
- check_state(txn: ska_sdp_config.config.Transaction, check_realtime: bool = True) None [source]
Check the state of the processing block.
Check if the PB is finished or cancelled, and for real-time scripts check if the EB is finished or cancelled.
- Parameters:
txn (ska_sdp_config.Transaction) – SDP configuration transaction
check_realtime (bool) – Whether to check execution block state if the processing block is realtime (i.e. cancel processing script for FINISHED/CANCELLED)
- ee_deploy_dask(name: str, n_workers: int, func: Callable, f_args: Tuple[Any])[source]
Deploy a Dask execution engine.
- ee_deploy_helm(deploy_name: str, values: dict | None = None)[source]
Deploy a Helm execution engine.
This can be used to deploy any Helm chart.
- ee_deploy_test(deploy_name: str, func: Callable = None, f_args: List[Any] = None) EEDeploy [source]
Deploy a fake execution engine.
This is used for testing and example purposes.
- is_eb_finished(txn: ska_sdp_config.config.Transaction) bool [source]
Check if the EB is finished or cancelled.
- Parameters:
txn (ska_sdp_config.Transaction) – config db transaction
- Return type:
- monitor_deployments(txn: ska_sdp_config.config.Transaction, iteration: int = 0)[source]
Monitor deployments, update the deployment status in the processing block state based on the deployments’ pods’ state.
Also update the deployments_ready pb state key.
At the moment deployments_ready = True only if all deployments are RUNNING. Else, it is False.
- Parameters:
txn – Transaction object (config.txn)
iteration – number of txn iteration
- update_pb_state(status: ProcessingBlockStatus = ProcessingBlockStatus.UNSET)[source]
Update processing block state.
If the status is UNSET, it is marked as finished.
- Parameters:
status (str, optional) – status
- wait_loop(func: Callable[[ska_sdp_config.config.Transaction], bool], time_to_ready: int = 0)[source]
Wait loop to check the status of the processing block. It also updates the processing block state with deployment statuses for realtime scripts.
- Parameters:
func – function to check condition for exiting the watcher loop
time_to_ready – set deployments_ready to true after this amount of time has passed (seconds). Only for deployments deployed with phase.ee_deploy_test
Execution engine deployment
- class ska_sdp_scripting.ee_base_deploy.EEDeploy(pb_id: str, config: ska_sdp_config.Config)[source]
Base class for execution engine deployment.
- Parameters:
pb_id (str) – processing block ID
config (ska_sdp_config.Client) – SDP configuration client
- is_finished(txn: ska_sdp_config.config.Transaction) bool [source]
Check if the deployment is finished.
- Parameters:
txn (ska_sdp_config.Transaction) – configuration transaction
- Return type:
Helm EE Deployment
Dask EE deployment
- class ska_sdp_scripting.dask_deploy.DaskDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, n_workers: int, func: Callable, f_args: Tuple[Any])[source]
Deploy a Dask execution engine.
The function when called with the arguments should return a Dask graph. The graph is then executed by calling the compute method:
result = func(*f_args) result.compute()
This happens in a separate thread so the constructor can return immediately.
This should not be created directly, use the
Phase.ee_deploy_dask()
method instead.
Fake EE deployment
- class ska_sdp_scripting.fake_deploy.FakeDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, func: Callable = None, f_args: Tuple[Any] = None)[source]
Deploy a fake execution engine.
The function is called with the arguments in a separate thread so the constructor can return immediately.
This should not be created directly, use the
Phase.ee_deploy_test()
method instead.- Parameters:
Queue connector functions
Common functions used to configure the QueueConnector
- ska_sdp_scripting.queue_connector.configure_queue_connector(txn, qc_config, pb_id, eb_id)[source]
Function to configure queue connector.
- Parameters:
qc_config – QueueConnector configuration string
txn – Transaction
pb_id – Processing block ID
eb_id – Execution block ID
- Returns:
- ska_sdp_scripting.queue_connector.tear_down_queue_connector(config, qc_config_path)[source]
Function for tearing down the QueueConnector
- Parameters:
config – SDP configuration client
qc_config_path – QueueConnector config path
- ska_sdp_scripting.queue_connector.wait_for_queue_connector_state(config, qc_config_path, expected_state, timeout)[source]
Function to wait for queue connector state.
- Parameters:
config – SDP configuration client
qc_config_path – Path of the QueueConnector configuration (str)
expected_state – Expected state the QueueConnector needs to reach (str)
timeout – How much time to elapse before TimeoutError