Dask-jobqueue: Difference between revisions

From Grid5000
Jump to navigation Jump to search
Line 239: Line 239:
from dask_jobqueue import OARCluster as Cluster
from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
from dask.distributed import Client
from pprint import pprint
import logging
import os
import os
import logging
import socket
import socket
from pprint import pprint
import time


cluster = Cluster(
cluster = Cluster(

Revision as of 16:22, 9 June 2023

Dask-jobqueue

Dask-jobqueue is a Python library which makes it easy to deploy Dask on common job queuing systems typically found in high performance supercomputers, academic research institutions, and other clusters. Dask is a Python library for parallel computing which scales Python code from multi-core local machines to large distributed clusters in the cloud. Since Dask-jobqueue provides interfaces for OAR and Slurm based clusters, it can be used to facilitate the passage between OAR and Slurm based resource managers. Source code, issues and pull requests can be found here.


Dask-jobqueue installation

Note.png Note

The current version of Dask-jobqueue in pip or conda repositories does not contain the last developed features. In particular, the last version available on the github repository is able to manage memory in OAR job submission (see changelog)

Note.png Note

Dask-jobqueue should be installed in a folder accessible by both the frontend and the nodes. For example, by installing it in your homedir, we are guaranteed that Dask-jobqueue is available by both the frontend and the nodes.

Using pip

pip can be used to install dask-jobqueue and its dependencies in your homedir:

Terminal.png fontend.site:
pip install --user dask-jobqueue

As stated above, to install the last version from the github repository:

Terminal.png fontend.site:

Note that, it is recommended to install python dependencies via a virtual environment. To do so, before running the pip command:

Terminal.png fontend.site:
cd my_dask_jobqueue_project
Terminal.png fontend.site:
python3 -m venv env
Terminal.png fontend.site:
source env/bin/activate
Terminal.png fontend.site:

Using conda

conda can be used to install dask-jobqueue from the conda-forge:

Terminal.png fontend.site:
conda install dask-jobqueue -c conda-forge


To install dask-jobqueue from the last version available in the github repository, you need to create a conda environment file (e.g. "conda-env-dask-jobq.yml") as:

name: dask-jobq
dependencies:
  - pip:
    - git+https://github.com/dask/dask-jobqueue

And install daks-jobqueue using this environment file

Terminal.png fontend.site:
conda env create --file conda-env-dask-jobq.yml
Terminal.png fontend.site:
source activate dask-jobq

Basic usage

Executing a bash script with dask-jobqueue

Here is a Python script example which requests for starting a batch script on a well defined resource (2 cores, 24GB, at least 1 GPU, specific cluster - chifflet -, for 1 hour)

from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
import os

cluster = Cluster(
  queue='default',
  # Should be specified if you belongs to more than one GGA
  #project='<your grant access group>',
  # cores per job, required parameter
  cores=2,
  # memory per job, required parameter
  memory='24GB',
  memory_per_core_property_name='memcore',
  # walltime for each worker job
  walltime='1:0:0',

  job_extra_directives=[
    # Besteffort job
    #'-t besteffort',
    # reserve node from specific cluster
    '-p chifflet',
  ],

  # reserve node with GPU
  #resource_spec='gpu=1'
)

cluster.scale(1)
client = Client(cluster)

# call your favorite batch script
client.submit(os.system, './hello-world.sh').result()

client.close()
cluster.close()

The example script can be launched on frontend as follow:

Terminal.png flille:
python3 this-script.py

How is Dask-jobqueue interacting with OAR in practice?

Dask-jobqueue creates at first a Dask Scheduler when the Cluster object is instantiated (see line 5 of the previous example). The Dask Scheduler interacts with OAR/Slurm to submit jobs and reserve nodes according to the specification you made. For instance, Dask Scheduler creates for the previous example the following configuration file and submits it to OAR :

#!/usr/bin/env bash

#OAR -n dask-worker
#OAR -q default
#OAR -l /nodes=1/core=2,walltime=1:0:0
#OAR -p 'chifflet AND memcore>=11445'

/home/ychi/source-dask-jobq/env/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:36611 --nthreads 1 --nworkers 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60

This generated OAR file can be access by adding the following line to the python script:

print(cluster.job_script())

Note, that this command can help you to debug Dask-jobqueue resource reservation if you do not understand why Dask is not correctly reserving the nodes you specified.


The job request is sent to the OAR and executing on the wanted resources. OAR is starting a Dask Worker on the distant nodes. The Dask Scheduler is interacting with these Workers to indicate the computations they have to perform. In the previous example, this is done by executing the bash script specified by the submit method (see line 34).

Note that :

  • a Worker in Dask is a Python object that serves data and performs computations while a Job here means requests submitted to, and managed by a resource manager (OAR for instance). To summarize, dask-jobqueue starts a single OAR job that may includes one or more Dask Workers that may execute one or more scripts/tasks.
  • when the Cluster object goes away, either because the Python program is killed or the dask object has been deleted, a signal is sent to the Workers to shut them down. If for some reason, the signal does not get through, the Workers kill themselves after waiting for 60 seconds reconnecting with the Dask Scheduler (timeout can be adjusted with the death-timeout parameter presented in the next section).
  • to schedule job(s) on the previously reserved nodes for the computation, you need to tell the Dask Scheduler the number of job(s) using the scale command. At this step, the Dask Scheduler starts to interact with OAR. In the example above, only one job will be launched, with only one worker inside. For advanced usage, please refer to the Advanced usage section.

Below is a schema that illustrates the different layers about Dask, OAR and resources.

Schema dask oar.png

The schema below describes the workflow of the different entities (Dask, OAR and resources):

Workflow dask jobq.png

Advanced usage

Use a configuration file to specify resources

Instead of specifying resources in the Python script, it is possible to use a configuration file. This can be useful when the same script is executed on different G5K sites.


Bellow an example of Dask config file stored in ~/.config/dask/jobqueue.yaml:

jobqueue:
  oar:
    name: dask-worker
    # Dask worker options
    cores: 2 # Total number of cores per job
    memory: '24GB' # Total amount of memory per job
    #processes: 1 # Number of Python processes per job, ~= sqrt(cores) 
    #interface: null # Network interface to use: eth0 or ib0
    death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
    #local-directory: null # Location of fast local storage like /scratch or $TMPDIR
    #extra: [] # Extra arguments to pass to Dask worker
    worker-extra-args: [] # Additional arguments to pass to `dask-worker`

    # OAR resource manager options
    #shebang: "#!/usr/bin/env bash"
    queue: 'default'
    #project: null
    walltime: '1:00:00'
    #env-extra: []
    job-script-prologue: []
    #resource-spec: null
    #job-extra: null
    #job-extra-directives: []
    #job-directives-skip: []
    log-directory: null

    # Scheduler options
    scheduler-options: {}

When a resource configuration file exists, the cluster can be then instantiated with one single line as follow:

cluster = OARCluster()

Cluster parameters

The following table is giving the mapping between Dask parameters and OAR/Slurm parameters.

dask-jobqueue parameter OAR command example Slurm command example Description
queue #OAR -q #SBATCH -p Destination queue for each worker job
project #OAR --project #SBATCH -A Accounting group associated with each worker job
cores #OAR -l core=2 #SBATCH --cpu-per-task=2 Total cores per job
memory #OAR -p memcore>=16000 #SBATCH --mem=24GB if job_mem is None Workers' --memory-limit parameter
walltime #OAR -l walltime=hh:mm:ss #SBATCH -t hh:mm:ss Walltime for each worker job
name #OAR -n #SBATCH -J Name of worker, always set to the default value dask-worker
resource_spec #OAR -l host=1/core=2, gpu=1 Not supported Request resources and specify job placement
job_mem Not supported #SBATCH --mem=24GB Amount of memory to request (If None, defaults to memory * Worker processes)
job_cpu Not supported #SBATCH --cpus-per-task=2 Number of CPU to book (If None, defaults to Worker threads * processes)
job_extra_directives #OAR -O, -E #SBATCH -o, -e Log directory
job_extra_directives #OAR -p parasilo #SBATCH -C sirocoo Property request
job_extra_directives #OAR -t besteffort #SBATCH -t besteffort Besteffort job
job_extra_directives #OAR -r now #SBATCH --begin=now Advance reservation
job_extra_directives #OAR --checkpoint 150 #SBATCH --checkpoint 150 Checkpoint
job_extra_directives #OAR -a jobid #SBATCH --dependency state:jobid Jobs dependency

Start multiple computations at once using 'scale' parameter

Dask-jobqueue allows to seamlessly deploy Dask on clusters that use a variety of job queuing systems such OAR and Slurm. With Dask-jobqueue's Pythonic interface, users can easily manage submissions, executions and deletions of jobs through different resource and job management systems. But Dask also gives users the ability to scale the jobs for parallel computing that coordinates with Python's existing scientific librairies like NumPy, Pandas and Scikit-Learn.

Dask-distributed that we imported in the example above is an extension of Dask which facilitates parallel computings. The Client class allows to connect to and to submit computations to a defined Dask Cluster, by 'submit' or 'map' calls. Details of the class can be found here.

As shown by the example above, a single Job may include one or more Workers. The number of Workers can be set by the processes parameter (see configuration section), if your job can be cut into many processes.

To specify the number of Jobs, you can use the scale command. The number of Jobs can either be specified directly as shown in the example above (2 cores, 24GB), or indirectly by the cores or memory request:

# 2 jobs with 1 worker for each will be launched
cluster.scale(2)
# specify total cores 
cluster.scale(cores=4)
# specify total memory
cluster.scale(memory="48GB")

Dask Cluster also has the ability to "autoscale", with the adapt interface:

cluster.adapt(minimum=2, maximum=20)

A more complicated example below can help understanding the Worker and Job notions of Dask-jobqueue:

from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
from pprint import pprint
import logging
import os
import socket
import time

cluster = Cluster(
    queue='default',
    # Should be specified if you belongs to more than one GGA
    #project='<your grant access group>',
    cores=4,
    memory='64GiB',
    memory_per_core_property_name='memcore',
    # walltime for each worker job
    walltime='1:0:0',
    processes=2,
)
cluster.scale(6)
print(cluster.job_script())

client = Client(cluster)

def slow_increment(x):
    time.sleep(5)
    return {'result': x + 1,
            'host': socket.gethostname(),
            'pid': os.getpid(),
            'time': time.asctime(time.localtime(time.time()))}

nb_workers = 0
while True:
    nb_workers = len(client.scheduler_info()["workers"])
    if nb_workers >= 2:
        print('Finally got {} workers '.format(nb_workers), 'with client: ', client)
        break
    time.sleep(1)


futures = client.map(slow_increment, range(6))
results = client.gather(futures)
print('results: ')
pprint(results)

client.close()
cluster.close()

The regular job script is generated as follow:

#!/usr/bin/env bash

#OAR -n dask-worker
#OAR -q default
#OAR -l /nodes=1/core=4,walltime=1:0:0
#OAR -p memcore>=16384

/home/ychi/source-dask-jobq/env/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:40605 --nthreads 2 --nprocs 2 --memory-limit 32.00GiB --name dummy-name --nanny --death-timeout 60 --protocol tcp://

Finally got 6 workers  with client:  <Client: 'tcp://172.16.47.106:40605' processes=6 threads=12, memory=192.00 GiB>
results:
[{'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6780,
  'result': 1,
  'time': 'Fri Aug  5 17:08:03 2022'},
 {'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6767,
  'result': 2,
  'time': 'Fri Aug  5 17:08:03 2022'},
 {'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6768,
  'result': 3,
  'time': 'Fri Aug  5 17:08:03 2022'},
 {'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6777,
  'result': 4,
  'time': 'Fri Aug  5 17:08:03 2022'},
 {'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6769,
  'result': 5,
  'time': 'Fri Aug  5 17:08:03 2022'},
 {'host': 'chetemi-13.lille.grid5000.fr',
  'pid': 6776,
  'result': 6,
  'time': 'Fri Aug  5 17:08:03 2022'}]

Here 2 Workers (--nprocs 2) with 2 cores for each are asked for Dask. Since there are 2 Workers per job, Dask-jobqueue will ask 3 OAR Jobs, equivalent of 6 Dask Jobs/Workers (processes=6).