##### Piper ##### A complete installable data analysis pipeline can be defined with the `ska-sdp-piper` framework. A :py:class:`Pipeline` consists of :py:class:`Stage` objects (grouped in :py:class:`Stages`) and executes them in-order through a runner. A stage is created by decorating a python function (the stage definition) with :py:class:`ConfigurableStage`. Stage configuration is inferred from function annotations and validated with ``pydantic``. Stage parameters are configurable via YAML and can also be overridden at runtime with ``--set``. ******** Examples ******** =========================== Process Visibility Pipeline =========================== This example demonstrates a pipeline with three stages: - :py:func:`initiate_data`: Builds input data - :py:func:`process_data`: Applies runtime processing parameters - :py:func:`log_average_data`: Logs final aggregate output ---------------------------- Defining configurable stages ---------------------------- A configurable stage is defined by decorating a standard python function with :py:func:`ConfigurableStage`. The stage function signature is: :py:attr:`stage_name(pipeline_metadata..., configuration_parameters...,)` The framework provides the following metadata arguments: * :py:attr:`_upstream_output_`: Output of upstream stage. This parameter **must be present** in each stage. If using the Pipeline's default runner class i.e. :py:class:`DefaultRunner`, then the type of this parameter is a :py:class:`dict`. * :py:attr:`_global_parameters_`: pipeline-level global configuration model. This is a instance of the pydantic model defined while initializing the ``Pipeline``. The default value for this is :py:class:`PiperBaseModel` * :py:attr:`_output_dir_`: resolved output path for the current pipeline run. The type is :py:class:`str`. Example stage definitions: >>> from typing import Annotated, Literal >>> import dask.array as da >>> import numpy as np >>> from pydantic import BaseModel, Field >>> from ska_sdp_piper.piper.command import CLIArgument >>> from ska_sdp_piper.piper.stage import ConfigurableStage >>> >>> class GlobalConfig(BaseModel): ... div_value: float = 1.0 >>> >>> class Limits(BaseModel): ... start: int = Field(default=0, description="Start of the range") ... stop: int = Field(default=1001, description="Stop of the range") >>> >>> @ConfigurableStage ... def initiate_data( ... _upstream_output_, ... _output_dir_, ... input_path: Annotated[str, CLIArgument], ... limits: Annotated[Limits, Field(description="Limits", default_factory=Limits)], ... ): ... data = da.arange(limits.start, limits.stop) ... return {"data": data} >>> >>> @ConfigurableStage(name="process_data") ... def process_data( ... _upstream_output_, ... _global_parameters_: GlobalConfig, ... added_value: Annotated[float, CLIArgument], ... multiplier: Annotated[ ... Literal[1.0, 2.5, 5.0], ... Field(description="Multiplier for data processing"), ... ] = 1.0, ... ): ... data = _upstream_output_["data"] ... processed_data = multiplier * data + added_value ... ... div_value = _global_parameters_.div_value ... if div_value: ... processed_data = processed_data / div_value ... ... return {**_upstream_output_, "processed_data": processed_data} >>> >>> @ConfigurableStage ... def log_average_data(_upstream_output_): ... processed_data = _upstream_output_["processed_data"] ... average = np.average(processed_data) ... return {"average": average} --------------------- Defining the pipeline --------------------- The :py:class:`Pipeline` constructor takes: - ``name``: Name of the pipeline - ``stages``: An instance of :py:class:`Stages` - optional ``global_config_model`` which is a pydantic model of the global configuration - optional ``version`` Example pipeline definitions: >>> from ska_sdp_piper.piper.pipeline import Pipeline >>> from ska_sdp_piper.piper.stage import Stages >>> >>> pipeline = Pipeline( ... "mockpipe", ... stages=Stages([ ... initiate_data, ... process_data, ... log_average_data, ... ]), ... global_config_model=GlobalConfig, ... version="1.0", ... ) ------------------------------------------- Optional run customization with overide_run ------------------------------------------- If a pipeline needs additional CLI arguments to be passed to the underlying stages, or needs a custom execution strategy, use :py:func:`Pipeline.overide_run`. This is the API used to "override run" behavior for a specific pipeline. >>> from ska_sdp_piper.piper.runners import DaskRunner >>> >>> class CustomDaskRunner(DaskRunner): ... @classmethod ... def cli_args(cls): ... return [] ... ... def execute(self): ... output = None ... for stage in self.pipeline.executable_stages: ... output = stage(output) ... return output >>> >>> pipeline = pipeline.overide_run( ... CLIArgument( ... "--added-value", ... dest="added_value", ... type=float, ... default=0, ... help="Value added to processed data", ... ), ... CLIArgument( ... "--input", ... dest="input_path", ... type=str, ... required=True, ... help="Input data", ... ), ... runner=CustomDaskRunner, ... ) Notes: - ``overide_run`` is optional. Use it only when defaults are insufficient. - The provided ``runner`` must implement ``execute(self)``. - Runner-specific CLI arguments can be exposed via ``@classmethod cli_args``. - Default run arguments (``--config``, ``--output``, ``--stages``, ``--set``, ``--verbose``, ``--[no-]unique-output-subdir``) are still included. ------------------ Custom subcommands ------------------ By default, :py:class:`Pipeline` registers built-in ``run`` and ``install-config`` subcommands. Custom subcommands are added by decorating python functions with :py:func:`Pipeline.sub_command`. The decorator accepts variadic :py:class:`CLIArgument` values, and the callback receives parsed keyword arguments directly. >>> @pipeline.sub_command( ... "clean", ... CLIArgument( ... "--output", ... type=str, ... dest="output_path", ... required=True, ... help="Path to cleanup", ... ), ... help="Clean output artefacts", ... ) ... def cleanup(output_path): ... folder_contents = os.listdir(output_path) ... for content in folder_contents: ... timestamped_path = f"{output_path}/{content}" ... if pipeline.name in content and os.path.isdir(timestamped_path): ... shutil.rmtree(timestamped_path) Equivalent callback style using explicit kwargs is also valid: >>> @pipeline.sub_command("hello", CLIArgument("name", type=str), help="Say hello") ... def hello(**cli_args): ... print(f"Hello {cli_args['name']}") -------------------------- Entire Pipeline Definition -------------------------- A complete, runnable pipeline example is available in :file:`tests/mockpipe.py`. ----------------------- Installing the Pipeline ----------------------- If the pipeline definition is part of a larger python project, `pyproject.toml` can be used to manage dependencies and expose the executable pipeline. Add the following section in :file:`pyproject.toml`. .. code-block:: toml [tool.poetry.scripts] mockpipe = "complete.import.path.to.process_vis_pipeline:pipeline" Alternatively, use the :command:`piper install` command to register the pipeline as an executable in the current python environment, without modifying :file:`pyproject.toml`. .. code-block:: bash piper install mockpipe tests/mockpipe.py This command creates a CLI entry point for the pipeline. The pipeline object must be instantiated and available at module level in the provided script. To uninstall a registered pipeline: .. code-block:: bash piper uninstall mockpipe --------------------------- Pipeline Configuration File --------------------------- The default configuration for the above pipeline looks like this: .. code-block:: yaml global_parameters: div_value: 1.0 parameters: initiate_data: limits: start: 0 stop: 1001 process_data: multiplier: 1.0 log_average_data: {} pipeline: initiate_data: true process_data: true log_average_data: true The generated configuration consists of three sections: 1. Pipeline section 2. Parameters section 3. Global parameters section ---------------------- Executing the pipeline ---------------------- The installed CLI application provides these sub-commands: 1. :command:`run` 2. :command:`install-config` 3. any custom subcommand (for example :command:`clean`) You can run :command:`mockpipe --help` to inspect available subcommands. The pipeline can be executed as: .. code-block:: bash mockpipe run \ --input /path/to/input \ --added-value 10 \ --output /path/to/store/output ------------------------- Toggling pipeline stages ------------------------- Stages can be toggled off during execution in either of these ways: 1. Use ``--stages`` and pass only stage names to run. 2. Set stage flags under the ``pipeline`` section in config to ``false``. **************** Sphinx Extension **************** Piper also provides Sphinx extensions that can auto-generate configuration tables for the entire pipeline and for individual stages. ================== Pipeline extension ================== To activate the extension, define (or append) the ``extensions`` variable in ``conf.py`` like this: .. code-block:: python extensions = ["ska_sdp_piper.extensions.sphinx_piper_pipeline"] Then create an RST file with the following directive: .. code-block:: rst Test Pipeline Documentation =========================== .. pipelineconfig:: import.path.to.process_vis_pipeline.pipeline This auto-generates the configuration for each stage in its own table.