Bulk-Loading Data to Cassandra with sstable or JMX

The 'sstableloader' introduced from Apache Cassandra 0.8.1 onwards, provides a powerful way to load huge volumes of data into a Cassandra cluster. If you are moving from a cloud cluster to a dedicated cluster or vice-versa or  from a different database to Cassandra you will be interested in this tool. As shown below in whatever case if you can generate the 'sstable' from the data to be loaded into Cassandra, you can load it in bulk to the cluster using 'sstableloader'. I have tried it in version 1.1.2 here.

With this post I ll share my experience where I created sstables from a .csv file and loaded to a Cassandra instance running on same machine, which acts as the cluster here.
  1. sstable generation
  2. Bulk loading Cassandra using sstableloader
  3. Using JMX

'sstable' generation

To generate 'SSTableSimpleUnsortedWriter' the 'cassandra.yaml' file should be present in the class path. In Intellij Idea you can do it in Run-->Edit Configurations-->Application-->Configuration-->VM Options. There you should give the path to cassandra.yaml as follows.

-Dcassandra-foreground -Dcassandra.config=
file:///<path to/apache-cassandra-1.1.2/conf/cassandra.yaml> -ea -Xmx1G

Here is the simple code to generate the sstables according to the context I tried, referring the documentation in Datastax. With just few modifications you may be able to use it. I ll try to explain the code bit below.

SSTableSimpleUnsortedWriter eventWriter = new SSTableSimpleUnsortedWriter(
directory, partitioner, keySpace, "Events", AsciiType.instance,null, 64);
This writer does not assume any order in the rows. Instead it buffers the rows in memory and write them in sorted order. You can define a threshold on the amount of rows to be buffered to avoid loading entire data set in memory. Each time the threshold is reached one sstable is created and buffer is rested.

directory - The directory to write sstables
partitioner - strategy to distribute data over the nodes. I have used RandomPartitioner which use MD5 hash value to distribute data. This blogpost may help you to decide on what to use according to your context from RandomPartitioner and OrderPreservingPartitioner. There are two more partitioners available.
keySpace - the Keyspace name
"Events" - name of column family
AsciiType.instance - the column family comparator
null - the subComparator is set to null as this is not a super column family
64 - buffer size in MB. This should be decided upon the context to achieve best performance

With the following code we are creating the rows and adding columns of each row according to the entry read from .csv file. As for the Cassandra wiki one row can have upto 2Billion columns like this.

eventWriter.newRow(uuid);
eventWriter.addColumn(bytes("sourceAdd"),bytes(entry.sourceAdd), timestamp);
eventWriter.addColumn(bytes("sourceChannelType"),bytes(entry.sourceChannelType), 
timestamp);

The static nested class CsvEntry is used to read just the relevant fields from the csv row.

Once you run the code pointing to the csv file, there will be a directory created in the location you specified as 'directory'. Inside it you will find something similar to following which contains the created sstables.

Bulk loading Cassandra using sstableloader

Inside bin directory of Cassandra you can find this tool sstableloader. You can run it through command line pointing to the above generated sstables. Good guidance on that can be found in Datastax and this blog. Also you can directly use the class 'org.apache.cassandra.tools.Bulkloader' in java code to load the sstables to a Cassandra cluster.

If you are testing all this in localhost, following steps need to be taken to try out sstableloader.

  • Get a copy of the running Cassandra instance
  • Set another loop-back address. In Linux you can do it using,
sudo ifconfig lo:0 127.0.0.2 netmask 255.0.0.0 up
  • Set the rpc address and listen address of the copied /conf/casandra.yaml to 127.0.0.2. Of course you can set rpc address to 0.0.0.0 if you want to listen all interfaces.
  • Then from the copied Cassandra run sstableloader we run sstableloader from command line as follows,
./sstableloader -d 127.0.0.2 <path to generated sstables>
  • It needs to be noticed the path should end as /keyspace_name/columnfamily_name (eg : ...../CDRs/Events for the above screenshot)

Using JMX bulk load

You can also use this code to bulk load Cassandra from generated sstables. I received this from Cassandra user mailing list from Brian Jeltema. The main method needs to be run giving path to generated sstables as above, as an argument.

