Piper

A complete installable data analysis pipeline can be defined with the ska-sdp-piper framework.

A Pipeline consists of Stage objects (grouped in Stages) and executes them in-order through a runner. A stage is created by decorating a python function (the stage definition) with ConfigurableStage.

Stage configuration is inferred from function annotations and validated with pydantic. Stage parameters are configurable via YAML and can also be overridden at runtime with --set.

Examples

Process Visibility Pipeline

This example demonstrates a pipeline with three stages:

  • initiate_data(): Builds input data

  • process_data(): Applies runtime processing parameters

  • log_average_data(): Logs final aggregate output

Defining configurable stages

A configurable stage is defined by decorating a standard python function with ConfigurableStage().

The stage function signature is:

stage_name(pipeline_metadata..., configuration_parameters...,)

The framework provides the following metadata arguments:

  • _upstream_output_: Output of upstream stage. This parameter must be present in each stage. If using the Pipeline's default runner class i.e. DefaultRunner, then the type of this parameter is a dict.

  • _global_parameters_: pipeline-level global configuration model. This is a instance of the pydantic model defined while initializing the Pipeline. The default value for this is PiperBaseModel

  • _output_dir_: resolved output path for the current pipeline run. The type is str.

Example stage definitions:

>>> from typing import Annotated, Literal
>>> import dask.array as da
>>> import numpy as np
>>> from pydantic import BaseModel, Field
>>> from ska_sdp_piper.piper.command import CLIArgument
>>> from ska_sdp_piper.piper.stage import ConfigurableStage
>>>
>>> class GlobalConfig(BaseModel):
...     div_value: float = 1.0
>>>
>>> class Limits(BaseModel):
...     start: int = Field(default=0, description="Start of the range")
...     stop: int = Field(default=1001, description="Stop of the range")
>>>
>>> @ConfigurableStage
... def initiate_data(
...     _upstream_output_,
...     _output_dir_,
...     input_path: Annotated[str, CLIArgument],
...     limits: Annotated[Limits, Field(description="Limits", default_factory=Limits)],
... ):
...     data = da.arange(limits.start, limits.stop)
...     return {"data": data}
>>>
>>> @ConfigurableStage(name="process_data")
... def process_data(
...     _upstream_output_,
...     _global_parameters_: GlobalConfig,
...     added_value: Annotated[float, CLIArgument],
...     multiplier: Annotated[
...         Literal[1.0, 2.5, 5.0],
...         Field(description="Multiplier for data processing"),
...     ] = 1.0,
... ):
...     data = _upstream_output_["data"]
...     processed_data = multiplier * data + added_value
...
...     div_value = _global_parameters_.div_value
...     if div_value:
...         processed_data = processed_data / div_value
...
...     return {**_upstream_output_, "processed_data": processed_data}
>>>
>>> @ConfigurableStage
... def log_average_data(_upstream_output_):
...     processed_data = _upstream_output_["processed_data"]
...     average = np.average(processed_data)
...     return {"average": average}

Defining the pipeline

The Pipeline constructor takes:

  • name: Name of the pipeline

  • stages: An instance of Stages

  • optional global_config_model which is a pydantic model of the global configuration

  • optional version

Example pipeline definitions:

>>> from ska_sdp_piper.piper.pipeline import Pipeline
>>> from ska_sdp_piper.piper.stage import Stages
>>>
>>> pipeline = Pipeline(
...     "mockpipe",
...     stages=Stages([
...         initiate_data,
...         process_data,
...         log_average_data,
...     ]),
...     global_config_model=GlobalConfig,
...     version="1.0",
... )

Optional run customization with overide_run

If a pipeline needs additional CLI arguments to be passed to the underlying stages, or needs a custom execution strategy, use Pipeline.overide_run().

This is the API used to "override run" behavior for a specific pipeline.

