Apache Solr: Basic Set Up and Integration with Apache Nutch

Apache Solr is an open source enterprise search platform based on Apache Lucene. It provides full-text search, database integration, rich documents (work, pdf etc.) handling and so on. Apache Solr is written in Java and runs within a servlet container such as Tomcat or Jetty. Its REST-like HTTP/XML and JSON API allow it accessible from almost any programming language. This post covers Apache Solr set up and an example of using Apache Solr with web pages crawled from Apache Nutch.

Set Up Apache Solr

0. Download Apache Solr binaries from http://lucene.apache.org/solr/.

1. Uncompress the Solr binaries.

2. The example folder of the uncompressed Solr directory contains an installation of Jetty, and we can run Solr WAR file with start.jar file using the command below.

$ cd ${apache-solr-root}/example
$ java -jar start.jar

3. Verification. We should be able to access the following links if everything is all right.

http://localhost:8983/solr/admin/
http://localhost:8983/solr/admin/stats.jsp

Integration with Apache Nutch

0. Follow the post Apache Nutch 1.x set up to set up Apache Nutch if you haven’t done so.

1. Copy the schema file ${nutch root directory}/conf/schema.xml to Apache Solr with the command below.

$ cp ${nutch root directory}/conf/schema.xml ${apache-solr-root}/example/solr/conf/

2. Start/restart Apache Solr with the command below.

$ java -jar start.jar

3. Edit solrconfig.xml file under ${apache-solr-root}/solr/conf/, change the “df” line under <requestHandler name=”/select” class=”solr.SearchHandler”> to below.

<str name=”df”>content</str>

Note that content is set according to the defaultSearchField at ${apache-solr-root}/solr/conf/schema.xml.

<defaultSearchField>content</defaultSearchField>

4. Follow the post Apache Nutch 1.x set up or Apache Nutch 1.x: Crawling HTTPs to crawl some data. Note that the invertlinks step should have been run after this step.

5. Index the crawled data with solrindex by the command below.

$ bin/nutch solrindex http://127.0.0.1:8983/solr/ crawl/crawldb -linkdb crawl/linkdb crawl/segments/*

6. Go to http://localhost:8983/solr/admin/ to start search.

6.1 Set the query string as “*:*”, and click search. The url request is set to “http://localhost:8983/solr/select/?q=*%3A*&version=2.2&start=0&rows=10&indent=on”. All pages should match the query, but the default shows at most 10 records. We can change the rows=10 to list more records.

6.2 Set the query as “video”, and click search. The url request is set to “http://localhost:8983/solr/select/?q=video&version=2.2&start=0&rows=10&indent=on”.

If we can see the returned records, we have set up a small search engine with Apache Solr and Apache Nutch successfully.

Apache Nutch 1.x: Crawling HTTPS

This is a follow up post of the Apache Nutch 1.x: Set up and Basic Usage. Please read it before reading this post if you don’t have Apache Nutch set up on your machine.

The default configuration of Apache Nutch 1.5 doesn’t support HTTPS crawling. However, this can be easily enabled by including protocol-httpclient as a plugin. This is done by adding the following content to conf/nutch-site.xml.

<property>
           <name>plugin.includes</name>

           <value>protocol-httpclient|urlfilter-regex|parse-(html|tika)|index-(basic|anchor)|scoring-opic|urlnormalizer-(pass|regex|basic)</value>

</property>

This overwrites the settings in conf/nutch-default.xml.

<property>

<name>plugin.includes</name>

<value>protocol-http|urlfilter-regex|parse-(html|tika)|index-(basic|anchor)|scoring-opic|urlnormalizer-(pass|regex|basic)</value>

</property>

Below we show an example of using Apache Nutch to crawl Google Play pages.

1. Update conf/regex-urlfilter.txt.

1.1 Comment out -[?*!@=]. This line filters out URLs containing a few special characters. Since Google Play URL contains “?” character, we’ll need to comment this line out or modify this line to allow the URLs to be fetched.

1.2 Change the line below “accept anything else” to something as below. This constraints the crawling to a Google Play domain page.

+^https://play.google.com/store/apps/details?id

2. Start Crawling. We can use two methods.

2.1 Use the crawl command shown as below.

bin/nutch crawl urls -dir crawl -depth 10 -topN 10

2.2 Use the step by step command. We can use the shell script below.

#!/bin/bash
bin/nutch inject crawl/crawldb urls
for i in {1..10}
do
   bin/nutch generate crawl/crawldb crawl/segments -topN 10
   s2=`ls -dtr crawl/segments/2* | tail -1`
   echo $s2
   bin/nutch fetch $s2
   bin/nutch parse $s2
   bin/nutch updatedb crawl/crawldb $s2
done
bin/nutch invertlinks crawl/linkdb -dir crawl/segments

