Using the Cassandra Bulk Loader with Hadoop BulkOutputFormat

For the purpose of bulk-loading external data into a cluster Cassandra 0.8 introduced the sstableloader. The sstableloader streams a set of sstables into a live cluster without simply copying every table to every node, but only transfer the relevant part of the data to each node by conforming to the replication strategy of the cluster.

Using this tool with Hadoop makes a lot of sense since this should put less strain on the cluster, than having multiple Mappers or Reducers communicate with the cluster. Although the P2P concept of Cassandra should assist in such an approach, simple tests have shown that the throughput could be increased by 20-25% using the BulkOutputFormat, which makes use of the sstableloader, in Hadoop [Improved Hadoop output in Cassandra 1.1].

This article describes the use of the Cassandra BulkOutputFormat with Hadoop and provides the source code of a simple example. The sample application implements the word count example – what else – with Cassandra 1.1.6 and Hadoop 0.20.205 .

Cassandra Wordcount Schema

The result of counting words should be a word number pair. Here the word is used as the key for a row containing a count column, which holds the value of the count for the word. For convenience the default_validation_class is UTF8Type and the result of the counting will also be serialized as a string. The schema used for this sample application is very simple and looks as follows:

create keyspace cassa_word_count
    with strategy_options = [{replication_factor:1}]
    and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy';
use cassa_word_count;
create column family words 
    and comparator = UTF8Type
    and default_validation_class = UTF8Type;

Job Configuration

Besides setting the job output format to org.apache.cassandra.hadoop.BulkOutputFormat there are a few more settings to consider when writing a Cassandra bulk loading job with Hadoop. First you’ll have to provide some information about the Keyspace and ColumnFamily you plan to write the data to:

  • cassandra.output.keyspace
  • cassandra.output.columnfamily

Next you would have to set some of the required information about the cluster to make the sstableloader work. It needs to be able to connect to the cluster and also know the partitioner, which is used to distribute rows by their key in the cluster. A last and optional parameter is for throttling the throughput capacity in MBits.

  • cassandra.output.partitioner.class
  • cassandra.output.thrift.port
  • cassandra.output.thrift.address
  • mapreduce.output.bulkoutputformat.streamthrottlembits
// cassandra bulk loader config
conf.set("cassandra.output.keyspace", "cassa_word_count");
conf.set("cassandra.output.columnfamily", "words");
// OrderPreservingPartitioner as an alternative
conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
conf.set("cassandra.output.thrift.port","9160");    // default
conf.set("cassandra.output.thrift.address", "127.0.0.1");
conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");

job.setOutputFormatClass(org.apache.cassandra.hadoop.BulkOutputFormat.class);

Mapper

There is nothing special about the Mapper. The keys and values for input and output are the same as any other Hadoop job would have.

public class CassaWordCountMapper extends 
        Mapper<LongWritable, Text, Text, IntWritable>  {

    @Override
    public void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        ....
    }  
}

Reducer

While there is nothing special about the Mapper, the output of this Reducer is different from a “usual” Reducer. In this sample application the Reducer creates the columns and keys which are than emitted and finally written to the BulkOutputFormat.

Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>

 

Thrift is being used to create key and columns of the row. Here the row key is a word and the row only has one column – ‘count’ – which holds the total amount of counted words. Both the value of the column and the key get serialized as Strings, which makes sense for the key but not as much for the value.

public class CassaWordCountReducer extends 
        Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List<Mutation> columnsToAdd = new ArrayList<Mutation>();
        Integer wordCount = 0;
        for(IntWritable value : values) {
            wordCount += value.get();
        }
        Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
        countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
        countCol.setTimestamp(new Date().getTime());

        ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
        wordCosc.setColumn(countCol);

        Mutation countMut = new Mutation();
        countMut.column_or_supercolumn = wordCosc;

        columnsToAdd.add(countMut);
        context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
    }
}

This simple example should get you started with Cassandra and it’s Hadoop BulkOutputFormat. You can download the source of the example here. It comes with the Shakespeare repository for counting words. Please also consider reading the provided README.

For further information about the sstableloader or the BulkOutputFormat can be found here:

 

Advertisement

2 thoughts on “Using the Cassandra Bulk Loader with Hadoop BulkOutputFormat

  1. Hi Henning,

    I was able to your code. At first, I had the an exception saying that “you must the output using setOutputColumnFamily” as I added the following configurations
    ConfigHelper.setOutputInitialAddress(job.getConfiguration(), “192.168.1.3”);
    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), “cassa_word_count”, “words”);
    String query = “INSERT INTO cassa_word_count.words (words, count) VALUES (‘?’,’?’)”;
    CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
    ConfigHelper.setOutputPartitioner(job.getConfiguration(), “Murmur3Partitioner”);
    job.setOutputFormatClass(CqlOutputFormat.class);

    And thus I was able to run the code without any problem. BUT to my greatest surprise, when I query cassandra on its shell with the command
    select * from cassa_word_count.words; I had the following results:

    words | count
    ——-+——-

    That is my database is empty.

    I think I missing something simple but don’t know what? Please any help will be welcome. Thanks.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s