Pages

Sunday 26 February 2012

Cassandra Bulk Loader


Bulk Loading into Cassandra cluster was used to be a difficult task. But if you have all the data to be inserted in place , loading data to the Cassandra is going to be a cakewalk with utility that Cassandra has introduced  in Cassandra-0.8.1.
I tried inserting the data to the Cassandra using Cassandra Java Client  Pelops and Python Client Pycassa. But for the heavy data load we have (70 GB of text data) we found that both of these libraries are not helpful.
After lots of effort on Pycassa and Pelops we tried the sstableloader and this utility is more robust  and time efficient. Sstableloader  utility loads the available or generated sstables into Cassandra .
In this Blog I am going to cover
  1. Generating sstables
  2. Loading sstables with sstableloader
  3. Improving performance of generation and loading
  4. Tuning  generation for optimize performance of the Cassandra
  5. Advantage of sstableloader utility over other bulk loading  techniques.
Sstableloader:
Cassandra 0.8.1 introduces a new tool sstableloader to load the sstables to Cassandra and this is the fastest way to insert the data into Cassandra. ( Later in this document we will see how to use sstableloader )
sstableloader is a tool that, given a set of sstable data files, streams them to a live cluster. It does not simply copy the set of sstables to every node, but only transfers the relevant part of the data to each, conforming to the replication strategy of the cluster.So the sstable needs the data to be in the sstable format,Lets see how to generate sstables from the row or text data. 

Why Use SStable Generator?
Ssstableloader is the best way to insert the data into Cassandra and it requires the data in the form of  sstables.
We have the raw( text ) data which is not in the form of sstables. So we need to create sstables from this raw data .
So to convert this raw data into the sstables the simplest solution is the new Java class SSTableSimpleUnsortedWriter introduced in Cassandra 0.8.2. 

1) Generating SSTables

SSTableSimpleUnsortedWriter API

org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter 

a) Creating  Writer :

Here we get the writer which creates sstable for a given column family of given keyspace

SSTableSimpleUnsortedWriter columnFamilyWriter = new SSTableSimpleUnsortedWriter(
        directory,
        keyspace,
        "ColumnFamilyName",
        Utf8.instance,
        null,
        64);
  • The directory is the directory where to put the sstables ,this directory is object of File class in java
  • Keyspace is  the keyspace of the column families (a String).
  • Next, there are the column family name and the comparator and sub-columns comparator–here, we don’t use super columns so the sub-columns comparator is null.
  • The last parameter is a “buffer” size:
  • SSTableSimpleUnsortedWriter buffers whatever input it gets in memory and “flush” everything in one sstable once the buffer size is reached.
  • That is, the resulting sstables will be approximately of size equal to buffer.
b) Writing to sstables :

Here using above writer we populates rows in column family with column names and value specified.

for (...each Row ,curresponding Column and Value)
{
    columnFamilyWriter.newRow(byte(RowKey));//byte converts String into ByteBuffer
    columnFamilyWriter.addColumn(bytes("ColumnName"), bytes(“ColumnValue”),
timestamp);


                        }

c) Writing to sstables with TTL :

TTL is time to live .it is provided in seconds and the column will be expired after   the given seconds.

for (...each Row ,curresponding Column and Value)
{
    columnFamilyWriter.newRow(byte(RowKey));//byte converts String into ByteBuffer
    columnFamilyWriter.addExpiringColumn(bytes("ColumnName"), bytes(“ColumnValue”), timestamp, ttl, expirationTimestampMS );
}

Note: expirationTimestampMS -this is the server time timestamp used for actually
expiring the column.It should be (insertion time in milliseconds + TTL in milliseconds)
 
 d) Closing sstablewriter  :
columnFamilyWriter.close();
 
