Apache Avro: Java Generic Mapping

Previous post covers Java code generation and specific mapping for Apache Avro. This post discusses using Apache Avro without code generation. This is used when the schema is not known before runtime. It’s called generic mapping in Java.

We’ll use the same schema as previous post, which is shown below.

{

  "namespace": "avro", 

  "type": "record", 

  "name": "DemoRecord",

  "aliases": ["LinkedLongs"],                      // old name for this

  "fields" : [

      {"name": "desp", "type": "string"},

    {"name": "value", "type": "int"},             // each element has a long

    {"name": "next", "type": ["DemoRecord", "null"]} // optional next element

  ]

}

To serialize the data and store it to a file, we use the code below.

private static void writeData() throws IOException {

    FileOutputStream out = new FileOutputStream(FNAME);

    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);

    GenericDatumWriter writer = new GenericDatumWriter(schema);

     

    GenericRecord record1 = new GenericData.Record(schema);

    record1.put("desp", new Utf8("record 1"));

    record1.put("value", 1);

    record1.put("next", null);

     

    GenericRecord record2 = new GenericData.Record(schema);

    record2.put("desp", new Utf8("record 2"));

    record2.put("value", 2);

    record2.put("next", record1);

     

    GenericRecord record3 = new GenericData.Record(schema);

    record3.put("desp", new Utf8("record 3"));

    record3.put("value", 3);

    record3.put("next", record2);

     

    writer.write(record3, encoder);

    encoder.flush();

}

The data will be stored in a file named test1. We can do a comparison with the file generated in previous post named test.

diff test test1

To deserialize the data, we use the code below.

private static void readData() throws IOException {

    FileInputStream in = new FileInputStream(FNAME);

    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);

    GenericDatumReader<GenericRecord> reader = 

            new GenericDatumReader<GenericRecord>(schema);

    

    GenericRecord record = reader.read(null, decoder);

    System.out.println(record.get("desp") + ":" + record.get("value"));

    

    while (record.get("next") != null) {

        record = (GenericRecord) record.get("next");

        System.out.println(record.get("desp") + ":" + record.get("value"));

    }

}

The output of running the code is the same as previous post, shown as below.

record 3:3
record 2:2
record 1:1

You can download the source code here.

Apache Avro: Java Code Generation and Specific Mapping

Although code generation is not required for using Apache Avro, Java and C++ implementation can generate code to represent data for Avro schema. If we have the schema before read or write data, code generation can optimize the performance. In Java, this is called the specific mapping.

1. Code Generation

Suppose we have a schema file demorecord.avsc as below.

{

  "namespace": "avro", 

  "type": "record", 

  "name": "DemoRecord",

  "aliases": ["LinkedLongs"],                      // old name for this

  "fields" : [

      {"name": "desp", "type": "string"},

        {"name": "value", "type": "int"},             // each element has a long

        {"name": "next", "type": ["DemoRecord", "null"]} // optional next element

  ]

}

We can use the following command to generate the Java class.

java -cp <path_to_avro_tools>/avro-tools-1.6.3.jar org.apache.avro.tool.Main compile schema <schema_file> <output_directory>

In our example, we have the avro-tools-1.6.3.jar in the local directory and we simply output to the current directory. Then the command is as below.

java -cp avro-tools-1.6.3.jar org.apache.avro.tool.Main compile schema demorecord.avsc .

The generated code is as below.

/**

 * Autogenerated by Avro

 * 

 * DO NOT EDIT DIRECTLY

 */

package avro;  

@SuppressWarnings("all")

public class DemoRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {

  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{"type":"record","name":"DemoRecord","namespace":"avro","fields":[{"name":"desp","type":"string"},{"name":"value","type":"int"},{"name":"next","type":["DemoRecord","null"]}],"aliases":["LinkedLongs"]}");

  @Deprecated public java.lang.CharSequence desp;

  @Deprecated public int value;

