Source code for ska_sdp_batchlet.services.pipeline_service

# pylint: disable=too-few-public-methods

from contextlib import ExitStack
from typing import Dict, Optional

from ska_sdp_batchlet.utils.dask_cluster.handler import DaskHandler

from ..utils.monitor.handler import MonitorHandler
from ..utils.pipeline_executer import PipelineExecuter


[docs] class PipelineService: """ Pipeline service. """
[docs] @staticmethod def run_pipeline( command: list[str], dask_params: dict = None, monitor: Optional[Dict] = None, generate_reports_on_failure=True, ): """ Update pipeline into config. Parameters ---------- command: list[str] Command to be executed within the dask cluster dask_params: dict User provided dask parameters. Given to batchlet as json input, in the key "dask_params". monitor: Optional(dict) Monitor Configuration generate_reports_on_failure : bool Whether to generate reports even if some exception occurs during exit """ monitor = monitor or {} dask_params = dask_params or {} with ExitStack() as stack: handler = MonitorHandler( stack, generate_reports_on_failure, **monitor ) dask_handler = DaskHandler( stack, command, generate_reports_on_failure, **dask_params ) command = dask_handler.append_scheduler_address(command) PipelineExecuter.run( handler, command, )