Apache Nutch 1.x: Set up and Basic Usage

0. Set up

Below are the steps to set up Nutch on Linux.

  • Download latest 1.x version of nutch from http://nutch.apache.org/
  • Set JAVA_HOME environment variable. One can add the following line to ~/.bashrc file.

export JAVA_HOME=<path to Java jdk>

  • Make sure bin/nutch is executable by the command below.

chmod +x bin/nutch

  • Add an agent name in conf/nutch-site.xml as below.

<property>
               <name>http.agent.name</name>
               <value>Nutch Test Spider</value>

</property>

1. An Example

Below are the steps to run Nutch on this blog site.

  • Create a directory named urls, and put a file named seed.txt under the directory with the content below.

http://www.roman10.net/

  • Edit conf/regex-urlfilter.txt file. Change the line below “accept anything else” to something as below. This constraints the crawling to a specific domain.

+^http://([a-z0-9]*.)*roman10.net/

  • Seeding the crawldb with URLs. The command below will convert urls to db entries and put them into crawldb.

bin/nutch inject crawl/crawldb urls

  • Generate the fetch list. A segment directory named as the creation timestamp will be created and the URLs to be fetched will be stored in crawl_generate sub directory.

bin/nutch generate crawl/crawldb crawl/segments

  • Fetch the content. The command below will fetch the content. Two more sub directories will be created under the segment directory, including crawl_fetch (status of fetching each URL) and content (raw content retrieved).

bin/nutch fetch `ls -d crawl/segments/* | tail -1`

  • Parse the content. The command below parses the fetched content. Three more folders are created, including crawl_parse (outlink URLs for updating crawldb), parse_text (text parsed) and parse_data (outlink and metadata parsed).

bin/nutch parse `ls -d crawl/segments/* | tail -1`

  • Update the crawldb database. We can use the command below.

updatedb crawl/crawldb `ls -d crawl/segments/* | tail -1`

  • After update database, crawldb will contain updated entries for initial pages and the newly discovered outlinks. We can start fetching a new segment with the top-scoring 10 pages. This can be done with the command below.

bin/nutch generate crawl/crawldb crawl/segments -topN 10
s2=`ls -d crawl/segments/2* | tail -1`
echo $s2
bin/nutch fetch $s2
bin/nutch parse $s2
bin/nutch updatedb crawl/crawldb $s2

  • We can repeat the above commands to fetch more segments.
  • Invertlinks. We can use the command below. This creates a crawl/linkdb folder, which contains list of known links to each URL, including both source URL and archor text of the link.

bin/nutch invertlinks crawl/linkdb -dir crawl/segments

  • Indexing. This can be done with Apache Solr. The details are not covered in this post.

2. How It Works…

The essential procedure of crawling at Nutch is as below.

inject URLs

loop

        generate fetch list

        fetch

        updatedb

invert links

index

References:

1. Nutch Tutorial: http://wiki.apache.org/nutch/NutchTutorial#A3._Crawl_your_first_website

2. Nutch set up and use blog: http://nutch.wordpress.com/

Apache Cassandra Understand Replication

Apache Cassandra places copies of the same data on multiple nodes to ensure fault tolerance and no single point of failure. This operation is called replication and the data copies are called replicas. Replication is done on a row basis.

1. Replication Strategy

Replication is defined by Replication strategy when creating a keyspace. One important attribute of Replication Strategy is the replication factor, which indicates number of replicas within a cluster. Cassandra provides two Replication strategies out-of-box.

SimpleStrategy

SimpleStrategy is usually used for simple single data center clusters. It distribute replicas without considering data center or rack location.

  • It places the first replica on a node according to the partitioner.
  • It then places the rest of the replicas on next nodes (one replica on one node) clockwise in the Cassandra ring.

NetworkTopologyStrategy

NeworkTopologyStrategy is used for applications deployed to multiple data centers or racks. It relies on snitches to determine the nodes location in the network. Different replication factors can be specified for different data centers. (e.g.: strategy_options = {us-east : 2, us-west : 3})

  • It places the a replica for each data center on a node according to the partitioner.
  • It then distributes the rest of the replicas for each data center based on the replication factor for the data center. It searches clockwise and places the next replica on a node at different rack. If no such node is found, a different node in the same rack is used.

Note that in this way, each data center will have the complete data. If a data center fails, other data centers can still serve the complete data. Within a data center, Cassandra tries to distribute across different ranks since nodes from the same rank are more likely to fail at the same time (due to power, network issues etc.).

2. Replication Factor

Replication factor along with ConsistencyLevel affects both write and read at Cassandra. The following factors should be considered when setting replication factors.

  • Reads should be served at local data center, without consulting remote data center.
  • The affection of failure should be minimized.
  • Replication factor should not exceed number of nodes in the cluster. Otherwise, writes are rejected. Read can still be served if the desired ConsistencyLevel can be met.

