DaskLocalCluster
Purpose
The DaskLocalCluster is a wrapper function around a distributed.LocalCluster configured
with batchlet's dask parameters. Its purpose is to keep the worker-configuration resolution logic consistent
with DaskSlurmCluster. This keeps behavior consistent between local and SLURM execution modes.
Working
The function maps batchlet parameters to distributed.LocalCluster arguments:
workers_per_node->n_workersthreads_per_worker->threads_per_workermemory_per_worker->memory_limitworker_scratch_directory->local_directoryresources_per_worker-> workerresources
The parameters nodes and use_entry_node are ignored for local mode.
Usage
from ska_sdp_batchlet.utils.dask_cluster.local_cluster import DaskLocalCluster
with DaskLocalCluster(
workers_per_node=4,
threads_per_worker=2,
memory_per_worker="2GB",
resources_per_worker={"CPU": 2},
worker_scratch_directory="/tmp/dask-workers",
name="BatchletLocal",
) as cluster:
client = cluster.get_client()
# submit dask tasks with client
API
- ska_sdp_batchlet.utils.dask_cluster.local_cluster.DaskLocalCluster(workers_per_node=None, threads_per_worker=None, memory_per_worker='auto', resources_per_worker=None, worker_scratch_directory=None, silence_logs=20, name='DaskLocalCluster', **_)[source]
Create a local Dask cluster for parallel computing. This function converts the input dask parameters to their equivalent in the
distributed.LocalClusterclass arguments. When no arguments are passed, the returned cluster is equivalent to runningLocalCluster().This function shares a same worker-configuration resolution logic with
DaskSlurmClusterThe parameters "nodes" and "use_entry_node" are ignored in this function, as those are irrelevent for LocalCluster.
Please also refer to the documentation of
distributed.LocalCluster- Parameters:
workers_per_node (
int|None, default:None) -- Number of workers to be created. Equivalent to "n_workers" parameter of LocalCluster.threads_per_worker (
int|None, default:None) -- Number of threads allocated per worker. Equivalent to "threads_per_worker" parameter of LocalCluster.memory_per_worker (
str|float|int, default:'auto') -- Memory limit per worker. Equivalent to "memory_limit" parameter of LocalCluster.resources_per_worker (
dict|str|None, default:None) --Resources or task constraints per dask worker.
If string, it can be a
,seperated key-value pairs, where key and value is seperated using=.e.g.
"GPU=2";"TPU=1,MEM=10e9";{"CPU":10}worker_scratch_directory (
str|None, default:None) -- Directory for temporary worker files. Equivalent to "local_directory" parameter of LocalCluster.silence_logs (
int, default:20) -- An integer corresponding to the standard logging levels inloggingmodule. Determines the logging levels for scheduler and worker processes. Equivalent to "silence_logs" parameter of LocalCluster.name (
str, default:'DaskLocalCluster') -- Name of the dask cluster. Equivalent to "name" parameter of LocalCluster.
- Returns:
An instance of dask
LocalClusterconfigured with the specified parameters.- Return type: