API

Processing block

class ska_sdp_scripting.processing_block.ProcessingBlock(pb_id: str | None = None)[source]

Claim the processing block.

Parameters:

pb_id (str, optional) – processing block ID

configure_recv_processes_ports(scan_types, max_channels_per_process, port_start, channels_per_port)[source]

Calculate how many receive process(es) and ports are required, And configure a dictionary to be fed back into the receive_addresses attribute.

Parameters:
  • scan_types – scan types from EB

  • max_channels_per_process – maximum number of channels per process

  • port_start – starting port the receiver will be listening in

  • channels_per_port – number of channels to be sent to each port

Returns:

tuple(configured receive dict, number of processes)

Return type:

tuple

create_phase(name: str, requests: List[BufferRequest]) Phase[source]

Create a script phase for deploying execution engines.

The phase is created with a list of resource requests which must be satisfied before the phase can start executing. For the time being the only resource requests are (placeholder) buffer reservations, but eventually this will include compute requests too.

Parameters:
  • name (str) – name of the phase

  • requests (list of BufferRequest) – resource requests

Returns:

the phase

Return type:

Phase

exit()[source]

Perform clean-up.

get_dependencies()[source]

Get the list of processing block dependencies.

Returns:

processing block dependencies

Return type:

list

get_parameters(schema=None)[source]

Get script parameters from processing block.

The schema checking is not currently implemented.

Parameters:

schema – schema to validate the parameters

Returns:

processing block parameters

Return type:

dict

get_scan_types() List[str][source]

Get scan types from the execution block.

Updates the scan types with the default parameters and channels.

This is only supported for real-time scripts

Returns:

scan types

Return type:

list

nested_parameters(parameters: dict)[source]

Convert flattened dictionary to nested dictionary.

Parameters:

parameters – parameters to be converted

Returns:

nested parameters

receive_addresses(configured_host_port, chart_name=None, service_name=None, namespace=None, script='vis-receive')[source]

Generate receive addresses and update the processing block state.

Parameters:
  • configured_host_port – constructed host and port

  • chart_name – Name of the statefulset

  • service_name – Name of the headless service

  • namespace – namespace where it’s going to be deployed

  • script – processing script that is configuring receive addresses options: vis-receive, pointing-offset; default: vis-receive

static request_buffer(size: float, tags: List[str]) BufferRequest[source]

Request a buffer reservation.

This returns a buffer reservation request that is used to create a script phase. These are currently only placeholders.

Parameters:
  • size (float) – size of the buffer

  • tags (list of str) – tags describing the type of buffer required

Returns:

buffer reservation request

Return type:

BufferRequest

update_parameters(default_parameters: dict, parameters: dict | Mapping)[source]

Nested overwrite of default_parameter values with ones in parameters.

Parameters:
  • default_parameters

    dict:

    default parameter values

  • parameters

    dict:

    script specific parameters

Returns:

processing block additional parameters

Return type:

dict

Buffer request

class ska_sdp_scripting.buffer_request.BufferRequest(size: float, tags: List[str])[source]

Request a buffer reservation.

This is currently just a placeholder.

Parameters:
  • size (float) – size of the buffer

  • tags (list of str) – tags describing the type of buffer required

size: float
tags: List[str]

Script phase

class ska_sdp_scripting.phase.Phase(name: str, list_requests: List, config: ska_sdp_config.Config, pb_id: str, eb_id: str, script_kind: str, qc_config: str)[source]

Script phase.

This should not be created directly, use the ProcessingBlock.create_phase() method instead.

Parameters:
  • name (str) – name of the phase

  • list_requests (list) – list of requests

  • config (ska_sdp_config.Config) – SDP configuration client

  • pb_id (str) – processing block ID

  • eb_id (str) – execution block ID

  • script_kind (str) – script kind

  • qc_config (str) – Queue Connector config string

check_state(txn: ska_sdp_config.config.Transaction, check_realtime: bool = True) None[source]

Check the state of the processing block.

Check if the PB is finished or cancelled, and for real-time scripts check if the EB is finished or cancelled.

Parameters:
  • txn (ska_sdp_config.Transaction) – SDP configuration transaction

  • check_realtime (bool) – Whether to check execution block state if the processing block is realtime (i.e. cancel processing script for FINISHED/CANCELLED)

ee_deploy_dask(name: str, n_workers: int, func: Callable, f_args: Tuple[Any])[source]

Deploy a Dask execution engine.

Parameters:
  • name (str) – deployment name

  • n_workers (int) – number of Dask workers

  • func (function) – function to execute

  • f_args (tuple) – function arguments

Returns:

Dask execution engine deployment

Return type:

DaskDeploy

ee_deploy_helm(deploy_name: str, values: dict | None = None)[source]

Deploy a Helm execution engine.

This can be used to deploy any Helm chart.

Parameters:
  • deploy_name (str) – name of Helm chart

  • values (dict, optional) – values to pass to Helm chart

Returns:

Helm execution engine deployment

Return type:

HelmDeploy

