Hadoop On Execo

From Grid5000
Jump to: navigation, search

The Apache Hadoop project provides an open-source framework for reliable, scalable, distributed computing. As such, it can be deployed and used in the Grid 5000 platform. However, its configuration and management may be sometimes difficult, specially under the dynamic nature of clusters within Grid 5000 reservations. In turn, Execo offers a Python API to manage processes execution. It is well suited for quick and easy creation of reproducible experiments on distributed hosts.

The project presented here is called hadoop_g5k and provides a layer built on top of Execo that allows to manage Hadoop clusters and prepare reproducible experiments in Hadoop. It offers a set of scripts to be used in command-line interfaces and a Python interface. Additionaly it provides classes and scripts to manage and link Apache Spark to a Hadoop cluster. Detailed information about it can be found in the wiki Spark on Execo.

The code can be found and downloaded from here.


As hg5k is based on Execo, we need to install it with easy_install:

Terminal.png frontend:
easy_install --user execo

You are ready to install hadoop_g5k using setuptools. Download the sources from the repository and unzip them.

Terminal.png frontend:
unzip master.zip

Then you can use the setup script to install hadoop_g5k. Move to the uncompressed directory and execute:

Terminal.png frontend:
python setup.py install --user

Depending on your Python configuration, the scripts will be installed in a different directory. You may add this directory to the path in order to be able to call them from any directory.

Hadoop Cluster Management

Hadoop_g5k provides a main Python class, called HadoopCluster, which exposes several useful methods to manage a Hadoop cluster deployed on top of Grid5000. Alternatively, a command-line script can be used to manage the cluster from the terminal.

Support for newer versions of Hadoop (>= 2.0) is also provided with the class HadoopV2Cluster.

The documentation of the methods of HadoopCluster and HadoopV2Cluster and all the related classes can be found in Read the Docs.

hg5k Script Basic Usage

hg5k is a script that provides a command-line interface to manage a Hadoop cluster. You can list all the possible options by using its help command:

Terminal.png frontend:
hg5k -h

In the following we show an example of a basic usage workflow of hg5k to execute the mrbench job in a cluster of 4 nodes. More details are given in the following section.

First, we need to reserve a set of nodes with the oarsub command.

Terminal.png frontend:
oarsub -I -t allow_classic_ssh -l nodes=4,walltime=2

Now, we are ready to create and use a Hadoop cluster. In the most simple way, a Hadoop cluster is created by just providing the set of machines that it will comprise. We can use the environment variable provided by oarsub. The option --version 2 indicates that we are using Hadoop 2.x.

Terminal.png node:
hg5k --create $OAR_FILE_NODES --version 2

Next, Hadoop needs to be installed in the nodes of the cluster. We need to provide the binaries of Hadoop as downloaded from the Apache Hadoop project webpage. Here we use a version already stored in several sites of Grid5000.

Terminal.png node:
hg5k --bootstrap /home/mliroz/public/sw/hadoop/hadoop-2.6.0.tar.gz
Warning.png Warning

This is just an example and /home/mliroz/public/sw/hadoop/hadoop-2.6.0.tar.gz may not be present in all sites. It is recommended to download the desired version of Hadoop to the user space and reference it.

Once we have installed Hadoop, we just need to initialize it (which will configure Hadoop depending both on the parameters we specify and the characteristics of the computers of the cluster) and start the servers. We can do both steps in just one command.

Terminal.png node:
hg5k --initialize --start

Now we are ready to execute jobs. We are going to execute the randomtextwriter job, which is included in Hadoop's examples jar.

Terminal.png node:
hg5k --jarjob /home/mliroz/public/sw/hadoop/hadoop-mapreduce-examples-2.6.0.jar randomtextwriter /output

We may check that the job executed correctly by printing the contents of HDFS. The following command is a shortcut for this:

Terminal.png node:
hg5k --state files

If everything went OK, there should as many files in the /output directory as reduce tasks in our MapReduce job.

When we are done, we should delete the cluster in order to remove all temporary files created during execution.

Terminal.png node:
hg5k --delete

hg5k Advanced Features

General Overview

hg5k allows to use several clusters at the same time, provided the set of nodes is disjoint. In order to identify each of the clusters, it internally uses a different id, and serializes the corresponding Python class in a different directory in /tmp. If nothing is indicated, the last used cluster is the one to be loaded. However, the user can instruct the script to use the cluster corresponding to a specific identifier with the option --id.

Creation and Destruction

In the basic usage, just the set of nodes composing the cluster is passed to the script. Additional options can be used when creating a cluster:

  • --properties path_to_conf: It specifies a properties file (with INI format) that can be used to indicate the cluster general properties, such as local and remote directory locations, ports, etc. The following example file contains the default values used by hg5k:
hadoop_base_dir = /tmp/hadoop
hadoop_conf_dir = /tmp/hadoop/conf
hadoop_logs_dir = /tmp/hadoop/logs
hadoop_temp_dir = /tmp/hadoop/temp
hdfs_port = 54310
mapred_port = 54311

local_base_conf_dir = ./conf
Note.png Note

/tmp is used by default because it is accessible by every user and it is usually stored in the biggest partition. In any case, the usage of NFS directories should be avoided as the different nodes of the cluster may overwrite the same files.

  • --version hadoop_version: It specifies the version of Hadoop to be used. Depending on the major number of the version a different Python class adapted to the corresponding distribution is used. By default it uses the class compliant with the old versions of Hadoop (0.* and 1.*).

Cluster State

After creation, the state of the cluster can be obtained at any moment by using:

Terminal.png frontend:
hg5k --state [state_param]

Detailed state information can be obtained by passing an additional parameter to the --state option:

  • general: Shows general information about the cluster. If no state parameter is indicated, this option is used.
  • files: Shows the hierarchy of the files stored in the filesystem.
  • dfs: Shows the file system state.
  • dfsblocks: Shows the information about the blocks in the filesystem.
  • mrjobs: Shows the state of the MapReduce jobs.


hg5k can be passed a hadoop binary distribution to be installed in all nodes in the directories specified in the properties file. Alternatively, it is also possible to use a previously created environment where a specific version of Hadoop is installed. In that case, the user should modify the properties file so that the installation directories point to the correct locations.

Initialization and Start

To start working, a Hadoop cluster needs to be configured (master and slaves, topology, etc.) and the distributed file system (dfs) formatted. To do so we use the option --initialize. More in detail, this option performs the following actions:

  • It copies the user-defined configuration specified in the local_base_conf_dir property of the configuration file to all nodes in the cluster. If the directory does not exist, nothing is done.
  • The slaves are taken from the nodes given in the --create option. The master is chosen among them. The topology is discovered automatically and the corresponding script created on the fly.
  • Hardware-dependent properties are configured individually for each node following the best practices described in [1]. This includes configuring the number of map and reduce slots and the memory allocated to each process depending on the characteristics of the machine.
  • The distributed file system is formated.

The cluster can also be returned to the original state by executing the option --clean. This will remove all the files created during cluster initialization and usage, including logs, dfs files, etc.

Finally, the --start option just starts the necessary servers: namenode and datanodes for the file system, the jobtracker and tasktrackers for MapReduce in version 0.* and 1.* and the YARN servers in version 2.*.

The cluster should be started before being used.

In the same way the cluster can be stopped by using the --stop option.

Hadoop Properties

Hadoop properties are stored in a set of .xml files in the configuration directory. These properties can be changed by hg5k by using the following command:

Terminal.png frontend:
hg5k --changeconf prop1=value1 prop2=value2

A list of property names and the corresponding new values should be passed. hg5k will look for the specified properties in the configuration files and replace their values.

Properties can also be read with the following command:

Terminal.png frontend:
hg5k --getconf prop1 prop2

Note that if the property is not specifically set, nothing will be returned. In this case, it should be assumed that the default value is used.


The basic usage of hg5k to execute jobs was described in the previous section. Additional properties can also be provided. The general form of the option is the following:

Terminal.png frontend:
hg5k --jarjob program.jar job_options --lib_jars lib1.jar lib2.jar --node cluster_node

hg5k will copy the job jar and the list of libraries to the indicated node (the master if nothing is specified) and execute the job with the given parameters.

Alternatively, arbitrary hadoop commands can be executed with the option --execute.

File Transfers

hg5k allows to easily and efficiently copy data to and from the distributed filesystem in the cluster. The options to be used are respectively --putindfs and --getfromdfs, which takes as parameters the source and destination dirs (one of them in the dfs and the other in the local filesystem).

Transfers of data to the cluster is performed efficiently by starting one thread on each of the nodes of the cluster and copying a subset of the files to that node.

Finally, the statistics of the job can be then retrieved with the following command:

Terminal.png frontend:
hg5k --copyhistory local_path [job_id1 job_id2 ...]