3. Replication Factor Update

It is possible to update the Replication Factor. However, a repair is required so that the replicas can be distributed according to the new Replication Factor.

References:

1. Understanding Data Partitioning and Replication in Apache Cassandra
http://www.slideshare.net/DataStax/understanding-data-partitioning-and-replication-in-apache-cassandra

2. Cassandra Operations Replication: http://wiki.apache.org/cassandra/Operations#Replication

3. About Replication in Cassandra:
http://www.datastax.com/docs/1.0/cluster_architecture/replication#replication

Apache Cassandra How Compaction Works

Compaction in Apache Cassandra refers to the operation of merging multiple SSTables into a single new one. It mainly deals with the following.

  • Merge keys
  • Combine columns
  • Discard tombstones

Compaction is done for two purposes.

  • bound the number of SSTables to consult on reads. Cassandra’s write model allows multiple versions of a row exists in different SSTables. At read, different versions are read from different SSTables and merged into one. Compaction will reduce number of SStables to consult and therefore improve read performance.
  • reclaim space taken by obsolete data in SSTable

After compaction, the old SSTables will be marked as obsolete. These SSTables are deleted asynchronously when JVM performs a GC, or when Cassandra restarts, whichever happens first. Cassandra may force a deletion if it detects disk space is running one. It is also possible to force a deletion from JConsole.

Size Tiered Compaction

When there’re N (default set to 4) similar-sized SStables for a column family, a compaction will occur to merge them into one. Suppose we have four SSTables of size X, a compaction will merge them into one SSTable of size Y. When we have four SSTables of size Y, a compaction will merge them into another SSTable of size Z. This process repeats itself.

There’re a few problems with this approach.

  • Space. A compaction will create a new SSTable and both the new and old SSTables exist during the compaction. In the worst case (with no deletes and overwrites), a compaction will require 2 times of the on-disk space used for SSTables.
  • Performance. There’s no guarantee of how many SSTables a row can exist.

Leveled Compaction

This is introduced in Cassandra 1.0, based on leveldb (reference 4). This compaction strategy creates fixed-size SSTable (by default 5MB) and groups them into levels of different size capacities. Each level is 10 times as large as its previous level.

When a SSTable is created at L0, it is immediately compacted with SStables in L1. When L1 is filled up, extra SSTables generated in L1 are promoted to L2, and any subsequent SSTables appear at L1 will be compacted with SSTables at L2. Within a level, Leveled Compaction guarantees there’s no overlapping in SSTables, meaning a row can appear at most once on a single level. However, a row can appear on different levels.
Leveled Compaction resolves the concerns of size tiered compaction.

  • Space. It only requires 10 times of the SSTable size for compaction.
  • Performance. It guarantees that 90% of all reads will be satisfied from a single SSTable assuming row sizes are nearly uniform. Worst case is bounded by total number of levels. (For 10TB data, there’re 7 levels).

When to Use Which

Leveled compaction essentially treats more disk IO for a guarantee of how many SSTables a row may be spread. Below are the cases where Leveled Compaction can be a good option.

  • Low latency read is required
  • High read/write ratio
  • Rows are frequently updated. If size-tiered compaction is used, a row will spread across multiple SSTables.

Below are the cases where size-tiered compaction can be a good option.

  • Disk I/O is expensive
  • Write heavy workloads.
  • Rows are write once. If the rows are written once and then never updated, they will be contained in a single SSTable naturally. There’s no point to use the more I/O intensive leveled compaction.

References:

1. Cassandra SSTable: http://wiki.apache.org/cassandra/MemtableSSTable

2. Compaction in Apache Cassandra: http://www.datastax.com/dev/blog/leveled-compaction-in-apache-cassandra

3. Cassandra, the Definitive Guide.

4. leveldb: http://code.google.com/p/leveldb/

5. When to use leveled compaction: http://www.datastax.com/dev/blog/when-to-use-leveled-compaction

How Apache Cassandra Read Works

This is a follow up post on previous post How Apache Cassandra Read Works.

0. Partitioners and Snitches

Partitioners and snitches affects Cassandra reads. We briefly describe them first.

Partitioners

