Pages

Sunday 4 March 2012

BulkLoad To the Cassandra with the Hadoop



BulkOutputFormat
To bulkload the data to the Cassandra using Hadoop Cassandra introduces new OutputFormat that is BulkOutputFormat. Cassandra has implemented it in such a way that each map or reduce(depends on implementation) will generate sstables with data provided and then stream them to Cassandra with sstableloader. Don’t worry you need not know about all this implementation details to use BulkOutputFormat, all you need to know is some job configuration  and basic thrift call to create columns and mutations.
  1. Initial Setup to write Hadoop job with BulkOutputFormat
  2. Cassandra Related configuration
  3. Using BulkOutputFormat

1)Initial Setup to write Hadoop job with BulkOutputFormat

  1. To start with the development you need to have all the jars from Cassandra-1.1.x into classpath.
  2. All the Hadoop related jars should be in classpath.
  3. For execution of this job you should have all the Cassandra jars in classpath of Hadoop.

2)Cassandra Related configuration

We have to set following properties to Hadoop configuration Object.
      Keyspace name and ColumnFamily name:
      Name of the keyspace and column family for which we are creating sstables and streaming to Cassandra.
Use ConfigHelper class from Cassandra to set both these properties to Configuraion object of hadoop.
e.g

ConfigHelper.setOutputColumnFamily(conf,“keyspace_name”, columnFamily_name”);
      ThriftPort :
port for Thrift to listen for clients on
ConfigHelper.setOutputRpcPort(conf, “9160”);
(Note that you have to provide it as string)

    Seed node:
At least one node of the cluster you want to load data in should be configured as seed.

ConfigHelper.setOutputInitialAddress(conf, “10.10.10.10”);
(Note- ip address also have to specify as string)

     Partitioner:
 The partitioner is responsible for distributing rows (by key) across  nodes in the cluster.  Any  Partitioner may be used, including your  own as long as it is on the classpath.

ConfigHelper.setOutputPartitioner(conf,“org.apache.cassandra.dht.RandomPartitioner”);
    Buffer Size:
      SSTableSimpleUnsortedWriter buffers whatever input it gets in memory and “flush” everything in one sstable once the buffer size is reached.   That is, the resulting sstables will be approximately of size equal to buffer in MB.
You have to set this directly in hadoop configuration object
conf.set(“mapreduce.output.bulkoutputformat.buffersize
”, “64”);
(Note-  Buffer size is specified in string)
    OutputFormat class:
Set output format class for the job as BulkOutPutFormat

job.setOutputFormatClass(BulkOutputFormat.class);
So you have done with the basic configuration needed for the BulkOutputFormat.
Sample code:
Configuration conf = new Configuration();

ConfigHelper.setOutputColumnFamily(conf, “keyspace_name”, “columnFamily_name”);
ConfigHelper.setOutputRpcPort(conf, “9160”);
ConfigHelper.setOutputInitialAddress(conf, “10.10.10.10”);
ConfigHelper.setOutputPartitioner(conf,“org.apache.cassandra.dht.RandomPartitioner”);

conf.set(“mapreduce.output.bulkoutputformat.buffersize”, “64”);

Job job ;
job = new Job(conf);
job.setOutputFormatClass(BulkOutputFormat.class);

/*Rest of your job related configuration like input format , input path and all will go here*/

3)Using  BulkOutputFormat

If you look at the declaration of BulkOutputFormat  It looks like following

public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
That means you should have output key as Bytebuffer and Output value as list of Mutation.
It means in map or reduce you have to process  data to produce the rowkey as bytebuffer and columns corresponding to this row key as list of mutations. And when Hadoop calls

 context.write(ByteBuffer,List<Mutation>)
Sstables will be generated into map tmp directory and once the  record.close() is invoked it will start loading sstables to Cassandra.
You don’t need to know all this implementation details , all we need to know is create row-key of type ByteBuffer and columns in the form of List<Mutation> , and write them using context.write().Everything else will be taken care by Hadoop and Cassandra.

Creating row-key and list of mutations
ByteBuffer rowkey = org.apache.cassandra.utils.ByteBufferUtil.bytes(“rowkey”);
Column column = new Column();
            column.name = name;
            column.value = value;
            column.timestamp = timestamp;
            column.ttl = ttl;
List<Mutation> mutationList;
Mutation mutation = new Mutation();
            mutation.column_or_supercolumn = new ColumnOrSuperColumn();
            mutation.column_or_supercolumn.column = column;
            mutationList.add(mutation);
context.write(rowkey, mutationList);
IF you want to write the supercolumn
Create a list of all columns those are related to supercolumn.Add these columns to column list.Create supercolumn using this columnlist , create mutation out of this supercolumn and add this mutation to mutation list.

Example:
 
List<Column> columnList;
Column column1 = new Column();
            column1.name = name;
            column1.value = value;
            column1.timestamp = timestamp;
            column1.ttl = ttl;
Column column2 = new Column();
            column2.name = name;
            column2.value = value;
            column2.timestamp = timestamp;
            column2.ttl = ttl;
