Source code for ska_sdp_piper.piper.runners.default_runner

from __future__ import annotations

import logging
from contextlib import ExitStack
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ..pipeline import Pipeline

logger = logging.getLogger(__name__)


[docs] class DefaultRunner: """ The base runner class for sequential pipeline execution. Parameters ---------- _pipeline_: Pipeline The pipeline instance kwargs Any additional key-word arguments. In the ``Pipeline.run`` call, these kwargs are all of the cli arguments that were parsed by the pipeline. Examples -------- >>> with DefaultRunner(_pipeline_=pipeline_instance) as runner: ... runner.execute() """
[docs] @classmethod def cli_args(cls): """ Return an empty list of CLI arguments for the default runner. Returns ------- list An empty list. """ return []
def __init__(self, *, _pipeline_: Pipeline, **kwargs): """ Initialize the runner with a pipeline and context stack. """ self.pipeline = _pipeline_ self._stack = ExitStack()
[docs] def execute(self) -> dict: """ Execute pipeline stages sequentially and log progress. This function is called by pipeline in its ``run`` subcommand. Returns ------- The output from the final pipeline stage. """ upstream_output = dict() for stage in self.pipeline.executable_stages: logger.info( "Starting %s", stage.name, extra={"tags": f"sdpPhase:{stage.name.upper()},state:START"}, ) upstream_output = stage(upstream_output) logger.info( "Finished %s", stage.name, extra={ "tags": f"sdpPhase:{stage.name.upper()},state:FINISHED" }, ) return upstream_output
def __enter__(self) -> DefaultRunner: """ Enter the runtime context for the runner. Returns ------- The current runner instance. """ return self def __exit__(self, *args): """ Close the context stack and release resources. Parameters ---------- *args : tuple Exception type, value, and traceback. """ self._stack.close()