Dask-jobqueue: Difference between revisions
| (115 intermediate revisions by 3 users not shown) | |||
| Line 1: | Line 1: | ||
| {{Portal|User}} | |||
| {{TutorialHeader}} | |||
| = Dask-jobqueue = | = Dask-jobqueue = | ||
| [https://jobqueue.dask.org/en/latest/index.html Dask-jobqueue] is a Python library  | [https://jobqueue.dask.org/en/latest/index.html Dask-jobqueue] is a Python library that simplifies the deployment of [https://www.dask.org/ Dask] on commonly used job queuing systems found in high performance supercomputers, academic research institutions, and other clusters. Dask itself is a Python library for parallel computing, allowing scalability of Python code from multi-core local machines to large distributed clusters in the cloud.   | ||
| Since Dask-jobqueue provides interfaces for OAR and Slurm based clusters, it can be used to  | |||
| Since Dask-jobqueue provides interfaces for both OAR and Slurm-based clusters, it can be used to facilitate the switch between OAR and Slurm-based resource managers.   | |||
| The source code, issues and pull requests can be found [https://github.com/dask/dask-jobqueue here]. | |||
| The OARCluster implementation in Dask-jobqueue has been improved to address a limitation. Previously, it did not consider the memory parameter, leading to inconsistencies in memory allocation. To overcome this, an extension was proposed and accepted into the Dask-jobqueue library. | |||
| This extension enables OAR to consider the memory parameter, ensuring that the allocated resources fulfill the specified memory requirements of Dask workers. By making OAR aware of the memory request, the extension facilitates a more intuitive and efficient allocation of resources. | |||
| Additionally, documentation work has been undertaken to assist users in seamlessly transitioning between OAR and Slurm-based clusters. You can refer to [https://www.grid5000.fr/w/Dask-jobqueue#Cluster_parameters this table] to see the mapping of commands and concepts between OAR and Slurm-based resource managers. | |||
| == Dask-jobqueue installation == | |||
| {{Note|text=Dask-jobqueue has to be installed on both the frontend and the nodes. By installing it locally in your homedir, Dask-jobqueue is available on both the frontend and the nodes (as your homedir is stored on a NFS server). | |||
| }} | |||
| === Using pip === | |||
| pip can be used to install dask-jobqueue and its dependencies in your homedir: | |||
| {{Term|location=fontend.site|cmd=<code class="command">pip install --user dask-jobqueue==0.8.2 distributed==2023.8.0 --upgrade</code>}} | |||
| ==  | Note that, it is recommended to install python dependencies via a virtual environment. To do so, before running the <code class="command">pip</code> command, you need to set up a virtual environment: | ||
| {{Term|location= | {{Term|location=fontend.site|cmd=<code class="command">python3 -m venv env</code>}} | ||
| {{Term|location=fontend.site|cmd=<code class="command">source env/bin/activate</code>}} | |||
| {{Term|location= | {{Term|location=fontend.site|cmd=<code class="command">pip install dask-jobqueue==0.8.2 distributed==2023.8.0</code>}} | ||
| === Using conda === | |||
| conda can be used to install dask-jobqueue from the conda-forge: | |||
| {{Term|location=fontend.site|cmd=<code class="command">module load conda</code>}} | |||
| {{Term|location=fontend.site|cmd=<code class="command">conda install dask-jobqueue=0.8.2 distributed=2023.8.0 -c conda-forge</code>}} | |||
| {{Note|text=The example usages below have been tested with specific versions of the packages: dask-jobqueue==0.8.2 and distributed==2023.8.0. Without these version pins, we may encounter the error as described here: https://intranet.grid5000.fr/bugzilla/show_bug.cgi?id=15889. We recommend using these versions until a verified fix is released in newer versions. | |||
| }} | |||
| == Basic usage == | == Basic usage == | ||
| Here is a Python script example which  | === Executing a bash script with dask-jobqueue === | ||
| Here is a Python script example of dask-jobqueue which executes a batch script (''hello-world.sh'') on a specific resource (2 cores, 24GB, at least 1 GPU, specific cluster - chifflet -, for 1 hour) | |||
| <syntaxhighlight lang=" | <syntaxhighlight lang="Python" line> | ||
| from dask_jobqueue import OARCluster as Cluster | from dask_jobqueue import OARCluster as Cluster | ||
| from dask.distributed import Client | from dask.distributed import Client | ||
| Line 19: | Line 53: | ||
| cluster = Cluster( | cluster = Cluster( | ||
| queue='default', |   queue='default', | ||
| # Should be specified if you belongs to more than one GGA |   # Should be specified if you belongs to more than one GGA | ||
| project='<your grant access group>', |   #project='<your grant access group>', | ||
| # cores per job, required parameter |   # cores per job, required parameter | ||
| cores=2, |   cores=2, | ||
| # memory per job, required parameter |   # memory per job, required parameter | ||
| memory=' |   memory='12GB', | ||
| # walltime for each worker job |   # The memory per core property name of your OAR cluster (usually memcore or mem_core). | ||
| walltime='1:0:0', |   memory_per_core_property_name='memcore', | ||
|   # walltime for each worker job | |||
|   walltime='1:0:0', | |||
|   job_extra_directives=[ | |||
| '-t besteffort', |     # Besteffort job | ||
| # reserve node from specific cluster |     #'-t besteffort', | ||
| '-p chifflet', |     # reserve node from specific cluster | ||
|     '-p chifflet', | |||
|   ], | |||
| ], | |||
| #  |   # reserve node with GPU | ||
| #resource_spec='gpu=1' |   #resource_spec='gpu=1' | ||
| ) | ) | ||
| Line 45: | Line 80: | ||
| # call your favorite batch script | # call your favorite batch script | ||
| client.submit(os.system,  | client.submit(os.system, './hello-world.sh').result() | ||
| client.close() | client.close() | ||
| cluster.close() | cluster.close() | ||
| </syntaxhighlight> | </syntaxhighlight> | ||
| A sample <code>hello-world.sh</code> could be the following: | |||
| <syntaxhighlight lang="bash" line> | |||
| echo "hello from $(hostname)" | |||
| </syntaxhighlight> | |||
| The Python script has to be launched on frontend: | |||
| {{Term|location=fontend.site|cmd=<code class="command">python3 this-script.py</code>}} | |||
| {{Warning|text=The OARCluster extension introduces a new parameter: <code>oar_mem_core_property_name</code>. Indeed, as OAR does not provide a generic property name for specifying the amount of memory associated with a resource, administrators are free to setup their own naming convention (usually memcore or mem_core). | |||
| Setting this property is required, otherwise dask jobqueue cannot ensure that the nodes possess the specified amount of memory. If the property does not exist on your cluster, you should set it to <code>not_applicable</code> to avoid getting a warning. On Grid'5000, the property is named ''memcore'', this is why you need to have the parameter <code>memory_per_core_property_name='memcore'</code>}} | |||
| == Advanced usage == | |||
| === Start multiple computations in parallel using 'scale' parameter === | |||
| <syntaxhighlight lang=" | |||
| print(cluster.job_script()) | Dask provides the ability (through Dask-distributed) to scale jobs for parallel computing, which is very convenient for interacting with scientific libraries like NumPy, Pandas, Scikit-Learn. The <code>Client</code> class enables users to submit computations (through <code>submit</code> or <code>map</code> calls) to a defined Dask Cluster. More details about the <code>Client</code> class can be found [https://distributed.dask.org/en/stable/client.html here]. | ||
| To specify the number of Dask workers or OAR jobs, you can use the '''scale''' command. The number of workers or jobs can be specified directly, or indirectly by specifying the cores or memory request: | |||
| <syntaxhighlight lang="Python"> | |||
| # specify total workers | |||
| cluster.scale(2) | |||
| # specify total OAR jobs | |||
| cluster.scale(jobs=2) | |||
| # specify total cores  | |||
| cluster.scale(cores=4) | |||
| # specify total memory | |||
| cluster.scale(memory="48GB") | |||
| </syntaxhighlight> | |||
| Dask Cluster also has the ability to "autoscale", with the '''adapt''' interface: | |||
| <syntaxhighlight lang="Python"> | |||
| cluster.adapt(minimum=2, maximum=20) | |||
| </syntaxhighlight> | |||
| Note that a single Job may include one or more Workers. | |||
| A more complicated example below can help understanding the Worker and Job notions of Dask-jobqueue: | |||
| <syntaxhighlight lang="Python" line> | |||
| from dask_jobqueue import OARCluster as Cluster | |||
| from dask.distributed import Client | |||
| from pprint import pprint | |||
| import logging | |||
| import os | |||
| import socket | |||
| import time | |||
| cluster = Cluster( | |||
|     queue='default', | |||
|     # Should be specified if you belongs to more than one GGA | |||
|     #project='<your grant access group>', | |||
|     cores=4, | |||
|     memory='16GiB', | |||
|     # The memory per core property name of your OAR cluster (usually memcore or mem_core). | |||
|     memory_per_core_property_name='memcore', | |||
|     # walltime for each worker job | |||
|     walltime='1:0:0', | |||
|     processes=2, | |||
| ) | |||
| cluster.scale(jobs=2) | |||
| print('OAR submission file: \n', cluster.job_script()) | |||
| client = Client(cluster) | |||
| def slow_increment(x): | |||
|     time.sleep(5) | |||
|     return {'result': x + 1, | |||
|             'host': socket.gethostname(), | |||
|             'pid': os.getpid(), | |||
|             'time': time.asctime(time.localtime(time.time())), | |||
|             'jobId': os.environ.get('OAR_JOB_ID')} | |||
| # cluster.scale() doesn't wait for the workers to spin up. | |||
| # sleep here and wait for workers to be up. | |||
| # more details here: https://github.com/dask/dask-jobqueue/issues/206 | |||
| while len(client.scheduler_info()["workers"]) < 4: | |||
|     time.sleep(1) | |||
| futures = client.map(slow_increment, range(6)) | |||
| results = client.gather(futures) | |||
| print('results: ') | |||
| pprint(results) | |||
| client.close() | |||
| cluster.close() | |||
| </syntaxhighlight> | </syntaxhighlight> | ||
| <syntaxhighlight lang="python"> | The previous code generates the following output: | ||
| <syntaxhighlight lang="bash"> | |||
| results: | |||
| [{'host': 'chifflet-7.lille.grid5000.fr', | |||
|   'jobId': '1952956', | |||
|   'pid': 16502, | |||
|   'result': 1, | |||
|   'time': 'Thu Jun 29 09:56:06 2023'}, | |||
|  {'host': 'chifflet-7.lille.grid5000.fr', | |||
|   'jobId': '1952955', | |||
|   'pid': 16501, | |||
|   'result': 2, | |||
|   'time': 'Thu Jun 29 09:56:06 2023'}, | |||
|  {'host': 'chifflet-7.lille.grid5000.fr', | |||
|   'jobId': '1952956', | |||
|   'pid': 16508, | |||
|   'result': 3, | |||
|   'time': 'Thu Jun 29 09:56:06 2023'}, | |||
|  {'host': 'chifflet-7.lille.grid5000.fr', | |||
|   'jobId': '1952955', | |||
|   'pid': 16507, | |||
|   'result': 4, | |||
|   'time': 'Thu Jun 29 09:56:06 2023'}, | |||
|  {'host': 'chifflet-7.lille.grid5000.fr', | |||
|   'jobId': '1952956', | |||
|   'pid': 16502, | |||
|   'result': 5, | |||
|   'time': 'Thu Jun 29 09:56:06 2023'}, | |||
|  {'host': 'chifflet-7.lille.grid5000.fr', | |||
|   'jobId': '1952955', | |||
|   'pid': 16501, | |||
|   'result': 6, | |||
|   'time': 'Thu Jun 29 09:56:06 2023'}] | |||
| </syntaxhighlight> | |||
| As shown in the output, we got 4 workers (corresponding to the 4 different PIDs: 16501, 16502, 16507, and 16508). Besides, we can see that as requested, the workers were executed in 2 different OAR jobs (1952955 and 1952956) ; 2 workers on each job. | |||
| === Understand the interaction between Dask-jobqueue and OAR === | |||
| Dask-jobqueue creates at first a Dask Scheduler when the Cluster object is instantiated (see line 5 of [[#Executing_a_bash_script_with_dask-jobqueue | the basic example]]). The Dask Scheduler interacts with the scheduler (i.e., OAR or Slurm) to submit jobs according to the wanted specification. The interaction between Dask-jobqueue and OAR (hidden for the end-user) is made through an inline command for the job submission.  | |||
| {{Note|text=If needed, you can display the generated OAR file by adding the following line to the python script: | |||
| <pre>print(cluster.job_script())</pre> It might be useful for debugging if you do not understand why Dask is not correctly reserving the nodes you specified.}} | |||
| In [[#Executing_a_bash_script_with_dask-jobqueue | the basic example]], Dask Scheduler generated this file: | |||
| <syntaxhighlight lang="bash"> | |||
| #!/usr/bin/env bash | #!/usr/bin/env bash | ||
| #OAR -n dask-worker | #OAR -n dask-worker | ||
| #OAR -q default | #OAR -q default | ||
| #OAR -l walltime=1:0:0 | #OAR -l /nodes=1/core=2,walltime=1:0:0 | ||
| #OAR -p 'chifflet AND memcore>=11445' | |||
| #OAR -p chifflet | |||
| / | /home/ychi/source-dask-jobq/env/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:36611 --nthreads 1 --nworkers 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60 | ||
| </syntaxhighlight> | </syntaxhighlight> | ||
| The job is sent to the OAR  | The job request is sent to the OAR and executing on the wanted resources. OAR is starting a Dask Worker on the distant nodes. The Dask Scheduler is interacting with these Workers to indicate the computations they have to perform. In [[#Executing_a_bash_script_with_dask-jobqueue | the basic example]], this is done by executing the bash script specified by the <code class=command>submit</code> method (see line 33). | ||
| = | The schema below describes the workflow of the different entities (Dask, OAR and resources): | ||
| [[File:Workflow dask jobq.png|800px]] | |||
| Note that :  | |||
| * when the Cluster object goes away, either because the Python program is killed or the dask object has been deleted, a signal is sent to the Workers to shut them down. If for some reason, the signal does not get through, the Workers kill themselves after waiting for 60 seconds reconnecting with the Dask Scheduler (timeout can be adjusted with the <code class=command>death-timeout</code> parameter presented in the [[#Advanced usage|next section]]). | |||
| * to schedule job(s) on the previously reserved nodes for the computation, you need to tell the Dask Scheduler the number of job(s) using the '''scale''' command. At this step, the Dask Scheduler starts to interact with OAR. In [[#Executing_a_bash_script_with_dask-jobqueue | the basic example]], only one job will be launched, with only one worker inside. For advanced usage, please refer to [[#Start_multiple_computations_in_parallel_using_.27scale.27_parameter | this example]]. | |||
| * dask-jobqueue does not handle multi-node jobs. In order to do this, you probably want to use [https://mpi.dask.org/en/latest/ Dask-mpi]. | |||
| === Use a configuration file to specify resources === | === Use a configuration file to specify resources === | ||
| Instead of specifying resources in the Python script, it is possible to use a configuration file. | |||
| <syntaxhighlight> | This can be useful when the same script is executed on different G5K sites. | ||
| Bellow an example of Dask config file stored in ~/.config/dask/jobqueue.yaml: | |||
| <syntaxhighlight lang="XML"> | |||
| jobqueue: | jobqueue: | ||
|    oar: |    oar: | ||
| Line 99: | Line 262: | ||
|      cores: 2 # Total number of cores per job |      cores: 2 # Total number of cores per job | ||
|      memory: '24GB' # Total amount of memory per job |      memory: '24GB' # Total amount of memory per job | ||
|      #processes: 1 # Number of Python processes per job |      #processes: 1 # Number of Python processes per job, ~= sqrt(cores)  | ||
|      #interface: null # Network interface to use: eth0 or ib0 |      #interface: null # Network interface to use: eth0 or ib0 | ||
|      death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler |      death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler | ||
|      #local-directory: null # Location of fast local storage like /scratch or $TMPDIR |      #local-directory: null # Location of fast local storage like /scratch or $TMPDIR | ||
|      #extra: [] # Extra arguments to pass to Dask worker |      #extra: [] # Extra arguments to pass to Dask worker | ||
|     worker-extra-args: [] # Additional arguments to pass to `dask-worker` | |||
|      # OAR resource manager options |      # OAR resource manager options | ||
| Line 111: | Line 275: | ||
|      walltime: '1:00:00' |      walltime: '1:00:00' | ||
|      #env-extra: [] |      #env-extra: [] | ||
|     job-script-prologue: [] | |||
|      #resource-spec: null |      #resource-spec: null | ||
|      job-extra: [] |      #job-extra: null | ||
|     #job-extra-directives: [] | |||
|     #job-directives-skip: [] | |||
|      log-directory: null |      log-directory: null | ||
| Line 119: | Line 286: | ||
| </syntaxhighlight>   | </syntaxhighlight>   | ||
| When a resource configuration file exists, the Dask cluster object can be then instantiated without parameters: | |||
| <syntaxhighlight lang=" | <syntaxhighlight lang="Python"> | ||
| cluster = OARCluster() | cluster = OARCluster() | ||
| </syntaxhighlight> | </syntaxhighlight> | ||
| Line 126: | Line 293: | ||
| === Cluster parameters === | === Cluster parameters === | ||
| The following table is giving the mapping between Dask parameters and OAR/Slurm parameters. | |||
| {| class="wikitable" | {| class="wikitable" | ||
| Line 137: | Line 305: | ||
| | cores || #OAR -l core=2 || #SBATCH --cpu-per-task=2 || Total cores per job | | cores || #OAR -l core=2 || #SBATCH --cpu-per-task=2 || Total cores per job | ||
| |- | |- | ||
| | memory || || #SBATCH --mem=24GB ||  | | memory || #OAR -p memcore>=16000 || #SBATCH --mem=24GB if job_mem is None || Workers' --memory-limit parameter | ||
| |- | |- | ||
| | walltime|| #OAR -l walltime=hh:mm:ss || #SBATCH -t hh:mm:ss || Walltime for each worker job | | walltime|| #OAR -l walltime=hh:mm:ss || #SBATCH -t hh:mm:ss || Walltime for each worker job | ||
| Line 145: | Line 313: | ||
| | resource_spec || #OAR -l host=1/core=2, gpu=1 || Not supported || Request resources and specify job placement | | resource_spec || #OAR -l host=1/core=2, gpu=1 || Not supported || Request resources and specify job placement | ||
| |- | |- | ||
| |  | | job_mem || Not supported || #SBATCH --mem=24GB || Amount of memory to request (If None, defaults to memory * Worker processes) | ||
| |-  | |||
| | job_cpu || Not supported || #SBATCH --cpus-per-task=2 || Number of CPU to book (If None, defaults to Worker threads * processes) | |||
| |-  | |||
| | job_extra_directives || #OAR -O, -E || #SBATCH -o, -e || Log directory | |||
| |- | |- | ||
| |  | | job_extra_directives || #OAR -p parasilo || #SBATCH -C sirocoo || Property request   | ||
| |- | |- | ||
| |  | | job_extra_directives || #OAR -t besteffort || #SBATCH -t besteffort || Besteffort job | ||
| |- | |- | ||
| |  | | job_extra_directives || #OAR -r now || #SBATCH --begin=now || Advance reservation | ||
| |- | |- | ||
| |  | | job_extra_directives || #OAR --checkpoint 150 || #SBATCH --checkpoint 150 || Checkpoint | ||
| |- | |- | ||
| |  | | job_extra_directives || #OAR -a jobid || #SBATCH --dependency state:jobid || Jobs dependency | ||
| |} | |} | ||
Latest revision as of 13:54, 28 August 2024
|   | Note | 
|---|---|
| This page is actively maintained by the Grid'5000 team. If you encounter problems, please report them (see the Support page). Additionally, as it is a wiki page, you are free to make minor corrections yourself if needed. If you would like to suggest a more fundamental change, please contact the Grid'5000 team. | |
Dask-jobqueue
Dask-jobqueue is a Python library that simplifies the deployment of Dask on commonly used job queuing systems found in high performance supercomputers, academic research institutions, and other clusters. Dask itself is a Python library for parallel computing, allowing scalability of Python code from multi-core local machines to large distributed clusters in the cloud.
Since Dask-jobqueue provides interfaces for both OAR and Slurm-based clusters, it can be used to facilitate the switch between OAR and Slurm-based resource managers.
The source code, issues and pull requests can be found here.
The OARCluster implementation in Dask-jobqueue has been improved to address a limitation. Previously, it did not consider the memory parameter, leading to inconsistencies in memory allocation. To overcome this, an extension was proposed and accepted into the Dask-jobqueue library.
This extension enables OAR to consider the memory parameter, ensuring that the allocated resources fulfill the specified memory requirements of Dask workers. By making OAR aware of the memory request, the extension facilitates a more intuitive and efficient allocation of resources.
Additionally, documentation work has been undertaken to assist users in seamlessly transitioning between OAR and Slurm-based clusters. You can refer to this table to see the mapping of commands and concepts between OAR and Slurm-based resource managers.
Dask-jobqueue installation
Using pip
pip can be used to install dask-jobqueue and its dependencies in your homedir:
Note that, it is recommended to install python dependencies via a virtual environment. To do so, before running the pip command, you need to set up a virtual environment:
Using conda
conda can be used to install dask-jobqueue from the conda-forge:
|   | Note | 
|---|---|
| The example usages below have been tested with specific versions of the packages: dask-jobqueue==0.8.2 and distributed==2023.8.0. Without these version pins, we may encounter the error as described here: https://intranet.grid5000.fr/bugzilla/show_bug.cgi?id=15889. We recommend using these versions until a verified fix is released in newer versions. | |
Basic usage
Executing a bash script with dask-jobqueue
Here is a Python script example of dask-jobqueue which executes a batch script (hello-world.sh) on a specific resource (2 cores, 24GB, at least 1 GPU, specific cluster - chifflet -, for 1 hour)
from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
import os
cluster = Cluster(
  queue='default',
  # Should be specified if you belongs to more than one GGA
  #project='<your grant access group>',
  # cores per job, required parameter
  cores=2,
  # memory per job, required parameter
  memory='12GB',
  # The memory per core property name of your OAR cluster (usually memcore or mem_core).
  memory_per_core_property_name='memcore',
  # walltime for each worker job
  walltime='1:0:0',
  job_extra_directives=[
    # Besteffort job
    #'-t besteffort',
    # reserve node from specific cluster
    '-p chifflet',
  ],
  # reserve node with GPU
  #resource_spec='gpu=1'
)
cluster.scale(1)
client = Client(cluster)
# call your favorite batch script
client.submit(os.system, './hello-world.sh').result()
client.close()
cluster.close()
A sample hello-world.sh could be the following:
echo "hello from $(hostname)"
The Python script has to be launched on frontend:
Advanced usage
Start multiple computations in parallel using 'scale' parameter
Dask provides the ability (through Dask-distributed) to scale jobs for parallel computing, which is very convenient for interacting with scientific libraries like NumPy, Pandas, Scikit-Learn. The Client class enables users to submit computations (through submit or map calls) to a defined Dask Cluster. More details about the Client class can be found here.
To specify the number of Dask workers or OAR jobs, you can use the scale command. The number of workers or jobs can be specified directly, or indirectly by specifying the cores or memory request:
# specify total workers
cluster.scale(2)
# specify total OAR jobs
cluster.scale(jobs=2)
# specify total cores 
cluster.scale(cores=4)
# specify total memory
cluster.scale(memory="48GB")
Dask Cluster also has the ability to "autoscale", with the adapt interface:
cluster.adapt(minimum=2, maximum=20)
Note that a single Job may include one or more Workers.
A more complicated example below can help understanding the Worker and Job notions of Dask-jobqueue:
from dask_jobqueue import OARCluster as Cluster
from dask.distributed import Client
from pprint import pprint
import logging
import os
import socket
import time
cluster = Cluster(
    queue='default',
    # Should be specified if you belongs to more than one GGA
    #project='<your grant access group>',
    cores=4,
    memory='16GiB',
    # The memory per core property name of your OAR cluster (usually memcore or mem_core).
    memory_per_core_property_name='memcore',
    # walltime for each worker job
    walltime='1:0:0',
    processes=2,
)
cluster.scale(jobs=2)
print('OAR submission file: \n', cluster.job_script())
client = Client(cluster)
def slow_increment(x):
    time.sleep(5)
    return {'result': x + 1,
            'host': socket.gethostname(),
            'pid': os.getpid(),
            'time': time.asctime(time.localtime(time.time())),
            'jobId': os.environ.get('OAR_JOB_ID')}
# cluster.scale() doesn't wait for the workers to spin up.
# sleep here and wait for workers to be up.
# more details here: https://github.com/dask/dask-jobqueue/issues/206
while len(client.scheduler_info()["workers"]) < 4:
    time.sleep(1)
futures = client.map(slow_increment, range(6))
results = client.gather(futures)
print('results: ')
pprint(results)
client.close()
cluster.close()
The previous code generates the following output:
results:
[{'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952956',
  'pid': 16502,
  'result': 1,
  'time': 'Thu Jun 29 09:56:06 2023'},
 {'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952955',
  'pid': 16501,
  'result': 2,
  'time': 'Thu Jun 29 09:56:06 2023'},
 {'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952956',
  'pid': 16508,
  'result': 3,
  'time': 'Thu Jun 29 09:56:06 2023'},
 {'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952955',
  'pid': 16507,
  'result': 4,
  'time': 'Thu Jun 29 09:56:06 2023'},
 {'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952956',
  'pid': 16502,
  'result': 5,
  'time': 'Thu Jun 29 09:56:06 2023'},
 {'host': 'chifflet-7.lille.grid5000.fr',
  'jobId': '1952955',
  'pid': 16501,
  'result': 6,
  'time': 'Thu Jun 29 09:56:06 2023'}]
As shown in the output, we got 4 workers (corresponding to the 4 different PIDs: 16501, 16502, 16507, and 16508). Besides, we can see that as requested, the workers were executed in 2 different OAR jobs (1952955 and 1952956) ; 2 workers on each job.
Understand the interaction between Dask-jobqueue and OAR
Dask-jobqueue creates at first a Dask Scheduler when the Cluster object is instantiated (see line 5 of the basic example). The Dask Scheduler interacts with the scheduler (i.e., OAR or Slurm) to submit jobs according to the wanted specification. The interaction between Dask-jobqueue and OAR (hidden for the end-user) is made through an inline command for the job submission.
In the basic example, Dask Scheduler generated this file:
#!/usr/bin/env bash
#OAR -n dask-worker
#OAR -q default
#OAR -l /nodes=1/core=2,walltime=1:0:0
#OAR -p 'chifflet AND memcore>=11445'
/home/ychi/source-dask-jobq/env/bin/python3 -m distributed.cli.dask_worker tcp://172.16.47.106:36611 --nthreads 1 --nworkers 2 --memory-limit 11.18GiB --name dummy-name --nanny --death-timeout 60
The job request is sent to the OAR and executing on the wanted resources. OAR is starting a Dask Worker on the distant nodes. The Dask Scheduler is interacting with these Workers to indicate the computations they have to perform. In  the basic example, this is done by executing the bash script specified by the submit method (see line 33).
The schema below describes the workflow of the different entities (Dask, OAR and resources):
Note that :
- when the Cluster object goes away, either because the Python program is killed or the dask object has been deleted, a signal is sent to the Workers to shut them down. If for some reason, the signal does not get through, the Workers kill themselves after waiting for 60 seconds reconnecting with the Dask Scheduler (timeout can be adjusted with the death-timeoutparameter presented in the next section).
- to schedule job(s) on the previously reserved nodes for the computation, you need to tell the Dask Scheduler the number of job(s) using the scale command. At this step, the Dask Scheduler starts to interact with OAR. In the basic example, only one job will be launched, with only one worker inside. For advanced usage, please refer to this example.
- dask-jobqueue does not handle multi-node jobs. In order to do this, you probably want to use Dask-mpi.
Use a configuration file to specify resources
Instead of specifying resources in the Python script, it is possible to use a configuration file. This can be useful when the same script is executed on different G5K sites.
Bellow an example of Dask config file stored in ~/.config/dask/jobqueue.yaml:
jobqueue:
  oar:
    name: dask-worker
    # Dask worker options
    cores: 2 # Total number of cores per job
    memory: '24GB' # Total amount of memory per job
    #processes: 1 # Number of Python processes per job, ~= sqrt(cores) 
    #interface: null # Network interface to use: eth0 or ib0
    death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
    #local-directory: null # Location of fast local storage like /scratch or $TMPDIR
    #extra: [] # Extra arguments to pass to Dask worker
    worker-extra-args: [] # Additional arguments to pass to `dask-worker`
    # OAR resource manager options
    #shebang: "#!/usr/bin/env bash"
    queue: 'default'
    #project: null
    walltime: '1:00:00'
    #env-extra: []
    job-script-prologue: []
    #resource-spec: null
    #job-extra: null
    #job-extra-directives: []
    #job-directives-skip: []
    log-directory: null
    # Scheduler options
    scheduler-options: {}
When a resource configuration file exists, the Dask cluster object can be then instantiated without parameters:
cluster = OARCluster()
Cluster parameters
The following table is giving the mapping between Dask parameters and OAR/Slurm parameters.
| dask-jobqueue parameter | OAR command example | Slurm command example | Description | 
|---|---|---|---|
| queue | #OAR -q | #SBATCH -p | Destination queue for each worker job | 
| project | #OAR --project | #SBATCH -A | Accounting group associated with each worker job | 
| cores | #OAR -l core=2 | #SBATCH --cpu-per-task=2 | Total cores per job | 
| memory | #OAR -p memcore>=16000 | #SBATCH --mem=24GB if job_mem is None | Workers' --memory-limit parameter | 
| walltime | #OAR -l walltime=hh:mm:ss | #SBATCH -t hh:mm:ss | Walltime for each worker job | 
| name | #OAR -n | #SBATCH -J | Name of worker, always set to the default value dask-worker | 
| resource_spec | #OAR -l host=1/core=2, gpu=1 | Not supported | Request resources and specify job placement | 
| job_mem | Not supported | #SBATCH --mem=24GB | Amount of memory to request (If None, defaults to memory * Worker processes) | 
| job_cpu | Not supported | #SBATCH --cpus-per-task=2 | Number of CPU to book (If None, defaults to Worker threads * processes) | 
| job_extra_directives | #OAR -O, -E | #SBATCH -o, -e | Log directory | 
| job_extra_directives | #OAR -p parasilo | #SBATCH -C sirocoo | Property request | 
| job_extra_directives | #OAR -t besteffort | #SBATCH -t besteffort | Besteffort job | 
| job_extra_directives | #OAR -r now | #SBATCH --begin=now | Advance reservation | 
| job_extra_directives | #OAR --checkpoint 150 | #SBATCH --checkpoint 150 | Checkpoint | 
| job_extra_directives | #OAR -a jobid | #SBATCH --dependency state:jobid | Jobs dependency | 


