My understanding of how Map Reduce work

Map/Reduce

A picture is worth a thousand words.. Well that really depends on how good the the picture is, but in my search for an understanding how map/reduce work, I found the following set of pictures and they went further than any other in helping me get a better grasp of the process.

In this simple word count example, a 200MB text file containing some sentences are initially loaded to Hadoop HDFS. Hadoop splits it into three 64MB (default) blocks and one remaining 8MB block. Then depending on how many data nodes (machines) you have in your cluster, these blocks gets distributed across the data nodes. In this post I have not investigated the exact mechanism of how hadoop distributes the data file which I intend to do later. For simplicity sake, we can assume that we have four data nodes in the cluster, and hadoop distributed each block on to each data node.

When the map/reduce job runs, hadoop will also distribute your mapper and reducer, which we will examine shortly, to each of the nodes, so that each block gets a mapper and a reducer and all the blocks will get processed in parallel, Awesome!!. Now I understand what they mean when they say hadoop takes your logic to the data rather than taking your data to the logic. This reduces network traffic as logic is much smaller than data to push through the network.

Java is the default language to write Map/Reduce programs, but you can write in other languages by using the Hadoop Streaming API. However to understand the protocol, its better to look at a program written in Java.




Mapper

The mapper is a function you write to process a single record from the source data. A record in this word count example is a single line from a default 64MB (or less) block of the 200MB text file, constructed as a key/value pair by the hadoop RecordReader. The key is the byte offset of the current line and the value is the entire line as text. So your mapper is called by the hadoop framework for each line in the text file.

You write a class for the mapper by inheriting from the generic Mapper type, imported from the org.apache.hadoop.mapreduce.mapper package. You then override its map method with your own logic. In the map method you write the logic to process a single record from the source data. In the case of the word count program, this is a single line from the text file. The map method takes three parameters. The first being the key, the second being the value and the third being the output collection where the results of the mapper is stored.

In the word count program, the key is the byte offset of each line, and the value is the entire line in the text file starting at the byte offset. The data types of these parameters are hadoop wrapper classes. So if a key is of a primitive type int, then it must be represented by the wrapper type IntWritable, if its long then it would be LongWritable and so on. For the value, which is a line of text of type string, it should be represented by the wrapper type Text. The collection parameter is represented by the type Context.

So in the above example, the RecordReader in the hadoop framework reads a line from a text file block, converts it into a key/value pair, then calls your map method in the mapper class passing the key/value pair into the first and second parameters respectively. Here is the mapper code.

public static class TokenizerMapper
   extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
    }
  }
}

The StringTokenizer breaks the value parameter into separate words, which is then iterated over and each word is added to the output context with a value of 1 for each word. That's all the mapper has to do in the word count program. The framework then does Shuffling and Sorting which is essentially preparing an intermediate data set to be consumed by the reducer.

The following picture helps to understand the shuffling, sorting and reducing process.



Shuffle and Sort

The result of the mapper is an intermediate data set for every data block held in a Context object. For the word count program, the shuffle and sort process, which is handled by the framework, first finds unique words contained in the intermediate data set along with a array of elements representing its total count found. At this stage the unique words become the keys and the array of elements become the value. The framework then sorts these unique keys prior to calling the reducers.

Reducer

The reducer is also a function you write to process each unique word record from the above result set. The reducer is called by the framework for each unique word. Depending on your cluster configuration and the number of data nodes in the cluster, the reducer may get distributed to all the nodes so that just like the mappers, the reducers can run in parallel for all the unique words. Here is the reducer code.

public static class IntSumReducer
   extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
  }
}

The reducer is a class you write by inheriting from the generic Reducer type, imported from the package org.apache.hadoop.mapreduce.reducer. You then override its reduce method and write your own logic to process each unique word sent to it by the framework. The reduce method takes three parameters the first of which is the key. The key in this instance is the unique word resulting from the mapping process. It is represented by the hadoop type Text which is a wrapper type of string. The second parameter collection type of int represented by the hadoop wrapper types IntWritable. This collection contains a element with a value of 1 for each count of the key word found in the source data file. The final parameter is another output collection type represented by the class Context. This is the output collection where the final result of the reduce process will be stored.

In the above word count example, the reducer simply iterates value collection summing each element value of 1 together to give the in a final total count for the unique word represented by the key parameter.

Putting it all together in the main method

The mapper and reducer can be contained in a single class which can be then submitted has a map/reduce job as a jar file. Here is the code for the container class with the main method.

package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

/* the mapper is goes here */

/* the reducer goes here */

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
  System.err.println("Usage: wordcount <in> <out>");
  System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

The word count program takes two parameters in the main method's String[] args parameter. The first is the input path where the text file resides and the second parameter is the output path where the file containing final result is created. Note that one of the requirements of the framework is that the output path should not already be created, otherwise it will throw an error. A new job is created by instantiating a Job class imported from the org.apache.hadoop.mapreduce.Job package. Then this job is configured as shown above with a set of method of the Job class. Then the input and output paths passed into the main method is given to the static methods of the FileInputFormat and FileOutputFormat classes. Then finally the program is held in a wait state until the map/reduce job is completed by hadoop.

3 comments:

  1. Actually, you have explained the technology to the fullest. Thanks for sharing the information you have got. It helped me a lot. I experimented your thoughts in my training program.


    Big Data Training Chennai
    Big Data Training

    ReplyDelete
  2. Actually, you have explained the technology to the fullest. Thanks for sharing the information you have got. It helped me a lot. I experimented your thoughts in my training program.


    Big Data Training
    Big Data Course in Chennai

    ReplyDelete
  3. Actually, you have explained the technology to the fullest. Thanks for sharing the information you have got. It helped me a lot. I experimented your thoughts in my training program.

    Big Data Hadoop Training in Chennai
    Best Hadoop Training in Chennai

    ReplyDelete