DaskSlurmCluster

Purpose

DaskSlurmCluster is the SLURM-specific cluster implementation used by batchlet.

  • Creates a dask cluster inside an existing SLURM allocation created with sbatch or salloc.

  • Uses srun to start scheduler and worker processes across allocated nodes.

  • Exposes a clean interface similar to distributed.LocalCluster while respecting allocated CPU and memory limits.

Working

Once DaskSlurmCluster is initialised on the head node (1st node) of the allocated compute nodes, it starts a local dask scheduler as a subprocess. Then it uses srun commmand to start local and remote workers on all the nodes, based on user configuration.

../_images/daskslurmcluster.drawio.svg

The DaskSlurmCluster extends the standard distributed.SpecCluster class.

Similar to distributed.LocalCluster, the DaskSlurmCluster can take decision on the number of workers, number of threads per worker and memory limit per worker, based on the allocated slurm resources. User can override these values while initialising the constructor.

Usage

Following slurm (python) script shows basic example of how DaskSlurmCluster can be used in a python application.

#!/usr/bin/env python3
#SBATCH -n 2
#SBATCH --nodes 2
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 12
#SBATCH --mem 32GB

from ska_sdp_batchlet.utils.dask_cluster.slurm import DaskSlurmCluster

if __name__ == "__main__":
    with DaskSlurmCluster() as cluster:
        cluster.wait_for_all_workers()
        scheduler_address = cluster.scheduler_address

        client = cluster.get_client()
        # client code

API

class ska_sdp_batchlet.utils.dask_cluster.slurm.cluster.DaskSlurmCluster(nodes=None, workers_per_node=None, threads_per_worker=None, memory_per_worker='auto', resources_per_worker=None, worker_scratch_directory=None, use_entry_node=True, silence_logs=20, name='DaskSlurmCluster', **_)[source]

Starts a dask cluster inside existing slurm allocation.

This class extends the distributed.SpecCluster class. Thus this has a lot of functionalities that come with other standard clusters like distributed.LocalCluster.

Parameters:
  • nodes (int | None, default: None) -- Number of slurm nodes on which SlurmWorkers will start. If None, cluster will try to get slurm nodes using the SLURM_JOB_NUM_NODES environmet variable, else set to 1.

  • workers_per_node (int | None, default: None) -- Number of dask worker processes per node. If this is None and memory_per_worker is specified, then cluster will set workers_per_node such that each worker can use at least memory_per_worker amount of memory. If this is None and memory_per_worker is either "auto" or None, then number of workers per node is decided using dask's default heuristic for setting number of dask workers and threads per worker, while respecting slurm's limit on CPUs (if any).

  • threads_per_worker (int | None, default: None) -- Number of threads per dask worker process. If this is None and workers_per_node is also None, then number of threads per worker is decided using dask's default heuristic for setting number of dask workers and threads per worker, while respecting slurm's limit on CPUs (if any).

  • memory_per_worker (str | int | None, default: 'auto') --

    Memory limit per dask worker

    Notes regarding argument data type:

    • If "auto", cluster find out total usable memory per node, then divides the memory equally among all workers.

    • If None or 0, no limit is applied. The memory_per_worker is set to total usable memory per node.

    • If string, it should be a number followed by a unit. e.g. "128GB", "1 TiB"

    • If an int, it should indicate number of bytes per worker.

  • resources_per_worker (dict | str | None, default: None) --

    Resources or task constraints per dask worker.

    If string, it can be a , seperated key-value pairs, where key and value is seperated using =.

    e.g. "GPU=2"; "TPU=1,MEM=10e9" ; {"CPU":10}

  • worker_scratch_directory (str | None, default: None) -- The directory to store dask workers' temporary files. Equivalent to local_directory option in distributed.Worker class.

  • use_entry_node (bool, default: True) -- Whether to include the node running the dask scheduler to also launch dask workers. If true, dask workers are also launched on the node running dask scheduler. If false dask workers would be launched on nodes relative to the dask scheduler node. In this case, the number of allocated slurm nodes must be at least one more than the nodes parameter.

  • silence_logs (int, default: 20) -- An integer corresponding to the standard logging levels in logging module. Determines the logging levels for scheduler and worker processes.

  • name (str, default: 'DaskSlurmCluster') -- Name of the dask cluster. This also acts as the prefix for worker names.

Example

>>> cluster = DaskSlurmCluster(1, 2, 3, "4GB")
>>> # get scheduler address
>>> address = cluster.scheduler_address
>>> # get a client
>>> client = cluster.get_client()
wait_for_all_workers()[source]

Wait for all workers to start