ska_sdp_batchlet.utils.dask_cluster.slurm.cluster module

class ska_sdp_batchlet.utils.dask_cluster.slurm.cluster.DaskSlurmCluster(nodes=None, workers_per_node=None, threads_per_worker=None, memory_per_worker='auto', resources_per_worker=None, worker_scratch_directory=None, use_entry_node=True, silence_logs=20, name='DaskSlurmCluster', **_)[source]

Bases: SpecCluster

Starts a dask cluster inside existing slurm allocation.

This class extends the distributed.SpecCluster class. Thus this has a lot of functionalities that come with other standard clusters like distributed.LocalCluster.

Parameters:
  • nodes (int | None, default: None) -- Number of slurm nodes on which SlurmWorkers will start. If None, cluster will try to get slurm nodes using the SLURM_JOB_NUM_NODES environmet variable, else set to 1.

  • workers_per_node (int | None, default: None) -- Number of dask worker processes per node. If this is None and memory_per_worker is specified, then cluster will set workers_per_node such that each worker can use at least memory_per_worker amount of memory. If this is None and memory_per_worker is either "auto" or None, then number of workers per node is decided using dask's default heuristic for setting number of dask workers and threads per worker, while respecting slurm's limit on CPUs (if any).

  • threads_per_worker (int | None, default: None) -- Number of threads per dask worker process. If this is None and workers_per_node is also None, then number of threads per worker is decided using dask's default heuristic for setting number of dask workers and threads per worker, while respecting slurm's limit on CPUs (if any).

  • memory_per_worker (str | int | None, default: 'auto') --

    Memory limit per dask worker

    Notes regarding argument data type:

    • If "auto", cluster find out total usable memory per node, then divides the memory equally among all workers.

    • If None or 0, no limit is applied. The memory_per_worker is set to total usable memory per node.

    • If string, it should be a number followed by a unit. e.g. "128GB", "1 TiB"

    • If an int, it should indicate number of bytes per worker.

  • resources_per_worker (dict | str | None, default: None) --

    Resources or task constraints per dask worker.

    If string, it can be a , seperated key-value pairs, where key and value is seperated using =.

    e.g. "GPU=2"; "TPU=1,MEM=10e9" ; {"CPU":10}

  • worker_scratch_directory (str | None, default: None) -- The directory to store dask workers' temporary files. Equivalent to local_directory option in distributed.Worker class.

  • use_entry_node (bool, default: True) -- Whether to include the node running the dask scheduler to also launch dask workers. If true, dask workers are also launched on the node running dask scheduler. If false dask workers would be launched on nodes relative to the dask scheduler node. In this case, the number of allocated slurm nodes must be at least one more than the nodes parameter.

  • silence_logs (int, default: 20) -- An integer corresponding to the standard logging levels in logging module. Determines the logging levels for scheduler and worker processes.

  • name (str, default: 'DaskSlurmCluster') -- Name of the dask cluster. This also acts as the prefix for worker names.

Example

>>> cluster = DaskSlurmCluster(1, 2, 3, "4GB")
>>> # get scheduler address
>>> address = cluster.scheduler_address
>>> # get a client
>>> client = cluster.get_client()
wait_for_all_workers()[source]

Wait for all workers to start