Source code for ska_sdp_batchlet.app.cli

#!/usr/bin/env python3
"""
The `batchlet` is a tool useful to run dask-distributed batch processing
pipelines on Slurm HPC clusters.

Batchlet acts as a wrapper tool over the pipeline process, and provides
following abilities:

    1. Managing the dask cluster used by the pipeline to perform dask-based
    computations
    2. Monitoring the hardware resources
    3. Monitoring the pipeline logs and generating events for the consumers
"""

import argparse
import importlib.metadata as importlib_metadata
import json
import logging
import sys

from ska_ser_logging import configure_logging

from ska_sdp_batchlet import __version__
from ska_sdp_batchlet.log_config import LOGGING_CONFIG
from ska_sdp_batchlet.services.pipeline_service import PipelineService

configure_logging(level=logging.INFO, overrides=LOGGING_CONFIG)


[docs] def run_pipeline(config: str) -> None: """ Run batchlet with given configuration. Parameters ---------- config Either a path to a JSON config file or literal ``"-"``. If the string is ``"-"`` the configuration is read from ``stdin``. """ if config == "-": raw = sys.stdin.read() else: with open(config, "r", encoding="utf-8") as f: raw = f.read() run_config = json.loads(raw) PipelineService.run_pipeline(**run_config)
[docs] def register_sub_commands(parser): """ Register entry points as sub-commands for a given argument parser. Parameters ---------- parser : argparse._SubParsersAction The sub-parser object to which new commands will be added. Raises ------ TypeError If a loaded entry point is not an instance of argparse.ArgumentParser. """ for ep in importlib_metadata.entry_points(group="batchlet_cli"): subcommand = ep.load() if not isinstance(subcommand, argparse.ArgumentParser): raise TypeError(f"{ep.name} not of type `ArgumentParser`") parser.add_parser( ep.name, parents=[subcommand], help=subcommand.description, conflict_handler="resolve", )
[docs] def main() -> None: """ Command line entrypoint for batchlet. """ parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawTextHelpFormatter, ) parser.add_argument( "-V", "--version", action="version", version=f"%(prog)s {__version__}", help="Show batchlet version and exit", ) subparsers = parser.add_subparsers( title="subcommands", required=True, dest="subcommand" ) run_parser = subparsers.add_parser( "run", help="Run batchlet with given config.", formatter_class=argparse.RawTextHelpFormatter, ) run_parser.add_argument( "config", nargs="?", default="-", help="""Path to json config file. If '-' or not provided, then pipeline reads config from stdin. """, ) run_parser.set_defaults(func=lambda args: run_pipeline(args.config)) register_sub_commands(subparsers) args = parser.parse_args() getattr(args, "func")(args)
if __name__ == "__main__": main()