Things to remember to compile and run the program for sstables generation:
To compile this file the Cassandra jar (>= 0.8.2) needs to be in the classpath (javac -cp <path_to>/apache-cassandra-0.8.2.jar DataImportExample.java). To run it, the Cassandra jar needs to be present as well as the jar of the librairies used by Cassandra (those in the lib/ directory of Cassandra source tree). Valid cassandra.yaml and log4j configuration files should also be accessible; typically, this means the conf/ directory of the Cassandra source tree should be in the classpath–see here for a typical launch script that sets all those. As of 0.8.2, you will need to set the data_file_directories and commitlog_directory directives in said cassandra.yaml to accessible directories, but not ones of an existing Cassandra node. (This will be fixed in 0.8.3, but in the meantime using /tmp for both is a good idea.) The only useful property you need to set up for SSTableSimpleUnsortedWriter is the partitioner you want to use.
Once You have sstables in place the further step is to load these sstables to Cassandra cluster,Here the sstable loader comes in picture.

2) Loading sstables with sstableloader
Configuration :
  1. To learn the topology of the cluster, the number of nodes, which ranges of keys each node is responsible for, the schema, etc., sstableloader uses the Cassandra gossip subsystem. It thus requires a directory containing a cassandra.yaml configuration file in the classpath
  2. In this config file, the listen_address, storage_port, rpc_address and rpc_port should be set correctly to communicate with the cluster, and at least one node of the cluster you want to load data in should be configured as seed. The rest is ignored for the purposes of sstableloader.
  3. Note that the schema for the column families to be loaded should be defined beforehand, using you prefered method: CLI, thrift or CQL.
  4. Remove the initial token in Cassandra.yaml
  5. Change the listen_address and rpc_address in Cassandra.yml to private ip of the loader machine.
Running Sstable-Loader :
We will need cassandra 0.8 or newer version to run the sstable –loader (We are using cassandra0.8.2 ) 
  1. With sstableloader, you first need the sstables–only the -Data and -Index components are required, the others (-Statistics and -Filter) will be ignored
  2. These sstables have to be in a directory whose name is the name of the keyspace of the sstables. This is how they will be stored in either the main data directory, or a snapshot.
    Say
    TestKeyspace is the name  our keyspace, then will require all the sstables in the directory named TestKeyspace
  3. Then, assuming sstableloader is configured to talk to your multi-node cluster:
        Go to CASSANDRA_HOME/bin and say
        $ ./sstableloader TestKeyspace
        This will start loading sstables to the cluster sstable loader configured to talk
3) Improving performance of generation and loading

  1. If the JVM and  GC tuning is not done properly you will not experience the power of this utility. For the generation while running the program you should provide the appropriate -Xmx and -Xms value. As sstablewriter keeps writing into  heap  still sstable reach  buffer size, providing the sufficient heap is very essential. Too small size might result into Out of memory error.

    Java  -ea -cp $CLASSPATH  –Xms3000M –Xmx3000M \
    -Dlog4j.configuration=log4j-tools.properties
    your.program.for.SstableGenerator

    I have experienced the better result with 3000MB of heap size for 300MB of buffer size.
  2. Also this is very essential to write the row once and all the columns following it and not repeating call to newRow for the same row and different column again and again. For that you may iterate over certain no of rows and store the columns for the each row in some Collection may be Map. And then you can call to newRow() once and addColumn() for all the columns for that row.I have observed  50 % of performance growth with this approach.
    For Example:

    for (...each row and corresponding columns ...)

    {
    columnFamilyWriter.newRow(bytes(uuid));

        columnFamilyWriter.addColumn(bytes("firstname"), bytes(entry.firstname), timestamp);
        columnFamilyWriter.addColumn(bytes("lastname"), bytes(entry.lastname), timestamp);
        columnFamilyWriter.addColumn(bytes("passsword"), bytes(entry.password), timestamp);
        columnFamilyWriter.addColumn(bytes("age"), bytes(entry.age), timestamp);
        columnFamilyWriter.addColumn(bytes("email"), bytes(entry.email), timestamp);}
  3. Also while loading you should have allocated sufficient heap to sstableloader for better performance. I have observed  better performance with 4000MB of heap for 60 GB of sstables.
    In our case 60 GB of sstables were loaded in 20-25 mins .
    To set the -Xmx and -Xms   for sstableloader:

        a. Open file "CASSANDRA_HOME/bin/sstableloader.sh"
        b. search for
            $JAVA -ea -cp $CLASSPATH -Xmx256M \
            -Dlog4j.configuration=log4j-tools.properties \
            org.apache.cassandra.tools.BulkLoader "$@"
        c. in above part of the file change –Xmx and add –Xms. Have same value for both option.