Partitioner allows us to specify how row keys should be sorted, which affects how data is distributed across Cassandra nodes. At read, it will affect the range queries. Cassandra provides three partitioners out of box, namely random partitioner, order-preserving partitioner, and collating order-preserving partitioner. It is also possible to create our own partitioner.

  • Random Partitioner: The default partitioner. As its name suggests, it orders the row key in a random fashion. Subsequently, it has the advantage of distributing row keys evenly across Cassandra nodes. However, if this partitioner is used, the range query can only be used to fetch the entire column family. Any other key range may give unpredictable results because the key range in between the starting and end key is not deterministic with no order maintained. Most of the time, Random Partitioner is the proper choice.
  • Order Preserving Partitioner (OPP): It requires the row keys to be UTF-8 strings. Rows are sorted and stored according to the row keys. This partitioner facilitates efficient range query. However, it often leads to a lopsided ring where some nodes has significant more amount of data than other nodes.
  • Byte-Order Partitioner (BOP): It’s like OOP except it treats row keys as raw bytes. Because it doesn’t require validation of row keys to be UTF-8 strings, it can bring performance improvement over OPP if the row keys are not required to be UTF-8 strings.

Snitches

Snitches determines the proximity of Cassandra nodes. It gathers network topology information and tries to route the request efficiently. Snitch can be configured in cassandra.yaml configuration file and all nodes in a cluster must use the same snitch configuration. Below is a list of snitches available.

  • SimpleSnitch: The default snitch. It uses the IP addresses of each node to determine the proximity.
  • PropertyFileSnitch: This snitch reads the topology from a user supplied configuration file.
  • There’re a few other snitches available depends on Cassandra versions.

By default, a dynamic snitch layer is used for all snitches. It monitors the read performance and routes the request away from the slow nodes. It’s recommended to keep dynamic snitches enabled for most deployments.

1. The Read Process

The Cassandra read process can be briefly illustrated by the diagram below.

The following steps describe how read is performed at Cassandra.

  • A client sends a read request to one of the Cassandra nodes
  • The node received the read request will act as StorageProxy. Based on ReplicationStrategy, it gets a list of N nodes that are responsible for the replicas of the keys the read requested and are currently alive.
  • StorageProxy node sorts the N nodes according to proximity, which is obtained based on snitches as introduced earlier.
  • Based on the proximity, different messages are sent to N different nodes.
    • A data read command is sent to the closest node A measured by proximity. A is supposed read the actual data and return it to StorageProxy node.
    • Based on consistency level, a few more nodes may be sent digest read commands. These nodes are supposed to perform the data read but only return a digest of the data to StorageProxy node. A digest normally costs less network traffic than actual data.
    • Based on configuration, a read repair may be triggered. In this case, the remaining nodes which are responsible for the key replicas will also be sent a digest read command.
  • Based on consistency level, a copy of the data (can be stale or up-to-date) is returned from the StorageProxy to the client. If the read repair is triggered, it can happen in the background after data is returned.

When a node reads data locally, it checks both Memtable and SSTables. Because Cassandra does not update data in place on disk, a typical read needs to merge data from 2-4 SSTables, which makes read at Cassandra usually slower than write. To read data from a SSTable, it first get the position for the row using a binary search on the SSTable index. Then it uses a row-level column index and row-level bloom filter to find the exact data blocks to read and only deserialize those blocks. After retrieving data from multiple SSTables, the data are combined.

2. Performance Tuning

Read performance can be improved by adjusting the Memtable Thresholds, Lower compaction priority, fine-tuning JVM memory options etc.

References:
1. Cassandra Internals – Reading: http://www.mikeperham.com/2010/03/17/cassandra-internals-reading/

2. Cassandra Reads Performance Explained: http://nosql.mypopescu.com/post/474623402/cassandra-reads-performance-explained

3. Cassandra Architecture Internals: http://wiki.apache.org/cassandra/ArchitectureInternals

4. Cassandra Architecture Overview: http://wiki.apache.org/cassandra/ArchitectureOverview

5. Cassandra Performance Tuning: http://wiki.apache.org/cassandra/PerformanceTuning

How Apache Cassandra Write Works

Apache Cassandra is known to have good write performance, mainly because all writes are sequential and there’s no reading and seeking before writes. This post covers how write works on Apache Cassandra.

1. The Write Process

Below is a diagram that illustrates Cassandra write process.

As shown in the diagram above, a client sends a write request to a random node of the Cassandra ring. The node received the request will get the a list of N nodes responsible for replicas of the keys in the write request from ReplicationStrategy. It then sends a RowMutation message to each of the N nodes.

Each of the N node will perform the following upon receiving the RowMutation message.

  • Append the write to the commit log. The commit log is the crash recovery mechanism and there’s a single commit log for each node.
  • Update the in-memory Memtable data structure with the write. Memtables are organized in sorted order by the row key and maintained per column family. It is possible to have multiple Memtables for a single column family, but only one of them will be current. The rest can be waiting to be flushed, which we’ll discuss later.

That’s it. The write request is served. Because the write consists of append to a file and update in-memory data structure, and there’s no update in place on disk, write on Cassandra is fast.

2. SSTable

But that’s not the entire story. Memory space is limited and transient. When a Memtable is full, it’s flushed to disk-based structure called SSTables sequentially. SSTable is also maintained per column family and multiple SSTables can exist for a single column family. Each SSTable has a bloom filter associated with it, which we’ll discuss separately.

