#!/usr/bin/env python3
"""
The `batchlet` is a tool useful to run dask-distributed batch processing
pipelines on Slurm HPC clusters.
Batchlet acts as a wrapper tool over the pipeline process, and provides
following abilities:
1. Managing the dask cluster used by the pipeline to perform dask-based
computations
2. Monitoring the hardware resources
3. Monitoring the pipeline logs and generating events for the consumers
"""
import argparse
import importlib.metadata as importlib_metadata
import json
import logging
import sys
from ska_ser_logging import configure_logging
from ska_sdp_batchlet import __version__
from ska_sdp_batchlet.log_config import LOGGING_CONFIG
from ska_sdp_batchlet.services.pipeline_service import PipelineService
configure_logging(level=logging.INFO, overrides=LOGGING_CONFIG)
[docs]
def run_pipeline(config: str) -> None:
"""
Run batchlet with given configuration.
Parameters
----------
config
Either a path to a JSON config file or literal ``"-"``.
If the string is ``"-"`` the configuration is read from ``stdin``.
"""
if config == "-":
raw = sys.stdin.read()
else:
with open(config, "r", encoding="utf-8") as f:
raw = f.read()
run_config = json.loads(raw)
PipelineService.run_pipeline(**run_config)
[docs]
def register_sub_commands(parser):
"""
Register entry points as sub-commands for a given argument parser.
Parameters
----------
parser : argparse._SubParsersAction
The sub-parser object to which new commands will be added.
Raises
------
TypeError
If a loaded entry point is not an instance of argparse.ArgumentParser.
"""
for ep in importlib_metadata.entry_points(group="batchlet_cli"):
subcommand = ep.load()
if not isinstance(subcommand, argparse.ArgumentParser):
raise TypeError(f"{ep.name} not of type `ArgumentParser`")
parser.add_parser(
ep.name,
parents=[subcommand],
help=subcommand.description,
conflict_handler="resolve",
)
[docs]
def main() -> None:
"""
Command line entrypoint for batchlet.
"""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"-V",
"--version",
action="version",
version=f"%(prog)s {__version__}",
help="Show batchlet version and exit",
)
subparsers = parser.add_subparsers(
title="subcommands", required=True, dest="subcommand"
)
run_parser = subparsers.add_parser(
"run",
help="Run batchlet with given config.",
formatter_class=argparse.RawTextHelpFormatter,
)
run_parser.add_argument(
"config",
nargs="?",
default="-",
help="""Path to json config file.
If '-' or not provided, then pipeline reads config from stdin.
""",
)
run_parser.set_defaults(func=lambda args: run_pipeline(args.config))
register_sub_commands(subparsers)
args = parser.parse_args()
getattr(args, "func")(args)
if __name__ == "__main__":
main()