4)  Tuning  generation and loading for optimize performance of the Cassandra
  1. Deciding on an appropriate value of the Buffer is very important part of the sstable generation.
  2. As setting this buffer size low results into faster generation and loading, but it puts load on cassndra and one should not set it to smaller value.
  3. With small buffer size the large no of small size sstables will be generated which when loaded into the Cassandra takes too much time to complete compaction and puts load on Cassandra cluster.
  4. If the size of sstable generated is “X” before loading then the size of sstable you will find after loading into Cassandra is
    (X * replication_factor) /( number_of_nodes_into_cluster)

    You can observe that as the no of nodes increases the size of sstable will be smaller .This will help you to decide the buffer size as per you cluster size.
  5. Also you can turn minor compaction off while loading data into Cassandra for faster loading.(You can do this with nodetool setcompactionthreshold)
  6. While loading is in progress the data becomes available only after the rebuilding of indexes is completed(Won’t  take that much time).
5) Advantage of sstableloader utility over other bulk loading  techniques.
  1. It do not put much load on loader machine as well as Cassandra cluster.
  2. Of course it’s  faster than any other loading methods.
  3. In all other bulk loading techniques you keep hitting Cassandra all the time. On the other hand in sstableloader while generating the sstables you do not put a load on Cassandra cluster(Provided you are not using the same machine on which Cassandra is running to generate the sstables)
  4. In our case to generate 140 GB of sstables from text data of 70GB it took 3 hours, and loading is completed in just an hour. That is we are hitting the Cassandra cluster just for an Hour to load the data.
 References:

16 comments:

Amila said...

Hi,
Is there a way to load sstables to cassendra using sstableloader form a java code rather from this commandlines ??

SAMARTH GAHIRE said...

Hi Amila,
You can check /bin/sstableloader file, Here in it is calling " org.apache.cassandra.tools.BulkLoader "$@" ".
If you could set all the configuration mention in this file you should be able to call bulk-loading easily from your java module.

Amila Paranawithana said...

Hi Samarth,
Thanks for the reply, but I am bit confused. Can u please help me in giving a sample code on how to invoke sstableloader for a particular keyspace from java code? and what are the special configurations I need to do?

SAMARTH GAHIRE said...

Hi Amila,
From your java code you can call " org.apache.cassandra.tools.BulkLoader.main(args) ".
Where "args" will be array of values you supply during the run of sstableloader from command line and it should work.

Anonymous said...

Hi I wrote a sstable generator. I bundled it in a JAR file.

I put it into the cassandra home dir.

But when I tried to run, it did not see the yaml and other conf files.

Error:

log4j:WARN No appenders could be found for logger (org.apache.cassandra.config.DatabaseDescriptor).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Cannot locate cassandra.yaml
Fatal configuration error; unable to start server. See log for stacktrace.

any idea about that problem ?

SAMARTH GAHIRE said...

Hi,
your sstable-generation program should have all the following in class-path while executing

