API

Processing block

class ska_sdp_scripting.processing_block.ProcessingBlock(pb_id: str | None = None)

This is the core class of the API. It handles the following processes:

  • Claiming and Releasing Processing Blocks:

    The constructor claims a processing block identified by pb_id from the configuration database. The exit method (and the context manager protocol via __enter__ and __exit__) releases the claim.

  • Managing Processing Block State:

    The receive_addresses() method update the state of the processing block in the configuration database, including the receive addresses.

  • Handling Script Parameters:

    Methods get_parameters(), update_parameters(), and validate_parameters() manage script parameters, including validation against a provided Pydantic model.

  • Defining Data Flows:

    The create_data_flow() method allows defining data flows within the processing block, specifying source and sink types.

  • Creating Execution Phases:

    The create_phase() method creates execution phases, which represent steps in the processing block, and associates them with resource requests (like buffer reservations).

  • Retrieving Dependencies and Scan Types:

    get_dependencies() retrieves the dependencies of the processing block, and get_scan_types() retrieves and updates scan types from the associated execution block(s).

Parameters:

pb_id (str, optional) – Processing block ID.

static config_host_port_channel_map(scan_types: list[dict], port_start: int, channels_per_port: int, *, num_hosts: int = 1, max_ports_per_host: int | None = None) tuple[dict, dict]

Configures a dictionary of host and port channel maps for a given list of scan types. This is a wrapper function for receive_addresses.generate_host_port_channel_map() function which returns a dictionary of host and port channel maps for a given list of scan types.

Parameters:
  • scan_types (list[dict]) – The list of scan types for which receive addresses should be calculated.

  • port_start (int) – The first port to allocate on each host.

  • channels_per_port (int) – The number of channels that are sent per port.

  • num_hosts (int) – If given, the exact number of hosts to allocate.

  • max_ports_per_host (int) – If given, the maximum number of ports to allocate to a host before a new host is allocated (per beam).

Returns:

Two dictionaries keyed on the scan type id, one for hosts and one for ports of the channel maps for a given list of scan types. Hosts are identified by running, 0-indexed numbers, so that actual hostnames/IPs can be easily assigned later.

Return type:

tuple[dict, dict]

create_data_flow(name: str, data_model: str, sink: ska_sdp_config.entity.flow.TelModel | ska_sdp_config.entity.flow.SharedMem | ska_sdp_config.entity.flow.DataQueue | ska_sdp_config.entity.flow.Display | ska_sdp_config.entity.flow.DataProduct | ska_sdp_config.entity.flow.TangoAttribute | ska_sdp_config.entity.flow.TangoAttributeMap | ska_sdp_config.entity.flow.SpeadStream, sources: list | None = None) DataFlow

Create and register a DataFlow to this processing block.

Parameters:
  • name (str) – Name of the DataFlow object.

  • data_model (str) – Data model.

  • sink (see SINKS in data_flow.py for available options) – Sink used in this flow.

  • sources (list or None) – List of sources to be used.

Returns:

Data flow.

Return type:

dataflow.DataFlow

create_phase(name: str, requests: list[BufferRequest]) Phase

Create a script phase for deploying execution engines.

The phase is created with a list of resource requests which must be satisfied before the phase can start executing. For the time being the only resource requests are (placeholder) buffer reservations, but eventually this will include compute requests too.

Parameters:
  • name (str) – Name of the phase.

  • requests (list of BufferRequest.) – Resource requests.

Returns:

The script phase object.

Return type:

phase.Phase

get_dependencies() list

Get the list of processing block dependencies.

Returns:

Processing block dependencies.

Return type:

list

get_parameters() dict

Get script parameters from processing block. The schema checking is not currently implemented.

Returns:

Processing block parameters.

Return type:

dict

get_scan_types() list[dict]

Get scan types from the execution block. Updates the scan types with the default parameters and channels. This is only supported for real-time scripts

Returns:

Scan types.

Return type:

list

receive_addresses(configured_host_port: dict, chart_name: str | None = None, service_name: str | None = None, namespace: str | None = None, update_dns: bool = True) None