  @Deprecated public avro.DemoRecord next;

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }

  // Used by DatumWriter.  Applications should not call. 

  public java.lang.Object get(int field$) {

    switch (field$) {

    case 0: return desp;

    case 1: return value;

    case 2: return next;

    default: throw new org.apache.avro.AvroRuntimeException("Bad index");

    }

  }

  // Used by DatumReader.  Applications should not call. 

  @SuppressWarnings(value="unchecked")

  public void put(int field$, java.lang.Object value$) {

    switch (field$) {

    case 0: desp = (java.lang.CharSequence)value$; break;

    case 1: value = (java.lang.Integer)value$; break;

    case 2: next = (avro.DemoRecord)value$; break;

    default: throw new org.apache.avro.AvroRuntimeException("Bad index");

    }

  }

 

  /**

   * Gets the value of the 'desp' field.

   */

  public java.lang.CharSequence getDesp() {

    return desp;

  }

 

  /**

   * Sets the value of the 'desp' field.

   * @param value the value to set.

   */

  public void setDesp(java.lang.CharSequence value) {

    this.desp = value;

  }

 

  /**

   * Gets the value of the 'value' field.

   */

  public java.lang.Integer getValue() {

    return value;

  }

 

  /**

   * Sets the value of the 'value' field.

   * @param value the value to set.

   */

  public void setValue(java.lang.Integer value) {

    this.value = value;

  }

 

  /**

   * Gets the value of the 'next' field.

   */

  public avro.DemoRecord getNext() {

    return next;

  }

 

  /**

   * Sets the value of the 'next' field.

   * @param value the value to set.

   */

  public void setNext(avro.DemoRecord value) {

    this.next = value;

  }

 

  /** Creates a new DemoRecord RecordBuilder */

  public static avro.DemoRecord.Builder newBuilder() {

    return new avro.DemoRecord.Builder();

  }

  

  /** Creates a new DemoRecord RecordBuilder by copying an existing Builder */

  public static avro.DemoRecord.Builder newBuilder(avro.DemoRecord.Builder other) {

    return new avro.DemoRecord.Builder(other);

  }

  

  /** Creates a new DemoRecord RecordBuilder by copying an existing DemoRecord instance */

  public static avro.DemoRecord.Builder newBuilder(avro.DemoRecord other) {

    return new avro.DemoRecord.Builder(other);

  }

  

  /**

   * RecordBuilder for DemoRecord instances.

   */

  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<DemoRecord>

    implements org.apache.avro.data.RecordBuilder<DemoRecord> {

 

    private java.lang.CharSequence desp;

    private int value;

    private avro.DemoRecord next;

 

    /** Creates a new Builder */

    private Builder() {

      super(avro.DemoRecord.SCHEMA$);

    }

    

    /** Creates a Builder by copying an existing Builder */

    private Builder(avro.DemoRecord.Builder other) {

      super(other);

    }

    

    /** Creates a Builder by copying an existing DemoRecord instance */

    private Builder(avro.DemoRecord other) {

            super(avro.DemoRecord.SCHEMA$);

      if (isValidValue(fields()[0], other.desp)) {

        this.desp = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.desp);

        fieldSetFlags()[0] = true;

      }

      if (isValidValue(fields()[1], other.value)) {

        this.value = (java.lang.Integer) data().deepCopy(fields()[1].schema(), other.value);

        fieldSetFlags()[1] = true;

      }

      if (isValidValue(fields()[2], other.next)) {

        this.next = (avro.DemoRecord) data().deepCopy(fields()[2].schema(), other.next);

        fieldSetFlags()[2] = true;

      }

    }

 

    /** Gets the value of the 'desp' field */

    public java.lang.CharSequence getDesp() {

      return desp;

    }

    

    /** Sets the value of the 'desp' field */

    public avro.DemoRecord.Builder setDesp(java.lang.CharSequence value) {

      validate(fields()[0], value);

      this.desp = value;

      fieldSetFlags()[0] = true;

      return this; 

    }

    

    /** Checks whether the 'desp' field has been set */

    public boolean hasDesp() {

      return fieldSetFlags()[0];

    }

    

    /** Clears the value of the 'desp' field */

    public avro.DemoRecord.Builder clearDesp() {

      desp = null;

      fieldSetFlags()[0] = false;

      return this;

    }

 

    /** Gets the value of the 'value' field */

    public java.lang.Integer getValue() {

      return value;

    }

    

    /** Sets the value of the 'value' field */

    public avro.DemoRecord.Builder setValue(int value) {

      validate(fields()[1], value);

      this.value = value;

      fieldSetFlags()[1] = true;

      return this; 

    }

    

    /** Checks whether the 'value' field has been set */

    public boolean hasValue() {

      return fieldSetFlags()[1];

    }

    

    /** Clears the value of the 'value' field */

    public avro.DemoRecord.Builder clearValue() {

      fieldSetFlags()[1] = false;

      return this;

    }

 

    /** Gets the value of the 'next' field */

    public avro.DemoRecord getNext() {

      return next;

    }

    

    /** Sets the value of the 'next' field */

    public avro.DemoRecord.Builder setNext(avro.DemoRecord value) {

      validate(fields()[2], value);

      this.next = value;

      fieldSetFlags()[2] = true;

      return this; 

    }

    

    /** Checks whether the 'next' field has been set */

    public boolean hasNext() {

      return fieldSetFlags()[2];

    }

    

    /** Clears the value of the 'next' field */

    public avro.DemoRecord.Builder clearNext() {

      next = null;

      fieldSetFlags()[2] = false;

      return this;

    }

 

    @Override

    public DemoRecord build() {

      try {

        DemoRecord record = new DemoRecord();

        record.desp = fieldSetFlags()[0] ? this.desp : (java.lang.CharSequence) defaultValue(fields()[0]);

        record.value = fieldSetFlags()[1] ? this.value : (java.lang.Integer) defaultValue(fields()[1]);

        record.next = fieldSetFlags()[2] ? this.next : (avro.DemoRecord) defaultValue(fields()[2]);

        return record;

      } catch (Exception e) {

        throw new org.apache.avro.AvroRuntimeException(e);

      }

    }

  }

}

