The reducer of Hadoop MapReduce program receives an Iterable of inputs with the same key. Most of the times, we iterate through the values once. But there’re always exceptions. What if we want to iterate the values twice?
The code shown below are tested on Hadoop 1.0.3.
0. An example
We’ll use a simplified version of the example in How to Write a Simple MapReduce Program. Suppose we are given several files, each file contains 5 key value pairs. The key indicates a year in the range of [2000,2001], and the value is some random number generated. The task is to find the maximum number for each year. Note that for each file, it may contain values for multiple years.
Below is a script that helps us to generate the input files.
#!/bin/bash
for i in {1..5}
do
echo $(($RANDOM%2+2000)) $RANDOM
done
To generate the input, use the commands below.
$ mkdir input
$ ./rand.sh > input/1.txt
$ ./rand.sh > input/2.txt
1. The wrong code
One might think that iterating the Iterable is easy. We simply iterate the Iterable twice. Like the code below,
private static class MaxNumReducerWrong1 extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext)
throws IOException, InterruptedException {
System.out.println("----------" + pKey + "----------");
for (LongWritable aNum : pValues) {
System.out.println("first iteration: " + aNum);
}
for (LongWritable aNum : pValues) {
System.out.println("second iteration: " + aNum);
}
}
}
However, this won’t work. The output we’ll get are similar to below.
Basically, the second loop is never executed, in other words, the Iterable becomes empty after the first iteration.
One can try using two Iterators. The result is the same.
2. Another Wrong Code
The way to iterate through the Iterable twice is to cache the values, like the code below.
private static class MaxNumReducerWrong2 extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext)
throws IOException, InterruptedException {
System.out.println("----------" + pKey + "----------");
ArrayList<LongWritable> cache = new ArrayList<LongWritable>();
for (LongWritable aNum : pValues) {
System.out.println("first iteration: " + aNum);
cache.add(aNum);
}
int size = cache.size();
for (int i = 0; i < size; ++i) {
System.out.println("second iteration " + i + ": " + cache.get(i));
}
}
}
But this doesn’t work also. We’ll get the output as below.
This indicates the Iterable is giving us the same object reference every time, but with different values. Since we’re caching the reference, all cached reference will be pointing to the same object. At the end of the iteration, the object contains the value of last element. Therefore, our cached references point to the same object, which has the value of the last element.
3. The Code Works
Basically, we’ll need to cache the objects. Below is the code that works.
private static class MaxNumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext)
throws IOException, InterruptedException {
System.out.println("----------" + pKey + "----------");
ArrayList<LongWritable> cache = new ArrayList<LongWritable>();
for (LongWritable aNum : pValues) {
System.out.println("first iteration: " + aNum);
LongWritable writable = new LongWritable();
writable.set(aNum.get());
cache.add(writable);
}
int size = cache.size();
for (int i = 0; i < size; ++i) {
System.out.println("second iteration: " + cache.get(i));
}
}
}
And there is another version using Iterator,
private static class MaxNumReducer2 extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext)
throws IOException, InterruptedException {
System.out.println("----------" + pKey + "----------");
ArrayList<LongWritable> cache = new ArrayList<LongWritable>();
Iterator<LongWritable> iterator = pValues.iterator();
while (iterator.hasNext()) {
LongWritable writable = iterator.next();
System.out.println("MaxNumReducer2 first iteration: " + writable);
cache.add(new LongWritable(writable.get()));
}
int size = cache.size();
for (int i = 0; i < size; ++i) {
System.out.println("MaxNumReducer2 second iteration: " + cache.get(i));
}
}
}
Running either of the reducer will give the result as below,
4. The Complete Code
You can refer below for the complete code.
package test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HadoopReducerIteratorTest {
private static class MaxNumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
public void map(LongWritable pKey, Text pValue, Context pContext)
throws IOException, InterruptedException {
String line = pValue.toString();
String[] tokens = line.split(" ");
Long number = Long.parseLong(tokens[1]);
pContext.write(new Text(tokens[0]), new LongWritable(number));
}
}
private static class MaxNumReducerWrong1 extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext)
throws IOException, InterruptedException {
System.out.println("----------" + pKey + "----------");
for (LongWritable aNum : pValues) {
System.out.println("first iteration: " + aNum);
}
for (LongWritable aNum : pValues) {
System.out.println("second iteration: " + aNum);
}
}
}
private static class MaxNumReducerWrong2 extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext)
throws IOException, InterruptedException {
System.out.println("----------" + pKey + "----------");
ArrayList<LongWritable> cache = new ArrayList<LongWritable>();
for (LongWritable aNum : pValues) {
System.out.println("first iteration: " + aNum);
cache.add(aNum);
}
int size = cache.size();
for (int i = 0; i < size; ++i) {
System.out.println("second iteration " + i + ": " + cache.get(i));
}
}
}
private static class MaxNumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext)
throws IOException, InterruptedException {
System.out.println("----------" + pKey + "----------");
ArrayList<LongWritable> cache = new ArrayList<LongWritable>();
for (LongWritable aNum : pValues) {
System.out.println("first iteration: " + aNum);
LongWritable writable = new LongWritable();
writable.set(aNum.get());
cache.add(writable);
}
int size = cache.size();
for (int i = 0; i < size; ++i) {
System.out.println("second iteration: " + cache.get(i));
}
}
}
private static class MaxNumReducer2 extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext)
throws IOException, InterruptedException {
System.out.println("----------" + pKey + "----------");
ArrayList<LongWritable> cache = new ArrayList<LongWritable>();
Iterator<LongWritable> iterator = pValues.iterator();
while (iterator.hasNext()) {
LongWritable writable = iterator.next();
System.out.println("MaxNumReducer2 first iteration: " + writable);
cache.add(new LongWritable(writable.get()));
}
int size = cache.size();
for (int i = 0; i < size; ++i) {
System.out.println("MaxNumReducer2 second iteration: " + cache.get(i));
}
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Number of arguments: " + args.length);
System.out.println("Usage: MaxNumHadoop <input folder path> <output folder path>");
System.exit(1);
}
Job job = new Job();
job.setJarByClass(HadoopReducerIteratorTest.class);
job.setJobName("Max Number of Year");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxNumMapper.class);
// job.setReducerClass(MaxNumReducerWrong1.class);
// job.setReducerClass(MaxNumReducerWrong2.class);
job.setReducerClass(MaxNumReducer.class);
// job.setReducerClass(MaxNumReducer2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
One comment on “Hadoop Reducer Iterable: How to Iterate Twice”
Leave a Reply Cancel reply
40% Discount on My Book — Android NDK Cookbook
Android NDK Cookbook ebook 40% discount with promotion code MREANC40 at Packt Publishing The promotion code is valid until 15th June.Categories
- Android Apps (18)
- Android Audio Editor (1)
- TS 2 (3)
- Video Converter Android (8)
- Video2Gif (1)
- Android Tutorial (27)
- Android Dev Tools (1)
- API illustrated (8)
- Multimedia API (3)
- ffmpeg on Android (4)
- NDK (6)
- UI (6)
- Animation (2)
- Code Snippet (2)
- Coding Beyond Technique (18)
- a word, a world (4)
- Bug Rectified (4)
- Programming Habit (1)
- Software as a Career (1)
- Software as User Experience (1)
- Compilers and Related (2)
- ELF (2)
- Computer Languages (31)
- C/C++ (13)
- Java (9)
- JavaScript (2)
- PHP (1)
- Python (8)
- Data Structure & Algorithms (29)
- Bits (1)
- Data Structure (5)
- Integers (10)
- BigInteger (1)
- Prime (4)
- Search (3)
- Sorting (5)
- Strings (5)
- Database (1)
- SQLite (1)
- Digital Signal Processing (33)
- Distributed Systems (17)
- Apache Cassandra (6)
- Apache Hadoop (8)
- Apache Avro (3)
- Apache Nutch (3)
- Apache Solr (1)
- Linux Study Notes (40)
- crontab (1)
- Linux Kernel Programming (8)
- Linux Programming (12)
- IPC (2)
- Linux Network Programming (5)
- Linux Signals (2)
- Linux Shell Scripting (1)
- ssh (3)
- Machinery (30)
- misc (1)
- My Ideas (1)
- My Project (3)
- Mobile Caching (1)
- Selective Decoding (2)
- My Publication (1)
- My Readings (1)
- Networking (15)
- Program for Performance (8)
- Uncategorized (1)
- Virtual Machine (2)
- Web Dev (8)
- web components (3)
- Android Apps (18)
Recent Comments
Archives
- May 2013 (2)
- April 2013 (1)
- March 2013 (4)
- December 2012 (2)
- November 2012 (6)
- October 2012 (6)
- September 2012 (3)
- August 2012 (13)
- July 2012 (15)
- June 2012 (3)
- May 2012 (8)
- April 2012 (4)
- March 2012 (13)
- February 2012 (19)
- January 2012 (9)
- December 2011 (11)
- November 2011 (12)
- October 2011 (4)
- September 2011 (12)
- August 2011 (16)
- July 2011 (15)
- June 2011 (6)
- May 2011 (10)
- April 2011 (13)
- March 2011 (20)
- February 2011 (4)
- November 2010 (2)
- May 2010 (1)
- April 2010 (1)
- February 2010 (1)





What if the Iterable pValues ia huge and cannot fit in main memory?