API

Processing block

Buffer request

class ska_sdp_scripting.buffer_request.BufferRequest(size: float, tags: List[str])[source]

Request a buffer reservation.

This is currently just a placeholder.

Parameters:
  • size (float) – size of the buffer

  • tags (list of str) – tags describing the type of buffer required

size: float
tags: List[str]

Script phase

class ska_sdp_scripting.phase.Phase(name: str, list_requests: List, config: ska_sdp_config.Config, pb_id: str, eb_id: str, script_kind: str, qc_config: str, flow_data: list)[source]

Script phase.

This should not be created directly, use the ProcessingBlock.create_phase() method instead.

Parameters:
  • name (str) – name of the phase

  • list_requests (list) – list of requests

  • config (ska_sdp_config.Config) – SDP configuration client

  • pb_id (str) – processing block ID

  • eb_id (str) – execution block ID

  • script_kind (str) – script kind

  • qc_config (str) – Queue Connector config string

check_state(txn: ska_sdp_config.config.Transaction, check_realtime: bool = True) None[source]

Check the state of the processing block.

Check if the PB is finished or cancelled, and for real-time scripts check if the EB is finished or cancelled.

Parameters:
  • txn (ska_sdp_config.Transaction) – SDP configuration transaction

  • check_realtime (bool) – Whether to check execution block state if the processing block is realtime (i.e. cancel processing script for FINISHED/CANCELLED)

ee_deploy_dask(name: str, n_workers: int, func: Callable, f_args: Tuple[Any]) EEDeploy[source]

Deploy a Dask execution engine.

Parameters:
  • name (str) – deployment name

  • n_workers (int) – number of Dask workers

  • func (function) – function to execute

  • f_args (tuple) – function arguments

Returns:

Dask execution engine deployment

Return type:

DaskDeploy

ee_deploy_helm(deploy_name: str, values: dict | None = None) EEDeploy[source]

Deploy a Helm execution engine.

This can be used to deploy any Helm chart.

Parameters:
  • deploy_name (str) – name of Helm chart

  • values (dict, optional) – values to pass to Helm chart

Returns:

Helm execution engine deployment

Return type:

HelmDeploy

ee_deploy_test(deploy_name: str, func: Callable | None = None, f_args: List[Any] | None = None) EEDeploy[source]

Deploy a fake execution engine.

This is used for testing and example purposes.

Parameters:
  • deploy_name (str) – deployment name

  • func (function) – function to execute

  • f_args (tuple) – function arguments

Returns:

fake execution engine deployment

Return type:

FakeDeploy

ee_remove() None[source]

Remove execution engines deployments.

is_eb_finished(txn: ska_sdp_config.config.Transaction) bool[source]

Check if the EB is finished or cancelled.

Parameters:

txn (ska_sdp_config.Transaction) – config db transaction

Return type:

bool

monitor_deployments(txn: ska_sdp_config.config.Transaction, iteration: int = 0) None[source]

Monitor deployments, update the deployment status in the processing block state based on the deployments’ pods’ state.

Also update the deployments_ready pb state key.

At the moment deployments_ready = True only if all deployments are RUNNING. Else, it is False.

Parameters:
  • txn – Transaction object (config.txn)

  • iteration – number of txn iteration

update_pb_state(status: ProcessingBlockStatus = ProcessingBlockStatus.UNSET) None[source]

Update processing block state.

If the status is UNSET, it is marked as finished.

Parameters:

status (str, optional) – status

wait_loop(func: Callable[[ska_sdp_config.config.Transaction], bool], time_to_ready: int = 0) None[source]

Wait loop to check the status of the processing block. It also updates the processing block state with deployment statuses for realtime scripts.

Parameters:
  • func – function to check condition for exiting the watcher loop

  • time_to_ready – set deployments_ready to true after this amount of time has passed (seconds). Only for deployments deployed with phase.ee_deploy_test

Execution engine deployment

class ska_sdp_scripting.ee_base_deploy.EEDeploy(pb_id: str, config: ska_sdp_config.Config)[source]

Base class for execution engine deployment.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – SDP configuration client

get_id() str[source]

Get the deployment ID.

Returns:

deployment ID

Return type:

str

is_finished(txn: ska_sdp_config.config.Transaction) bool[source]

Check if the deployment is finished.

Parameters:

txn (ska_sdp_config.Transaction) – configuration transaction

Return type:

bool

remove(deploy_id: str) None[source]

Remove the execution engine.

Parameters:

deploy_id (str) – deployment ID

update_deploy_status(status: str) None[source]

Update deployment status.

Parameters:

status (str) – status

Helm EE Deployment

class ska_sdp_scripting.helm_deploy.HelmDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, values: dict | None = None)[source]

Deploy Helm execution engine.

This should not be created directly, use the Phase.ee_deploy_helm() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Config) – SDP configuration client

  • deploy_name (str) – name of Helm chart to deploy

  • values (dict, optional) – values to pass to Helm chart

Dask EE deployment

class ska_sdp_scripting.dask_deploy.DaskDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, n_workers: int, func: Callable, f_args: Tuple[Any])[source]

Deploy a Dask execution engine.

The function when called with the arguments should return a Dask graph. The graph is then executed by calling the compute method:

result = func(*f_args)
result.compute()

This happens in a separate thread so the constructor can return immediately.

This should not be created directly, use the Phase.ee_deploy_dask() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – configuration DB client

  • deploy_name (str) – deployment name

  • n_workers (int) – number of Dask workers

  • func (function) – function to execute

  • f_args (tuple) – function arguments

Fake EE deployment

class ska_sdp_scripting.fake_deploy.FakeDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, func: Callable | None = None, f_args: Tuple[Any] | None = None)[source]

Deploy a fake execution engine.

The function is called with the arguments in a separate thread so the constructor can return immediately.

This should not be created directly, use the Phase.ee_deploy_test() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – SDP configuration client

  • deploy_name (str) – deployment name

  • func (function) – function to execute

  • f_args (tuple) – function arguments

set_deployments_ready(time_to_ready: int = 0) None[source]

Set deployments_ready to True

Parameters:

time_to_ready – set deployments_ready to true after this amount of time has passed (seconds)

Queue connector functions

Common functions used to configure the QueueConnector

ska_sdp_scripting.queue_connector.configure_queue_connector(txn: ska_sdp_config.config.Transaction, qc_config: dict, pb_id: str, eb_id: str) str[source]

Function to configure queue connector.

Parameters:
  • qc_config – QueueConnector configuration string

  • txn – Transaction

  • pb_id – Processing block ID

  • eb_id – Execution block ID

Returns:

Configuration path string

ska_sdp_scripting.queue_connector.tear_down_queue_connector(config: ska_sdp_config.Config, qc_config_path: str) None[source]

Function for tearing down the QueueConnector

Parameters:
  • config – SDP configuration client

  • qc_config_path – QueueConnector config path

ska_sdp_scripting.queue_connector.wait_for_queue_connector_state(config: ska_sdp_config.Config, qc_config_path: str, expected_state: str, timeout: float) bool[source]

Function to wait for queue connector state.

Parameters:
  • config – SDP configuration client

  • qc_config_path – Path of the QueueConnector configuration (str)

  • expected_state – Expected state the QueueConnector needs to reach (str)

  • timeout – How much time to elapse before TimeoutError