When there’re lots of SSTables, a process called compaction can happen to merge multiple SSTables into one. The old SSTables will be deleted after compaction. Because the data in each SSTable is sorted, the merge process is essentially similar to the “merge” step of a mergesort. We’ll cover compaction in detail in another post.

3. Hinted Handoff

If one of the N nodes where the write belongs to has failed, the node which receives the write will create a hint. When the failed node is back online, the node will send the hint to the recovered node. This is called Hinted Handoff.

One concern for hinted handoff is that a lots of hints may be created for a node which goes offlien for quite some time. And once the node is back, it is going to be flooded by the hints. It is possible to disable hinted handoff or reduce the priority of hinted handoff messages to avoid this issue.

4. Performance Tuning for Writes

As suggested by reference 6, there’s a single strategy to improve write performance — use separate disks for commit logs and SSTables. Because write requires appending to commit logs all the time, it is important to ensure it’s not affected  by other disk operations like flushing data to SSTables.

References:
1. Cassandra Wiki Architecture Internals: http://wiki.apache.org/cassandra/ArchitectureInternals
2. Cassandra Internals – writing: http://www.mikeperham.com/2010/03/13/cassandra-internals-writing/
3. Cassandra Operations: http://wiki.apache.org/cassandra/Operations
4. Cassandra: The Definitive Guide
5. About Writes in Cassandra: http://www.datastax.com/docs/1.0/dml/about_writes

6. Cassandra Performance Tuning: http://wiki.apache.org/cassandra/PerformanceTuning

Apache Avro: Java Generic Mapping

Previous post covers Java code generation and specific mapping for Apache Avro. This post discusses using Apache Avro without code generation. This is used when the schema is not known before runtime. It’s called generic mapping in Java.

We’ll use the same schema as previous post, which is shown below.

{

  "namespace": "avro", 

  "type": "record", 

  "name": "DemoRecord",

  "aliases": ["LinkedLongs"],                      // old name for this

  "fields" : [

      {"name": "desp", "type": "string"},

    {"name": "value", "type": "int"},             // each element has a long

    {"name": "next", "type": ["DemoRecord", "null"]} // optional next element

  ]

}

To serialize the data and store it to a file, we use the code below.

private static void writeData() throws IOException {

    FileOutputStream out = new FileOutputStream(FNAME);

    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);

    GenericDatumWriter writer = new GenericDatumWriter(schema);

     

    GenericRecord record1 = new GenericData.Record(schema);

    record1.put("desp", new Utf8("record 1"));

    record1.put("value", 1);

    record1.put("next", null);

     

    GenericRecord record2 = new GenericData.Record(schema);

    record2.put("desp", new Utf8("record 2"));

    record2.put("value", 2);

    record2.put("next", record1);

     

    GenericRecord record3 = new GenericData.Record(schema);

    record3.put("desp", new Utf8("record 3"));

    record3.put("value", 3);

    record3.put("next", record2);

     

    writer.write(record3, encoder);

    encoder.flush();

}

The data will be stored in a file named test1. We can do a comparison with the file generated in previous post named test.

diff test test1

To deserialize the data, we use the code below.

private static void readData() throws IOException {

    FileInputStream in = new FileInputStream(FNAME);

    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);

    GenericDatumReader<GenericRecord> reader = 

            new GenericDatumReader<GenericRecord>(schema);

    

    GenericRecord record = reader.read(null, decoder);

    System.out.println(record.get("desp") + ":" + record.get("value"));

    

    while (record.get("next") != null) {

        record = (GenericRecord) record.get("next");

        System.out.println(record.get("desp") + ":" + record.get("value"));

    }

}

The output of running the code is the same as previous post, shown as below.

record 3:3
record 2:2
record 1:1

You can download the source code here.

Apache Avro: Java Code Generation and Specific Mapping

Although code generation is not required for using Apache Avro, Java and C++ implementation can generate code to represent data for Avro schema. If we have the schema before read or write data, code generation can optimize the performance. In Java, this is called the specific mapping.

1. Code Generation

Suppose we have a schema file demorecord.avsc as below.

{

  "namespace": "avro", 

  "type": "record", 

  "name": "DemoRecord",

  "aliases": ["LinkedLongs"],                      // old name for this

  "fields" : [

      {"name": "desp", "type": "string"},

        {"name": "value", "type": "int"},             // each element has a long

        {"name": "next", "type": ["DemoRecord", "null"]} // optional next element

  ]

}

We can use the following command to generate the Java class.

java -cp <path_to_avro_tools>/avro-tools-1.6.3.jar org.apache.avro.tool.Main compile schema <schema_file> <output_directory>

