- Module code
- ska_sdp_batchlet.utils.dask_cluster.handler
-
Source code for ska_sdp_batchlet.utils.dask_cluster.handler
import logging
from contextlib import ExitStack
from copy import deepcopy
from distributed import SpecCluster
from ..monitor.dask_monitor import dask_monitor
from .factory import DaskClusterFactory
logger = logging.getLogger(__name__)
# pylint:disable=R0903,R0913,R0917
[docs]
class DaskHandler:
"""
Handles all dask related operations.
It is responsible for instantiating and starting both
DaskCluster (based on the infrastructure) and dask's
performance monitoring.
Parameters
----------
stack: ExitStack
The ExitStack to register the dask cluster and monitor
contexts with.
NOTE: The "stack" is pass by reference, and DaskHandler
will modify it.
command: list[str]
The subprocess command as passed by the user
generate_reports_on_failure: bool, default=True
Whether to generate reports even if
some exception occurs during exit
dask_cli_option: str, optional
The cli option corresponding to the dask scheduler
as passed to the underlying subprocess
dask_report_dir: str, optional
Directory where the dask performance report will
be stored
**dask_cluster_params
All other parameters passed in the "dask_params" key
of the user provided configuration
"""
cluster: SpecCluster
def __init__(
self,
stack: ExitStack,
command: list[str],
generate_reports_on_failure: bool = True,
dask_cli_option: str = None,
dask_report_dir: str = None,
**dask_cluster_params
):
self.cluster = None
self.dask_cli_option = dask_cli_option
if self.dask_cli_option and (self.dask_cli_option not in command):
self.cluster = stack.enter_context(
DaskClusterFactory.get_cluster(dask_cluster_params)
)
else:
logger.info(
"Not starting dask cluster as pre-requisites are not met."
)
if self.cluster and dask_report_dir:
stack.enter_context(
dask_monitor(
dask_report_dir,
self.cluster.scheduler_address,
generate_reports_on_failure,
)
)
[docs]
def append_scheduler_address(self, command: list[str]):
"""
Appends the dask cli option of the subprocess, and
the dask scheduler address, to the command to be
executed.
If cluster has not been started, then it returns
the original command without modifying.
Parameters
----------
command: list of str
The subprocess command to be executed by batchlet
Returns
-------
list of str
The new command with dask scheduler address appended
"""
if not self.cluster:
return command
new_cmd = deepcopy(command)
new_cmd.extend(
[
self.dask_cli_option,
self.cluster.scheduler_address,
]
)
return new_cmd