Generate receive addresses and update the processing block state. Chart_name and service_name are only needed if the configured_host_port needs updating with DNS names Currently this only applies to the vis-receive script.

Parameters:
  • configured_host_port (dict) – Constructed host and port.

  • chart_name (str) – Name of the statefulset.

  • service_name (str) – Name of the headless service.

  • namespace (str) – Namespace where it’s going to be deployed.

  • update_dns (boolean) – Whether to update with DNS address, True by default.

static request_buffer(size: float, tags: list[str]) BufferRequest

Request a buffer reservation.

This returns a buffer reservation request that is used to create a script phase. These are currently only placeholders.

Parameters:
  • size (float) – Size of the buffer.

  • tags (list of str) – Tags describing the type of buffer required.

Returns:

Buffer reservation request.

Return type:

BufferRequest

update_parameters(default_parameters: dict, parameters: dict | Mapping) dict

Overwrite (nested) values in default_parameter mapping with those from parameters mapping.

Parameters:
  • default_parameters (dict) – Default parameter values.

  • parameters (dict) – Script specific parameters.

Returns:

Processing block additional parameters.

Return type:

dict

validate_parameters(model)

Validate processing block parameters against a given Pydantic model.

Raises Validation error if validation fails else returns the Pydantic class initialized with the processing block parameters.

Parameters:

model – Model to validate against

Returns:

Parameters

Return type:

ParameterBaseModel

Raises:

ValidationError

Buffer request

class ska_sdp_scripting.buffer_request.BufferRequest(size: float, tags: List[str])

Request a buffer reservation.

This is currently just a placeholder.

Parameters:
  • size (float) – size of the buffer

  • tags (list of str) – tags describing the type of buffer required

size: float
tags: List[str]

Script phase

class ska_sdp_scripting.phase.Phase(name: str, list_requests: list, config: ska_sdp_config.Config, pb_id: str, eb_id: str, script_kind: str, flow_data: list[DataFlow])

This class encapsulates the logic for managing a single phase within a processing block.

Objects of this class should not be created directly. use the ProcessingBlock.create_phase method instead.

The key functionalities include:

  • Resource Management:

    Handles the allocation and release of resources required by the phase, including interacting with the queue connector.

  • Execution Engine Deployment:

    Provides methods for deploying various types of execution engines (e.g., ee_deploy_helm(), ee_deploy_dask(), ee_deploy_test()).

  • State Management:

    Monitors the state of deployments and data flows, updating the processing block state accordingly. Includes methods like check_state(), update_pb_state(), monitor_deployments(), and monitor_data_flows().

  • Context Management:

    Designed to be used with the with statement (__enter__() and __exit__() methods) to ensure proper setup and teardown of resources.

Parameters:
  • name (str) – Name of the phase.

  • list_requests (list) – List of requests.

  • config (ska_sdp_config.Config) – SDP configuration client.

  • pb_id (str) – Processing block ID.

  • eb_id (str) – Execution block ID.

  • script_kind (str) – Script kind.

  • flow_data (list[DataFlow]) – Dataflow list.

check_state(txn: ska_sdp_config.config.Transaction, check_realtime: bool = True) None

Check the state of the processing block.

Check if the ProcessingBlock state is finished or cancelled, and for real-time scripts check if the ExecutionBlock is finished or cancelled.

Parameters:
  • txn (Transaction) – SDP configuration transaction.

  • check_realtime (bool) – Whether to check execution block state if the processing block is realtime (i.e. cancel processing script for FINISHED/CANCELLED)

ee_deploy_dask(name: str, n_workers: int, func: Callable, f_args: tuple[Any]) DaskDeploy

Deploy a Dask execution engine.

Parameters:
  • name (str) – Deployment name.

  • n_workers (int) – Number of Dask workers.

  • func (callable) – Function to execute.

  • f_args (tuple) – Function arguments.

Returns:

Dask execution engine deployment

Return type:

DaskDeploy

ee_deploy_helm(deploy_name: str, values: dict | None = None, version: str = '1.0.0') HelmDeploy

