API

Processing block

class ska_sdp_scripting.processing_block.ProcessingBlock(pb_id: str | None = None)

This is the core class of the API. It handles the following processes:

  • Claiming and Releasing Processing Blocks:

    The constructor claims a processing block identified by pb_id from the configuration database. The exit method (and the context manager protocol via __enter__ and __exit__) releases the claim.

  • Managing Processing Block State:

    The state of the processing block in the configuration database is continuously updated with error messages as they occur. Additionally, the publish_receive_addresses() method updates the state of the processing block in the configuration database to record the receive addresses, which are genearted with generate_receive_addresses().

  • Handling Script Parameters:

    Methods get_parameters(), update_parameters(), and validate_parameters() manage script parameters, including validation against a provided Pydantic model.

  • Defining Data Flows:

    The create_data_flow() method allows defining data flows within the processing block, specifying source and sink types.

  • Handling Dependencies:

    In __init__() all FlowDependencies are automatically registered with the class, which are returned by the get_dependencies() method. create_flow_dependency() allows users to register custom dependencies as well.

  • Creating Execution Phases:

    The create_phase() method creates execution phases, which represent steps in the processing block, and associates them with resource requests (like buffer reservations).

Parameters:

pb_id (str, optional) – Processing block ID.

add_phase(phase_name: str, length: int, status: str | None = None) None
Parameters:
  • phase_name – Phase name.

  • length – Expected phase duration in seconds.

  • status – The current phase status.

Raises:

ValueError – If any parameter is invalid.

create_data_flow(name: str, data_model: str, sink: ska_sdp_config.entity.flow.SharedMem | ska_sdp_config.entity.flow.DataQueue | ska_sdp_config.entity.flow.Display | ska_sdp_config.entity.flow.DataProduct | ska_sdp_config.entity.flow.TangoAttribute | ska_sdp_config.entity.flow.TangoAttributeMap | ska_sdp_config.entity.flow.DataProductPersist | ska_sdp_config.entity.flow.SpeadStream, sources: list | None = None, expiry_time: int = -1) DataFlow

Create and register a DataFlow to this processing block.

Parameters:
  • name (str) – Name of the DataFlow object.

  • data_model (str) – Data model.

  • sink (see SINKS in data_flow.py for available options) – Sink used in this flow.

  • sources (list or None) – List of sources to be used.

  • expiry_time (int) – Time to keep flow after completions. default: DEFAULT_FLOW_TIMEOUT

Returns:

Data flow.

Return type:

dataflow.DataFlow

create_flow_dependency(pb_id: str, name: str, origin: str, kind: str = 'data-product', expiry_time: float = -1, description: str = '') ska_sdp_config.entity.Dependency

Create and register a Flow Dependency to this processing block.

create_phase(name: str, requests: list[BufferRequest]) Phase

Create a script phase for deploying execution engines.

The phase is created with a list of resource requests which must be satisfied before the phase can start executing. For the time being the only resource requests are (placeholder) buffer reservations, but eventually this will include compute requests too.

Parameters:
  • name (str) – Name of the phase.

  • requests (list of BufferRequest.) – Resource requests.

  • dependencies (List of :py:class: Dependency.) – Flow dependencies.

Returns:

The script phase object.

Return type:

phase.Phase

create_sdm_flow(name: str, expiry_time: int = -1) DataFlow

Create a request to an LSM.

Parameters:
  • name – Name of the the DataFlow object.

  • output_path – The subpath within the dataproduct path.

  • parameters – The search parameters to use.

  • expiry_time – Time to keep flow after completions. default: DEFAULT_FLOW_TIMEOUT

Returns:

Data flow.

property eb: ska_sdp_config.entity.ExecutionBlock

The Execution Block for to this ProcessingBlock.

property eb_id: str

The ID of the Execution Block associated to this ProcessingBlock.

generate_receive_addresses(function: str, num_hosts: int = 1, port_start: int = 21000, channels_per_port: int = 1) PartialReceiveAddresses

