Apache Cassandra Understand Replication

Apache Cassandra places copies of the same data on multiple nodes to ensure fault tolerance and no single point of failure. This operation is called replication and the data copies are called replicas. Replication is done on a row basis.

1. Replication Strategy

Replication is defined by Replication strategy when creating a keyspace. One important attribute of Replication Strategy is the replication factor, which indicates number of replicas within a cluster. Cassandra provides two Replication strategies out-of-box.

SimpleStrategy

SimpleStrategy is usually used for simple single data center clusters. It distribute replicas without considering data center or rack location.

  • It places the first replica on a node according to the partitioner.
  • It then places the rest of the replicas on next nodes (one replica on one node) clockwise in the Cassandra ring.

NetworkTopologyStrategy

NeworkTopologyStrategy is used for applications deployed to multiple data centers or racks. It relies on snitches to determine the nodes location in the network. Different replication factors can be specified for different data centers. (e.g.: strategy_options = {us-east : 2, us-west : 3})

  • It places the a replica for each data center on a node according to the partitioner.
  • It then distributes the rest of the replicas for each data center based on the replication factor for the data center. It searches clockwise and places the next replica on a node at different rack. If no such node is found, a different node in the same rack is used.

Note that in this way, each data center will have the complete data. If a data center fails, other data centers can still serve the complete data. Within a data center, Cassandra tries to distribute across different ranks since nodes from the same rank are more likely to fail at the same time (due to power, network issues etc.).

2. Replication Factor

Replication factor along with ConsistencyLevel affects both write and read at Cassandra. The following factors should be considered when setting replication factors.

  • Reads should be served at local data center, without consulting remote data center.
  • The affection of failure should be minimized.
  • Replication factor should not exceed number of nodes in the cluster. Otherwise, writes are rejected. Read can still be served if the desired ConsistencyLevel can be met.

3. Replication Factor Update

It is possible to update the Replication Factor. However, a repair is required so that the replicas can be distributed according to the new Replication Factor.

References:

1. Understanding Data Partitioning and Replication in Apache Cassandra
http://www.slideshare.net/DataStax/understanding-data-partitioning-and-replication-in-apache-cassandra

2. Cassandra Operations Replication: http://wiki.apache.org/cassandra/Operations#Replication

3. About Replication in Cassandra:
http://www.datastax.com/docs/1.0/cluster_architecture/replication#replication

Apache Cassandra How Compaction Works

Compaction in Apache Cassandra refers to the operation of merging multiple SSTables into a single new one. It mainly deals with the following.

  • Merge keys
  • Combine columns
  • Discard tombstones

Compaction is done for two purposes.

  • bound the number of SSTables to consult on reads. Cassandra’s write model allows multiple versions of a row exists in different SSTables. At read, different versions are read from different SSTables and merged into one. Compaction will reduce number of SStables to consult and therefore improve read performance.
  • reclaim space taken by obsolete data in SSTable

After compaction, the old SSTables will be marked as obsolete. These SSTables are deleted asynchronously when JVM performs a GC, or when Cassandra restarts, whichever happens first. Cassandra may force a deletion if it detects disk space is running one. It is also possible to force a deletion from JConsole.

Size Tiered Compaction

When there’re N (default set to 4) similar-sized SStables for a column family, a compaction will occur to merge them into one. Suppose we have four SSTables of size X, a compaction will merge them into one SSTable of size Y. When we have four SSTables of size Y, a compaction will merge them into another SSTable of size Z. This process repeats itself.

There’re a few problems with this approach.

  • Space. A compaction will create a new SSTable and both the new and old SSTables exist during the compaction. In the worst case (with no deletes and overwrites), a compaction will require 2 times of the on-disk space used for SSTables.
  • Performance. There’s no guarantee of how many SSTables a row can exist.

Leveled Compaction

This is introduced in Cassandra 1.0, based on leveldb (reference 4). This compaction strategy creates fixed-size SSTable (by default 5MB) and groups them into levels of different size capacities. Each level is 10 times as large as its previous level.