In our example, we have the avro-tools-1.6.3.jar in the local directory and we simply output to the current directory. Then the command is as below.

java -cp avro-tools-1.6.3.jar org.apache.avro.tool.Main compile schema demorecord.avsc .

The generated code is as below.

/**

 * Autogenerated by Avro

 * 

 * DO NOT EDIT DIRECTLY

 */

package avro;  

@SuppressWarnings("all")

public class DemoRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {

  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{"type":"record","name":"DemoRecord","namespace":"avro","fields":[{"name":"desp","type":"string"},{"name":"value","type":"int"},{"name":"next","type":["DemoRecord","null"]}],"aliases":["LinkedLongs"]}");

  @Deprecated public java.lang.CharSequence desp;

  @Deprecated public int value;

  @Deprecated public avro.DemoRecord next;

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }

  // Used by DatumWriter.  Applications should not call. 

  public java.lang.Object get(int field$) {

    switch (field$) {

    case 0: return desp;

    case 1: return value;

    case 2: return next;

    default: throw new org.apache.avro.AvroRuntimeException("Bad index");

    }

  }

  // Used by DatumReader.  Applications should not call. 

  @SuppressWarnings(value="unchecked")

  public void put(int field$, java.lang.Object value$) {

    switch (field$) {

    case 0: desp = (java.lang.CharSequence)value$; break;

    case 1: value = (java.lang.Integer)value$; break;

    case 2: next = (avro.DemoRecord)value$; break;

    default: throw new org.apache.avro.AvroRuntimeException("Bad index");

    }

  }

 

  /**

   * Gets the value of the 'desp' field.

   */

  public java.lang.CharSequence getDesp() {

    return desp;

  }

 

  /**

   * Sets the value of the 'desp' field.

   * @param value the value to set.

   */

  public void setDesp(java.lang.CharSequence value) {

    this.desp = value;

  }

 

  /**

   * Gets the value of the 'value' field.

   */

  public java.lang.Integer getValue() {

    return value;

  }

 

  /**

   * Sets the value of the 'value' field.

   * @param value the value to set.

   */

  public void setValue(java.lang.Integer value) {

    this.value = value;

  }

 

  /**

   * Gets the value of the 'next' field.

   */

  public avro.DemoRecord getNext() {

    return next;

  }

 

  /**

   * Sets the value of the 'next' field.

   * @param value the value to set.

   */

  public void setNext(avro.DemoRecord value) {

    this.next = value;

  }

 

  /** Creates a new DemoRecord RecordBuilder */

  public static avro.DemoRecord.Builder newBuilder() {

    return new avro.DemoRecord.Builder();

  }

  

  /** Creates a new DemoRecord RecordBuilder by copying an existing Builder */

  public static avro.DemoRecord.Builder newBuilder(avro.DemoRecord.Builder other) {

    return new avro.DemoRecord.Builder(other);

  }

  

  /** Creates a new DemoRecord RecordBuilder by copying an existing DemoRecord instance */

  public static avro.DemoRecord.Builder newBuilder(avro.DemoRecord other) {

    return new avro.DemoRecord.Builder(other);

  }

  

  /**

   * RecordBuilder for DemoRecord instances.

   */

  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<DemoRecord>

    implements org.apache.avro.data.RecordBuilder<DemoRecord> {

 

    private java.lang.CharSequence desp;

    private int value;

    private avro.DemoRecord next;

 

    /** Creates a new Builder */

    private Builder() {

      super(avro.DemoRecord.SCHEMA$);

    }

    

    /** Creates a Builder by copying an existing Builder */

    private Builder(avro.DemoRecord.Builder other) {

      super(other);

    }

    

    /** Creates a Builder by copying an existing DemoRecord instance */

    private Builder(avro.DemoRecord other) {

            super(avro.DemoRecord.SCHEMA$);

      if (isValidValue(fields()[0], other.desp)) {

        this.desp = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.desp);

        fieldSetFlags()[0] = true;

      }

      if (isValidValue(fields()[1], other.value)) {

        this.value = (java.lang.Integer) data().deepCopy(fields()[1].schema(), other.value);

        fieldSetFlags()[1] = true;

      }

      if (isValidValue(fields()[2], other.next)) {

        this.next = (avro.DemoRecord) data().deepCopy(fields()[2].schema(), other.next);

        fieldSetFlags()[2] = true;

      }

    }

 

    /** Gets the value of the 'desp' field */

    public java.lang.CharSequence getDesp() {

      return desp;

    }

    

    /** Sets the value of the 'desp' field */

    public avro.DemoRecord.Builder setDesp(java.lang.CharSequence value) {

      validate(fields()[0], value);

      this.desp = value;

      fieldSetFlags()[0] = true;

      return this; 

    }

    

    /** Checks whether the 'desp' field has been set */

    public boolean hasDesp() {

      return fieldSetFlags()[0];

    }

    

    /** Clears the value of the 'desp' field */

    public avro.DemoRecord.Builder clearDesp() {

      desp = null;

      fieldSetFlags()[0] = false;

      return this;

    }

 

    /** Gets the value of the 'value' field */

    public java.lang.Integer getValue() {

      return value;

    }

    

    /** Sets the value of the 'value' field */

    public avro.DemoRecord.Builder setValue(int value) {

      validate(fields()[1], value);

      this.value = value;

      fieldSetFlags()[1] = true;

      return this; 

    }

    

    /** Checks whether the 'value' field has been set */

    public boolean hasValue() {

      return fieldSetFlags()[1];

    }

    

    /** Clears the value of the 'value' field */

    public avro.DemoRecord.Builder clearValue() {

      fieldSetFlags()[1] = false;

      return this;

    }

 

    /** Gets the value of the 'next' field */

    public avro.DemoRecord getNext() {

      return next;

    }

    

    /** Sets the value of the 'next' field */

    public avro.DemoRecord.Builder setNext(avro.DemoRecord value) {

      validate(fields()[2], value);

      this.next = value;

      fieldSetFlags()[2] = true;

      return this; 

    }

    

    /** Checks whether the 'next' field has been set */

    public boolean hasNext() {

      return fieldSetFlags()[2];

    }

    

    /** Clears the value of the 'next' field */

    public avro.DemoRecord.Builder clearNext() {

      next = null;

      fieldSetFlags()[2] = false;

      return this;

    }

 

    @Override

    public DemoRecord build() {

      try {

        DemoRecord record = new DemoRecord();

        record.desp = fieldSetFlags()[0] ? this.desp : (java.lang.CharSequence) defaultValue(fields()[0]);

        record.value = fieldSetFlags()[1] ? this.value : (java.lang.Integer) defaultValue(fields()[1]);

        record.next = fieldSetFlags()[2] ? this.next : (avro.DemoRecord) defaultValue(fields()[2]);

        return record;

      } catch (Exception e) {

        throw new org.apache.avro.AvroRuntimeException(e);

      }

    }

  }

}