Generate and return the partial receive addresses for this ProcessingBlock’s scan types. At least @par function needs to be provided. See PartialReceiveAddresses.generate() for details on all parameters.

get_dependencies() list

Get the list of processing block dependencies.

Returns:

Processing block dependencies.

Return type:

list

get_parameters() dict

Get script parameters from processing block. The schema checking is not currently implemented.

Returns:

Processing block parameters.

Return type:

dict

get_scan_types() list[dict]

Get scan types from the execution block. Updates the scan types with the default parameters and channels. This is only supported for real-time scripts

Returns:

Scan types.

Return type:

list

property pb_id: str

The ID of this Processing Block.

publish_receive_addresses(receive_addresses: ReceiveAddresses)

Update the processing block state with the given receive addresses. The receive addresses are updated to add beam information from the Execution Block.

Parameters:

receive_addresses – The receive addresses to write.

request_buffer(size: float, kind: str, name: str, phases: list[str] | None = None) BufferRequest | None

Request a buffer reservation.

Parameters:
  • size – Size of the buffer.

  • kind – Kind of resource (e.g., ‘capacity-buffer-storage’).

  • name – Request name (e.g., ‘reserve’).

  • phases – List of phase names this buffer is associated with.

Returns:

Buffer reservation request, or None if resource management toggle is disabled.

set_phases()

Update the phase status in the processing block state.

property subarray_id: str | None

The Subarray ID to which this ProcessingBlok belongs.

update_parameters(default_parameters: dict, parameters: dict | Mapping) dict

Overwrite (nested) values in default_parameter mapping with those from parameters mapping.

Parameters:
  • default_parameters (dict) – Default parameter values.

  • parameters (dict) – Script specific parameters.

Returns:

Processing block additional parameters.

Return type:

dict

update_phase(phase_name: str, status: str | None = None, length: int | None = None) None

Update one or more fields of a specific phase by name.

Parameters:
  • phase_name – The name of the phase to update.

  • status – The status that the phase should be set to.

  • length – The length in seconds the phase will be active for.

Raises:

ValueError – If the phase is not found or status/length is invalid.

validate_parameters(model)

Validate processing block parameters against a given Pydantic model.

Raises Validation error if validation fails else returns the Pydantic class initialized with the processing block parameters.

Parameters:

model – Model to validate against

Returns:

Parameters

Return type:

ParameterBaseModel

Raises:

ValidationError

Buffer request

class ska_sdp_scripting.buffer_request.BufferRequest(size: int, pb_id: str, kind: str, name: str, phases: list[str] | None = None)

Request a buffer reservation.

This represents a request for a resource (typically buffer storage), depending on whether the RESOURCE_MANAGEMENT_TOGGLE feature toggle is enabled.

Parameters:
  • size (int) – Size of the request.

  • pb_id (str) – Processing block ID the request belongs to.

  • kind (str) – Type of resource (e.g., “performance-buffer-storage”).

  • name (str) – Name of request (e.g., “reserve”).

  • phases (list[str]) – List of phase names this buffer is associated with.

property request_key

Create request key.

resource_request()

Create a resource request for buffer storage, including phases.

Script phase

class ska_sdp_scripting.phase.Phase(name: str, list_requests: list, config: ska_sdp_config.Config, pb_id: str, eb_id: str, script_kind: str, flow_data: list[DataFlow], dependencies: list[ska_sdp_config.entity.Dependency] | None = None)

This class encapsulates the logic for managing a single phase within a processing block.

Objects of this class should not be created directly. use the ProcessingBlock.create_phase method instead.

The key functionalities include:

  • Resource Management:

    Handles the allocation and release of resources required by the phase, including interacting with the queue connector.

  • Execution Engine Deployment:

    Provides methods for deploying various types of execution engines (e.g., ee_deploy_helm(), ee_deploy_dask(), ee_deploy_test()).

  • State Management:

    Monitors the state of deployments and data flows, updating the processing block state accordingly. Includes methods like check_state(), update_pb_state(), monitor_deployments(), and monitor_data_flows().

  • Context Management:

    Designed to be used with the with statement (__enter__() and __exit__() methods) to ensure proper setup and teardown of resources.

