The reducer of Hadoop MapReduce program receives an Iterable of inputs with the same key. Most of the times, we iterate through the values once. But there’re always exceptions. What if we want to iterate the values twice?

The code shown below are tested on Hadoop 1.0.3.

0. An example

We’ll use a simplified version of the example in How to Write a Simple MapReduce Program.  Suppose we are given several files, each file contains 5 key value pairs. The key indicates a year in the range of [2000,2001], and the value is some random number generated. The task is to find the maximum number for each year. Note that for each file, it may contain values for multiple years.

Below is a script that helps us to generate the input files.

#!/bin/bash
for i in {1..5}
do
   echo $(($RANDOM%2+2000)) $RANDOM
done

To generate the input, use the commands below.

$ mkdir input
$ ./rand.sh > input/1.txt
$ ./rand.sh > input/2.txt

1. The wrong code
One might think that iterating the Iterable is easy. We simply iterate the Iterable twice. Like the code below,

private static class MaxNumReducerWrong1 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

              for (LongWritable aNum : pValues) {

                    System.out.println("first iteration: " + aNum);

                }

                for (LongWritable aNum : pValues) {

                    System.out.println("second iteration: " + aNum);

                }

          }

}

However, this won’t work. The output we’ll get are similar to below.

Basically, the second loop is never executed, in other words, the Iterable becomes empty after the first iteration.

One can try using two Iterators. The result is the same.

2. Another Wrong Code

The way to iterate through the Iterable twice is to cache the values, like the code below.

private static class MaxNumReducerWrong2 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

              for (LongWritable aNum : pValues) {

                System.out.println("first iteration: " + aNum);

                cache.add(aNum);

            }

              int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("second iteration " + i + ": " + cache.get(i));

            }

          }

    }

But this doesn’t work also. We’ll get the output as below.
This indicates the Iterable is giving us the same object reference every time, but with different values. Since we’re caching the reference, all cached reference will be pointing to the same object. At the end of the iteration, the object contains the value of last element. Therefore, our cached references point to the same object, which has the value of the last element.

3. The Code Works

Basically, we’ll need to cache the objects. Below is the code that works.

private static class MaxNumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

            for (LongWritable aNum : pValues) {

                System.out.println("first iteration: " + aNum);

                LongWritable writable = new LongWritable();

                writable.set(aNum.get());

                cache.add(writable);

            }

            int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("second iteration: " + cache.get(i));

            }

        }

    }

And there is another version using Iterator,

private static class MaxNumReducer2 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

            Iterator<LongWritable> iterator = pValues.iterator();

            while (iterator.hasNext()) {

                LongWritable writable = iterator.next();

                System.out.println("MaxNumReducer2 first iteration: " + writable);

                cache.add(new LongWritable(writable.get()));

            }

            int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("MaxNumReducer2 second iteration: " + cache.get(i));

            }

        }

    }

Running either of the reducer will give the result as below,

4. The Complete Code

You can refer below for the complete code.

package test;

 

import java.io.IOException;

import java.util.ArrayList;

import java.util.Iterator;

 

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class HadoopReducerIteratorTest {

 

    private static class MaxNumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        @Override

        public void map(LongWritable pKey, Text pValue, Context pContext) 

            throws IOException, InterruptedException {

            String line = pValue.toString();

            String[] tokens = line.split(" ");

            Long number = Long.parseLong(tokens[1]);

            pContext.write(new Text(tokens[0]), new LongWritable(number));

        }

    }

    

    private static class MaxNumReducerWrong1 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

              for (LongWritable aNum : pValues) {

                System.out.println("first iteration: " + aNum);

            }

            for (LongWritable aNum : pValues) {

                System.out.println("second iteration: " + aNum);

            }

          }

    }

    

    private static class MaxNumReducerWrong2 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

              for (LongWritable aNum : pValues) {

                System.out.println("first iteration: " + aNum);

                cache.add(aNum);

            }

              int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("second iteration " + i + ": " + cache.get(i));

            }

          }

    }

    

    private static class MaxNumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

            for (LongWritable aNum : pValues) {

                System.out.println("first iteration: " + aNum);

                LongWritable writable = new LongWritable();

                writable.set(aNum.get());

                cache.add(writable);

            }

            int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("second iteration: " + cache.get(i));

            }

        }

    }

    

    private static class MaxNumReducer2 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

            Iterator<LongWritable> iterator = pValues.iterator();

            while (iterator.hasNext()) {

                LongWritable writable = iterator.next();

                System.out.println("MaxNumReducer2 first iteration: " + writable);

                cache.add(new LongWritable(writable.get()));

            }

            int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("MaxNumReducer2 second iteration: " + cache.get(i));

            }

        }

    }

    

    public static void main(String[] args) throws Exception {

        if (args.length != 2) {

            System.out.println("Number of arguments: " + args.length);

            System.out.println("Usage: MaxNumHadoop <input folder path> <output folder path>");

            System.exit(1);

        }

        Job job = new Job();

        job.setJarByClass(HadoopReducerIteratorTest.class);

        job.setJobName("Max Number of Year");

        

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        

        job.setMapperClass(MaxNumMapper.class);

//        job.setReducerClass(MaxNumReducerWrong1.class);

//        job.setReducerClass(MaxNumReducerWrong2.class);

        job.setReducerClass(MaxNumReducer.class);

//        job.setReducerClass(MaxNumReducer2.class);

        

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(LongWritable.class);

        

        System.exit(job.waitForCompletion(true)?0:1);

    }

 

}

 

One comment on “Hadoop Reducer Iterable: How to Iterate Twice

  1. Sunayan Saikia on said:

    What if the Iterable pValues ia huge and cannot fit in main memory?

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Set your Twitter account name in your settings to use the TwitterBar Section.