Source code for ska_sdp_piper.piper.runners.default_runner
from __future__ import annotations
import logging
from contextlib import ExitStack
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..pipeline import Pipeline
logger = logging.getLogger(__name__)
[docs]
class DefaultRunner:
"""
The base runner class for sequential pipeline execution.
Parameters
----------
_pipeline_: Pipeline
The pipeline instance
kwargs
Any additional key-word arguments. In the ``Pipeline.run`` call,
these kwargs are all of the cli arguments that
were parsed by the pipeline.
Examples
--------
>>> with DefaultRunner(_pipeline_=pipeline_instance) as runner:
... runner.execute()
"""
[docs]
@classmethod
def cli_args(cls):
"""
Return an empty list of CLI arguments for the default runner.
Returns
-------
list
An empty list.
"""
return []
def __init__(self, *, _pipeline_: Pipeline, **kwargs):
"""
Initialize the runner with a pipeline and context stack.
"""
self.pipeline = _pipeline_
self._stack = ExitStack()
[docs]
def execute(self) -> dict:
"""
Execute pipeline stages sequentially and log progress.
This function is called by pipeline in its ``run`` subcommand.
Returns
-------
The output from the final pipeline stage.
"""
upstream_output = dict()
for stage in self.pipeline.executable_stages:
logger.info(
"Starting %s",
stage.name,
extra={"tags": f"sdpPhase:{stage.name.upper()},state:START"},
)
upstream_output = stage(upstream_output)
logger.info(
"Finished %s",
stage.name,
extra={
"tags": f"sdpPhase:{stage.name.upper()},state:FINISHED"
},
)
return upstream_output
def __enter__(self) -> DefaultRunner:
"""
Enter the runtime context for the runner.
Returns
-------
The current runner instance.
"""
return self
def __exit__(self, *args):
"""
Close the context stack and release resources.
Parameters
----------
*args : tuple
Exception type, value, and traceback.
"""
self._stack.close()