columnList.add(column1);
columnList.add(column2);
List<Mutation> mutationList;
SuperColumn superColumn = new SuperColumn(supercolumn, columnList);
            Mutation mutation = new Mutation();
            mutation.column_or_supercolumn = new ColumnOrSuperColumn();
            mutation.column_or_supercolumn.super_column = superColumn;
            mutationList.add(mutation);
context.write(rowkey, mutationList);

4)Tuning  generation and loading for optimize performance of the Cassandra

  •  Deciding on an appropriate value of the Buffer is very important part of the sstable generation.
  •  As setting this buffer size low results into faster generation and loading, but it puts load on cassndra and one should not set it to smaller value.
  • With small buffer size the large no of small size sstables will be generated which when loaded into the Cassandra takes too much time to complete compaction and puts load on Cassandra cluster.
  • Also you can turn minor compaction off while loading data into Cassandra for faster loading. (You can do this with nodetool setcompactionthreshoul)

25 comments:

Kannan said...

hi samarth, we are setting up cassandra on RHEL we are able to create table, insert, and select rows from a table now we are trying to load csv file. can you help us how to set up sstableloader in layman perspective

SAMARTH GAHIRE said...

Hi kannan,
What version of cassandra are you using?
Do you want to load this data from csv using Hadoop or just sstableloader?

Pushpalanka said...

Hi Samarth,
Thanks for the so useful post! We are hoping to use Cassandra for our final year project which need near real time execution for a big data load. It will be so useful if you can share which is more faster from Hadoop or just sstableloader. Thanks in advance!

SAMARTH GAHIRE said...

Hi Pushpalanka,
It depends how much data you have and in how much time you want to load it.
Of course With Hadoop it is faster but again it comes with Cost of Hadoop cluster and Large Cassandra cluster.

Kannan said...

hi samarth,

thanks for your reply, I am using version 1.1.0 and i am trying to load using sstableloader not with hadoop.

thanks
Kannan

SAMARTH GAHIRE said...

Hi Kannan,
What issues are you facing in sstableloader? I will surely help you throughout sstableloader setup process.

Ralph Pigweed said...

Is it possible to configure BulkOutputFormat so that it does not stream them to Cassandra, but rather leaves them intact on HDFS?

SAMARTH GAHIRE said...

Hi Ralph,
If you just want to create sstable and not to load them to cassandra you can easily do this with using Cassandra's SimpleUnsortedWriter API as I have explained in my previous post "http://www.shareitexploreit.blogspot.in/2012/02/cassandra-bulk-loader.html"

If you wish to do this with Hadoop using BulkOutPutFormat, there is no option to configure it.But you can write your own OutPutFormat which will simply write the sstable to HDFS and will not stream it to Cassandra.

Kannan said...

Hi Samarth,

Thanks and that would be better and convinient. I will send an email with more details on it to samarth.gahire@gmail.com

Thanks
Kannan.

Ralph said...

Hello Samarth,

Thank you for this useful post. I am trying to bulkload data into Cassandra with Hadoop. I am using BulkOutputFormat, cassandra 1.1.3, and hadoop 0.20.2. Something weird is happening as my sstables are generated in the tmp directory; however, during the streaming process, sstables are not streamed into Cassandra, and my log is stuck on: INFO JobClient:1288 - map 100% reduce 100%. I have also tried to stream them manually with sstableloader, but it is not working as well.
Do you have any ideas about this issue?

Thank you,
Ralph

SAMARTH GAHIRE said...

At the end of every map it should load the sstables created in tmp directory to the cassandra.
For me its working for Cassandra-1.1.0 and have tried it with different versions of Hadoop.
But in my case we do not have the reduce part, Its map only job.
As in your case sstables are generated then issue must be in configuration, double check the configuration.
Are you trying on single machine or you have different cluster for Cassandra and Hadoop?

Ralph said...

Samarth,

I am working on 5 machines, 4 machines have both Cassandra and Hadoop, and the last machine has Hadoop namenode and jobtracker. I think I understood what is the issue here. Let's say that I have a keyspace KS, and a column family CF. When I remove the folder KS in /tmp, and I execute my program, the bulkload works fine. However, if I run my program again without removing the folder /tmp/KS, it gets stuck on: 1288 - map 100% reduce 100% and the sstables in /tmp are not loaded into Cassandra.
Is this folder supposed to be removed after each load? and for some reasons it is not done with me, so I need to do it manually.

Ralph

SAMARTH GAHIRE said...

The tmp directory is created by hadoop for every map execution.It is cleaned up by hadoop after the completion of the map function. So looks like something is wrong with hadoop setup. see "java.io.tmpdir" this is the property to setup tmp directory for hadoop.
See "Directory Structure " in http://hadoop.apache.org/docs/r0.20.2/mapred_tutorial.html

Ralph said...

All right, you were right, I had to setup the tmp directory. It works fine now.

Thank you Samarth.

Ralph said...

Hello Samarth,