ee_deploy_test(deploy_name: str, func: Callable | None = None, f_args: List[Any] | None = None) EEDeploy[source]

Deploy a fake execution engine.

This is used for testing and example purposes.

Parameters:
  • deploy_name (str) – deployment name

  • func (function) – function to execute

  • f_args (tuple) – function arguments

Returns:

fake execution engine deployment

Return type:

FakeDeploy

ee_remove() None[source]

Remove execution engines deployments.

is_eb_finished(txn: ska_sdp_config.config.Transaction) bool[source]

Check if the EB is finished or cancelled.

Parameters:

txn (ska_sdp_config.Transaction) – config db transaction

Return type:

bool

monitor_deployments(txn: ska_sdp_config.config.Transaction, iteration: int = 0)[source]

Monitor deployments, update the deployment status in the processing block state based on the deployments’ pods’ state.

Also update the deployments_ready pb state key.

At the moment deployments_ready = True only if all deployments are RUNNING. Else, it is False.

Parameters:
  • txn – Transaction object (config.txn)

  • iteration – number of txn iteration

update_pb_state(status: ProcessingBlockStatus = ProcessingBlockStatus.UNSET)[source]

Update processing block state.

If the status is UNSET, it is marked as finished.

Parameters:

status (str, optional) – status

wait_loop(func: Callable[[ska_sdp_config.config.Transaction], bool], time_to_ready: int = 0)[source]

Wait loop to check the status of the processing block. It also updates the processing block state with deployment statuses for realtime scripts.

Parameters:
  • func – function to check condition for exiting the watcher loop

  • time_to_ready – set deployments_ready to true after this amount of time has passed (seconds). Only for deployments deployed with phase.ee_deploy_test

Execution engine deployment

class ska_sdp_scripting.ee_base_deploy.EEDeploy(pb_id: str, config: ska_sdp_config.Config)[source]

Base class for execution engine deployment.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – SDP configuration client

get_id() str[source]

Get the deployment ID.

Returns:

deployment ID

Return type:

str

is_finished(txn: ska_sdp_config.config.Transaction) bool[source]

Check if the deployment is finished.

Parameters:

txn (ska_sdp_config.Transaction) – configuration transaction

Return type:

bool

remove(deploy_id: str) None[source]

Remove the execution engine.

Parameters:

deploy_id (str) – deployment ID

update_deploy_status(status: str) None[source]

Update deployment status.

Parameters:

status (str) – status

Helm EE Deployment

class ska_sdp_scripting.helm_deploy.HelmDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, values: dict | None = None)[source]

Deploy Helm execution engine.

This should not be created directly, use the Phase.ee_deploy_helm() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Config) – SDP configuration client

  • deploy_name (str) – name of Helm chart to deploy

  • values (dict, optional) – values to pass to Helm chart

Dask EE deployment

class ska_sdp_scripting.dask_deploy.DaskDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, n_workers: int, func: Callable, f_args: Tuple[Any])[source]

Deploy a Dask execution engine.

The function when called with the arguments should return a Dask graph. The graph is then executed by calling the compute method:

result = func(*f_args)
result.compute()

This happens in a separate thread so the constructor can return immediately.

This should not be created directly, use the Phase.ee_deploy_dask() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – configuration DB client

  • deploy_name (str) – deployment name

  • n_workers (int) – number of Dask workers

  • func (function) – function to execute

  • f_args (tuple) – function arguments

Fake EE deployment

class ska_sdp_scripting.fake_deploy.FakeDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, func: Callable | None = None, f_args: Tuple[Any] | None = None)[source]

Deploy a fake execution engine.

The function is called with the arguments in a separate thread so the constructor can return immediately.

This should not be created directly, use the Phase.ee_deploy_test() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – SDP configuration client

  • deploy_name (str) – deployment name

  • func (function) – function to execute

  • f_args (tuple) – function arguments

set_deployments_ready(time_to_ready: int = 0)[source]

Set deployments_ready to True

Parameters:

time_to_ready – set deployments_ready to true after this amount of time has passed (seconds)

Queue connector functions

Common functions used to configure the QueueConnector

ska_sdp_scripting.queue_connector.configure_queue_connector(txn, qc_config, pb_id, eb_id)[source]

Function to configure queue connector.

Parameters:
  • qc_config – QueueConnector configuration string

  • txn – Transaction

  • pb_id – Processing block ID

  • eb_id – Execution block ID

Returns:

ska_sdp_scripting.queue_connector.tear_down_queue_connector(config, qc_config_path)[source]

Function for tearing down the QueueConnector

Parameters:
  • config – SDP configuration client

  • qc_config_path – QueueConnector config path

ska_sdp_scripting.queue_connector.wait_for_queue_connector_state(config, qc_config_path, expected_state, timeout)[source]

Function to wait for queue connector state.

Parameters:
  • config – SDP configuration client

  • qc_config_path – Path of the QueueConnector configuration (str)

  • expected_state – Expected state the QueueConnector needs to reach (str)

  • timeout – How much time to elapse before TimeoutError