Source code for ska_sdp_batchlet.utils.dask_cluster.local_cluster

import logging

from distributed import LocalCluster

from .resources import parse_dask_resource_spec, resolve_worker_configuration

logger = logging.getLogger(__name__)


# pylint: disable=R0913,R0917,C0103
[docs] def DaskLocalCluster( workers_per_node: int = None, threads_per_worker: int = None, memory_per_worker: str | float | int = "auto", resources_per_worker: dict | str | None = None, worker_scratch_directory: str | None = None, silence_logs: int = logging.INFO, name: str = "DaskLocalCluster", **_, ) -> LocalCluster: """ Create a local Dask cluster for parallel computing. This function converts the input dask parameters to their equivalent in the :py:class:`distributed.LocalCluster` class arguments. When no arguments are passed, the returned cluster is equivalent to running ``LocalCluster()``. This function shares a same worker-configuration resolution logic with :py:class:`DaskSlurmCluster` The parameters "nodes" and "use_entry_node" are ignored in this function, as those are irrelevent for LocalCluster. Please also refer to the documentation of :py:class:`distributed.LocalCluster` Parameters ---------- workers_per_node Number of workers to be created. Equivalent to "n_workers" parameter of LocalCluster. threads_per_worker Number of threads allocated per worker. Equivalent to "threads_per_worker" parameter of LocalCluster. memory_per_worker Memory limit per worker. Equivalent to "memory_limit" parameter of LocalCluster. resources_per_worker Resources or task constraints per dask worker. If string, it can be a ``,`` seperated key-value pairs, where key and value is seperated using ``=``. e.g. ``"GPU=2"``; ``"TPU=1,MEM=10e9"`` ; ``{"CPU":10}`` worker_scratch_directory Directory for temporary worker files. Equivalent to "local_directory" parameter of LocalCluster. silence_logs An integer corresponding to the standard logging levels in ``logging`` module. Determines the logging levels for scheduler and worker processes. Equivalent to "silence_logs" parameter of LocalCluster. name Name of the dask cluster. Equivalent to "name" parameter of LocalCluster. Returns ------- :py:class:`distributed.LocalCluster` An instance of dask ``LocalCluster`` configured with the specified parameters. """ logger.setLevel(silence_logs) _memory_limit, _n_workers, _nthreads = resolve_worker_configuration( logger, workers_per_node, threads_per_worker, memory_per_worker ) local_cluster_params = { "n_workers": _n_workers, "threads_per_worker": _nthreads, "memory_limit": _memory_limit, "silence_logs": silence_logs, "name": name, } logger.info("Local Cluster params: %s", local_cluster_params) worker_params = { "resources": parse_dask_resource_spec(resources_per_worker), "local_directory": worker_scratch_directory, } logger.info("Dask worker params: %s", worker_params) cluster = LocalCluster( **local_cluster_params, **worker_params, scheduler_port=0, dashboard_address=":0", ) logger.info("Scheduler is live at: %s", cluster.scheduler_address) logger.info("Scheduler dashboard is live at: %s", cluster.dashboard_link) return cluster