Another way for code generation is to use maven. Below is the pom.xml file that can be used to generate Java code for all schemas under src/main/avro directory relative to our project, and the output directory will be target/generated-sources/avro/ directory relative to our project.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

 

    <groupId>avro</groupId>

    <artifactId>avroDemo</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <packaging>jar</packaging>

 

    <name>avroDemo</name>

    <url>http://maven.apache.org</url>

 

    <properties>

        <java.version>1.6</java.version>

        <maven.compiler.source>${java.version}</maven.compiler.source>

        <maven.compiler.target>${java.version}</maven.compiler.target>

 

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    </properties>

 

    <build>

        <plugins>

            <plugin>

                <groupId>org.apache.avro</groupId>

                <artifactId>avro-maven-plugin</artifactId>

                <version>1.7.1</version>

                <configuration>

                </configuration>

                <executions>

                    <execution>

                        <phase>generate-sources</phase>

                        <goals>

                            <goal>schema</goal>

                        </goals>

                        <configuration>

                            <sourceDirectory>src/main/avro</sourceDirectory>

                            <outputDirectory>target/generated-sources/avro/</outputDirectory>

                        </configuration>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

 

 

    <dependencies>

        <dependency>

            <groupId>org.apache.avro</groupId>

            <artifactId>avro</artifactId>

            <version>1.7.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.avro</groupId>

            <artifactId>avro-tools</artifactId>

            <version>1.7.1</version>

        </dependency>

    </dependencies>

</project>

With this pom.xml file, we can type “mvn compile” command to generate the Java code for the schema at src/main/avro folder.

2. Use the Generated Code
We can use the generated code to serialize data and store into a file. Below is the sample code.

private static void writeData() throws IOException {

    FileOutputStream out = new FileOutputStream(FNAME);

    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);

    SpecificDatumWriter<DemoRecord> writer = 

            new SpecificDatumWriter<DemoRecord>(DemoRecord.class);

     

    DemoRecord record1 = new DemoRecord();

    record1.desp = new Utf8("record 1");

    record1.value = 1;

    record1.next = null;

     

    DemoRecord record2 = new DemoRecord();

    record2.desp = new Utf8("record 2");

    record2.value = 2;

    record2.next = record1;

     

    DemoRecord record3 = new DemoRecord();

    record3.desp = new Utf8("record 3");

    record3.value = 3;

    record3.next = record2;

     

    writer.write(record3, encoder);

    encoder.flush();

}

Note that there’s no need for us to write all three records into the file since record3 links back to record2, which subsequently links back to record1. Avro will track the linked records and serialize all the objects.

We can later on de-serialize the data as shown below.

private static void readData() throws IOException {

    FileInputStream in = new FileInputStream(FNAME);

    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);

    SpecificDatumReader<DemoRecord> reader = 

            new SpecificDatumReader<DemoRecord>(DemoRecord.class);

    

    DemoRecord record = new DemoRecord();

    reader.read(record, decoder);

    System.out.println(record.desp + ":" + record.value);

    

    while (record.next != null) {

        record = record.next;

        System.out.println(record.desp + ":" + record.value);

    }

}

