import logging
from distributed import LocalCluster
from .resources import parse_dask_resource_spec, resolve_worker_configuration
logger = logging.getLogger(__name__)
# pylint: disable=R0913,R0917,C0103
[docs]
def DaskLocalCluster(
workers_per_node: int = None,
threads_per_worker: int = None,
memory_per_worker: str | float | int = "auto",
resources_per_worker: dict | str | None = None,
worker_scratch_directory: str | None = None,
silence_logs: int = logging.INFO,
name: str = "DaskLocalCluster",
**_,
) -> LocalCluster:
"""
Create a local Dask cluster for parallel computing.
This function converts the input dask parameters to
their equivalent in the :py:class:`distributed.LocalCluster`
class arguments.
When no arguments are passed, the returned cluster
is equivalent to running ``LocalCluster()``.
This function shares a same worker-configuration resolution
logic with :py:class:`DaskSlurmCluster`
The parameters "nodes" and "use_entry_node" are ignored
in this function, as those are irrelevent for LocalCluster.
Please also refer to the documentation of
:py:class:`distributed.LocalCluster`
Parameters
----------
workers_per_node
Number of workers to be created. Equivalent to "n_workers"
parameter of LocalCluster.
threads_per_worker
Number of threads allocated per worker. Equivalent to
"threads_per_worker" parameter of LocalCluster.
memory_per_worker
Memory limit per worker. Equivalent to
"memory_limit" parameter of LocalCluster.
resources_per_worker
Resources or task constraints per dask worker.
If string, it can be a ``,`` seperated key-value pairs,
where key and value is seperated using ``=``.
e.g. ``"GPU=2"``; ``"TPU=1,MEM=10e9"`` ; ``{"CPU":10}``
worker_scratch_directory
Directory for temporary worker files. Equivalent to
"local_directory" parameter of LocalCluster.
silence_logs
An integer corresponding to the standard logging levels in
``logging`` module. Determines the logging levels for
scheduler and worker processes. Equivalent to
"silence_logs" parameter of LocalCluster.
name
Name of the dask cluster. Equivalent to
"name" parameter of LocalCluster.
Returns
-------
:py:class:`distributed.LocalCluster`
An instance of dask ``LocalCluster`` configured with the
specified parameters.
"""
logger.setLevel(silence_logs)
_memory_limit, _n_workers, _nthreads = resolve_worker_configuration(
logger, workers_per_node, threads_per_worker, memory_per_worker
)
local_cluster_params = {
"n_workers": _n_workers,
"threads_per_worker": _nthreads,
"memory_limit": _memory_limit,
"silence_logs": silence_logs,
"name": name,
}
logger.info("Local Cluster params: %s", local_cluster_params)
worker_params = {
"resources": parse_dask_resource_spec(resources_per_worker),
"local_directory": worker_scratch_directory,
}
logger.info("Dask worker params: %s", worker_params)
cluster = LocalCluster(
**local_cluster_params,
**worker_params,
scheduler_port=0,
dashboard_address=":0",
)
logger.info("Scheduler is live at: %s", cluster.scheduler_address)
logger.info("Scheduler dashboard is live at: %s", cluster.dashboard_link)
return cluster