Parameters:
  • name (str) – Name of the phase.

  • list_requests (list) – List of requests.

  • config (ska_sdp_config.Config) – SDP configuration client.

  • pb_id (str) – Processing block ID.

  • eb_id (str) – Execution block ID.

  • script_kind (str) – Script kind.

  • flow_data (list[DataFlow]) – Dataflow list.

  • dependencies (list[Dependency]) – Dependency list.

check_resource_allocation() bool

Check whether resource requests have corresponding allocations.

Returns:

True if all resources are allocated, False otherwise

Return type:

bool

check_state(txn: ska_sdp_config.config.Transaction, check_realtime: bool = True) None

Check the state of the processing block.

Check if the ProcessingBlock state is finished or cancelled, and for real-time scripts check if the ExecutionBlock is finished or cancelled.

Parameters:
  • txn (Transaction) – SDP configuration transaction.

  • check_realtime (bool) – Whether to check execution block state if the processing block is realtime (i.e. cancel processing script for FINISHED/CANCELLED)

ee_deploy_dask(name: str, n_workers: int, func: Callable, f_args: tuple[Any]) DaskDeploy

Deploy a Dask execution engine.

Parameters:
  • name (str) – Deployment name.

  • n_workers (int) – Number of Dask workers.

  • func (callable) – Function to execute.

  • f_args (tuple) – Function arguments.

Returns:

Dask execution engine deployment

Return type:

DaskDeploy

ee_deploy_helm(deploy_name: str, values: dict | None = None, version: str = '1.0.0') HelmDeploy

Deploy a Helm execution engine.

This can be used to deploy any Helm chart.

Parameters:
  • deploy_name (str) – Name of Helm chart.

  • values (dict, optional) – Values to pass to Helm chart.

  • version (str) – Helm chart version, defaults to 1.0.0

Returns:

Helm execution engine deployment

Return type:

HelmDeploy

ee_deploy_slurm(deploy_name: str, slargs: dict | None = None) SlurmDeploy

Deploy a Slurm execution engine.

Parameters:
  • deploy_name (str) – name of slurm deployment

  • slargs (dict, optional) – slurm arguments

Returns:

Slurm execution engine deployment

Return type:

SlurmDeploy

ee_deploy_test(deploy_name: str, func: Callable | None = None, f_args: list[Any] | None = None) FakeDeploy

Deploy a fake execution engine.

This is used for testing and example purposes.

Parameters:
  • deploy_name (str) – Deployment name.

  • func (function) – Function to execute.

  • f_args (tuple) – Function arguments.

Returns:

Fake execution engine deployment

Return type:

FakeDeploy

is_eb_and_processing_finished(txn: ska_sdp_config.config.Transaction, grace_period: float = 60.0) bool

Check if the ExecutionBlock is finished or cancelled. - If finished or cancelled, wait for all data product flows to reach a final state (COMPLETED, INCOMPLETE, or FAILED) within a grace period. - If timeout is reached and not all flows are in a final state, log an error and proceed to clean up. - If there are no data product flows, proceed to clean up immediately.

Parameters:
  • txn (Transaction) – Config db transaction.

  • grace_period – Time in seconds to wait for flows to complete after EB is finished or cancelled.

Return type:

bool

monitor_data_flows(txn: ska_sdp_config.config.Transaction, iteration: int = 0) dict[str, list[ska_sdp_config.entity.Flow.Key]]

Monitor the data flow state and aggregate any error messages into the processing block’s ‘error_messages’ key.

If any of the flow statuses is FAILED, updates the pb status to FAILED.

Parameters:
  • txn (Transaction) – Transaction object.

  • iteration (int) – Number of txn iteration.

Returns:

