Source code for ska_sdp_batchlet.services.pipeline_service
# pylint: disable=too-few-public-methods
from contextlib import ExitStack
from typing import Dict, Optional
from ska_sdp_batchlet.utils.dask_cluster.handler import DaskHandler
from ..utils.monitor.handler import MonitorHandler
from ..utils.pipeline_executer import PipelineExecuter
[docs]
class PipelineService:
"""
Pipeline service.
"""
[docs]
@staticmethod
def run_pipeline(
command: list[str],
dask_params: dict = None,
monitor: Optional[Dict] = None,
generate_reports_on_failure=True,
):
"""
Update pipeline into config.
Parameters
----------
command: list[str]
Command to be executed within the dask cluster
dask_params: dict
User provided dask parameters.
Given to batchlet as json input,
in the key "dask_params".
monitor: Optional(dict)
Monitor Configuration
generate_reports_on_failure : bool
Whether to generate reports even if
some exception occurs during exit
"""
monitor = monitor or {}
dask_params = dask_params or {}
with ExitStack() as stack:
handler = MonitorHandler(
stack, generate_reports_on_failure, **monitor
)
dask_handler = DaskHandler(
stack, command, generate_reports_on_failure, **dask_params
)
command = dask_handler.append_scheduler_address(command)
PipelineExecuter.run(
handler,
command,
)