Apache Avro: Java Generic Mapping

Previous post covers Java code generation and specific mapping for Apache Avro. This post discusses using Apache Avro without code generation. This is used when the schema is not known before runtime. It’s called generic mapping in Java.

We’ll use the same schema as previous post, which is shown below.

{

  "namespace": "avro", 

  "type": "record", 

  "name": "DemoRecord",

  "aliases": ["LinkedLongs"],                      // old name for this

  "fields" : [

      {"name": "desp", "type": "string"},

    {"name": "value", "type": "int"},             // each element has a long

    {"name": "next", "type": ["DemoRecord", "null"]} // optional next element

  ]

}

To serialize the data and store it to a file, we use the code below.

private static void writeData() throws IOException {

    FileOutputStream out = new FileOutputStream(FNAME);

    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);

    GenericDatumWriter writer = new GenericDatumWriter(schema);

     

    GenericRecord record1 = new GenericData.Record(schema);

    record1.put("desp", new Utf8("record 1"));

    record1.put("value", 1);

    record1.put("next", null);

     

    GenericRecord record2 = new GenericData.Record(schema);

    record2.put("desp", new Utf8("record 2"));

    record2.put("value", 2);

    record2.put("next", record1);

     

    GenericRecord record3 = new GenericData.Record(schema);

    record3.put("desp", new Utf8("record 3"));

    record3.put("value", 3);

    record3.put("next", record2);

     

    writer.write(record3, encoder);

    encoder.flush();

}

The data will be stored in a file named test1. We can do a comparison with the file generated in previous post named test.

diff test test1

To deserialize the data, we use the code below.

private static void readData() throws IOException {

    FileInputStream in = new FileInputStream(FNAME);

    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);

    GenericDatumReader<GenericRecord> reader = 

            new GenericDatumReader<GenericRecord>(schema);

    

    GenericRecord record = reader.read(null, decoder);

    System.out.println(record.get("desp") + ":" + record.get("value"));

    

    while (record.get("next") != null) {

        record = (GenericRecord) record.get("next");

        System.out.println(record.get("desp") + ":" + record.get("value"));

    }

}

The output of running the code is the same as previous post, shown as below.

record 3:3
record 2:2
record 1:1

You can download the source code here.

Apache Avro: Java Code Generation and Specific Mapping

Although code generation is not required for using Apache Avro, Java and C++ implementation can generate code to represent data for Avro schema. If we have the schema before read or write data, code generation can optimize the performance. In Java, this is called the specific mapping.

1. Code Generation

Suppose we have a schema file demorecord.avsc as below.

{

  "namespace": "avro", 

  "type": "record", 

  "name": "DemoRecord",

  "aliases": ["LinkedLongs"],                      // old name for this

  "fields" : [

      {"name": "desp", "type": "string"},

        {"name": "value", "type": "int"},             // each element has a long

        {"name": "next", "type": ["DemoRecord", "null"]} // optional next element

  ]

}

We can use the following command to generate the Java class.

java -cp <path_to_avro_tools>/avro-tools-1.6.3.jar org.apache.avro.tool.Main compile schema <schema_file> <output_directory>

In our example, we have the avro-tools-1.6.3.jar in the local directory and we simply output to the current directory. Then the command is as below.

java -cp avro-tools-1.6.3.jar org.apache.avro.tool.Main compile schema demorecord.avsc .

The generated code is as below.

/**

 * Autogenerated by Avro

 * 

 * DO NOT EDIT DIRECTLY

 */

package avro;  

@SuppressWarnings("all")

public class DemoRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {

  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{"type":"record","name":"DemoRecord","namespace":"avro","fields":[{"name":"desp","type":"string"},{"name":"value","type":"int"},{"name":"next","type":["DemoRecord","null"]}],"aliases":["LinkedLongs"]}");

  @Deprecated public java.lang.CharSequence desp;

  @Deprecated public int value;