Data flow keys grouped by status

monitor_deployments(txn: ska_sdp_config.config.Transaction, iteration: int = 0) dict[str, list[str]]

Monitor deployments, update the deployment status in the processing block state based on the deployments’ pods’ state.

Also update the deployments_ready pb state key.

At the moment deployments_ready = True only if all deployments are RUNNING. Else, it is False.

Parameters:
  • txn (Transaction) – Transaction object.

  • iteration (int) – Number of txn iteration.

Returns:

Deployment IDs grouped by status.

update_pb_state(status: ProcessingBlockStatus = ProcessingBlockStatus.UNSET) None

Update processing block state.

If the status is UNSET, it is marked as finished.

Parameters:

status (str, optional) – Status.

wait_loop(func: Callable[[ska_sdp_config.config.Transaction], bool], time_to_ready: int = 0) None

Wait loop to check the status of the processing block. It also updates the processing block state with deployment statuses and errors captured in data flows.

Parameters:
  • func – Function to check condition for exiting the watcher loop.

  • time_to_ready – Set deployments_ready to true after this amount of time has passed (seconds). Only for deployments deployed with ee_deploy_test().

Execution Engine (EE) deployment

class ska_sdp_scripting.ee_base_deploy.EEDeploy(pb_id: str, config: ska_sdp_config.Config)

Base class for execution engine deployment.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – SDP configuration client

property deploy_id: str

Get the deployment ID.

Returns:

deployment ID

Return type:

str

is_finished(txn: ska_sdp_config.config.Transaction) bool

Check if the deployment is finished.

Parameters:

txn (ska_sdp_config.Transaction) – configuration transaction

Return type:

bool

remove(deploy_id: str) None

Remove the execution engine.

Parameters:

deploy_id (str) – deployment ID

update_deploy_status(status: str) None

Update deployment status.

Parameters:

status (str) – status

Helm EE Deployment

class ska_sdp_scripting.helm_deploy.HelmDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, values: dict | None = None, version: str = '1.0.0')

Deploy Helm execution engine.

This should not be created directly, use the Phase.ee_deploy_helm() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Config) – SDP configuration client

  • deploy_name (str) – name of Helm chart to deploy

  • values (dict, optional) – values to pass to Helm chart

Slurm EE Deployment

class ska_sdp_scripting.slurm_deploy.SlurmDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, slargs: dict | None = None)

Deploy Slurm execution engine.

This should not be created directly, use the Phase.ee_deploy_slurm() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Config) – SDP configuration client

  • deploy_name (str) – deployment name

  • slargs (dict, optional) – slurm arguments to pass

Dask EE deployment

class ska_sdp_scripting.dask_deploy.DaskDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, n_workers: int, func: Callable, f_args: Tuple[Any])

Deploy a Dask execution engine.

The function (func) when called with the arguments (f_args) should return a Dask graph. The graph is then executed by calling the compute method:

result = func(*f_args)
result.compute()

This happens in a separate thread so the constructor can return immediately.

This should not be created directly, use the Phase.ee_deploy_dask() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – configuration DB client

  • deploy_name (str) – deployment name

  • n_workers (int) – number of Dask workers

  • func (callable) – function to execute

  • f_args (tuple) – function arguments

Fake EE deployment

class ska_sdp_scripting.fake_deploy.FakeDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, func: Callable | None = None, f_args: Tuple[Any] | None = None)

Deploy a fake execution engine.

The function is called with the arguments in a separate thread so the constructor can return immediately.

This should not be created directly, use the Phase.ee_deploy_test() method instead.

Parameters:
  • pb_id (str) – processing block ID

  • config (ska_sdp_config.Client) – SDP configuration client

  • deploy_name (str) – deployment name

  • func (callable) – function to execute

  • f_args (tuple) – function arguments

set_deployments_ready(time_to_ready: int = 0) None

Set deployments_ready to True

Parameters:

time_to_ready – set deployments_ready to true after this amount of time has passed (seconds)