Another way for code generation is to use maven. Below is the pom.xml file that can be used to generate Java code for all schemas under src/main/avro directory relative to our project, and the output directory will be target/generated-sources/avro/ directory relative to our project.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

 

    <groupId>avro</groupId>

    <artifactId>avroDemo</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <packaging>jar</packaging>

 

    <name>avroDemo</name>

    <url>http://maven.apache.org</url>

 

    <properties>

        <java.version>1.6</java.version>

        <maven.compiler.source>${java.version}</maven.compiler.source>

        <maven.compiler.target>${java.version}</maven.compiler.target>

 

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    </properties>

 

    <build>

        <plugins>

            <plugin>

                <groupId>org.apache.avro</groupId>

                <artifactId>avro-maven-plugin</artifactId>

                <version>1.7.1</version>

                <configuration>

                </configuration>

                <executions>

                    <execution>

                        <phase>generate-sources</phase>

                        <goals>

                            <goal>schema</goal>

                        </goals>

                        <configuration>

                            <sourceDirectory>src/main/avro</sourceDirectory>

                            <outputDirectory>target/generated-sources/avro/</outputDirectory>

                        </configuration>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

 

 

    <dependencies>

        <dependency>

            <groupId>org.apache.avro</groupId>

            <artifactId>avro</artifactId>

            <version>1.7.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.avro</groupId>

            <artifactId>avro-tools</artifactId>

            <version>1.7.1</version>

        </dependency>

    </dependencies>

</project>

With this pom.xml file, we can type “mvn compile” command to generate the Java code for the schema at src/main/avro folder.

2. Use the Generated Code
We can use the generated code to serialize data and store into a file. Below is the sample code.

private static void writeData() throws IOException {

    FileOutputStream out = new FileOutputStream(FNAME);

    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);

    SpecificDatumWriter<DemoRecord> writer = 

            new SpecificDatumWriter<DemoRecord>(DemoRecord.class);

     

    DemoRecord record1 = new DemoRecord();

    record1.desp = new Utf8("record 1");

    record1.value = 1;

    record1.next = null;

     

    DemoRecord record2 = new DemoRecord();

    record2.desp = new Utf8("record 2");

    record2.value = 2;

    record2.next = record1;

     

    DemoRecord record3 = new DemoRecord();

    record3.desp = new Utf8("record 3");

    record3.value = 3;

    record3.next = record2;

     

    writer.write(record3, encoder);

    encoder.flush();

}

Note that there’s no need for us to write all three records into the file since record3 links back to record2, which subsequently links back to record1. Avro will track the linked records and serialize all the objects.

We can later on de-serialize the data as shown below.