  @Deprecated public avro.DemoRecord next;

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }

  // Used by DatumWriter.  Applications should not call. 

  public java.lang.Object get(int field$) {

    switch (field$) {

    case 0: return desp;

    case 1: return value;

    case 2: return next;

    default: throw new org.apache.avro.AvroRuntimeException("Bad index");

    }

  }

  // Used by DatumReader.  Applications should not call. 

  @SuppressWarnings(value="unchecked")

  public void put(int field$, java.lang.Object value$) {

    switch (field$) {

    case 0: desp = (java.lang.CharSequence)value$; break;

    case 1: value = (java.lang.Integer)value$; break;

    case 2: next = (avro.DemoRecord)value$; break;

    default: throw new org.apache.avro.AvroRuntimeException("Bad index");

    }

  }

 

  /**

   * Gets the value of the 'desp' field.

   */

  public java.lang.CharSequence getDesp() {

    return desp;

  }

 

  /**

   * Sets the value of the 'desp' field.

   * @param value the value to set.

   */

  public void setDesp(java.lang.CharSequence value) {

    this.desp = value;

  }

 

  /**

   * Gets the value of the 'value' field.

   */

  public java.lang.Integer getValue() {

    return value;

  }

 

  /**

   * Sets the value of the 'value' field.

   * @param value the value to set.

   */

  public void setValue(java.lang.Integer value) {

    this.value = value;

  }

 

  /**

   * Gets the value of the 'next' field.

   */

  public avro.DemoRecord getNext() {

    return next;

  }

 

  /**

   * Sets the value of the 'next' field.

   * @param value the value to set.

   */

  public void setNext(avro.DemoRecord value) {

    this.next = value;

  }

 

  /** Creates a new DemoRecord RecordBuilder */

  public static avro.DemoRecord.Builder newBuilder() {

    return new avro.DemoRecord.Builder();

  }

  

  /** Creates a new DemoRecord RecordBuilder by copying an existing Builder */

  public static avro.DemoRecord.Builder newBuilder(avro.DemoRecord.Builder other) {

    return new avro.DemoRecord.Builder(other);

  }

  

  /** Creates a new DemoRecord RecordBuilder by copying an existing DemoRecord instance */

  public static avro.DemoRecord.Builder newBuilder(avro.DemoRecord other) {

    return new avro.DemoRecord.Builder(other);

  }

  

  /**

   * RecordBuilder for DemoRecord instances.

   */

  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<DemoRecord>

    implements org.apache.avro.data.RecordBuilder<DemoRecord> {

 

    private java.lang.CharSequence desp;

    private int value;

    private avro.DemoRecord next;

 

    /** Creates a new Builder */

    private Builder() {

      super(avro.DemoRecord.SCHEMA$);

    }

    

    /** Creates a Builder by copying an existing Builder */

    private Builder(avro.DemoRecord.Builder other) {

      super(other);

    }

    

    /** Creates a Builder by copying an existing DemoRecord instance */

    private Builder(avro.DemoRecord other) {

            super(avro.DemoRecord.SCHEMA$);

      if (isValidValue(fields()[0], other.desp)) {

        this.desp = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.desp);

        fieldSetFlags()[0] = true;

      }

      if (isValidValue(fields()[1], other.value)) {

        this.value = (java.lang.Integer) data().deepCopy(fields()[1].schema(), other.value);

        fieldSetFlags()[1] = true;

      }

      if (isValidValue(fields()[2], other.next)) {

        this.next = (avro.DemoRecord) data().deepCopy(fields()[2].schema(), other.next);

        fieldSetFlags()[2] = true;

      }

    }

 

    /** Gets the value of the 'desp' field */

    public java.lang.CharSequence getDesp() {

      return desp;

    }

    

    /** Sets the value of the 'desp' field */

    public avro.DemoRecord.Builder setDesp(java.lang.CharSequence value) {

      validate(fields()[0], value);

      this.desp = value;

      fieldSetFlags()[0] = true;

      return this; 

    }

    

    /** Checks whether the 'desp' field has been set */

    public boolean hasDesp() {

      return fieldSetFlags()[0];

    }

    

    /** Clears the value of the 'desp' field */

    public avro.DemoRecord.Builder clearDesp() {

      desp = null;

      fieldSetFlags()[0] = false;

      return this;

    }

 

    /** Gets the value of the 'value' field */

    public java.lang.Integer getValue() {

      return value;

    }

    

    /** Sets the value of the 'value' field */

    public avro.DemoRecord.Builder setValue(int value) {

      validate(fields()[1], value);

      this.value = value;

      fieldSetFlags()[1] = true;

      return this; 

    }

    

    /** Checks whether the 'value' field has been set */

    public boolean hasValue() {

      return fieldSetFlags()[1];

    }

    

    /** Clears the value of the 'value' field */

    public avro.DemoRecord.Builder clearValue() {

      fieldSetFlags()[1] = false;

      return this;

    }

 

    /** Gets the value of the 'next' field */

    public avro.DemoRecord getNext() {

      return next;

    }

    

    /** Sets the value of the 'next' field */

    public avro.DemoRecord.Builder setNext(avro.DemoRecord value) {

      validate(fields()[2], value);

      this.next = value;

      fieldSetFlags()[2] = true;

      return this; 

    }

    

    /** Checks whether the 'next' field has been set */

    public boolean hasNext() {

      return fieldSetFlags()[2];

    }

    

    /** Clears the value of the 'next' field */

    public avro.DemoRecord.Builder clearNext() {

      next = null;

      fieldSetFlags()[2] = false;

      return this;

    }

 

    @Override

    public DemoRecord build() {

      try {

        DemoRecord record = new DemoRecord();

        record.desp = fieldSetFlags()[0] ? this.desp : (java.lang.CharSequence) defaultValue(fields()[0]);

        record.value = fieldSetFlags()[1] ? this.value : (java.lang.Integer) defaultValue(fields()[1]);

        record.next = fieldSetFlags()[2] ? this.next : (avro.DemoRecord) defaultValue(fields()[2]);

        return record;

      } catch (Exception e) {

        throw new org.apache.avro.AvroRuntimeException(e);

      }

    }

  }

}

