Source code for ska_sdp_batchlet.utils.pipeline_executer
# pylint: disable=too-few-public-methods,unused-argument
import logging
import subprocess
from subprocess import CalledProcessError
from .monitor.handler import MonitorHandler
logger = logging.getLogger(__name__)
[docs]
class PipelineExecuter:
"""
Pipeline Executor
"""
[docs]
@staticmethod
def run(
monitor_handler: MonitorHandler,
command: list[str],
):
"""
Run pipeline
Parameters
----------
monitor_handler: MonitorHandler
Instance of MonitorHandler. This is used to
get stream objects to watch stdout of the subprocess
command: list[str]
Command to be executed within the dask cluster
"""
logger.info("Running command: %s", " ".join(command))
with monitor_handler.log_stream as stream:
try:
subprocess.run(
command,
stdout=stream,
check=True,
)
logger.info("Completed subprocess execution successfully")
except CalledProcessError as e:
logger.error(
"Subprocess failed with return code %d. Command: %s",
e.returncode,
" ".join(command),
)
raise