API
Processing block
- class ska_sdp_scripting.processing_block.ProcessingBlock(pb_id: str | None = None)
This is the core class of the API. It handles the following processes:
- Claiming and Releasing Processing Blocks:
The constructor claims a processing block identified by
pb_id
from the configuration database. The exit method (and the context manager protocol via __enter__ and __exit__) releases the claim.
- Managing Processing Block State:
The
receive_addresses()
method update the state of the processing block in the configuration database, including the receive addresses.
- Handling Script Parameters:
Methods
get_parameters()
,update_parameters()
, andvalidate_parameters()
manage script parameters, including validation against a provided Pydantic model.
- Defining Data Flows:
The
create_data_flow()
method allows defining data flows within the processing block, specifying source and sink types.
- Creating Execution Phases:
The
create_phase()
method creates execution phases, which represent steps in the processing block, and associates them with resource requests (like buffer reservations).
- Retrieving Dependencies and Scan Types:
get_dependencies()
retrieves the dependencies of the processing block, andget_scan_types()
retrieves and updates scan types from the associated execution block(s).
- Parameters:
pb_id¶ (str, optional) – Processing block ID.
- static config_host_port_channel_map(scan_types: list[dict], port_start: int, channels_per_port: int, *, num_hosts: int = 1, max_ports_per_host: int | None = None) tuple[dict, dict]
Configures a dictionary of host and port channel maps for a given list of scan types. This is a wrapper function for
receive_addresses.generate_host_port_channel_map()
function which returns a dictionary of host and port channel maps for a given list of scan types.- Parameters:
scan_types¶ (list[dict]) – The list of scan types for which receive addresses should be calculated.
port_start¶ (int) – The first port to allocate on each host.
channels_per_port¶ (int) – The number of channels that are sent per port.
num_hosts¶ (int) – If given, the exact number of hosts to allocate.
max_ports_per_host¶ (int) – If given, the maximum number of ports to allocate to a host before a new host is allocated (per beam).
- Returns:
Two dictionaries keyed on the scan type id, one for hosts and one for ports of the channel maps for a given list of scan types. Hosts are identified by running, 0-indexed numbers, so that actual hostnames/IPs can be easily assigned later.
- Return type:
tuple[dict, dict]
- create_data_flow(name: str, data_model: str, sink: ska_sdp_config.entity.flow.TelModel | ska_sdp_config.entity.flow.SharedMem | ska_sdp_config.entity.flow.DataQueue | ska_sdp_config.entity.flow.Display | ska_sdp_config.entity.flow.DataProduct | ska_sdp_config.entity.flow.TangoAttribute | ska_sdp_config.entity.flow.TangoAttributeMap | ska_sdp_config.entity.flow.SpeadStream, sources: list | None = None) DataFlow
Create and register a DataFlow to this processing block.
- create_phase(name: str, requests: list[BufferRequest]) Phase
Create a script phase for deploying execution engines.
The phase is created with a list of resource requests which must be satisfied before the phase can start executing. For the time being the only resource requests are (placeholder) buffer reservations, but eventually this will include compute requests too.
- Parameters:
name¶ (str) – Name of the phase.
requests¶ (list of
BufferRequest
.) – Resource requests.
- Returns:
The script phase object.
- Return type:
- get_dependencies() list
Get the list of processing block dependencies.
- Returns:
Processing block dependencies.
- Return type:
list
- get_parameters() dict
Get script parameters from processing block. The schema checking is not currently implemented.
- Returns:
Processing block parameters.
- Return type:
dict
- get_scan_types() list[dict]
Get scan types from the execution block. Updates the scan types with the default parameters and channels. This is only supported for real-time scripts
- Returns:
Scan types.
- Return type:
list
- receive_addresses(configured_host_port: dict, chart_name: str | None = None, service_name: str | None = None, namespace: str | None = None, update_dns: bool = True) None
Generate receive addresses and update the processing block state. Chart_name and service_name are only needed if the
configured_host_port
needs updating with DNS names Currently this only applies to the vis-receive script.- Parameters:
- static request_buffer(size: float, tags: list[str]) BufferRequest
Request a buffer reservation.
This returns a buffer reservation request that is used to create a script phase. These are currently only placeholders.
- Parameters:
- Returns:
Buffer reservation request.
- Return type:
- update_parameters(default_parameters: dict, parameters: dict | Mapping) dict
Overwrite (nested) values in default_parameter mapping with those from parameters mapping.
- validate_parameters(model)
Validate processing block parameters against a given Pydantic model.
Raises Validation error if validation fails else returns the Pydantic class initialized with the processing block parameters.
- Parameters:
model – Model to validate against
- Returns:
Parameters
- Return type:
ParameterBaseModel
- Raises:
ValidationError
Buffer request
Script phase
- class ska_sdp_scripting.phase.Phase(name: str, list_requests: list, config: ska_sdp_config.Config, pb_id: str, eb_id: str, script_kind: str, flow_data: list[DataFlow])
This class encapsulates the logic for managing a single phase within a processing block.
Objects of this class should not be created directly. use the
ProcessingBlock.create_phase
method instead.The key functionalities include:
- Resource Management:
Handles the allocation and release of resources required by the phase, including interacting with the queue connector.
- Execution Engine Deployment:
Provides methods for deploying various types of execution engines (e.g.,
ee_deploy_helm()
,ee_deploy_dask()
,ee_deploy_test()
).
- State Management:
Monitors the state of deployments and data flows, updating the processing block state accordingly. Includes methods like
check_state()
,update_pb_state()
,monitor_deployments()
, andmonitor_data_flows()
.
- Context Management:
Designed to be used with the with statement (
__enter__()
and__exit__()
methods) to ensure proper setup and teardown of resources.
- Parameters:
- check_state(txn: ska_sdp_config.config.Transaction, check_realtime: bool = True) None
Check the state of the processing block.
Check if the
ProcessingBlock
state is finished or cancelled, and for real-time scripts check if theExecutionBlock
is finished or cancelled.
- ee_deploy_dask(name: str, n_workers: int, func: Callable, f_args: tuple[Any]) DaskDeploy
Deploy a Dask execution engine.
- Parameters:
- Returns:
Dask execution engine deployment
- Return type:
- ee_deploy_helm(deploy_name: str, values: dict | None = None, version: str = '1.0.0') HelmDeploy
Deploy a Helm execution engine.
This can be used to deploy any Helm chart.
- Parameters:
- Returns:
Helm execution engine deployment
- Return type:
- ee_deploy_slurm(deploy_name: str, slargs: dict | None = None) SlurmDeploy
Deploy a Slurm execution engine.
- ee_deploy_test(deploy_name: str, func: Callable | None = None, f_args: list[Any] | None = None) FakeDeploy
Deploy a fake execution engine.
This is used for testing and example purposes.
- Parameters:
- Returns:
Fake execution engine deployment
- Return type:
- is_eb_finished(txn: ska_sdp_config.config.Transaction) bool
Check if the
ExecutionBlock
is finished or cancelled.- Parameters:
txn¶ (
Transaction
) – Config db transaction.- Return type:
bool
- monitor_data_flows(txn: ska_sdp_config.config.Transaction, iteration: int = 0) None
Monitor the data flow state and aggregate any error messages into the processing block’s ‘error_messages’ key.
- monitor_deployments(txn: ska_sdp_config.config.Transaction, iteration: int = 0) None
Monitor deployments, update the deployment status in the processing block state based on the deployments’ pods’ state.
Also update the deployments_ready pb state key.
At the moment deployments_ready = True only if all deployments are RUNNING. Else, it is False.
- update_pb_state(status: ProcessingBlockStatus = ProcessingBlockStatus.UNSET) None
Update processing block state.
If the status is UNSET, it is marked as finished.
- Parameters:
status¶ (str, optional) – Status.
- wait_loop(func: Callable[[ska_sdp_config.config.Transaction], bool], time_to_ready: int = 0) None
Wait loop to check the status of the processing block. It also updates the processing block state with deployment statuses and errors captured in data flows for realtime scripts.
- Parameters:
func¶ – Function to check condition for exiting the watcher loop.
time_to_ready¶ – Set deployments_ready to true after this amount of time has passed (seconds). Only for deployments deployed with
ee_deploy_test()
.
Execution Engine (EE) deployment
- class ska_sdp_scripting.ee_base_deploy.EEDeploy(pb_id: str, config: ska_sdp_config.Config)
Base class for execution engine deployment.
- Parameters:
- property deploy_id: str
Get the deployment ID.
- Returns:
deployment ID
- Return type:
str
- is_finished(txn: ska_sdp_config.config.Transaction) bool
Check if the deployment is finished.
- Parameters:
txn¶ (ska_sdp_config.Transaction) – configuration transaction
- Return type:
bool
Helm EE Deployment
- class ska_sdp_scripting.helm_deploy.HelmDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, values: dict | None = None, version: str = '1.0.0')
Deploy Helm execution engine.
This should not be created directly, use the
Phase.ee_deploy_helm()
method instead.
Slurm EE Deployment
- class ska_sdp_scripting.slurm_deploy.SlurmDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, slargs: dict | None = None)
Deploy Slurm execution engine.
This should not be created directly, use the
Phase.ee_deploy_slurm()
method instead.
Dask EE deployment
- class ska_sdp_scripting.dask_deploy.DaskDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, n_workers: int, func: Callable, f_args: Tuple[Any])
Deploy a Dask execution engine.
The function (func) when called with the arguments (f_args) should return a Dask graph. The graph is then executed by calling the compute method:
result = func(*f_args) result.compute()
This happens in a separate thread so the constructor can return immediately.
This should not be created directly, use the
Phase.ee_deploy_dask()
method instead.
Fake EE deployment
- class ska_sdp_scripting.fake_deploy.FakeDeploy(pb_id: str, config: ska_sdp_config.Config, deploy_name: str, func: Callable | None = None, f_args: Tuple[Any] | None = None)
Deploy a fake execution engine.
The function is called with the arguments in a separate thread so the constructor can return immediately.
This should not be created directly, use the
Phase.ee_deploy_test()
method instead.- Parameters:
Queue connector functions
Common functions used to configure the QueueConnector
- ska_sdp_scripting.queue_connector.wait_for_queue_connector_state(config: ska_sdp_config.Config, pb_id: str | None, expected_state: str | None, timeout: float) None
Wait for all queue connector flows to reach an expected state.
Receive Addresses utilities
Generate Receive Addresses
- ska_sdp_scripting.receive_addresses.generate_host_port_channel_map(scan_types: list, port_start: int, channels_per_port: int, *, num_hosts: int = 1, max_ports_per_host: int | None = None) tuple[dict, dict]
Returns the dictionary of host and port channel maps for a given list of scan types. The dictionary is indexed by Scan Type ID and then by Beam ID, each entry at this last level has two lists under “host” and “port” with the corresponding channel maps.
- Parameters:
scan_types¶ – The list of scan types for which receive addresses should be calculated.
port_start¶ – The first port to allocate on each host.
channels_per_port¶ – The number of channels that are sent per port.
num_hosts¶ – If given, the exact number of hosts to allocate.
max_ports_per_host¶ – If given, the maximum number of ports to allocate to a host before a new host is allocated (per beam).
- Returns:
The dictionary of host and port channels maps for a given list of scan types. Hosts are identified by running, 0-indexed numbers, so that actual hostnames/IPs can be easily assigned later.
- ska_sdp_scripting.receive_addresses.generate_host_port_per_beam(spectral_windows: list, port_start: int, channels_per_port: int, *, num_hosts: int | None = None, max_ports_per_host: int | None = None) tuple[list, list, list]
Returns the list of host and port channel maps for a given beam’s spectral window list. The calculation starts at a given starting port for convenience and takes into account that more than one channel can be sent per port. Additionally, users can specify either a fixed number of pods that the algorithm should allocate, or (alternatively) the maximum number of ports after which a new pod should be allocated by the algorithm.
- Parameters:
spectral_windows¶ – The list of Spectral Windows for which receive addresses should be calculated.
port_start¶ – The first port to allocate on each host.
channels_per_port¶ – The number of channels that are sent per port.
num_hosts¶ – If given, the exact number of hosts to allocate.
max_ports_per_host¶ – If given, the maximum number of ports to allocate to a host before a new host is allocated.
- Returns:
A three-tuple with: the list of host channel maps, the list of port channel maps, and a list of port counts for each host. Hosts are identified by running, 0-indexed numbers, so that actual hostnames/IPs can be easily assigned later.