Deploy a Helm execution engine.

This can be used to deploy any Helm chart.

Parameters:
  • deploy_name (str) – Name of Helm chart.

  • values (dict, optional) – Values to pass to Helm chart.

  • version (str) – Helm chart version, defaults to 1.0.0

Returns:

Helm execution engine deployment

Return type:

HelmDeploy

ee_deploy_slurm(deploy_name: str, slargs: dict | None = None) SlurmDeploy

Deploy a Slurm execution engine.

Parameters:
  • deploy_name (str) – name of slurm deployment

  • slargs (dict, optional) – slurm arguments

Returns:

Slurm execution engine deployment

Return type:

SlurmDeploy

ee_deploy_test(deploy_name: str, func: Callable | None = None, f_args: list[Any] | None = None) FakeDeploy

Deploy a fake execution engine.

This is used for testing and example purposes.

Parameters:
  • deploy_name (str) – Deployment name.

  • func (function) – Function to execute.

  • f_args (tuple) – Function arguments.

Returns:

Fake execution engine deployment

Return type:

FakeDeploy

is_eb_finished(txn: ska_sdp_config.config.Transaction) bool

Check if the ExecutionBlock is finished or cancelled.

Parameters:

txn (Transaction) – Config db transaction.

Return type:

bool

monitor_data_flows(txn: ska_sdp_config.config.Transaction, iteration: int = 0) None

Monitor the data flow state and aggregate any error messages into the processing block’s ‘error_messages’ key.

Parameters:
  • txn (Transaction) – Transaction object.

  • iteration (int) – Number of txn iteration.

monitor_deployments(txn: ska_sdp_config.config.Transaction, iteration: int = 0) None

Monitor deployments, update the deployment status in the processing block state based on the deployments’ pods’ state.

Also update the deployments_ready pb state key.

At the moment deployments_ready = True only if all deployments are RUNNING. Else, it is False.

Parameters:
  • txn (Transaction) – Transaction object.

  • iteration (int) – Number of txn iteration.

update_pb_state(status: ProcessingBlockStatus = ProcessingBlockStatus.UNSET) None

Update processing block state.

If the status is UNSET, it is marked as finished.

Parameters:

status (str, optional) – Status.

wait_loop(func: Callable[[ska_sdp_config.config.Transaction], bool], time_to_ready: int = 0) None

Wait loop to check the status of the processing block. It also updates the processing block state with deployment statuses and errors captured in data flows for realtime scripts.

Parameters:
  • func – Function to check condition for exiting the watcher loop.

  • time_to_ready – Set deployments_ready to true after this amount of time has passed (seconds). Only for deployments deployed with ee_deploy_test().

Execution Engine (EE) deployment

class ska_sdp_scripting.ee_base_deploy.EEDeploy(pb_id: str, config: ska_sdp_config.Config)

Base class for execution engine deployment.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – SDP configuration client

property deploy_id: str

Get the deployment ID.

Returns:

deployment ID

Return type:

str

is_finished(txn: ska_sdp_config.config.Transaction) bool

Check if the deployment is finished.

Parameters:

txn (ska_sdp_config.Transaction) – configuration transaction

Return type:

bool

remove(deploy_id: str) None

Remove the execution engine.

Parameters:

deploy_id (str) – deployment ID

update_deploy_status(status: str) None

Update deployment status.

Parameters:

status (str) – status

Helm EE Deployment

class ska_sdp_scripting.helm_deploy.HelmDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, values: dict | None = None, version: str = '1.0.0')

Deploy Helm execution engine.

This should not be created directly, use the Phase.ee_deploy_helm() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Config) – SDP configuration client

  • deploy_name (str) – name of Helm chart to deploy

  • values (dict, optional) – values to pass to Helm chart

Slurm EE Deployment

class ska_sdp_scripting.slurm_deploy.SlurmDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, slargs: dict | None = None)

Deploy Slurm execution engine.

This should not be created directly, use the Phase.ee_deploy_slurm() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Config) – SDP configuration client

  • deploy_name (str) – deployment name

  • slargs (dict, optional) – slurm arguments to pass

