DaskLocalCluster

Purpose

The DaskLocalCluster is a wrapper function around a distributed.LocalCluster configured with batchlet's dask parameters. Its purpose is to keep the worker-configuration resolution logic consistent with DaskSlurmCluster. This keeps behavior consistent between local and SLURM execution modes.

Working

The function maps batchlet parameters to distributed.LocalCluster arguments:

  • workers_per_node -> n_workers

  • threads_per_worker -> threads_per_worker

  • memory_per_worker -> memory_limit

  • worker_scratch_directory -> local_directory

  • resources_per_worker -> worker resources

The parameters nodes and use_entry_node are ignored for local mode.

Usage

from ska_sdp_batchlet.utils.dask_cluster.local_cluster import DaskLocalCluster

with DaskLocalCluster(
    workers_per_node=4,
    threads_per_worker=2,
    memory_per_worker="2GB",
    resources_per_worker={"CPU": 2},
    worker_scratch_directory="/tmp/dask-workers",
    name="BatchletLocal",
) as cluster:
    client = cluster.get_client()
    # submit dask tasks with client

API

ska_sdp_batchlet.utils.dask_cluster.local_cluster.DaskLocalCluster(workers_per_node=None, threads_per_worker=None, memory_per_worker='auto', resources_per_worker=None, worker_scratch_directory=None, silence_logs=20, name='DaskLocalCluster', **_)[source]

Create a local Dask cluster for parallel computing. This function converts the input dask parameters to their equivalent in the distributed.LocalCluster class arguments. When no arguments are passed, the returned cluster is equivalent to running LocalCluster().

This function shares a same worker-configuration resolution logic with DaskSlurmCluster

The parameters "nodes" and "use_entry_node" are ignored in this function, as those are irrelevent for LocalCluster.

Please also refer to the documentation of distributed.LocalCluster

Parameters:
  • workers_per_node (int | None, default: None) -- Number of workers to be created. Equivalent to "n_workers" parameter of LocalCluster.

  • threads_per_worker (int | None, default: None) -- Number of threads allocated per worker. Equivalent to "threads_per_worker" parameter of LocalCluster.

  • memory_per_worker (str | float | int, default: 'auto') -- Memory limit per worker. Equivalent to "memory_limit" parameter of LocalCluster.

  • resources_per_worker (dict | str | None, default: None) --

    Resources or task constraints per dask worker.

    If string, it can be a , seperated key-value pairs, where key and value is seperated using =.

    e.g. "GPU=2"; "TPU=1,MEM=10e9" ; {"CPU":10}

  • worker_scratch_directory (str | None, default: None) -- Directory for temporary worker files. Equivalent to "local_directory" parameter of LocalCluster.

  • silence_logs (int, default: 20) -- An integer corresponding to the standard logging levels in logging module. Determines the logging levels for scheduler and worker processes. Equivalent to "silence_logs" parameter of LocalCluster.

  • name (str, default: 'DaskLocalCluster') -- Name of the dask cluster. Equivalent to "name" parameter of LocalCluster.

Returns:

An instance of dask LocalCluster configured with the specified parameters.

Return type:

distributed.LocalCluster