DaskSlurmCluster
Purpose
DaskSlurmCluster is the SLURM-specific cluster implementation used by batchlet.
Creates a dask cluster inside an existing SLURM allocation created with
sbatchorsalloc.Uses
srunto start scheduler and worker processes across allocated nodes.Exposes a clean interface similar to
distributed.LocalClusterwhile respecting allocated CPU and memory limits.
Working
Once DaskSlurmCluster is initialised on the head node (1st node) of the allocated compute nodes, it starts a local dask scheduler as a subprocess. Then it uses srun commmand to start local and remote workers on all the nodes, based on user configuration.
The DaskSlurmCluster extends the standard distributed.SpecCluster class.
Similar to distributed.LocalCluster, the DaskSlurmCluster can take decision on the number of workers, number of threads per worker and memory limit per worker, based on the allocated slurm resources. User can override these values while initialising the constructor.
Usage
Following slurm (python) script shows basic example of how DaskSlurmCluster can be used in a python application.
#!/usr/bin/env python3
#SBATCH -n 2
#SBATCH --nodes 2
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 12
#SBATCH --mem 32GB
from ska_sdp_batchlet.utils.dask_cluster.slurm import DaskSlurmCluster
if __name__ == "__main__":
with DaskSlurmCluster() as cluster:
cluster.wait_for_all_workers()
scheduler_address = cluster.scheduler_address
client = cluster.get_client()
# client code
API
- class ska_sdp_batchlet.utils.dask_cluster.slurm.cluster.DaskSlurmCluster(nodes=None, workers_per_node=None, threads_per_worker=None, memory_per_worker='auto', resources_per_worker=None, worker_scratch_directory=None, use_entry_node=True, silence_logs=20, name='DaskSlurmCluster', **_)[source]
Starts a dask cluster inside existing slurm allocation.
This class extends the
distributed.SpecClusterclass. Thus this has a lot of functionalities that come with other standard clusters likedistributed.LocalCluster.- Parameters:
nodes (
int|None, default:None) -- Number of slurm nodes on which SlurmWorkers will start. IfNone, cluster will try to get slurm nodes using theSLURM_JOB_NUM_NODESenvironmet variable, else set to 1.workers_per_node (
int|None, default:None) -- Number of dask worker processes per node. If this isNoneandmemory_per_workeris specified, then cluster will setworkers_per_nodesuch that each worker can use at leastmemory_per_workeramount of memory. If this isNoneandmemory_per_workeris either "auto" orNone, then number of workers per node is decided using dask's default heuristic for setting number of dask workers and threads per worker, while respecting slurm's limit on CPUs (if any).threads_per_worker (
int|None, default:None) -- Number of threads per dask worker process. If this isNoneandworkers_per_nodeis alsoNone, then number of threads per worker is decided using dask's default heuristic for setting number of dask workers and threads per worker, while respecting slurm's limit on CPUs (if any).memory_per_worker (
str|int|None, default:'auto') --Memory limit per dask worker
Notes regarding argument data type:
If "auto", cluster find out total usable memory per node, then divides the memory equally among all workers.
If
Noneor 0, no limit is applied. The memory_per_worker is set to total usable memory per node.If string, it should be a number followed by a unit. e.g. "128GB", "1 TiB"
If an int, it should indicate number of bytes per worker.
resources_per_worker (
dict|str|None, default:None) --Resources or task constraints per dask worker.
If string, it can be a
,seperated key-value pairs, where key and value is seperated using=.e.g.
"GPU=2";"TPU=1,MEM=10e9";{"CPU":10}worker_scratch_directory (
str|None, default:None) -- The directory to store dask workers' temporary files. Equivalent tolocal_directoryoption indistributed.Workerclass.use_entry_node (
bool, default:True) -- Whether to include the node running the dask scheduler to also launch dask workers. If true, dask workers are also launched on the node running dask scheduler. If false dask workers would be launched on nodes relative to the dask scheduler node. In this case, the number of allocated slurm nodes must be at least one more than thenodesparameter.silence_logs (
int, default:20) -- An integer corresponding to the standard logging levels inloggingmodule. Determines the logging levels for scheduler and worker processes.name (
str, default:'DaskSlurmCluster') -- Name of the dask cluster. This also acts as the prefix for worker names.
Example
>>> cluster = DaskSlurmCluster(1, 2, 3, "4GB") >>> # get scheduler address >>> address = cluster.scheduler_address >>> # get a client >>> client = cluster.get_client()
- wait_for_all_workers()[source]
Wait for all workers to start