Thursday 14 February 2013

Writing MapReduce Program on Hadoop - Context

Map and Reduce

Map Reduce works by breaking the processing into two phases: Map phase and Reduce phase. Each phase has the key-value pair as input and type of key and value can be chosen by the programmer.

Data flow in the Map and Reduce:
Input ==> Map ==> Mapper Output ==> Sort and shuffle ==> Reduce ==> Final Output

Steps to Write the Hadoop Map Reduce in Java

Map Reduce program need three things: Map, Reduce and Some code to run job(Here we will call it as Invoker)

1). Create the Map(Any Name) class and map function was represented by org.apache.hadoop.mapreduce.Mapper.class which declares an abstract map() method.
[code lang="java"]import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Map extends Mapper<;LongWritable, Text, Text,IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
word.set(value.toString());
context.write(word, one);
}
}

[/code]
Explanation:
The Mapper class is the generic class with four formal parameters(input key, input value, output key and output value). Here input key is LongWritable(Long representation by hadoop), input value is the Text(String representation by hadoop), output key is text(keyword) and output value is Intwritable(int representation by hadoop). All above hadoop datatypes are same as java datatype expect that are optimized for network serialization.

2). Create the Reducer(Any Name) class and reduce function was represented by org.apache.hadoop.mapreduce.Reducer.class which declares an abstract reduce() method.
[code lang="java"]import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class Reduce extends Reducer<Text, IntWritable, Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable intWritable : values){
sum += intWritable.get();
}
context.write(key, new IntWritable(sum));
}
}
[/code]
Explanation:
The Reducer class is the generic class with four formal parameters(input key, input value, output key and output value). Here input key and input value type must match with Mapper output, output key is text(keyword) and output value is Intwritable(number of occurence).

3) We are ready with Map and Reduce implementation, then we need to have the invoker for confguring the Hadoop job and invoke the Map Reduce program.
[code lang="java"]import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount{
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.set("fs.default.name", "hdfs://localhost:10011");
configuration.set("mapred.job.tracker","localhost:10012");

Job job = new Job(configuration, "Word Count");

job.setJarByClass(WordCount.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//Submit the job to the cluster and wait for it to finish.
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[/code]
4). Compile code by using following command
mkdir WordCount
javac -classpath ${HADOOP_HOME}/hadoop-0.20.2+228-core.jar -d WordCount path/*.java

5). Create the jar by using command
jar -cvf ~/WordCount.jar -C WordCount/ .

6). Create the input file in the local file system

Eg : mkdir /home/user1/wordcount/input
cd /wordcount/input
gedit file01
gedit file02

and so on..

7). Copy the input file in local file system to HDFS
$HADOOP_HOME/bin/hadoop fs -cp ~/wordcount/input/file01 /home/user1/dfs/input/file01
$HADOOP_HOME/bin/hadoop fs -cp ~/wordcount/input/file02 /home/user1/dfs/input/file02

8). Execute the jar as follows:
$HADOOP_HOME/bin/hadoop jar WordCount.jar WordCount /home/user1/dfs/input /home/user1/dfs/output

9). After execution get completed, below set of commands is used to view the reduce file that are generated
$HADOOP_HOME/bin/hadoop fs -ls /home/user1/dfs/output/

10). To view the output, use this below given command
$HADOOP_HOME/bin/hadoop fs -cat hdfs:///home/user1/dfs/output/part-00000
$HADOOP_HOME/bin/hadoop fs -cat hdfs:///home/user1/dfs/output/part-00001
$HADOOP_HOME/bin/hadoop fs -cat hdfs:///home/user1/dfs/output/part-00002

and so on...

Upcoming Post: Using Distributed Cache in Java Hadoop MapReduce.

1 comment:

  1. [...] Writing MapReduce Program on Hadoop – Context - Random Zone Steps to Write the Hadoop Map Reduce in Java. Map Reduce program need three things: Map, Reduce and Some code to run job(Here we will call it as Invoker Create the Map(Any Name) class and map function was . [...]

    ReplyDelete