Dask Scaling
Dask is a Python library for distributing computation across multiple workers. We use it to parallelise processing.
Overview
A dask cluster is made of:
A scheduler, a lightweight process that coordinates task execution.
Workers, that execute the tasks. They may be running on remote nodes and perform the heavy processing.
Note
The dask scheduler, the dask workers and the batch pre-processing pipeline application must be running in the same Python environment.
Using BPP with dask
Here we outline how to start a dask cluster on your machine and have BPP use it.
Scheduler
In one terminal, activate the batch pre-processing Python environment and launch the scheduler. Assuming you followed the instructions in Installation to set up the Python environment:
cd <BPP_REPOSITORY>
source .venv/bin/activate
dask scheduler
This starts the scheduler on the default port 8786.
Workers
In another terminal, launch the desired number of workers (4 in this example). They will share the CPU cores and RAM of your machine evenly.
cd <BPP_REPOSITORY>
source .venv/bin/activate
dask worker localhost:8786 --nworkers 4 --resources "process=1"
Warning
--resources "process=1" is required – explanation below.
Dashboard
You may now open the dask dashboard in a browser, which runs on port 8787 by default. From there you can observe the progression of tasks in real time; simply navigate to https://localhost:8787.
Pipeline App
Batch pre-processing can now be launched and use the cluster we just started.
Just use the same command line with the -d or --dask-scheduler option:
ska-sdp-batch-preprocess run -d localhost:8786 <OPTIONS> <INPUT>
Why worker resources?
The pipeline CLI app runs DP3 tasks, which use multiple threads.
Dask has no mechanism to detect how many threads a task uses, and assumes that
every task uses 1 thread from the worker’s own
Python ThreadPool. This is problematic when running C/C++ code spawning its
own pool of threads on the side, like DP3.
The only reliable solution is to use
worker resources.
The batch pre-processing pipeline assumes that all workers define a resource
called process; each worker holds 1, and each DP3 task is defined as
requiring 1. When a DP3 task reaches a worker, DP3 is launched with the same
number of threads as the worker officially owns. A worker thus only ever runs
one task at a time, and all threads are used without risk of over-subscription.
The drawback is that resources can only be defined when the workers are
launched; hence the need to add --resources "process=1" to the worker launch
command.