Another way for code generation is to use maven. Below is the pom.xml file that can be used to generate Java code for all schemas under src/main/avro directory relative to our project, and the output directory will be target/generated-sources/avro/ directory relative to our project.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

 

    <groupId>avro</groupId>

    <artifactId>avroDemo</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <packaging>jar</packaging>

 

    <name>avroDemo</name>

    <url>http://maven.apache.org</url>

 

    <properties>

        <java.version>1.6</java.version>

        <maven.compiler.source>${java.version}</maven.compiler.source>

        <maven.compiler.target>${java.version}</maven.compiler.target>

 

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    </properties>

 

    <build>

        <plugins>

            <plugin>

                <groupId>org.apache.avro</groupId>

                <artifactId>avro-maven-plugin</artifactId>

                <version>1.7.1</version>

                <configuration>

                </configuration>

                <executions>

                    <execution>

                        <phase>generate-sources</phase>

                        <goals>

                            <goal>schema</goal>

                        </goals>

                        <configuration>

                            <sourceDirectory>src/main/avro</sourceDirectory>

                            <outputDirectory>target/generated-sources/avro/</outputDirectory>

                        </configuration>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

 

 

    <dependencies>

        <dependency>

            <groupId>org.apache.avro</groupId>

            <artifactId>avro</artifactId>

            <version>1.7.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.avro</groupId>

            <artifactId>avro-tools</artifactId>

            <version>1.7.1</version>

        </dependency>

    </dependencies>

</project>

With this pom.xml file, we can type “mvn compile” command to generate the Java code for the schema at src/main/avro folder.

2. Use the Generated Code
We can use the generated code to serialize data and store into a file. Below is the sample code.

private static void writeData() throws IOException {

    FileOutputStream out = new FileOutputStream(FNAME);

    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);

    SpecificDatumWriter<DemoRecord> writer = 

            new SpecificDatumWriter<DemoRecord>(DemoRecord.class);

     

    DemoRecord record1 = new DemoRecord();

    record1.desp = new Utf8("record 1");

    record1.value = 1;

    record1.next = null;

     

    DemoRecord record2 = new DemoRecord();

    record2.desp = new Utf8("record 2");

    record2.value = 2;

    record2.next = record1;

     

    DemoRecord record3 = new DemoRecord();

    record3.desp = new Utf8("record 3");

    record3.value = 3;

    record3.next = record2;

     

    writer.write(record3, encoder);

    encoder.flush();

}

Note that there’s no need for us to write all three records into the file since record3 links back to record2, which subsequently links back to record1. Avro will track the linked records and serialize all the objects.

