API

Processing block

class ska_sdp_workflow.workflow.ProcessingBlock(pb_id=None)[source]

Claim the processing block.

Parameters

pb_id (str, optional) – processing block ID

configure_recv_processes_ports(scan_types, max_channels_per_process, port_start, channels_per_port)[source]

Calculate how many receive process(es) and ports are required.

Parameters
  • scan_types – scan types from SBI

  • max_channels_per_process – maximum number of channels per process

  • port_start – starting port the receiver will be listening in

  • channels_per_port – number of channels to be sent to each port

Returns

configured host and port

Return type

dict

create_phase(name, requests)[source]

Create a workflow phase for deploying execution engines.

The phase is created with a list of resource requests which must be satisfied before the phase can start executing. For the time being the only resource requests are (placeholder) buffer reservations, but eventually this will include compute requests too.

Parameters
  • name (str) – name of the phase

  • requests (list of BufferRequest) – resource requests

Returns

the phase

Return type

Phase

exit()[source]

Close connection to the configuration.

get_parameters(schema=None)[source]

Get workflow parameters from processing block.

The schema checking is not currently implemented.

Parameters

schema – schema to validate the parameters

Returns

processing block parameters

Return type

dict

get_scan_types()[source]

Get scan types from the scheduling block instance.

This is only supported for real-time workflows

Returns

scan types

Return type

list

nested_parameters(parameters)[source]

Convert flattened dictionary to nested dictionary.

Parameters

parameters – parameters to be converted

Returns

nested parameters

receive_addresses(chart_name=None, service_name=None, namespace=None, configured_host_port=None)[source]

Generate receive addresses and update the processing block state.

Parameters
  • scan_types – Scan types

  • chart_name – Name of the statefulset

  • service_name – Name of the headless service

  • namespace – namespace where its going to be deployed

  • configured_host_port – constructed host and port

request_buffer(size, tags)[source]

Request a buffer reservation.

This returns a buffer reservation request that is used to create a workflow phase. These are currently only placeholders.

Parameters
  • size (float) – size of the buffer

  • tags (list of str) – tags describing the type of buffer required

Returns

buffer reservation request

Return type

BufferRequest

update_parameters(default_parameters, parameters)[source]

Nested overwrite of default_parameter values with ones in parameters.

Parameters
  • default_parameters

    dict

    default parameter values

  • parameters

    dict

    workflow specific parameters

Returns

processing block additional parameters

Return type

dict

Buffer request

class ska_sdp_workflow.buffer_request.BufferRequest(size, tags)[source]

Request a buffer reservation.

This is currently just a placeholder.

Parameters
  • size (float) – size of the buffer

  • tags (list of str) – tags describing the type of buffer required

Workflow phase

class ska_sdp_workflow.phase.Phase(name, list_requests, config, pb_id, sbi_id, workflow_type)[source]

Workflow phase.

This should not be created directly, use the ProcessingBlock.create_phase() method instead.

Parameters
  • name (str) – name of the phase

  • list_requests (list) – list of requests

  • config (ska_sdp_config.Config) – SDP configuration client

  • pb_id (str) – processing block ID

  • sbi_id (str) – scheduling block instance ID

  • workflow_type (str) – workflow type

check_state(txn)[source]

Check the state of the processing block.

Check if the PB is finished or cancelled, and for real-time workflows check if the SBI is finished or cancelled.

Parameters

txn (ska_sdp_config.Transaction) – SDP configuration transaction

ee_deploy_dask(name, n_workers, func, f_args)[source]

Deploy a Dask execution engine.

Parameters
  • name (str) – deployment name

  • n_workers (int) – number of Dask workers

  • func (function) – function to execute

  • f_args (tuple) – function arguments

Returns

Dask execution engine deployment

Return type

DaskDeploy

ee_deploy_helm(deploy_name, values=None)[source]

Deploy a Helm execution engine.

This can be used to deploy any Helm chart.

Parameters
  • deploy_name (str) – name of Helm chart

  • values (dict, optional) – values to pass to Helm chart

Returns

Helm execution engine deployment

Return type

HelmDeploy

ee_deploy_test(deploy_name, func=None, f_args=None)[source]

Deploy a fake execution engine.

This is used for testing and example purposes.

Parameters
  • deploy_name (str) – deployment name

  • func (function) – function to execute

  • f_args (tuple) – function arguments

Returns

fake execution engine deployment

Return type

FakeDeploy

ee_remove()[source]

Remove execution engines deployments.

is_sbi_finished(txn)[source]

Check if the SBI is finished or cancelled.

Parameters

txn (ska_sdp_config.Transaction) – config db transaction

Return type

bool

update_pb_state(status=None)[source]

Update processing block state.

If the status is not provided, it is marked as finished.

Parameters

status (str, optional) – status

wait_loop()[source]

Wait loop to check the status of the processing block.

Execution engine deployment

class ska_sdp_workflow.ee_base_deploy.EEDeploy(pb_id, config)[source]

Base class for execution engine deployment.

Parameters
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – SDP configuration client

get_id()[source]

Get the deployment ID.

Returns

deployment ID

Return type

str

is_finished(txn)[source]

Check if the deployment is finished.

Parameters

txn (ska_sdp_config.Transaction) – configuration transaction

Return type

bool

remove(deploy_id)[source]

Remove the execution engine.

Parameters

deploy_id (str) – deployment ID

update_deploy_status(status)[source]

Update deployment status.

Parameters

status (str) – status

Helm EE Deployment

class ska_sdp_workflow.helm_deploy.HelmDeploy(pb_id, config, deploy_name, values=None)[source]

Deploy Helm execution engine.

This should not be created directly, use the Phase.ee_deploy_helm() method instead.

Parameters
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Config) – SDP configuration client

  • deploy_name (str) – name of Helm chart to deploy

  • values (dict, optional) – values to pass to Helm chart

Dask EE deployment

class ska_sdp_workflow.dask_deploy.DaskDeploy(pb_id, config, deploy_name, n_workers, func, f_args)[source]

Deploy a Dask execution engine.

The function when called with the arguments should return a Dask graph. The graph is then executed by calling the compute method:

result = func(*f_args)
result.compute()

This happens in a separate thread so the constructor can return immediately.

This should not be created directly, use the Phase.ee_deploy_dask() method instead.

Parameters
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – configuration DB client

  • deploy_name (str) – deployment name

  • n_workers (int) – number of Dask workers

  • func (function) – function to execute

  • f_args (tuple) – function arguments

Fake EE deployment

class ska_sdp_workflow.fake_deploy.FakeDeploy(pb_id, config, deploy_name, func=None, f_args=None)[source]

Deploy a fake execution engine.

The function is called with the arguments in a separate thread so the constructor can return immediately.

This should not be created directly, use the Phase.ee_deploy_test() method instead.

Parameters
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – SDP configuration client

  • deploy_name (str) – deployment name

  • func (function) – function to execute

  • f_args (tuple) – function arguments