For the purpose of bulk-loading external data into a cluster Cassandra 0.8 introduced the sstableloader
. The sstableloader
streams a set of sstables
into a live cluster without simply copying every table to every node, but only transfer the relevant part of the data to each node by conforming to the replication strategy of the cluster.
Using this tool with Hadoop makes a lot of sense since this should put less strain on the cluster, than having multiple Mappers or Reducers communicate with the cluster. Although the P2P concept of Cassandra should assist in such an approach, simple tests have shown that the throughput could be increased by 20-25% using the BulkOutputFormat
, which makes use of the sstableloader
, in Hadoop [Improved Hadoop output in Cassandra 1.1].
This article describes the use of the Cassandra BulkOutputFormat
with Hadoop and provides the source code of a simple example. The sample application implements the word count example – what else – with Cassandra 1.1.6 and Hadoop 0.20.205 .
Cassandra Wordcount Schema
The result of counting words should be a word number pair. Here the word is used as the key for a row containing a count column, which holds the value of the count for the word. For convenience the default_validation_class
is UTF8Type
and the result of the counting will also be serialized as a string. The schema used for this sample application is very simple and looks as follows:
create keyspace cassa_word_count with strategy_options = [{replication_factor:1}] and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy'; use cassa_word_count; create column family words and comparator = UTF8Type and default_validation_class = UTF8Type;
Job Configuration
Besides setting the job output format to org.apache.cassandra.hadoop.BulkOutputFormat
there are a few more settings to consider when writing a Cassandra bulk loading job with Hadoop. First you’ll have to provide some information about the Keyspace
and ColumnFamily
you plan to write the data to:
cassandra.output.keyspace
cassandra.output.columnfamily
Next you would have to set some of the required information about the cluster to make the sstableloader
work. It needs to be able to connect to the cluster and also know the partitioner, which is used to distribute rows by their key in the cluster. A last and optional parameter is for throttling the throughput capacity in MBits.
cassandra.output.partitioner.class
cassandra.output.thrift.port
cassandra.output.thrift.address
mapreduce.output.bulkoutputformat.streamthrottlembits
// cassandra bulk loader config conf.set("cassandra.output.keyspace", "cassa_word_count"); conf.set("cassandra.output.columnfamily", "words"); // OrderPreservingPartitioner as an alternative conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner"); conf.set("cassandra.output.thrift.port","9160"); // default conf.set("cassandra.output.thrift.address", "127.0.0.1"); conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400"); job.setOutputFormatClass(org.apache.cassandra.hadoop.BulkOutputFormat.class);
Mapper
There is nothing special about the Mapper. The keys and values for input and output are the same as any other Hadoop job would have.
public class CassaWordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { .... } }
Reducer
While there is nothing special about the Mapper, the output of this Reducer is different from a “usual” Reducer. In this sample application the Reducer creates the columns and keys which are than emitted and finally written to the BulkOutputFormat
.
Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
Thrift is being used to create key and columns of the row. Here the row key is a word and the row only has one column – ‘count’ – which holds the total amount of counted words. Both the value of the column and the key get serialized as Strings, which makes sense for the key but not as much for the value.
public class CassaWordCountReducer extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { List<Mutation> columnsToAdd = new ArrayList<Mutation>(); Integer wordCount = 0; for(IntWritable value : values) { wordCount += value.get(); } Column countCol = new Column(ByteBuffer.wrap("count".getBytes())); countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes())); countCol.setTimestamp(new Date().getTime()); ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn(); wordCosc.setColumn(countCol); Mutation countMut = new Mutation(); countMut.column_or_supercolumn = wordCosc; columnsToAdd.add(countMut); context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd); } }
This simple example should get you started with Cassandra and it’s Hadoop BulkOutputFormat
. You can download the source of the example here. It comes with the Shakespeare repository for counting words. Please also consider reading the provided README.
For further information about the sstableloader
or the BulkOutputFormat
can be found here:
Hi Henning,
I was able to your code. At first, I had the an exception saying that “you must the output using setOutputColumnFamily” as I added the following configurations
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), “192.168.1.3”);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), “cassa_word_count”, “words”);
String query = “INSERT INTO cassa_word_count.words (words, count) VALUES (‘?’,’?’)”;
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
ConfigHelper.setOutputPartitioner(job.getConfiguration(), “Murmur3Partitioner”);
job.setOutputFormatClass(CqlOutputFormat.class);
And thus I was able to run the code without any problem. BUT to my greatest surprise, when I query cassandra on its shell with the command
select * from cassa_word_count.words; I had the following results:
words | count
——-+——-
That is my database is empty.
I think I missing something simple but don’t know what? Please any help will be welcome. Thanks.
LikeLike