We can later on de-serialize the data as shown below.

private static void readData() throws IOException {

    FileInputStream in = new FileInputStream(FNAME);

    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);

    SpecificDatumReader<DemoRecord> reader = 

            new SpecificDatumReader<DemoRecord>(DemoRecord.class);

    

    DemoRecord record = new DemoRecord();

    reader.read(record, decoder);

    System.out.println(record.desp + ":" + record.value);

    

    while (record.next != null) {

        record = record.next;

        System.out.println(record.desp + ":" + record.value);

    }

}

The output of running the code is as below.

record 3:3
record 2:2
record 1:1

You can download the source code here.

Apache Avro: Understand Schema

Apache Avro is a serialization framework that designed with compact, fast, extensibility and interoperability in mind. It is first started to provide better serialization mechanism for Apache Hadoop, the open source distributed computing framework.

Avro provides mechanisms to store object data or sending it over the network for RPC. In both case, the data is always serialized with its schema. Because the schema is always present at serialization and deserialization, there’s no need to tag the data. This can result in more compact serialized data. This post covers the Avro schema.

Avro schema defines how the data associated with it should be serialized and deserialized. It is represented in JSON format.

0. Primitive Types

Avro defines eight primitive types, including null (not value), boolean (a binary value), int (32-bit signed integer), long (64-bit signed integer), float (32-bit floating point), double (64-bit floating point), bytes (sequence of 8-bit unsigned bytes), and string (unicode character sequence). Primitive types have no attributes.

Note that Avro types are language independent. Different language can have different representation for the same Avro data type. For instance, the double type is represented as double in C/C++ and Java, but as float in Python.

1. Complex Types

Six complex types are supported, including records, enums, arrays, maps, unions and fixed. Complex types has attributes, and can be formed  by primitive types or complex types.

Record: record is the most commonly used complex type. An example of a record defined in a schema file is shown as below.

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoRecord",

 "aliases": ["LinkedRecord"],                      // old name for this

 "fields" : [

{"name": "desp", "type": "string"},

{"name": "value", "type": "long"},             // each element has a long

{"name": "next", "type": ["DemoRecord", "null"]} // optional next element

 ]

}

The following attributes are required in a record schema.

  • name: the name of the record
  • type: a JSON string “record”
  • fields: a JSON array describing the data fields used to form the record. A data record can have the following attributes.
    • name: required. Indicating the name of the field.
    • type: required. Indicating the type of the field. It can be primitive or complex type.
    • default: optional. However, this is required when we deserialize data that doesn’t have this field.
    • order: optional. Specify how this field affect the sort ordering of the record.
    • doc: optional. a JSON string describing the field.
    • aliases: optional. a JSON array of strings, providing alternative names for this field.

The following attributes are optional for a record schema.

  • namespace: used to avoid name conflict. corresponds to Java package.
  • doc: describing the record
  • aliases: a JSON array of strings. indicating alternate names

Enum: An example of enum defined in a schema file is shown as below.

{ 

 "namespace": "avro", 

 "type": "enum",

 "name": "DemoEnum",

 "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]

}

It has the following required attributes.

  • name: the name of the enum
  • type: a JSON string “enum”
  • symbols: a JSON array of JSON strings, indicating all symbols defined for the enum. The symbols are required to be unique.

Similar to records, namespace, doc and aliases are optional.
Arrays: An example of a record containing array as one of its fields are as below,

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoArray",

 "fields" : [

{"name": "anArr", "type": {"type": "array", "items": "string"}}

]

}

The following attributes are required.

  • type: must be a JSON string “array”
  • items: the schema of the array’s items. In the example above, the items are string.

Map: An example of a schema file defining a record with a map as one of its fields are as below.

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoMap",

 "fields" : [

{"name": "aMap", "type": {"type": "map", "values": "long"}}

 ]

}

The following attributes are required.

  • type: a JSON string “map”
  • values: the schema of the map’s values. The keys are assumed to be strings. In the example above, the values are of long type.

Union: Below is a sample schema of a record containing a union as one of its fields.

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoUnion",

 "fields" : [

{"name": "aUnion", "type": ["string", "null"]}

 ]

}

Unions are defined as a JSON array. In the example above, the field aUnion can either be a string or null.