private static void readData() throws IOException {

    FileInputStream in = new FileInputStream(FNAME);

    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);

    SpecificDatumReader<DemoRecord> reader = 

            new SpecificDatumReader<DemoRecord>(DemoRecord.class);

    

    DemoRecord record = new DemoRecord();

    reader.read(record, decoder);

    System.out.println(record.desp + ":" + record.value);

    

    while (record.next != null) {

        record = record.next;

        System.out.println(record.desp + ":" + record.value);

    }

}

The output of running the code is as below.

record 3:3
record 2:2
record 1:1

You can download the source code here.

Apache Avro: Understand Schema

Apache Avro is a serialization framework that designed with compact, fast, extensibility and interoperability in mind. It is first started to provide better serialization mechanism for Apache Hadoop, the open source distributed computing framework.

Avro provides mechanisms to store object data or sending it over the network for RPC. In both case, the data is always serialized with its schema. Because the schema is always present at serialization and deserialization, there’s no need to tag the data. This can result in more compact serialized data. This post covers the Avro schema.

Avro schema defines how the data associated with it should be serialized and deserialized. It is represented in JSON format.

0. Primitive Types

Avro defines eight primitive types, including null (not value), boolean (a binary value), int (32-bit signed integer), long (64-bit signed integer), float (32-bit floating point), double (64-bit floating point), bytes (sequence of 8-bit unsigned bytes), and string (unicode character sequence). Primitive types have no attributes.

Note that Avro types are language independent. Different language can have different representation for the same Avro data type. For instance, the double type is represented as double in C/C++ and Java, but as float in Python.

1. Complex Types

Six complex types are supported, including records, enums, arrays, maps, unions and fixed. Complex types has attributes, and can be formed  by primitive types or complex types.

Record: record is the most commonly used complex type. An example of a record defined in a schema file is shown as below.

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoRecord",

 "aliases": ["LinkedRecord"],                      // old name for this

 "fields" : [

{"name": "desp", "type": "string"},

{"name": "value", "type": "long"},             // each element has a long

{"name": "next", "type": ["DemoRecord", "null"]} // optional next element

 ]

}

The following attributes are required in a record schema.

  • name: the name of the record
  • type: a JSON string “record”
  • fields: a JSON array describing the data fields used to form the record. A data record can have the following attributes.
    • name: required. Indicating the name of the field.
    • type: required. Indicating the type of the field. It can be primitive or complex type.
    • default: optional. However, this is required when we deserialize data that doesn’t have this field.
    • order: optional. Specify how this field affect the sort ordering of the record.
    • doc: optional. a JSON string describing the field.
    • aliases: optional. a JSON array of strings, providing alternative names for this field.

The following attributes are optional for a record schema.

  • namespace: used to avoid name conflict. corresponds to Java package.
  • doc: describing the record
  • aliases: a JSON array of strings. indicating alternate names

Enum: An example of enum defined in a schema file is shown as below.

{ 

 "namespace": "avro", 

 "type": "enum",

 "name": "DemoEnum",

 "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]

}

It has the following required attributes.

  • name: the name of the enum
  • type: a JSON string “enum”
  • symbols: a JSON array of JSON strings, indicating all symbols defined for the enum. The symbols are required to be unique.

Similar to records, namespace, doc and aliases are optional.
Arrays: An example of a record containing array as one of its fields are as below,

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoArray",

 "fields" : [

{"name": "anArr", "type": {"type": "array", "items": "string"}}

]

}

The following attributes are required.

  • type: must be a JSON string “array”
  • items: the schema of the array’s items. In the example above, the items are string.

Map: An example of a schema file defining a record with a map as one of its fields are as below.

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoMap",

 "fields" : [

{"name": "aMap", "type": {"type": "map", "values": "long"}}

 ]

}

The following attributes are required.

  • type: a JSON string “map”
  • values: the schema of the map’s values. The keys are assumed to be strings. In the example above, the values are of long type.

Union: Below is a sample schema of a record containing a union as one of its fields.

{

 "namespace": "avro", 

 "type": "record", 

 "name": "DemoUnion",

 "fields" : [

{"name": "aUnion", "type": ["string", "null"]}

 ]

}

Unions are defined as a JSON array. In the example above, the field aUnion can either be a string or null.

Fixed: fixed define a data of fixed number of bytes. Below is a sample schema of a fixed.

{"namespace": "avro", "type": "fixed", "size": 16, "name": "DemoFixed"}

It has the following required attributes.

  • name: a JSON string contains the name of the fixed.
  • type: a JSON string “fixed”
  • size: the number of bytes in a value of the fixed type.

It has the following optional attributes.

  • namespace: similar to record.
  • aliases: similar to record.

References:
Apache Avro specification 1.7.1: http://avro.apache.org/docs/current/spec.html