Friday, June 29, 2012

Bulk-Loading Data to Cassandra with sstable or JMX

The 'sstableloader' introduced from Apache Cassandra 0.8.1 onwards, provides a powerful way to load huge volumes of data into a Cassandra cluster. If you are moving from a cloud cluster to a dedicated cluster or vice-versa or  from a different database to Cassandra you will be interested in this tool. As shown below in whatever case if you can generate the 'sstable' from the data to be loaded into Cassandra, you can load it in bulk to the cluster using 'sstableloader'. I have tried it in version 1.1.2 here.

With this post I ll share my experience where I created sstables from a .csv file and loaded to a Cassandra instance running on same machine, which acts as the cluster here.
  1. sstable generation
  2. Bulk loading Cassandra using sstableloader
  3. Using JMX

'sstable' generation

To generate 'SSTableSimpleUnsortedWriter' the 'cassandra.yaml' file should be present in the class path. In Intellij Idea you can do it in Run-->Edit Configurations-->Application-->Configuration-->VM Options. There you should give the path to cassandra.yaml as follows.

-Dcassandra-foreground -Dcassandra.config=file:///<path to/apache-cassandra-1.1.2/conf/cassandra.yaml> -ea -Xmx1G

Here is the simple code to generate the sstables according to the context I tried, referring the documentation in Datastax. With just few modifications you may be able to use it. I ll try to explain the code bit below.

SSTableSimpleUnsortedWriter eventWriter = new SSTableSimpleUnsortedWriter(directory, partitioner, keySpace, "Events", AsciiType.instance,null, 64);
This writer does not assume any order in the rows. Instead it buffers the rows in memory and write them in sorted order. You can define a threshold on the amount of rows to be buffered to avoid loading entire data set in memory. Each time the threshold is reached one sstable is created and buffer is rested.

directory - The directory to write sstables
partitioner - strategy to distribute data over the nodes. I have used RandomPartitioner which use MD5 hash value to distribute data. This blogpost may help you to decide on what to use according to your context from RandomPartitioner and OrderPreservingPartitioner. There are two more partitioners available.
keySpace - the Keyspace name
"Events" - name of column family
AsciiType.instance - the column family comparator
null - the subComparator is set to null as this is not a super column family
64 - buffer size in MB. This should be decided upon the context to achieve best performance

With the following code we are creating the rows and adding columns of each row according to the entry read from .csv file. As for the Cassandra wiki one row can have upto 2Billion columns like this.

eventWriter.newRow(uuid);
eventWriter.addColumn(bytes("sourceAdd"),bytes(entry.sourceAdd), timestamp);
eventWriter.addColumn(bytes("sourceChannelType"),bytes(entry.sourceChannelType), timestamp);

The static nested class CsvEntry is used to read just the relevant fields from the csv row.

Once you run the code pointing to the csv file, there will be a directory created in the location you specified as 'directory'. Inside it you will find something similar to following which contains the created sstables.

Bulk loading Cassandra using sstableloader

Inside bin directory of Cassandra you can find this tool sstableloader. You can run it through command line pointing to the above generated sstables. Good guidance on that can be found in Datastax and this blog. Also you can directly use the class 'org.apache.cassandra.tools.Bulkloader' in java code to load the sstables to a Cassandra cluster.

If you are testing all this in localhost, following steps need to be taken to try out sstableloader.

  • Get a copy of the running Cassandra instance
  • Set another loop-back address. In Linux you can do it using,
sudo ifconfig lo:0 127.0.0.2 netmask 255.0.0.0 up
  • Set the rpc address and listen address of the copied /conf/casandra.yaml to 127.0.0.2. Of course you can set rpc address to 0.0.0.0 if you want to listen all interfaces.
  • Then from the copied Cassandra run sstableloader we run sstableloader from command line as follows,
./sstableloader -d 127.0.0.2 <path to generated sstables>
  • It needs to be noticed the path should end as /keyspace_name/columnfamily_name (eg : ...../CDRs/Events for the above screenshot)

Using JMX bulk load

You can also use this code to bulk load Cassandra from generated sstables. I received this from Cassandra user mailing list from Brian Jeltema. The main method needs to be run giving path to generated sstables as above, as an argument.

Monday, June 11, 2012

Running Cassandra in a Multi-node Cluster

This post gathers the steps I followed in setting up an Apache Cassandra cluster in multi-node. I have referred Cassandra wiki and Datastax documentation in setting up my cluster. The following procedure is expressed in details, sharing my experience in setting up the cluster.
  1. Setting up first node
  2. Adding other nodes
  3. Monitoring the cluster - nodetool, jConsole, Cassandra GUI

I used Cassandra 1.1.0 and Cassandra GUI - cassandra-gui-0.8.0-beta1 version(As older release had problems in showing data) in Ubuntu OS.

Setting up first node

Open cassandra.yaml which is in 'apache-cassandra-1.1.0/conf'.
Change listen_address: localhost -->  listen_address: <node IP address>
         rpc_address: localhost -->  rpc_address: <node IP address>
- seeds: "127.0.0.1" --> - seeds: "node IP address"
The listen address defines where the other nodes in the cluster should connect. So in a multi-node cluster it should to changed to it's identical address of Ethernet interface. 
The rpc address defines where the node is listening to clients. So it can be same as node IP address or set it to wildcard 0.0.0.0 if we want to listen Thrift clients on all available interfaces.
The seeds act as the communication points. When a new node joins the cluster it contact the seeds and get the information about the ring and basics of other nodes. So in multi-node, it needs to be changed to a routable address  as above which makes this node a seed.

Note: In multi-node cluster, it is better to have multiple seeds. Though it doesn't mean to have a single point of failure in using one node as a seed, it will make delays in spreading status message around the ring.  A list of nodes to be act as seeds can be defined as follows,

- seeds: "<ip1>,<ip2>,<ip3>"

For the moment let's go forward with previous configuration with single seed. Now we can simply start Cassandra on this node, which will run perfect without the rest of the nodes. Let's imagine our cluster need increased performance and more data is feeding to the system. So it's the time to add another node to the cluster.

Adding other nodes


Simply copy the Apache Cassandra folder of first node to each of these. Now replace the listen_address: <node IP address> and rpc_address: <node IP address> as relevant for each node. (No need to touch seeds section) When we start each node now it will join the ring, using the seeds as hubs of the gossip network. In the logs it will show up the information related to other nodes in the cluster as it can see.


Monitoring the cluster

Nodetool - This is shipped with Apache Cassandra. We can run it being inside Cassandra folder with bin/nodetool . With the ring command of nodetool we can check some information of the ring as follows.

bin/nodetool -host <node IP address> ring


It has lot more useful functionalities which can be referred at site.

jConsole - We can use this to monitor usage of memory, thread behavior etc. It is so helpful to analyse the cluster in detail and to fine tune the performance. This guide also carries good information on using jConsole if you are not familiar with it already.


Cassandra GUI - This is to satisfy the need to visualize the data inside the cluster. With this we can see the content distributed across the cluster at one place.