It will find the statistics of the execution of the jobs with the ids specified in the option (all executed jobs if nothing is indicated) and copy them to a local directory.

Test Automatization

Test automatization in hadoop_g5k can be done by using Execo's Engine class. A generic HadoopEngine is provided with hadoop_g5k. This class can directly executed from the command line or extended to be personalized. Documentation about the class methods and related classes can be found in Read the Docs.

In order to use it the following command should be executed:

Terminal.png frontend:
hadoop_engine <cluster> <num_nodes> <test_conf.ini>

This is the test_conf.ini file that will be used as example:

test.summary_file = ./test/summary.csv
test.ds_summary_file = ./test/ds-summary.csv
test.stats_path = ./test/stats
test.output_path =  ./test/output

ds.class = hadoop_g5k.dataset.StaticDataset
ds.class.local_path = datasets/ds1
ds.dest = ${data_dir}

ds.size = 1073741824, 2147483648  # 1 | 2 GB

dfs.block.size = 67108864  # 64 MB
dfs.replication = 3

io.sort.factor = 10, 100
io.sort.mb = 500

xp.combiner = true, false

xp.job = program.jar || ${xp.combiner} other_job_options ${xp.input} ${xp.output}

Test execution

The main workflow comprises two main loops: an external loop that traverses the different dataset parameters combinations and cleans the cluster and deploy the corresponding dataset; and an inner loop that traverses the experiment parameters combinations and executes a Hadoop MapReduce job for each of them.

In order to let hadoop_engine know which parameters correspond to the datasets and which ones to the experiments, the parameters have been divided into two sections: [ds_parameters] for the formers and [xp_parameters] for the latters.


Test parameters

A Hadoop test has a set of general test parameters which define the global behaviour of the execution. These are the main properties used in the test:

  • test.summary_file and test.ds_summary_file: These properties indicate the paths of the files that will store the information of each executed experiment and created dataset.
  • test.stats_path: If specified, it indicates the path where the experiments' statistics will be copied.
  • test.output_path: If specified, it indicates the path where the experiments' output will be copied.

Dataset parameters

As mentioned before, there are some parameters which are used to configure the dataset before deploying. There are two type of parameters: general dataset parameters, which start by ds. and mapreduce parameters, which have arbitrary names, as they correspond to Hadoop properties. In the second case, these parameters are simply inserted in the Hadoop configuration files (dfs.block.size and dfs.replication in the given example). Main general parameters are the following:

  • ds.size: If specified, it indicates the desired size of the dataset deployment. It should be given in bytes.
  • ds.class: It specifies the class to be used for dataset deployment, which is loaded automatically. It should extend hadoop_g5k's class Dataset. Hadoop_g5k already provides two implementations:
    • StaticDataset: This class manages already generated datasets stored in the frontend. It uploads the files to the dfs with as much parallelization as possible.
    • DynamicDataset: This class dynamically generates a dataset by executing a MapReduce job.
  • ds.dest: The location of the dataset in the dfs.
  • ds.class.*: All the parameters starting like this are passed to the deployment class. Each class uses a different set of parameters.

Experiment parameters

These comprise the parameters used in the execution of the experiment. Arbitrary Hadoop parameters can be used as before (io.sort.factor and io.sort.mb in the example). A special parameter should always be specified:

  • xp.job: It indicates the jar containing the job to be executed and its parameters separated by a double vertical line (||).


There are special values that can be used in the parameters configuration, which are called macros. The general form of a macro is ${macro_name}. They are replaced either by internal variables of the engine or by other user defined parameters. These are the macros defined in the engine:

  • ${data_base_dir}: The base dir for the datasets.
  • ${out_base_dir}: The base dir for the experiment outputs.
  • ${data_dir}: The dir of the used dataset.
  • ${comb_id}: The unique combination identifier.
  • ${ds_id}: The unique dataset identifier.
  • ${xp.input}: The experiment's input dir.
  • ${xp.output}: The experiment's output dir.

Macros referencing user defined parameters must follow certain rules:

  • A test parameter cannot reference a dataset or experiment parameter.
  • A dataset parameter cannot reference an experiment parameter.
  • Parameter definitions should not contain cycles, e.g., it is not possible to have xp.a = ${xp.b} other_stuff and xp.b = ${xp.a} other_stuff .

Macros allow to specify job parameters in function of the dataset being used and the experiment being executed. In the given configuration file, for example, the job definition uses a user-defined parameter, ${xp.combiner} and two internal variables, ${xp.input} and ${xp.output}.