When a SSTable is created at L0, it is immediately compacted with SStables in L1. When L1 is filled up, extra SSTables generated in L1 are promoted to L2, and any subsequent SSTables appear at L1 will be compacted with SSTables at L2. Within a level, Leveled Compaction guarantees there’s no overlapping in SSTables, meaning a row can appear at most once on a single level. However, a row can appear on different levels.
Leveled Compaction resolves the concerns of size tiered compaction.

  • Space. It only requires 10 times of the SSTable size for compaction.
  • Performance. It guarantees that 90% of all reads will be satisfied from a single SSTable assuming row sizes are nearly uniform. Worst case is bounded by total number of levels. (For 10TB data, there’re 7 levels).

When to Use Which

Leveled compaction essentially treats more disk IO for a guarantee of how many SSTables a row may be spread. Below are the cases where Leveled Compaction can be a good option.

  • Low latency read is required
  • High read/write ratio
  • Rows are frequently updated. If size-tiered compaction is used, a row will spread across multiple SSTables.

Below are the cases where size-tiered compaction can be a good option.

  • Disk I/O is expensive
  • Write heavy workloads.
  • Rows are write once. If the rows are written once and then never updated, they will be contained in a single SSTable naturally. There’s no point to use the more I/O intensive leveled compaction.

References:

1. Cassandra SSTable: http://wiki.apache.org/cassandra/MemtableSSTable

2. Compaction in Apache Cassandra: http://www.datastax.com/dev/blog/leveled-compaction-in-apache-cassandra

3. Cassandra, the Definitive Guide.

4. leveldb: http://code.google.com/p/leveldb/

5. When to use leveled compaction: http://www.datastax.com/dev/blog/when-to-use-leveled-compaction

How Apache Cassandra Read Works

This is a follow up post on previous post How Apache Cassandra Read Works.

0. Partitioners and Snitches

Partitioners and snitches affects Cassandra reads. We briefly describe them first.

Partitioners

Partitioner allows us to specify how row keys should be sorted, which affects how data is distributed across Cassandra nodes. At read, it will affect the range queries. Cassandra provides three partitioners out of box, namely random partitioner, order-preserving partitioner, and collating order-preserving partitioner. It is also possible to create our own partitioner.

  • Random Partitioner: The default partitioner. As its name suggests, it orders the row key in a random fashion. Subsequently, it has the advantage of distributing row keys evenly across Cassandra nodes. However, if this partitioner is used, the range query can only be used to fetch the entire column family. Any other key range may give unpredictable results because the key range in between the starting and end key is not deterministic with no order maintained. Most of the time, Random Partitioner is the proper choice.
  • Order Preserving Partitioner (OPP): It requires the row keys to be UTF-8 strings. Rows are sorted and stored according to the row keys. This partitioner facilitates efficient range query. However, it often leads to a lopsided ring where some nodes has significant more amount of data than other nodes.
  • Byte-Order Partitioner (BOP): It’s like OOP except it treats row keys as raw bytes. Because it doesn’t require validation of row keys to be UTF-8 strings, it can bring performance improvement over OPP if the row keys are not required to be UTF-8 strings.

Snitches

Snitches determines the proximity of Cassandra nodes. It gathers network topology information and tries to route the request efficiently. Snitch can be configured in cassandra.yaml configuration file and all nodes in a cluster must use the same snitch configuration. Below is a list of snitches available.

  • SimpleSnitch: The default snitch. It uses the IP addresses of each node to determine the proximity.
  • PropertyFileSnitch: This snitch reads the topology from a user supplied configuration file.
  • There’re a few other snitches available depends on Cassandra versions.

By default, a dynamic snitch layer is used for all snitches. It monitors the read performance and routes the request away from the slow nodes. It’s recommended to keep dynamic snitches enabled for most deployments.

1. The Read Process

The Cassandra read process can be briefly illustrated by the diagram below.

The following steps describe how read is performed at Cassandra.

  • A client sends a read request to one of the Cassandra nodes
  • The node received the read request will act as StorageProxy. Based on ReplicationStrategy, it gets a list of N nodes that are responsible for the replicas of the keys the read requested and are currently alive.
  • StorageProxy node sorts the N nodes according to proximity, which is obtained based on snitches as introduced earlier.
  • Based on the proximity, different messages are sent to N different nodes.
    • A data read command is sent to the closest node A measured by proximity. A is supposed read the actual data and return it to StorageProxy node.
    • Based on consistency level, a few more nodes may be sent digest read commands. These nodes are supposed to perform the data read but only return a digest of the data to StorageProxy node. A digest normally costs less network traffic than actual data.
    • Based on configuration, a read repair may be triggered. In this case, the remaining nodes which are responsible for the key replicas will also be sent a digest read command.
  • Based on consistency level, a copy of the data (can be stale or up-to-date) is returned from the StorageProxy to the client. If the read repair is triggered, it can happen in the background after data is returned.