Fixed: fixed define a data of fixed number of bytes. Below is a sample schema of a fixed.

{"namespace": "avro", "type": "fixed", "size": 16, "name": "DemoFixed"}

It has the following required attributes.

  • name: a JSON string contains the name of the fixed.
  • type: a JSON string “fixed”
  • size: the number of bytes in a value of the fixed type.

It has the following optional attributes.

  • namespace: similar to record.
  • aliases: similar to record.

References:
Apache Avro specification 1.7.1: http://avro.apache.org/docs/current/spec.html

Hadoop Reducer Iterable: How to Iterate Twice

The reducer of Hadoop MapReduce program receives an Iterable of inputs with the same key. Most of the times, we iterate through the values once. But there’re always exceptions. What if we want to iterate the values twice?

The code shown below are tested on Hadoop 1.0.3.

0. An example

We’ll use a simplified version of the example in How to Write a Simple MapReduce Program.  Suppose we are given several files, each file contains 5 key value pairs. The key indicates a year in the range of [2000,2001], and the value is some random number generated. The task is to find the maximum number for each year. Note that for each file, it may contain values for multiple years.

Below is a script that helps us to generate the input files.

#!/bin/bash
for i in {1..5}
do
   echo $(($RANDOM%2+2000)) $RANDOM
done

To generate the input, use the commands below.

$ mkdir input
$ ./rand.sh > input/1.txt
$ ./rand.sh > input/2.txt

1. The wrong code
One might think that iterating the Iterable is easy. We simply iterate the Iterable twice. Like the code below,

private static class MaxNumReducerWrong1 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

              for (LongWritable aNum : pValues) {

                    System.out.println("first iteration: " + aNum);

                }

                for (LongWritable aNum : pValues) {

                    System.out.println("second iteration: " + aNum);

                }

          }

}

However, this won’t work. The output we’ll get are similar to below.

Basically, the second loop is never executed, in other words, the Iterable becomes empty after the first iteration.

One can try using two Iterators. The result is the same.

2. Another Wrong Code

The way to iterate through the Iterable twice is to cache the values, like the code below.

private static class MaxNumReducerWrong2 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

              for (LongWritable aNum : pValues) {

                System.out.println("first iteration: " + aNum);

                cache.add(aNum);

            }

              int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("second iteration " + i + ": " + cache.get(i));

            }

          }

    }

But this doesn’t work also. We’ll get the output as below.
This indicates the Iterable is giving us the same object reference every time, but with different values. Since we’re caching the reference, all cached reference will be pointing to the same object. At the end of the iteration, the object contains the value of last element. Therefore, our cached references point to the same object, which has the value of the last element.

3. The Code Works

Basically, we’ll need to cache the objects. Below is the code that works.

private static class MaxNumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

            for (LongWritable aNum : pValues) {

                System.out.println("first iteration: " + aNum);

                LongWritable writable = new LongWritable();

                writable.set(aNum.get());

                cache.add(writable);

            }

            int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("second iteration: " + cache.get(i));

            }

        }

    }

And there is another version using Iterator,

private static class MaxNumReducer2 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

            Iterator<LongWritable> iterator = pValues.iterator();

            while (iterator.hasNext()) {

                LongWritable writable = iterator.next();

                System.out.println("MaxNumReducer2 first iteration: " + writable);

                cache.add(new LongWritable(writable.get()));

            }

            int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("MaxNumReducer2 second iteration: " + cache.get(i));

            }

        }

    }

Running either of the reducer will give the result as below,

4. The Complete Code

You can refer below for the complete code.

package test;

 

import java.io.IOException;

import java.util.ArrayList;