Comments

  1. Hello Pushpalanka

    Thankyou for putting up this blog.
    I was going through your code to generate sstables. I noticed that you discarded the id field while adding the values. Is it mandatory to index based on UUID?

    --Debu

    ReplyDelete
  2. Hello Debu,

    It's not mandatory that even Cassandra allows building our own indexes for applications. According the context I dealt with ID field was not essential and used UUID to have the benefits mentioned here, http://www.datastax.com/docs/0.6/data_model/uuids.

    --Pushpalanka

    ReplyDelete
  3. Thanks for your blog.It's very helpful for beginners.
    Here i tried your blog with cassandra 1.2.4 on LINUX.But, getting following error...

    [main] INFO org.apache.cassandra.config.DatabaseDescriptor - Loading settings from file:/xyz/xyz/apache-cassandra-1.2.4/conf/cassandra.yaml
    [main] ERROR org.apache.cassandra.config.DatabaseDescriptor - Fatal configuration error error
    Can't construct a java object for tag:yaml.org,2002:org.apache.cassandra.config.Config; exception=Cannot create property=hinted_handoff_throttle_in_kb for JavaBean=org.apache.cassandra.config.Config@1d6946d4; Unable to find property 'hinted_handoff_throttle_in_kb' on class: org.apache.cassandra.config.Config
    in 'reader', line 10, column 1:
    cluster_name: 'Test Cluster'
    .....................
    ...............

    What is this?Where am i missing?Am i missing to configure Xmx size?
    Please guide me.It'll be very great full for you.
    Thank you.

    ReplyDelete
  4. Can you please help, what all arguments do we need to pass for JMX program

    ReplyDelete
    Replies
    1. Hi Jibanendu,

      Use
      JMXLoader np = new JMXLoader("localhost", 7199);
      np.bulkLoad("/path/to/SSTable/Dirctory/created/from/CSV?FILE");

      after running the JMX code login to your local Cassandra setup
      and authenticate to your Keyspace
      Check with : list Column_Family_name Column to check

      Thanks Abhijit..;)

      Delete
    2. Your path name should be something like: /Keyspace_Name/Column_Family_Name

      Delete
  5. It looks like the SSTableSimpleUnsortedWriter internally depends on google-collection jar or guava. Can somebody let me know all the dependencies?

    ReplyDelete
  6. Hi,
    I am trying to uploading a huge csv file to Cassandra 2.0.7... but getting below error message ..
    Error: Could not find or load main class SSTableSimpleUnsortedWriter

    ReplyDelete
  7. Hi,
    I am getting an error as Invalid directory while using JMX bulk load... i mentioned the path of SSTable files as args... i tried all approaches.. help me out wit this error... i am able to load using SSTableLoader... Thankq

    The error is
    Exception in thread "main" java.lang.RuntimeException: java.lang.IllegalArgumentException: Invalid directory C:\Users\\(columnfamily)
    at org.apache.cassandra.service.StorageService.bulkLoad(StorageService.java:3818)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at sun.reflect.misc.Trampoline.invoke(MethodUtil.java:75)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at sun.reflect.misc.MethodUtil.invoke(MethodUtil.java:279)
    at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2(StandardMBeanIntrospector.java:112)
    at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2(StandardMBeanIntrospector.java:46)
    at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237)
    at com.sun.jmx.mbeanserver.PerInterface.invoke(PerInterface.java:138)
    at com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
    at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
    at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1487)
    at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:97)
    at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1328)
    at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1420)
    at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:848)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:322)
    at sun.rmi.transport.Transport$1.run(Transport.java:177)
    at sun.rmi.transport.Transport$1.run(Transport.java:174)
    at java.security.AccessController.doPrivileged(Native Method)
    at sun.rmi.transport.Transport.serviceCall(Transport.java:173)
    at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:556)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:811)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:670)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
    Caused by: java.lang.IllegalArgumentException: Invalid directory C:\Users
    at org.apache.cassandra.service.StorageService.bulkLoadInternal(StorageService.java:3832)
    at org.apache.cassandra.service.StorageService.bulkLoad(StorageService.java:3814)
    ... 36 more

    ReplyDelete

Post a Comment

Popular posts from this blog

Signing SOAP Messages - Generation of Enveloped XML Signatures

How to send an HTML email in Java (Using Google SMTP Server)

Install Liferay over a MySQL Database