Dask EE deployment

class ska_sdp_scripting.dask_deploy.DaskDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, n_workers: int, func: Callable, f_args: Tuple[Any])

Deploy a Dask execution engine.

The function (func) when called with the arguments (f_args) should return a Dask graph. The graph is then executed by calling the compute method:

result = func(*f_args)
result.compute()

This happens in a separate thread so the constructor can return immediately.

This should not be created directly, use the Phase.ee_deploy_dask() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – configuration DB client

  • deploy_name (str) – deployment name

  • n_workers (int) – number of Dask workers

  • func (callable) – function to execute

  • f_args (tuple) – function arguments

Fake EE deployment

class ska_sdp_scripting.fake_deploy.FakeDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, func: Callable | None = None, f_args: Tuple[Any] | None = None)

Deploy a fake execution engine.

The function is called with the arguments in a separate thread so the constructor can return immediately.

This should not be created directly, use the Phase.ee_deploy_test() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – SDP configuration client

  • deploy_name (str) – deployment name

  • func (callable) – function to execute

  • f_args (tuple) – function arguments

set_deployments_ready(time_to_ready: int = 0) None

Set deployments_ready to True

Parameters:

time_to_ready – set deployments_ready to true after this amount of time has passed (seconds)

Queue connector functions

Common functions used to configure the QueueConnector

ska_sdp_scripting.queue_connector.wait_for_queue_connector_state(config: ska_sdp_config.Config, pb_id: str | None, expected_state: str | None, timeout: float) None

Wait for all queue connector flows to reach an expected state.

Parameters:
  • config – SDP configuration client

  • pb_id – Processing Block ID of flows

  • expected_state – Expected state the QueueConnector needs to reach

  • timeout – Time in seconds to elapse before TimeoutError

Raises:

TimeoutError – Timeout exceeded wait

Receive Addresses utilities

Generate Receive Addresses

ska_sdp_scripting.receive_addresses.generate_host_port_channel_map(scan_types: list, port_start: int, channels_per_port: int, *, num_hosts: int = 1, max_ports_per_host: int | None = None) tuple[dict, dict]

Returns the dictionary of host and port channel maps for a given list of scan types. The dictionary is indexed by Scan Type ID and then by Beam ID, each entry at this last level has two lists under “host” and “port” with the corresponding channel maps.

Parameters:
  • scan_types – The list of scan types for which receive addresses should be calculated.

  • port_start – The first port to allocate on each host.

  • channels_per_port – The number of channels that are sent per port.

  • num_hosts – If given, the exact number of hosts to allocate.

  • max_ports_per_host – If given, the maximum number of ports to allocate to a host before a new host is allocated (per beam).

Returns:

The dictionary of host and port channels maps for a given list of scan types. Hosts are identified by running, 0-indexed numbers, so that actual hostnames/IPs can be easily assigned later.

ska_sdp_scripting.receive_addresses.generate_host_port_per_beam(spectral_windows: list, port_start: int, channels_per_port: int, *, num_hosts: int | None = None, max_ports_per_host: int | None = None) tuple[list, list, list]

Returns the list of host and port channel maps for a given beam’s spectral window list. The calculation starts at a given starting port for convenience and takes into account that more than one channel can be sent per port. Additionally, users can specify either a fixed number of pods that the algorithm should allocate, or (alternatively) the maximum number of ports after which a new pod should be allocated by the algorithm.

Parameters:
  • spectral_windows – The list of Spectral Windows for which receive addresses should be calculated.

  • port_start – The first port to allocate on each host.

  • channels_per_port – The number of channels that are sent per port.

  • num_hosts – If given, the exact number of hosts to allocate.

  • max_ports_per_host – If given, the maximum number of ports to allocate to a host before a new host is allocated.

Returns:

A three-tuple with: the list of host channel maps, the list of port channel maps, and a list of port counts for each host. Hosts are identified by running, 0-indexed numbers, so that actual hostnames/IPs can be easily assigned later.