Source code for ska_sdp_piper.piper.pipeline

import functools
import logging
from typing import Any, Callable

from .command import CLIArgument, Command
from .configurations import RuntimeConfig
from .constants import CONFIG_CLI_ARGS, RUN_CLI_ARGS
from .named_instance import NamedInstance
from .piper_base_model import PiperBaseModel
from .runners import DefaultRunner
from .stage import Stage, Stages
from .utils import LogUtil, create_output_dir, timestamp, write_yml

logger = logging.getLogger(__name__)


[docs] class Pipeline(Command, metaclass=NamedInstance): def __init__( self, name: str, *stages: list[Stage | Callable], global_config_model: type[PiperBaseModel] = PiperBaseModel, version: str = None, ): """ Initialize the pipeline with stages and configuration models. Parameters ---------- name : str The name of the pipeline. *stages : list[Stage | Callable] List of stages or callables to include in the pipeline. global_config_model : type[PiperBaseModel], optional Model for global configurations, defaults to PiperBaseModel. version : str, optional Version string for the pipeline. """ super().__init__(version) self.name = name self._stages = Stages(*stages) self.GlobalConfigModel = global_config_model self._global_config = self.GlobalConfigModel.model_validate({}) self.Runner = DefaultRunner self.output_dir = None self.qa_dir = None self._qa_path_resolver = None self.sub_command( "install-config", *CONFIG_CLI_ARGS, help="Installs the default config at --config-install-path", )(self.install_config) self.sub_command( "run", *RUN_CLI_ARGS, help="Run the pipeline", )(self.run) @property def config(self) -> dict: """ Get all the configuration of the pipeline This returns a dictionary with following top level keys: - ``global_parameters`` - ``parameters`` - ``pipeline`` - ``version`` """ return dict( version=self.version, global_parameters=self._global_config.model_dump(mode="json"), parameters=functools.reduce( lambda config, stage: {**config, **stage.config}, self._stages, {}, ), pipeline={stage.name: stage.is_enabled for stage in self._stages}, )
[docs] def with_qa_path_resolver(self, resolver: Callable) -> "Pipeline": """ Register a custom function to resolve the log file path dynamically. Parameters ---------- resolver : Callable A function which can provide alternate log file paths. Returns ------- Pipeline """ self._qa_path_resolver = resolver return self
[docs] def overide_run( self, *cli_args: CLIArgument, runner: type[DefaultRunner] = None, ) -> "Pipeline": """ Override the ``run`` subcommand with custom CLI arguments and runner. Parameters ---------- *cli_args List of additional CLI arguments to register runner The runner class to use for execution of stages Returns ------- The current pipeline instance. """ self.Runner = runner or self.Runner self.sub_command( "run", *RUN_CLI_ARGS, *cli_args, *self.Runner.cli_args(), help="Run the pipeline", )(self.run) return self
[docs] def create_output_dir(self, output_path, unique_output_subdir, **kwargs): """ Create the directory for pipeline output files. Parameters ---------- output_path : str The root path for outputs. unique_output_subdir : bool Whether to create a unique timestamped subdirectory. **kwargs : dict Additional arguments for directory creation. """ self.output_dir = create_output_dir( output_path, unique_output_subdir, self.name )
[docs] def configure_logging(self, verbose, **kwargs): """ Initialize logging for the pipeline run. Parameters ---------- verbose : int The verbosity level (non-zero enables verbose logging). **kwargs : dict Additional arguments for log configuration. """ log_file = f"{self.qa_dir}/{self.name}_{timestamp()}.log" LogUtil.configure(log_file, verbose=(verbose != 0))
[docs] def set_runtime(self, config_path, override_defaults, stages, **kwargs): """ Set the runtime state based on CLI and configuration files. Parameters ---------- config_path : str Path to the YAML configuration file. override_defaults : dict Key-value pairs to override default settings. stages : list[str] Specific stages to enable for this run. **kwargs : dict Additional runtime parameters. """ stages = stages or [] runtime_config = ( RuntimeConfig(**self.config) .update_from_yaml(config_path) .update_from_cli_overrides(override_defaults) .update_from_cli_stages(stages) ) self._stages.update_stage_state(runtime_config.stages_to_run) self._stages.update_stage_parameters(runtime_config.parameters) self._global_config = self.GlobalConfigModel.model_validate( runtime_config.global_parameters, context={"allow_unset": False} ) self._stages.add_additional_parameters( _output_dir_=self.output_dir, _qa_dir_=self.qa_dir, _cli_args_=kwargs, _global_parameters_=self._global_config, ) self._stages.validate()
[docs] def install_config( self, override_defaults: list[tuple[str, str]], config_install_path: str, **kwargs, ): """ Set the runtime state based on CLI and configuration files. Parameters ---------- override_defaults A list of tuple/list, where each element contains 2 sub-elements. First element is a string key to the parameter to override. This can represent a path to a nested key, where each key in the hirerarchy is seperated by a ``.`` character. Second element is also a string, which is then parsed using YAML rules, and converted to a rich object. config_install_path Directory path to store the default config **kwargs : dict Additional runtime parameters. """ RuntimeConfig(**self.config).update_from_cli_overrides( override_defaults ).write_yml(f"{config_install_path}/{self.name}.yaml")
[docs] def write_runtime_params(self, cli_args: dict): """ Persist current configuration and CLI arguments to disk. Parameters ---------- cli_args The raw CLI arguments used for the run. """ config_output_path = ( f"{self.qa_dir}/{self.name}_{timestamp()}.config.yaml" ) write_yml(config_output_path, self.config) cli_output_file = f"{self.qa_dir}/{self.name}_{timestamp()}.cli.yaml" write_yml(cli_output_file, cli_args)
[docs] def resolve_qa_dir(self, **kwargs): """ Determine and set the QA directory path. Parameters ---------- **kwargs : dict Arbitrary keyword arguments passed to the QA path resolver. """ if self._qa_path_resolver is not None: self.qa_dir = self._qa_path_resolver( output_dir=self.output_dir, **kwargs ) else: self.qa_dir = self.output_dir
@property def executable_stages(self) -> list[Stage]: """ Get the list of stages currently enabled for execution. Returns ------- List of stage instances of all "enabled" stages. """ return self._stages.executable_stages
[docs] def run(self, **cli_args) -> Any: """ Execute the pipeline lifecycle. Parameters ---------- **cli_args Arguments parsed from the command line. Returns ------- The result returned by the runner's execution. """ self.create_output_dir(**cli_args) self.resolve_qa_dir(**cli_args) self.configure_logging(**cli_args) self.set_runtime(**cli_args) self.write_runtime_params(cli_args) logger.info("=============== START =====================") logger.info(f"Executing {self.name} pipeline with metadata:") logger.info(f"Configuration Path: {cli_args.get('config_path')}") logger.info(f"Current run output path : {self.output_dir}") logger.info( f"""Selected stages to run: {', '.join( stage.name for stage in self.executable_stages )}""" ) with self.Runner(_pipeline_=self, **cli_args) as runner: result = runner.execute() logger.info("=============== FINISH =====================") return result