Piper
A complete installable data analysis pipeline can be defined with the ska-sdp-piper framework.
A Pipeline consists of Stage objects (grouped in
Stages) and executes them in-order through a runner.
A stage is created by decorating a python function (the stage definition)
with ConfigurableStage.
Stage configuration is inferred from function annotations and validated with
pydantic. Stage parameters are configurable via YAML and can also be overridden
at runtime with --set.
Examples
Process Visibility Pipeline
This example demonstrates a pipeline with three stages:
initiate_data(): Builds input dataprocess_data(): Applies runtime processing parameterslog_average_data(): Logs final aggregate output
Defining configurable stages
A configurable stage is defined by decorating a standard python function with
ConfigurableStage().
The stage function signature is:
stage_name(pipeline_metadata..., configuration_parameters...,)
The framework provides the following metadata arguments:
_upstream_output_: Output of upstream stage. This parameter must be present in each stage. If using the Pipeline's default runner class i.e.DefaultRunner, then the type of this parameter is adict._global_parameters_: pipeline-level global configuration model. This is a instance of the pydantic model defined while initializing thePipeline. The default value for this isPiperBaseModel_output_dir_: resolved output path for the current pipeline run. The type isstr.
Example stage definitions:
>>> from typing import Annotated, Literal
>>> import dask.array as da
>>> import numpy as np
>>> from pydantic import BaseModel, Field
>>> from ska_sdp_piper.piper.command import CLIArgument
>>> from ska_sdp_piper.piper.stage import ConfigurableStage
>>>
>>> class GlobalConfig(BaseModel):
... div_value: float = 1.0
>>>
>>> class Limits(BaseModel):
... start: int = Field(default=0, description="Start of the range")
... stop: int = Field(default=1001, description="Stop of the range")
>>>
>>> @ConfigurableStage
... def initiate_data(
... _upstream_output_,
... _output_dir_,
... input_path: Annotated[str, CLIArgument],
... limits: Annotated[Limits, Field(description="Limits", default_factory=Limits)],
... ):
... data = da.arange(limits.start, limits.stop)
... return {"data": data}
>>>
>>> @ConfigurableStage(name="process_data")
... def process_data(
... _upstream_output_,
... _global_parameters_: GlobalConfig,
... added_value: Annotated[float, CLIArgument],
... multiplier: Annotated[
... Literal[1.0, 2.5, 5.0],
... Field(description="Multiplier for data processing"),
... ] = 1.0,
... ):
... data = _upstream_output_["data"]
... processed_data = multiplier * data + added_value
...
... div_value = _global_parameters_.div_value
... if div_value:
... processed_data = processed_data / div_value
...
... return {**_upstream_output_, "processed_data": processed_data}
>>>
>>> @ConfigurableStage
... def log_average_data(_upstream_output_):
... processed_data = _upstream_output_["processed_data"]
... average = np.average(processed_data)
... return {"average": average}
Defining the pipeline
The Pipeline constructor takes:
name: Name of the pipelinestages: An instance ofStagesoptional
global_config_modelwhich is a pydantic model of the global configurationoptional
version
Example pipeline definitions:
>>> from ska_sdp_piper.piper.pipeline import Pipeline
>>> from ska_sdp_piper.piper.stage import Stages
>>>
>>> pipeline = Pipeline(
... "mockpipe",
... stages=Stages([
... initiate_data,
... process_data,
... log_average_data,
... ]),
... global_config_model=GlobalConfig,
... version="1.0",
... )
Optional run customization with overide_run
If a pipeline needs additional CLI arguments to be passed to the underlying stages, or needs a custom execution
strategy, use Pipeline.overide_run().
This is the API used to "override run" behavior for a specific pipeline.
>>> from ska_sdp_piper.piper.runners import DaskRunner
>>>
>>> class CustomDaskRunner(DaskRunner):
... @classmethod
... def cli_args(cls):
... return []
...
... def execute(self):
... output = None
... for stage in self.pipeline.executable_stages:
... output = stage(output)
... return output
>>>
>>> pipeline = pipeline.overide_run(
... CLIArgument(
... "--added-value",
... dest="added_value",
... type=float,
... default=0,
... help="Value added to processed data",
... ),
... CLIArgument(
... "--input",
... dest="input_path",
... type=str,
... required=True,
... help="Input data",
... ),
... runner=CustomDaskRunner,
... )
Notes:
overide_runis optional. Use it only when defaults are insufficient.The provided
runnermust implementexecute(self).Runner-specific CLI arguments can be exposed via
@classmethod cli_args.Default run arguments (
--config,--output,--stages,--set,--verbose,--[no-]unique-output-subdir) are still included.
Custom subcommands
By default, Pipeline registers built-in run and
install-config subcommands.
Custom subcommands are added by decorating python functions with
Pipeline.sub_command().
The decorator accepts variadic CLIArgument values, and the
callback receives parsed keyword arguments directly.
>>> @pipeline.sub_command(
... "clean",
... CLIArgument(
... "--output",
... type=str,
... dest="output_path",
... required=True,
... help="Path to cleanup",
... ),
... help="Clean output artefacts",
... )
... def cleanup(output_path):
... folder_contents = os.listdir(output_path)
... for content in folder_contents:
... timestamped_path = f"{output_path}/{content}"
... if pipeline.name in content and os.path.isdir(timestamped_path):
... shutil.rmtree(timestamped_path)
Equivalent callback style using explicit kwargs is also valid:
>>> @pipeline.sub_command("hello", CLIArgument("name", type=str), help="Say hello")
... def hello(**cli_args):
... print(f"Hello {cli_args['name']}")
Entire Pipeline Definition
A complete, runnable pipeline example is available in tests/mockpipe.py.
Installing the Pipeline
If the pipeline definition is part of a larger python project, pyproject.toml can be used to manage dependencies and expose the executable pipeline.
Add the following section in pyproject.toml.
[tool.poetry.scripts]
mockpipe = "complete.import.path.to.process_vis_pipeline:pipeline"
Alternatively, use the piper install command to register the pipeline
as an executable in the current python environment,
without modifying pyproject.toml.
piper install mockpipe tests/mockpipe.py
This command creates a CLI entry point for the pipeline. The pipeline object must be instantiated and available at module level in the provided script.
To uninstall a registered pipeline:
piper uninstall mockpipe
Pipeline Configuration File
The default configuration for the above pipeline looks like this:
global_parameters:
div_value: 1.0
parameters:
initiate_data:
limits:
start: 0
stop: 1001
process_data:
multiplier: 1.0
log_average_data: {}
pipeline:
initiate_data: true
process_data: true
log_average_data: true
The generated configuration consists of three sections:
Pipeline section
Parameters section
Global parameters section
Executing the pipeline
The installed CLI application provides these sub-commands:
run
install-config
any custom subcommand (for example clean)
You can run mockpipe --help to inspect available subcommands.
The pipeline can be executed as:
mockpipe run \
--input /path/to/input \
--added-value 10 \
--output /path/to/store/output
Toggling pipeline stages
Stages can be toggled off during execution in either of these ways:
Use
--stagesand pass only stage names to run.Set stage flags under the
pipelinesection in config tofalse.
Sphinx Extension
Piper also provides Sphinx extensions that can auto-generate configuration tables for the entire pipeline and for individual stages.
Pipeline extension
To activate the extension, define (or append) the extensions variable in
conf.py like this:
extensions = ["ska_sdp_piper.extensions.sphinx_piper_pipeline"]
Then create an RST file with the following directive:
Test Pipeline Documentation
===========================
.. pipelineconfig:: import.path.to.process_vis_pipeline.pipeline
This auto-generates the configuration for each stage in its own table.