When a node reads data locally, it checks both Memtable and SSTables. Because Cassandra does not update data in place on disk, a typical read needs to merge data from 2-4 SSTables, which makes read at Cassandra usually slower than write. To read data from a SSTable, it first get the position for the row using a binary search on the SSTable index. Then it uses a row-level column index and row-level bloom filter to find the exact data blocks to read and only deserialize those blocks. After retrieving data from multiple SSTables, the data are combined.

2. Performance Tuning

Read performance can be improved by adjusting the Memtable Thresholds, Lower compaction priority, fine-tuning JVM memory options etc.

References:
1. Cassandra Internals – Reading: http://www.mikeperham.com/2010/03/17/cassandra-internals-reading/

2. Cassandra Reads Performance Explained: http://nosql.mypopescu.com/post/474623402/cassandra-reads-performance-explained

3. Cassandra Architecture Internals: http://wiki.apache.org/cassandra/ArchitectureInternals

4. Cassandra Architecture Overview: http://wiki.apache.org/cassandra/ArchitectureOverview

5. Cassandra Performance Tuning: http://wiki.apache.org/cassandra/PerformanceTuning

How Apache Cassandra Write Works

Apache Cassandra is known to have good write performance, mainly because all writes are sequential and there’s no reading and seeking before writes. This post covers how write works on Apache Cassandra.

1. The Write Process

Below is a diagram that illustrates Cassandra write process.

As shown in the diagram above, a client sends a write request to a random node of the Cassandra ring. The node received the request will get the a list of N nodes responsible for replicas of the keys in the write request from ReplicationStrategy. It then sends a RowMutation message to each of the N nodes.

Each of the N node will perform the following upon receiving the RowMutation message.

  • Append the write to the commit log. The commit log is the crash recovery mechanism and there’s a single commit log for each node.
  • Update the in-memory Memtable data structure with the write. Memtables are organized in sorted order by the row key and maintained per column family. It is possible to have multiple Memtables for a single column family, but only one of them will be current. The rest can be waiting to be flushed, which we’ll discuss later.

That’s it. The write request is served. Because the write consists of append to a file and update in-memory data structure, and there’s no update in place on disk, write on Cassandra is fast.

2. SSTable

But that’s not the entire story. Memory space is limited and transient. When a Memtable is full, it’s flushed to disk-based structure called SSTables sequentially. SSTable is also maintained per column family and multiple SSTables can exist for a single column family. Each SSTable has a bloom filter associated with it, which we’ll discuss separately.

When there’re lots of SSTables, a process called compaction can happen to merge multiple SSTables into one. The old SSTables will be deleted after compaction. Because the data in each SSTable is sorted, the merge process is essentially similar to the “merge” step of a mergesort. We’ll cover compaction in detail in another post.

3. Hinted Handoff

If one of the N nodes where the write belongs to has failed, the node which receives the write will create a hint. When the failed node is back online, the node will send the hint to the recovered node. This is called Hinted Handoff.

One concern for hinted handoff is that a lots of hints may be created for a node which goes offlien for quite some time. And once the node is back, it is going to be flooded by the hints. It is possible to disable hinted handoff or reduce the priority of hinted handoff messages to avoid this issue.

4. Performance Tuning for Writes

As suggested by reference 6, there’s a single strategy to improve write performance — use separate disks for commit logs and SSTables. Because write requires appending to commit logs all the time, it is important to ensure it’s not affected  by other disk operations like flushing data to SSTables.

References:
1. Cassandra Wiki Architecture Internals: http://wiki.apache.org/cassandra/ArchitectureInternals
2. Cassandra Internals – writing: http://www.mikeperham.com/2010/03/13/cassandra-internals-writing/
3. Cassandra Operations: http://wiki.apache.org/cassandra/Operations
4. Cassandra: The Definitive Guide
5. About Writes in Cassandra: http://www.datastax.com/docs/1.0/dml/about_writes

6. Cassandra Performance Tuning: http://wiki.apache.org/cassandra/PerformanceTuning