import java.util.Iterator;

 

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class HadoopReducerIteratorTest {

 

    private static class MaxNumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        @Override

        public void map(LongWritable pKey, Text pValue, Context pContext) 

            throws IOException, InterruptedException {

            String line = pValue.toString();

            String[] tokens = line.split(" ");

            Long number = Long.parseLong(tokens[1]);

            pContext.write(new Text(tokens[0]), new LongWritable(number));

        }

    }

    

    private static class MaxNumReducerWrong1 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

              for (LongWritable aNum : pValues) {

                System.out.println("first iteration: " + aNum);

            }

            for (LongWritable aNum : pValues) {

                System.out.println("second iteration: " + aNum);

            }

          }

    }

    

    private static class MaxNumReducerWrong2 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

              for (LongWritable aNum : pValues) {

                System.out.println("first iteration: " + aNum);

                cache.add(aNum);

            }

              int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("second iteration " + i + ": " + cache.get(i));

            }

          }

    }

    

    private static class MaxNumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

            for (LongWritable aNum : pValues) {

                System.out.println("first iteration: " + aNum);

                LongWritable writable = new LongWritable();

                writable.set(aNum.get());

                cache.add(writable);

            }

            int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("second iteration: " + cache.get(i));

            }

        }

    }

    

    private static class MaxNumReducer2 extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override

        public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

                throws IOException, InterruptedException {

            System.out.println("----------" + pKey + "----------");

            ArrayList<LongWritable> cache = new ArrayList<LongWritable>();

            Iterator<LongWritable> iterator = pValues.iterator();

            while (iterator.hasNext()) {

                LongWritable writable = iterator.next();

                System.out.println("MaxNumReducer2 first iteration: " + writable);

                cache.add(new LongWritable(writable.get()));

            }

            int size = cache.size();

            for (int i = 0; i < size; ++i) {

                System.out.println("MaxNumReducer2 second iteration: " + cache.get(i));

            }

        }

    }

    

    public static void main(String[] args) throws Exception {

        if (args.length != 2) {

            System.out.println("Number of arguments: " + args.length);

            System.out.println("Usage: MaxNumHadoop <input folder path> <output folder path>");

            System.exit(1);

        }

        Job job = new Job();

        job.setJarByClass(HadoopReducerIteratorTest.class);

        job.setJobName("Max Number of Year");

        

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        

        job.setMapperClass(MaxNumMapper.class);

//        job.setReducerClass(MaxNumReducerWrong1.class);

//        job.setReducerClass(MaxNumReducerWrong2.class);

        job.setReducerClass(MaxNumReducer.class);

//        job.setReducerClass(MaxNumReducer2.class);

        

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(LongWritable.class);

        

        System.exit(job.waitForCompletion(true)?0:1);

    }

 

}

Specify the Main Class in a jar File for Hadoop

Hadoop support execution of jar file. For an executable jar file in normal java execution, one can specify the main class in the command line, as covered in my previous post: switch between main classes in a jar file.

However, the rules are a bit different for executable jar file running with hadoop. Basically the following rules hold (I tested on Hadoop 1.0.3),

  1. If a jar file contains a main class specified in its manifest file, hadoop will take the main class even if the command specify another main class. This is different from normal java execution where we can specify a main class to overwrite the one in the manifest file.
  2. If a jar file does not contain a main class in manifest file, hadoop allows us to specify the main class.

At eclipse, when one export a project as runnable jar file, it always ask for a main class at Launch configuration, shown as below,

tar

The main class selected will be put in the manifest file. Below is the content of the META-INF/MANIFEST.MF file in my helloworld project where the main class is set to HelloWorld.

Manifest-Version: 1.0
Class-Path: .
Main-Class: hello.HelloWorld

One can browse the jar file using a file extractor, open the manifest file using a file editor, and simply delete the last line to remove the main class configuration, and save the changes to the jar file when prompted. This will create a runnable jar file without main class.

The modified jar file can then be used in Hadoop with user supplied main class configuration, as shown in the sample command below,

$ hadoop jar hello.jar hello.HelloWorld

Apache Hadoop — How to Write a Simple MapReduce Program Part 2 Combiner

Please read previous post first.

1. Combiner

The output of mappers are sent to reducers for reduce phase. Because each reducer usually takes a portion of data from all mappers, the data transferred can be big sometimes. Hadoop MapReduce allows developers to specify an optional combiner function to process the map output at mapper before data are transferred to reducer.

Note that the combiner function is optional and Hadoop does not guarantee whether and how many times it will call the combiner. Therefore, it is the developer’s responsibility to make sure the output of mapper and combiner is what reducers want, no matter how many times combiner runs.

In programming, the combiner function is specified using the Reducer class, same as reducer. In the example at Part 1, we can use the same code defined for reducer to process the mapper output at each mapper and then send the output to reducer. The example below shows why this works.

