Dask Scaling

Dask is a Python library for distributing computation across multiple workers. We use it to parallelise processing.

Overview

A dask cluster is made of:

  • A scheduler, a lightweight process that coordinates task execution.

  • Workers, that execute the tasks. They may be running on remote nodes and perform the heavy processing.

Note

The dask scheduler, the dask workers and the batch pre-processing pipeline application must be running in the same Python environment.

Using BPP with dask

Here we outline how to start a dask cluster on your machine and have BPP use it.

Scheduler

In one terminal, activate the batch pre-processing Python environment and launch the scheduler. Assuming you followed the instructions in Installation to set up the Python environment:

cd <BPP_REPOSITORY>
source .venv/bin/activate
dask scheduler

This starts the scheduler on the default port 8786.

Workers

In another terminal, launch the desired number of workers (4 in this example). They will share the CPU cores and RAM of your machine evenly.

cd <BPP_REPOSITORY>
source .venv/bin/activate
dask worker localhost:8786 --nworkers 4 --resources "process=1"

Warning

--resources "process=1" is required – explanation below.

Dashboard

You may now open the dask dashboard in a browser, which runs on port 8787 by default. From there you can observe the progression of tasks in real time; simply navigate to https://localhost:8787.

Pipeline App

Batch pre-processing can now be launched and use the cluster we just started. Just use the same command line with the -d or --dask-scheduler option:

ska-sdp-batch-preprocess run -d localhost:8786 <OPTIONS> <INPUT>

Why worker resources?

The pipeline CLI app runs DP3 tasks, which use multiple threads. Dask has no mechanism to detect how many threads a task uses, and assumes that every task uses 1 thread from the worker’s own Python ThreadPool. This is problematic when running C/C++ code spawning its own pool of threads on the side, like DP3.

The only reliable solution is to use worker resources. The batch pre-processing pipeline assumes that all workers define a resource called process; each worker holds 1, and each DP3 task is defined as requiring 1. When a DP3 task reaches a worker, DP3 is launched with the same number of threads as the worker officially owns. A worker thus only ever runs one task at a time, and all threads are used without risk of over-subscription.

The drawback is that resources can only be defined when the workers are launched; hence the need to add --resources "process=1" to the worker launch command.