Queue connector functions

Common functions used to configure the QueueConnector

ska_sdp_scripting.queue_connector.wait_for_queue_connector_state(config: ska_sdp_config.Config, pb_id: str | None, expected_state: str | None, timeout: float) None

Wait for all queue connector flows to reach an expected state.

Parameters:
  • config – SDP configuration client

  • pb_id – Processing Block ID of flows

  • expected_state – Expected state the QueueConnector needs to reach

  • timeout – Time in seconds to elapse before TimeoutError

Raises:

TimeoutError – Timeout exceeded wait

Receive Addresses utilities

Generate Receive Addresses

class ska_sdp_scripting.receive_addresses.receive_addresses.PartialReceiveAddresses(channels_per_port: int = 1, allocations: dict[str, ~ska_sdp_scripting.receive_addresses.receive_addresses._ScanTypePortAllocT] = <factory>)

Bases: ReceiveAddressesBase[PartialScanTypePortAllocation, PartialBeamChannelAddresses, int]

Receive addresses generated by SDP. They have placeholders for hostnames, and include only host and port information.

static generate(scan_types: list[dict] | list[ScanType], port_start: int, channels_per_port: int, *, function: str = 'visibilities', num_hosts: int = 1) PartialReceiveAddresses

Creates a PartialReceiveAddresses object with the results of calling _allocate_ports().

replace_hosts(hosts: list[str]) ReceiveAddresses

Replaces the hosts in this PartialReceiveAddresses object using replace_hosts(), returning a new ReceiveAddresses object.

class ska_sdp_scripting.receive_addresses.receive_addresses.ReceiveAddresses(channels_per_port: int = 1, allocations: dict[str, ~ska_sdp_scripting.receive_addresses.receive_addresses._ScanTypePortAllocT] = <factory>, extra_values: dict[str, dict[str, ~typing.Any]] = <factory>)

Bases: ReceiveAddressesBase[ScanTypePortAllocation, BeamChannelAddresses, str]

Receive addresses generated and published by SDP. They indicate endpoints that other subsystems are interested in (e.g., visibility receive UDP hosts and ports, tango attributes of interest, etc).

extra_values: dict[str, dict[str, Any]]

Extra per-scan type, per-beam dictionary values.

to_recv_addresses_dict() dict[str, dict[str, BeamChannelAddresses]]

Returns a dictionary representation of these receive addresses, suitable for storage in the SDP Config DB.

class ska_sdp_scripting.receive_addresses.receive_addresses.ReceiveAddressesBase(channels_per_port: int = 1, allocations: dict[str, ~ska_sdp_scripting.receive_addresses.receive_addresses._ScanTypePortAllocT] = <factory>)

Bases: Generic[_ScanTypePortAllocT, _BeamChanAddrT, _HostT]

Base classes for partial and full receive addresses.

allocations: dict[str, _ScanTypePortAllocT]

Per-ScanType partial allocations (i.e., with host placeholders).

channels_per_port: int = 1

Channels to receive per port.

property host_count: int

The number of hosts contained in these receive addresses.

property hosts: set[_HostT]

The set of hosts contained in these receive addresses.

property max_port_count: int

Maximum port count for any given host.

property port_counts: dict[_HostT, int]

Port count per host, across all scan types.

to_recv_addresses_dict() dict[str, dict[str, _BeamChanAddrT]]

Returns a dictionary representation of these receive addresses, suitable for storage in the SDP Config DB.

ska_sdp_scripting.receive_addresses.receive_addresses.copy_beam_info_from_eb(receive_addresses: dict[str, dict[str, BeamChannelAddresses]], eb_beam_map: dict[str, Any])

Add the values for the “function”, and if present the “{x}_beam_id”, keys from the given Execution Block beams to the appropriate receive addresses beams.

“{x}_beam_id” can be visibility_beam_id, search_beam_id, timing_beam_id, vlbi_beam_id depending on the type of beam.

Updates receive_addresses in place.