Suppose we have two mappers, the outputs for them without combiner are,

2001 31432
2001 23
2001 3234
2002 1234
2002 2134

and

2001 1234
2001 21343
2002 2
2002 12344
2002 34323

With the combiner added, the output of combiner at each mapper will be,

2001 31432
2002 2134

and

2001 21343
2002 34323

Instead of sending 10 records to reducer, we only need to send 4 records. Therefore it reduces bandwidth consumption and improves performance.

2. Changes to the Code

The only change is to add the following line to the Configuration code at MaxNumHadoop.java.

job.setCombinerClass(MaxNumReducer.class);

The code is as below,

package roman10.hadoop.maxnum;

 

import java.io.IOException;

 

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class MaxNumHadoop {

    public static void main(String[] args) throws Exception {

        if (args.length != 2) {

            System.out.println("Number of arguments: " + args.length);

            System.out.println("Usage: MaxNumHadoop <input folder path> <output folder path>");

            System.exit(1);

        }

        Job job = new Job();

        job.setJarByClass(MaxNumHadoop.class);

        job.setJobName("Max Number of Year");

        

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        

        job.setMapperClass(MaxNumMapper.class);

        job.setCombinerClass(MaxNumReducer.class);

        job.setReducerClass(MaxNumReducer.class);

        

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(LongWritable.class);

        

        System.exit(job.waitForCompletion(true)?0:1);

    }

}

Apache Hadoop — How to Write a Simple MapReduce Program

This post covers how to write a simple MapReduce program with an example. The Hadoop new API is used. For the example given in this post, I’ve run it on Apache Hadoop 1.0.3.

0. MapReduce

MapReduce, as its name suggests, consists of two phases: map and reduce. Users can specify a map function at map phase and a reduce function at reduce phase. Each phase uses key value pairs as input and output with data types specified by users.

Hadoop MapReduce is designed to scale to thousands of nodes, formed by a jobtracker and a number of tasktrackers. Tasktrackers run the processing and jobtracker coordinates the system and distribute the tasks.

At map phase, it is preferred that a tasktracker uses the input stored locally or close to it, this can help reduce data communication and improve performance. The task writes map output to its local disk, because the output is intermediate and replicates it (like storing in HDFS) will be inefficient. If a node fails before reduce phase obtains the data from the node, the jobtracker will rerun the map task on another node to reproduce the data. The output of map is sorted according to the key.

At reduce phase, the input to a single reduce node is usually from all map nodes. The data are merged according to the key before processing. The output is usually replicated (like storing in HDFS) for reliability. When multiple reducers exist, the map tasks partition their output, and each creates one partition for each reducer.

1. An Example

Suppose we are given several files, each file contains 10K key value pairs. The key indicates a year in the range of [2000,2011], and the value is some random number generated. The task is to find the maximum number for each year. Note that for each file, it may contain values for multiple years.

The script below can help to generate the input.

#!/bin/bash

for i in {1..10000}

do

    echo $(($RANDOM%12+2000)) $RANDOM

done

To generate a test input of 5 files, use the command below,

$ mkdir input
$ ./rand.sh > input/1.txt
$ ./rand.sh > input/2.txt
$ ./rand.sh > input/3.txt
$ ./rand.sh > input/4.txt
$ ./rand.sh > input/5.txt

2. Write the Program

A typical MapReduce program consists of 3 parts, the map, the reduce and the code to configure and run the job.

For the example above, the input key and value for map phase will be line number and line content in the input text file. The output key and value will be year and the corresponding random number. At reduce phase, we examine all values for each year to find the maximum value at that year.

Below are the code for mapper, reducer and code to configure and run the job.

Mapper

package roman10.hadoop.maxnum;

 

import java.io.IOException;

 

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class MaxNumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    @Override

    public void map(LongWritable pKey, Text pValue, Context pContext) 

        throws IOException, InterruptedException {

        String line = pValue.toString();

        String[] tokens = line.split(" ");

        Long number = Long.parseLong(tokens[1]);

        pContext.write(new Text(tokens[0]), new LongWritable(number));

    }

}

Reducer

package roman10.hadoop.maxnum;

 

