API
Processing block
- class ska_sdp_workflow.workflow.ProcessingBlock(pb_id=None)[source]
Claim the processing block.
- Parameters
pb_id (str, optional) – processing block ID
- configure_recv_processes_ports(scan_types, max_channels_per_process, port_start, channels_per_port)[source]
Calculate how many receive process(es) and ports are required.
- Parameters
scan_types – scan types from SBI
max_channels_per_process – maximum number of channels per process
port_start – starting port the receiver will be listening in
channels_per_port – number of channels to be sent to each port
- Returns
configured host and port
- Return type
- create_phase(name, requests)[source]
Create a workflow phase for deploying execution engines.
The phase is created with a list of resource requests which must be satisfied before the phase can start executing. For the time being the only resource requests are (placeholder) buffer reservations, but eventually this will include compute requests too.
- Parameters
name (str) – name of the phase
requests (list of
BufferRequest
) – resource requests
- Returns
the phase
- Return type
Phase
- get_parameters(schema=None)[source]
Get workflow parameters from processing block.
The schema checking is not currently implemented.
- Parameters
schema – schema to validate the parameters
- Returns
processing block parameters
- Return type
- get_scan_types()[source]
Get scan types from the scheduling block instance.
This is only supported for real-time workflows
- Returns
scan types
- Return type
- nested_parameters(parameters)[source]
Convert flattened dictionary to nested dictionary.
- Parameters
parameters – parameters to be converted
- Returns
nested parameters
- receive_addresses(chart_name=None, service_name=None, namespace=None, configured_host_port=None)[source]
Generate receive addresses and update the processing block state.
- Parameters
scan_types – Scan types
chart_name – Name of the statefulset
service_name – Name of the headless service
namespace – namespace where its going to be deployed
configured_host_port – constructed host and port
- request_buffer(size, tags)[source]
Request a buffer reservation.
This returns a buffer reservation request that is used to create a workflow phase. These are currently only placeholders.
- Parameters
size (float) – size of the buffer
tags (list of str) – tags describing the type of buffer required
- Returns
buffer reservation request
- Return type
BufferRequest
Buffer request
Workflow phase
- class ska_sdp_workflow.phase.Phase(name, list_requests, config, pb_id, sbi_id, workflow_type)[source]
Workflow phase.
This should not be created directly, use the
ProcessingBlock.create_phase()
method instead.- Parameters
- check_state(txn)[source]
Check the state of the processing block.
Check if the PB is finished or cancelled, and for real-time workflows check if the SBI is finished or cancelled.
- Parameters
txn (ska_sdp_config.Transaction) – SDP configuration transaction
- ee_deploy_helm(deploy_name, values=None)[source]
Deploy a Helm execution engine.
This can be used to deploy any Helm chart.
- ee_deploy_test(deploy_name, func=None, f_args=None)[source]
Deploy a fake execution engine.
This is used for testing and example purposes.
- is_sbi_finished(txn)[source]
Check if the SBI is finished or cancelled.
- Parameters
txn (ska_sdp_config.Transaction) – config db transaction
- Return type
Execution engine deployment
- class ska_sdp_workflow.ee_base_deploy.EEDeploy(pb_id, config)[source]
Base class for execution engine deployment.
- Parameters
pb_id (str) – processing block ID
config (ska_sdp_config.Client) – SDP configuration client
Helm EE Deployment
Dask EE deployment
- class ska_sdp_workflow.dask_deploy.DaskDeploy(pb_id, config, deploy_name, n_workers, func, f_args)[source]
Deploy a Dask execution engine.
The function when called with the arguments should return a Dask graph. The graph is then executed by calling the compute method:
result = func(*f_args) result.compute()
This happens in a separate thread so the constructor can return immediately.
This should not be created directly, use the
Phase.ee_deploy_dask()
method instead.
Fake EE deployment
- class ska_sdp_workflow.fake_deploy.FakeDeploy(pb_id, config, deploy_name, func=None, f_args=None)[source]
Deploy a fake execution engine.
The function is called with the arguments in a separate thread so the constructor can return immediately.
This should not be created directly, use the
Phase.ee_deploy_test()
method instead.