I was wondering, if we use BulkOutputFormat to bulk load data into Cassandra, and we set the number of map and reduce tasks to 1. Then should it take approximately the same time as if we were using Cassandra Bulk Loader to generate SSTables and stream them to the cluster? Because I tried both methods, when I used Cassandra's Bulk loader, it took about 4 minutes to load a 70 MB csv file, whereas with Cassandra's BulkOutputFormat, it took about 20 minutes to load the same csv file.

Thanks,
Ralph

SAMARTH GAHIRE said...

Hi Ralph,
You should know that Hadoop is all about carrying heavy load.

"An Elephan can't jump. But can carry heavy load"....

So for the small data size you will not experience the power of Hadoop and it is not recommended to use hadoop to process some MBs of data as it will spend much time in dividing and mapping task rather than actually processing data.
If you are processing (in this case bulkloading) say some several 100GiB of data then using BulkOutput Format make sense.

Ralph said...

Hello Samarth,

I managed to understand why my hadoop job was not working properly.

Here is the thing:
I am using BulkOutputFormat to load data from a .csv file into cassandra. I have 7 hadoop nodes: 1 namenode/jobtracker and 6 datanodes/tasktrackers. Cassandra is installed on 4 of these 6 datanodes/tasktrackers. My reduce task is to generate the SSTables and to stream them to the cluster. If I have 1 reducer per node everything goes right and it works fine. However, if one of my nodes has 2 reducers, SSTables are generated however when they are being streamed, I get a java.io.EOFException in the tasktracker's logs and a java.io.IOException: Broken pipe in the Cassandra logs. I managed to find what was causing this error. This happens when a node tries to stream SSTables generated while it's already streaming SSTables from the previous reduce task.

This error can be reproduced using SSTableloader to stream two sets of SSTables from the same node at the same time.

Is this a Cassandra bug? or is it just not possible in my case to set more than 1 reducer per machine to generate SSTables faster as they cannot be streamed at the same time? It would be a bit disappointing for me as I would just be using one core on machines that have 24 cores.

Cheers,
Ralph

SAMARTH GAHIRE said...

Hi Ralph,
Looks like your loader machines and Hadoop cluster is pretty strong.But what about the Cassandra cluster?
See, the size of the Cassandra cluster also impact the Loading performance, I have observe broken pipes when I try to load with multiple loader to small cluster.
But in production Cassandra cluster of 30 nodes, there is no problem loading data with multiple reducers.

Ralph said...

Samarth,

Thank you for your answer,
I might be wrong but I am not sure this issue has to do with the size of the cassandra cluster. I can reproduce this error by streaming simultaneously SSTables from the same node to the cassandra cluster using SSTableloader. It seems to me that Cassandra cannot handle receiving simultaneously SSTables from the same node. However, when it receives simultaneously SSTables from two different nodes, everything works fine.

In the production Cassandra cluster you are talking about, does a given node stream simultaneously several SSTables to the cassandra cluster or is it queued up?

Ralph

SAMARTH GAHIRE said...

Hi Ralph,
We are loading data into Cassandra cluster which runs Cassandra-1.1 and using Map only job, and multiple mappers running on same hadoop node to load data to cassandra and.
If you think this is the issue with cassandra you can always report this to cassandra jira.

Ralph said...

Alright, I see. Anyway, I will try to figure it out. Thank you for your answers and this helpful article!

Bhalchandra said...

Hi Samarth , I am using Cassandra 1.1.5 for my MTech project . I am trying to bulk load the data into 2 node cluster of cassandra with the help of SSTableLoader . I wrote the program to generate the SSTables. The problem with this program is it is generating the SSTables of size 4 times greater than the original data size. I have 1 GB data with 20 Million records in it and 5 columns , after this program finishes its execution , I have SSTables with me of size 4GB . This is so embarrasing. Again it is taking around 13 minutes to geneerate SSTables of data size 1GB ( Original) with 20 million records. Please help me regarding the issue.

Bhalchandra said...

Hi Samarth , I am using Cassandra 1.1.5 for my MTech project . I am trying to bulk load the data into 2 node cluster of cassandra with the help of SSTableLoader . I wrote the program to generate the SSTables. The problem with this program is it is generating the SSTables of size 4 times greater than the original data size. I have 1 GB data with 20 Million records in it and 5 columns , after this program finishes its execution , I have SSTables with me of size 4GB . This is so embarrasing. Again it is taking around 13 minutes to geneerate SSTables of data size 1GB ( Original) with 20 million records. Please help me regarding the issue.

SAMARTH GAHIRE said...

Regarding the size difference, the sstable data structure, the index and filter files created and the way you are writing this data to sstables requires extra space that will add to your original data size.
So you could expect difference in the generated sstables size and the original data size. In my project I saw the sstable size double that of raw data size.

Kannan said...

Hi samarth,

We are loading a new type of data in the csv format, in this format 95% of the columns will be null and cassandra is consuming lot space for these null values. id there a way to ignore null values while generation SSTables using SStableloader.

thanks
Kannan.