1) CASSANDRA_HOME
2) CASSANDRA_HOME/lib/*.jar
3) CASSANDRA_HOME/conf
4) JAVA

Run the program with JVM argument "-Dlog4j.configuration=log4j-tools.properties"

Looking at the error looks like you are not having conf directory in classpath.

Amila said...

Hi,
I got the same error and I resolved it in following manner.If you are using Eclipse IDE add the Cassandra/conf folder path to classpath-->Advanced settings-->Add External Folder--> then add apache-cassandra-x.x.x/conf folder in to path.

Anonymous said...

Hi samarth,
I am new to Cassandra. Just trying stuff out. I cant seem to be able to run the sstableloader utility through cmd on windows. What should i do ?

SAMARTH GAHIRE said...

The sstableloader utility is not available for windows system till Cassandra-1.1.0.
For the further releases I am not sure but you can check in Cassandra/bin directory if the batch file named "sstableloader" is present there or not.

Anonymous said...

Thanks for replying. I am using v1.1.4 , but no batch file as of now :( . Instead I am calling BulkLoader() from a JAVA program like you mentioned in one of the comments. But problem is that its quite slow (streams at 1-2 MBps)
Another question. I am using CQL JDBC Batch inserts to insert a huge CSV file (1GB) to the DB. But problem is again slower speed(20 mins ) And Am not getting speeds that people seem to be getting (50k rows /sec) even after multithreading .What would you suggest ? Thanks.

FYI
System Spec : 2 GB RAM, Windows7

SAMARTH GAHIRE said...

Sorry for late reply on this,
Have you tried tuning and optimization that I have described above in this same post?
Have you enabled compression for the column family you are loading data into? if yes try tuning compression_length_in_kb parameter.
Also the size of the cassandra cluster plays important role in loading performance.
Hope this helps.

Bhalchandra said...

Hi,
I am observing better performance for 64 MB buffer size than 300MB . Setting 300 MB buffer size takes much longer time to generate SSTables.Again setting up heap size 3000MB dosen't help ...Any suggestions Samarth ?

SAMARTH GAHIRE said...

64MB Buffer size means your sstables size will be around 64MB, so the number of sstables will be more than what it would be with 300MB buffer. With sstables its not more the merrier I mean more no of sstables will result in more frequent minor compaction and also hamper your read performance(as data will be scattered across sstables). You can see the faster writes as the load of managing memory and sorting the rows per memtable is less with the small buffer size.I would not recommend the smaller buffer size.
Again with the heap its not like more the heap faster the performance. If the allocated heap is not sufficient to run the program efficiently then increasing heap does make sense. If your program want 1GB to perform optimally and you are allocating 3GB it is not going to change the world. Try with larger buffer size and experiment with heap from say 500MB, 1GB, 2GB ....

Amey said...

Hey Samarath

I am generating Cassandra SSTables using the bulk loading techniques as you have mentioned in the post.

My question is how much disk space is ideally consumed by the SSTable files ? In my case my data CSV file is 45 GB and the total disk space consumed by SStables for this specific file is around 250GB. Is there something that I am missing while creating these tables ? Are there any compression options available for generating sstables ?

The second step where I am loading the sstables using sstableloader works perfectly fine and data is available for querying in CQL.

Also, I would like to know if there are anyother techniques available to import large data into cassandra other than the bulkload method that I have mentioned above.

SAMARTH GAHIRE said...

To store data sstable certainly takes some extra space but not as much as you are mentioning.In your case it is 5 times more than the actual data size, then you are definitely doing something wrong.
Make sure that you do not write the same row again and again for different columns which belong to same row.Try to collect all the columns belonging to particular row and write them once against the row.
Regarding compression: Cassandra provides compression that you can enable per column family basis, there is also one parameter "chunk_size" that you might have to tweak when you enable compression.Once you have column family with compression enabled just bulkload the data into it and you will see a reduced consumption of memory on disk

Anonymous said...

Hi,

Followed your tutorial and i am able to generate the SSTable directory.
for loading i am using the below command:
./sstableloader -d 127.0.0.1 /Applications/apache-cassandra-1.2.5/var/lib/cassandra/data/Demo_Test/Users/
here
Demo_Test is a KeySpace
Users is ColumnFamily.

I am getting "null"

Please suggest me what should i do?

Thanks
Abhijit