Source code for ska_sdp_batchlet.utils.dask_cluster.handler

import logging
from contextlib import ExitStack
from copy import deepcopy

from distributed import SpecCluster

from ..monitor.dask_monitor import dask_monitor
from .factory import DaskClusterFactory

logger = logging.getLogger(__name__)


# pylint:disable=R0903,R0913,R0917
[docs] class DaskHandler: """ Handles all dask related operations. It is responsible for instantiating and starting both DaskCluster (based on the infrastructure) and dask's performance monitoring. Parameters ---------- stack: ExitStack The ExitStack to register the dask cluster and monitor contexts with. NOTE: The "stack" is pass by reference, and DaskHandler will modify it. command: list[str] The subprocess command as passed by the user generate_reports_on_failure: bool, default=True Whether to generate reports even if some exception occurs during exit dask_cli_option: str, optional The cli option corresponding to the dask scheduler as passed to the underlying subprocess dask_report_dir: str, optional Directory where the dask performance report will be stored **dask_cluster_params All other parameters passed in the "dask_params" key of the user provided configuration """ cluster: SpecCluster def __init__( self, stack: ExitStack, command: list[str], generate_reports_on_failure: bool = True, dask_cli_option: str = None, dask_report_dir: str = None, **dask_cluster_params ): self.cluster = None self.dask_cli_option = dask_cli_option if self.dask_cli_option and (self.dask_cli_option not in command): self.cluster = stack.enter_context( DaskClusterFactory.get_cluster(dask_cluster_params) ) else: logger.info( "Not starting dask cluster as pre-requisites are not met." ) if self.cluster and dask_report_dir: stack.enter_context( dask_monitor( dask_report_dir, self.cluster.scheduler_address, generate_reports_on_failure, ) )
[docs] def append_scheduler_address(self, command: list[str]): """ Appends the dask cli option of the subprocess, and the dask scheduler address, to the command to be executed. If cluster has not been started, then it returns the original command without modifying. Parameters ---------- command: list of str The subprocess command to be executed by batchlet Returns ------- list of str The new command with dask scheduler address appended """ if not self.cluster: return command new_cmd = deepcopy(command) new_cmd.extend( [ self.dask_cli_option, self.cluster.scheduler_address, ] ) return new_cmd