>>> from ska_sdp_piper.piper.runners import DaskRunner
>>>
>>> class CustomDaskRunner(DaskRunner):
...     @classmethod
...     def cli_args(cls):
...         return []
...
...     def execute(self):
...         output = None
...         for stage in self.pipeline.executable_stages:
...             output = stage(output)
...         return output
>>>
>>> pipeline = pipeline.overide_run(
...     CLIArgument(
...         "--added-value",
...         dest="added_value",
...         type=float,
...         default=0,
...         help="Value added to processed data",
...     ),
...     CLIArgument(
...         "--input",
...         dest="input_path",
...         type=str,
...         required=True,
...         help="Input data",
...     ),
...     runner=CustomDaskRunner,
... )

Notes:

  • overide_run is optional. Use it only when defaults are insufficient.

  • The provided runner must implement execute(self).

  • Runner-specific CLI arguments can be exposed via @classmethod cli_args.

  • Default run arguments (--config, --output, --stages, --set, --verbose, --[no-]unique-output-subdir) are still included.

Custom subcommands

By default, Pipeline registers built-in run and install-config subcommands.

Custom subcommands are added by decorating python functions with Pipeline.sub_command().

The decorator accepts variadic CLIArgument values, and the callback receives parsed keyword arguments directly.

>>> @pipeline.sub_command(
...     "clean",
...     CLIArgument(
...         "--output",
...         type=str,
...         dest="output_path",
...         required=True,
...         help="Path to cleanup",
...     ),
...     help="Clean output artefacts",
... )
... def cleanup(output_path):
...     folder_contents = os.listdir(output_path)
...     for content in folder_contents:
...         timestamped_path = f"{output_path}/{content}"
...         if pipeline.name in content and os.path.isdir(timestamped_path):
...             shutil.rmtree(timestamped_path)

Equivalent callback style using explicit kwargs is also valid:

>>> @pipeline.sub_command("hello", CLIArgument("name", type=str), help="Say hello")
... def hello(**cli_args):
...     print(f"Hello {cli_args['name']}")

Entire Pipeline Definition

A complete, runnable pipeline example is available in tests/mockpipe.py.

Installing the Pipeline

If the pipeline definition is part of a larger python project, pyproject.toml can be used to manage dependencies and expose the executable pipeline.

Add the following section in pyproject.toml.

[tool.poetry.scripts]
mockpipe = "complete.import.path.to.process_vis_pipeline:pipeline"

Alternatively, use the piper install command to register the pipeline as an executable in the current python environment, without modifying pyproject.toml.

piper install mockpipe tests/mockpipe.py

This command creates a CLI entry point for the pipeline. The pipeline object must be instantiated and available at module level in the provided script.

To uninstall a registered pipeline:

piper uninstall mockpipe

Pipeline Configuration File

The default configuration for the above pipeline looks like this:

global_parameters:
  div_value: 1.0
parameters:
  initiate_data:
    limits:
      start: 0
      stop: 1001
  process_data:
    multiplier: 1.0
  log_average_data: {}
pipeline:
  initiate_data: true
  process_data: true
  log_average_data: true

The generated configuration consists of three sections:

  1. Pipeline section

  2. Parameters section

  3. Global parameters section

Executing the pipeline

The installed CLI application provides these sub-commands:

  1. run

  2. install-config

  3. any custom subcommand (for example clean)

You can run mockpipe --help to inspect available subcommands.

The pipeline can be executed as:

mockpipe run \
  --input /path/to/input \
  --added-value 10 \
  --output /path/to/store/output

Toggling pipeline stages

Stages can be toggled off during execution in either of these ways:

  1. Use --stages and pass only stage names to run.

  2. Set stage flags under the pipeline section in config to false.

Sphinx Extension

Piper also provides Sphinx extensions that can auto-generate configuration tables for the entire pipeline and for individual stages.

Pipeline extension

To activate the extension, define (or append) the extensions variable in conf.py like this:

extensions = ["ska_sdp_piper.extensions.sphinx_piper_pipeline"]

Then create an RST file with the following directive:

Test Pipeline Documentation
===========================

.. pipelineconfig:: import.path.to.process_vis_pipeline.pipeline

This auto-generates the configuration for each stage in its own table.