import java.io.IOException;

 

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class MaxNumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override

    public void reduce(Text pKey, Iterable<LongWritable> pValues, Context pContext) 

            throws IOException, InterruptedException {

        long maxNum = Long.MIN_VALUE;

        for (LongWritable aNum : pValues) {

            maxNum = Math.max(maxNum, aNum.get());

        }

        pContext.write(pKey, new LongWritable(maxNum));

    }

}

Code to Configure and Run

package roman10.hadoop.maxnum;

 

import java.io.IOException;

 

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class MaxNumHadoop {

    public static void main(String[] args) throws Exception {

        if (args.length != 2) {

            System.out.println("Number of arguments: " + args.length);

            System.out.println("Usage: MaxNumHadoop <input folder path> <output folder path>");

            System.exit(1);

        }

        Job job = new Job();

        job.setJarByClass(MaxNumHadoop.class);

        job.setJobName("Max Number of Year");

        

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        

        job.setMapperClass(MaxNumMapper.class);

        job.setReducerClass(MaxNumReducer.class);

        

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(LongWritable.class);

        

        System.exit(job.waitForCompletion(true)?0:1);

    }

}

3. Compilation and Execution

To compile the Java files and create a jar file, follow the commands below,

$ mkdir temp
$ javac -classpath <path-to-hadoop>/hadoop-core-*.jar -d temp *.java
$ jar -cvf MaxNumHadoop.jar -C temp/ .

To run the Hadoop MapReduce job, follow the commands below,s

$ hadoop jar MaxNumHadoop.jar roman10.hadoop.maxnum.MaxNumHadoop input output

To check if the job completes successfully, use the command below,

$ ls output/

If it runs successfully, we’ll see a file named _SUCCESS. There should be another file contains the final output. To check the final output,

$ cat output/part-r-00000

In my sample run, the output is as below,

2000 32748
2001 32750
2002 32764
2003 32767
2004 32765
2005 32763
2006 32761
2007 32756
2008 32765
2009 32763
2010 32755
2011 32764

4. Troubleshoot

4.1. Error JAVA_HOME is not set

You can set it using the following command,

export JAVA_HOME=/usr/lib/jvm/java-6-sun

Alternatively, you can append the line to ~/.bashrc and open another terminal.

4.2 Output Directory Exists

Make sure output directory does not exist before you run Hadoop. Otherwise, Hadoop will complain and ends execution. This is to prevent carelessly overwriting the data in an existing directory.

There is another post about adding a combiner to the code.

Apache Hadoop — How to Set Up for Standalone Mode on Ubuntu

Set up Apache Hadoop running on localhost is easy. Below are the steps to follow,

0. Prerequisites
0.1 Make sure Java JDK 6 or later is installed.

Since JDK comes with Java compiler javac, we can check by issuing the following command at the terminal,

$ javac -version

On my machine, it prints the following,

javac 1.6.0_26

This means JDK 6 is installed. If the output shows javac cannot be found, then it means JDK is not installed.  To install Oracle JDK 6

$ sudo apt-get install sun-java6-jdk

or if you prefer OpenJDK,

$ sudo apt-get install openjdk-6-jdk

0.2 Make sure ssh and rsync are installed.

Note that this is not required for running Hadoop in standalone mode, but it is required for other modes.

If ssh is not installed, install it with the commands below,

$ sudo apt-get install ssh
$ sudo apt-get install rsync

1. Install Hadoop

1.1 Download a Hadoop stable release from its release page at http://hadoop.apache.org/common/releases.html#Download.

1.2 Uncompress the compressed file using the command below (replacing x.y.z to actual version number),

$ tar xvf hadoop-x.y.z.tar.gz

1.3 Add Hadoop path to environmental variable PATH. Append the following lines to the end of ~/.bashrc file.

export HADOOP_INSTALL=<hadoop root folder path>
export PATH=$PATH:$HADOOP_INSTALL/bin

1.4 Verification. Start a new terminal and type the command,

$hadoop version

This should print out the hadoop version.

2. Running Hadoop in Standalone Mode (aka local mode)

No daemon is running and everything runs in a single JVM. It is easy for testing and debugging.

2.1 Configuration. The default configuration is set for standalone mode, so we can skip to next step.

2.2 Write Hadoop MapReduce jobs and start running…