The output of running the code is as below.

record 3:3
record 2:2
record 1:1

You can download the source code here.

Apache Avro: Understand Schema

Apache Avro is a serialization framework that designed with compact, fast, extensibility and interoperability in mind. It is first started to provide better serialization mechanism for Apache Hadoop, the open source distributed computing framework.

Avro provides mechanisms to store object data or sending it over the network for RPC. In both case, the data is always serialized with its schema. Because the schema is always present at serialization and deserialization, there’s no need to tag the data. This can result in more compact serialized data. This post covers the Avro schema.

Avro schema defines how the data associated with it should be serialized and deserialized. It is represented in JSON format.

0. Primitive Types

Avro defines eight primitive types, including null (not value), boolean (a binary value), int (32-bit signed integer), long (64-bit signed integer), float (32-bit floating point), double (64-bit floating point), bytes (sequence of 8-bit unsigned bytes), and string (unicode character sequence). Primitive types have no attributes.

Note that Avro types are language independent. Different language can have different representation for the same Avro data type. For instance, the double type is represented as double in C/C++ and Java, but as float in Python.

1. Complex Types

Six complex types are supported, including records, enums, arrays, maps, unions and fixed. Complex types has attributes, and can be formed  by primitive types or complex types.

Record: record is the most commonly used complex type. An example of a record defined in a schema file is shown as below.

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoRecord",

 "aliases": ["LinkedRecord"],                      // old name for this

 "fields" : [

{"name": "desp", "type": "string"},

{"name": "value", "type": "long"},             // each element has a long

{"name": "next", "type": ["DemoRecord", "null"]} // optional next element

 ]

}

The following attributes are required in a record schema.

  • name: the name of the record
  • type: a JSON string “record”
  • fields: a JSON array describing the data fields used to form the record. A data record can have the following attributes.
    • name: required. Indicating the name of the field.
    • type: required. Indicating the type of the field. It can be primitive or complex type.
    • default: optional. However, this is required when we deserialize data that doesn’t have this field.
    • order: optional. Specify how this field affect the sort ordering of the record.
    • doc: optional. a JSON string describing the field.
    • aliases: optional. a JSON array of strings, providing alternative names for this field.

The following attributes are optional for a record schema.

  • namespace: used to avoid name conflict. corresponds to Java package.
  • doc: describing the record
  • aliases: a JSON array of strings. indicating alternate names

Enum: An example of enum defined in a schema file is shown as below.

{ 

 "namespace": "avro", 

 "type": "enum",

 "name": "DemoEnum",

 "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]

}

It has the following required attributes.

  • name: the name of the enum
  • type: a JSON string “enum”
  • symbols: a JSON array of JSON strings, indicating all symbols defined for the enum. The symbols are required to be unique.

Similar to records, namespace, doc and aliases are optional.
Arrays: An example of a record containing array as one of its fields are as below,

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoArray",

 "fields" : [

{"name": "anArr", "type": {"type": "array", "items": "string"}}

]

}

The following attributes are required.

  • type: must be a JSON string “array”
  • items: the schema of the array’s items. In the example above, the items are string.

Map: An example of a schema file defining a record with a map as one of its fields are as below.

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoMap",

 "fields" : [

{"name": "aMap", "type": {"type": "map", "values": "long"}}

 ]

}

The following attributes are required.

  • type: a JSON string “map”
  • values: the schema of the map’s values. The keys are assumed to be strings. In the example above, the values are of long type.

Union: Below is a sample schema of a record containing a union as one of its fields.

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoUnion",

 "fields" : [

{"name": "aUnion", "type": ["string", "null"]}

 ]

}

Unions are defined as a JSON array. In the example above, the field aUnion can either be a string or null.

Fixed: fixed define a data of fixed number of bytes. Below is a sample schema of a fixed.

{"namespace": "avro", "type": "fixed", "size": 16, "name": "DemoFixed"}

It has the following required attributes.

  • name: a JSON string contains the name of the fixed.
  • type: a JSON string “fixed”
  • size: the number of bytes in a value of the fixed type.

It has the following optional attributes.

  • namespace: similar to record.
  • aliases: similar to record.

References:
Apache Avro specification 1.7.1: http://avro.apache.org/docs/current/spec.html