import functools
import logging
from typing import Any, Callable
from .command import CLIArgument, Command
from .configurations import RuntimeConfig
from .constants import CONFIG_CLI_ARGS, RUN_CLI_ARGS
from .named_instance import NamedInstance
from .piper_base_model import PiperBaseModel
from .runners import DefaultRunner
from .stage import Stage, Stages
from .utils import LogUtil, create_output_dir, timestamp, write_yml
logger = logging.getLogger(__name__)
[docs]
class Pipeline(Command, metaclass=NamedInstance):
def __init__(
self,
name: str,
*stages: list[Stage | Callable],
global_config_model: type[PiperBaseModel] = PiperBaseModel,
version: str = None,
):
"""
Initialize the pipeline with stages and configuration models.
Parameters
----------
name : str
The name of the pipeline.
*stages : list[Stage | Callable]
List of stages or callables to include in the pipeline.
global_config_model : type[PiperBaseModel], optional
Model for global configurations, defaults to PiperBaseModel.
version : str, optional
Version string for the pipeline.
"""
super().__init__(version)
self.name = name
self._stages = Stages(*stages)
self.GlobalConfigModel = global_config_model
self._global_config = self.GlobalConfigModel.model_validate({})
self.Runner = DefaultRunner
self.output_dir = None
self.qa_dir = None
self._qa_path_resolver = None
self.sub_command(
"install-config",
*CONFIG_CLI_ARGS,
help="Installs the default config at --config-install-path",
)(self.install_config)
self.sub_command(
"run",
*RUN_CLI_ARGS,
help="Run the pipeline",
)(self.run)
@property
def config(self) -> dict:
"""
Get all the configuration of the pipeline
This returns a dictionary with following top level keys:
- ``global_parameters``
- ``parameters``
- ``pipeline``
- ``version``
"""
return dict(
version=self.version,
global_parameters=self._global_config.model_dump(mode="json"),
parameters=functools.reduce(
lambda config, stage: {**config, **stage.config},
self._stages,
{},
),
pipeline={stage.name: stage.is_enabled for stage in self._stages},
)
[docs]
def with_qa_path_resolver(self, resolver: Callable) -> "Pipeline":
"""
Register a custom function to resolve the log file path dynamically.
Parameters
----------
resolver : Callable
A function which can provide alternate log file paths.
Returns
-------
Pipeline
"""
self._qa_path_resolver = resolver
return self
[docs]
def overide_run(
self,
*cli_args: CLIArgument,
runner: type[DefaultRunner] = None,
) -> "Pipeline":
"""
Override the ``run`` subcommand with custom CLI arguments and runner.
Parameters
----------
*cli_args
List of additional CLI arguments to register
runner
The runner class to use for execution of stages
Returns
-------
The current pipeline instance.
"""
self.Runner = runner or self.Runner
self.sub_command(
"run",
*RUN_CLI_ARGS,
*cli_args,
*self.Runner.cli_args(),
help="Run the pipeline",
)(self.run)
return self
[docs]
def create_output_dir(self, output_path, unique_output_subdir, **kwargs):
"""
Create the directory for pipeline output files.
Parameters
----------
output_path : str
The root path for outputs.
unique_output_subdir : bool
Whether to create a unique timestamped subdirectory.
**kwargs : dict
Additional arguments for directory creation.
"""
self.output_dir = create_output_dir(
output_path, unique_output_subdir, self.name
)
[docs]
def set_runtime(self, config_path, override_defaults, stages, **kwargs):
"""
Set the runtime state based on CLI and configuration files.
Parameters
----------
config_path : str
Path to the YAML configuration file.
override_defaults : dict
Key-value pairs to override default settings.
stages : list[str]
Specific stages to enable for this run.
**kwargs : dict
Additional runtime parameters.
"""
stages = stages or []
runtime_config = (
RuntimeConfig(**self.config)
.update_from_yaml(config_path)
.update_from_cli_overrides(override_defaults)
.update_from_cli_stages(stages)
)
self._stages.update_stage_state(runtime_config.stages_to_run)
self._stages.update_stage_parameters(runtime_config.parameters)
self._global_config = self.GlobalConfigModel.model_validate(
runtime_config.global_parameters, context={"allow_unset": False}
)
self._stages.add_additional_parameters(
_output_dir_=self.output_dir,
_qa_dir_=self.qa_dir,
_cli_args_=kwargs,
_global_parameters_=self._global_config,
)
self._stages.validate()
[docs]
def install_config(
self,
override_defaults: list[tuple[str, str]],
config_install_path: str,
**kwargs,
):
"""
Set the runtime state based on CLI and configuration files.
Parameters
----------
override_defaults
A list of tuple/list, where each element contains 2 sub-elements.
First element is a string key to the parameter to override.
This can represent a path to a nested key, where each key
in the hirerarchy is seperated by a ``.`` character.
Second element is also a string, which is then parsed using
YAML rules, and converted to a rich object.
config_install_path
Directory path to store the default config
**kwargs : dict
Additional runtime parameters.
"""
RuntimeConfig(**self.config).update_from_cli_overrides(
override_defaults
).write_yml(f"{config_install_path}/{self.name}.yaml")
[docs]
def write_runtime_params(self, cli_args: dict):
"""
Persist current configuration and CLI arguments to disk.
Parameters
----------
cli_args
The raw CLI arguments used for the run.
"""
config_output_path = (
f"{self.qa_dir}/{self.name}_{timestamp()}.config.yaml"
)
write_yml(config_output_path, self.config)
cli_output_file = f"{self.qa_dir}/{self.name}_{timestamp()}.cli.yaml"
write_yml(cli_output_file, cli_args)
[docs]
def resolve_qa_dir(self, **kwargs):
"""
Determine and set the QA directory path.
Parameters
----------
**kwargs : dict
Arbitrary keyword arguments passed to the QA path resolver.
"""
if self._qa_path_resolver is not None:
self.qa_dir = self._qa_path_resolver(
output_dir=self.output_dir, **kwargs
)
else:
self.qa_dir = self.output_dir
@property
def executable_stages(self) -> list[Stage]:
"""
Get the list of stages currently enabled for execution.
Returns
-------
List of stage instances of all "enabled" stages.
"""
return self._stages.executable_stages
[docs]
def run(self, **cli_args) -> Any:
"""
Execute the pipeline lifecycle.
Parameters
----------
**cli_args
Arguments parsed from the command line.
Returns
-------
The result returned by the runner's execution.
"""
self.create_output_dir(**cli_args)
self.resolve_qa_dir(**cli_args)
self.configure_logging(**cli_args)
self.set_runtime(**cli_args)
self.write_runtime_params(cli_args)
logger.info("=============== START =====================")
logger.info(f"Executing {self.name} pipeline with metadata:")
logger.info(f"Configuration Path: {cli_args.get('config_path')}")
logger.info(f"Current run output path : {self.output_dir}")
logger.info(
f"""Selected stages to run: {', '.join(
stage.name for stage in self.executable_stages
)}"""
)
with self.Runner(_pipeline_=self, **cli_args) as runner:
result = runner.execute()
logger.info("=============== FINISH =====================")
return result