Apache Cassandra: How to Retrieve All Rows from a Column Family

I’m using Hector client to fetch data from Apache Cassandra database. I’ll need to fetch all rows from a column family. Hector client API has a RangeSlicesQuery class which calls get_range_slices method of Cassandra API to retrieve multiple rows at a time.

If the column family contains more than a few thousands of rows, it is common practice to get the results in several batches. This is known as paging.

Assume all data stored in our Cassandra database are string type. Below is Java method that accepts the keyspace name and column family name as input parameters, and retrieve all rows from the column family. It output a Map of row key to retrieving order.

private Map<String, Integer> retrieveAllRows(String keyspace, String columnFamilyName) {

    Map<String, Integer> resultMap = new HashMap<String, Integer>();

    String lastKeyForMissing = "";

    StringSerializer s = StringSerializer.get();

    RangeSlicesQuery<String, String, String> allRowsQuery = HFactory.createRangeSlicesQuery(keyspace, s, s, s);

    allRowsQuery.setColumnFamily(columnFamilyName);

    allRowsQuery.setRange("", "", false, 3);    //retrieve 3 columns, no reverse

    //allRowsQuery.setReturnKeysOnly();    //enable this line if we want key only

    allRowsQuery.setRowCount(100);

    int rowCnt = 0;

    while (true) {

        allRowsQuery.setKeys(lastKeyForMissing, "");

        QueryResult<OrderedRows<String, String, String>> res = allRowsQuery.execute();

        OrderedRows<String, String, String> rows = res.get();

        lastKeyForMissing = rows.peekLast().getKey();

        for (Row<String, String, String> aRow : rows) {

            if (!resultMap.containsKey(aRow.getKey())) {    

                resultMap.put(aRow.getKey(), ++rowCnt);

                System.out.println(aRow.getKey() + ":" + rowCnt);

            }

        }

        if (rows.getCount() != 100) {

            //end of the column family

            break;

        }

}

return resultMap;

}

In the code above, we retrieve 100 rows at a page. Note that the setKeys method will determine the rows to select. Because the rows selected will include both the starting the ending key, we need to exclude the first row from second execute() call onwards. This is done by checking the row key is already put into our resultMap structure. There’re other ways to do this check.

How to Install Apache Cassandra on Ubuntu Linux

Apache Cassandra is a column oriented distributed database system. This post covers how to install Apache Cassandra on a single machine for development purpose.

Download
0. Make sure Java JDK 1.6 or higher is installed and configured properly.
1. Download Apache Cassandra from its website at http://cassandra.apache.org/download/.
2. Extract the downloaded file using the command below,

tar xvf apache-cassandra-x.x.x-bin.tar.gz

3. Append the following lines to ~/.bashrc

export PATH=$PATH:<path to cassandra directory>/bin

As an example, in my machine it is like,

export PATH=$PATH:/home/jicheng/Downloads/apache-cassandra-1.1.2/bin

Configure
1. The configuration files are located at conf folder of the cassandra root directory.

  • cassandra.yaml: cassandra storage configuration file
  • log4j-tools.properties: log configuration file
  • log4j-server.properties: cassandra server log configuration file
  • cassandra-evn.sh: cassandra running environment configuration file
  • commitlog_archiving.properties: commitlog archiving configuration file
  • cassandra-topology.properties: topology configuration file

2. Make sure the paths exist for data_file_directory, commitlog_directory and saved_caches_directory. If not, create the directories. In cassandra version 1.1.2, the path is /var/lib/cassandra, then we can use the command below to create the directory.

$ sudo mkdir -p /var/lib/cassandra
$ sudo chown -R `whoami` /var/lib/cassandra

3. Also ensure the log directory exist. The default log directory is /var/log/cassandra. Use the command below to create it.

$ sudo mkdir -p /var/log/cassandra
$ sudo chown -R `whoami` /var/log/cassandra

Run
Since we have set the path for cassandra, we can start a new terminal, and type the following command to run it:

$ cassandra –f

-f means to run cassandra in foreground mode, so we can stop it by pressing Ctrl+C.

Connect and Play with Cassandra-cli
Cassandra comes with a command line interface tool called cassandra-cli. We can connect to the Cassandra and play with it using cassandra-cli.

Figure 1.Connect to Cassandra with cassandra-cli