Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

11/22/2019

Reading time:4 min

Using the Cassandra Bulk Loader, Updated

by John Doe

We introduced sstableloader back in 0.8.1, in order to do bulk loading data into Cassandra.When it was first introduced, we wrote a blog post about its usage along with generating SSTable to bulk load.Now, Cassandra version 2.1.0 was released, and bulk loading has been evolved since the old blog post.Let's see how the change makes our life easier than before.What's changed?Specific changes are:sstableloader no longer participates in gossip membership to get schema and ring information. Instead, it just contacts one of the nodes in the cluster and ask for it. This allows you to bulk load from the same machine where cassandra is running, since it no longer listens at the same port with cassandra. Internally, streaming protocol is re-designed. You can stream data more efficiently than before. New CQLSSTableWriter is introduced(CASSANDRA-5894). You can now create SSTables using familiar CQL.In the old post, we showed two scenarios where sstableloader is used. Let's see how the changes work in those scenes.I use Apache Cassandra ver 2.1.0 through out this example, from cluster to running sstableloader.Example 1 - Loading existing SSTablesUsage of sstableloader has not changed much, but because it has to contact the node to get schema for loading SSTables, you have to specify the address(es) of the node by using -d option.So for example, you want to bulk load to $ bin/sstableloader -d 127.0.0.1 ~/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066fEstablished connection to initial hostsOpening sstables and calculating sections to streamStreaming relevant part of /data/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f/Keyspace1-Standard1-ka-6-Data.db /data/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f/Keyspace1-Standard1-ka-5-Data.db to [/127.0.0.1, /127.0.0.2, /127.0.0.3]progress: [/127.0.0.1]0:2/2 100% [/127.0.0.2]0:2/2 100% [/127.0.0.3]0:2/2 100% total: 100% 0  MB/s(avg: 5 MB/s)Summary statistics:   Connections per host:         : 1   Total files transferred:      : 6   Total bytes transferred:      : 98802914   Total duration (ms):          : 9455   Average transfer rate (MB/s): : 5   Peak transfer rate (MB/s):    : 11As you can see, some stats are printed out after the bulk load.Example 2 - Loading external dataPreviously, we had example that creates SSTables from CSV using UnsortedSimpleSSTableWriter and uses sstableloader to load it to Cassandra cluster in the old post.Schema there is created with thrift, and it has a simple, flat table structure.For this updated post, let's do more complex scenario with new CQLSSTableWriter.We will create real data from Yahoo! Finance to load historical prices of stocks in time-series manner.Schema definitionIf we take a look at CSV file for Yahoo!(YHOO), it has 7 fields in it. Date,Open,High,Low,Close,Volume,Adj Close2014-09-25,39.56,39.80,38.82,38.95,35859400,38.95...Let's use ticker symbol as our partition key, and 'Date' field as clustering key.So schema looks like: CREATE TABLE historical_prices (    ticker ascii,    date timestamp,    open decimal,    high decimal,    low decimal,    close decimal,    volume bigint,    adj_close decimal,    PRIMARY KEY (ticker, date)) WITH CLUSTERING ORDER BY (date DESC);We use CLUSTERING ORDER BY to query recent data easily.Generating SSTable using CQLSSTableWriterHow do you bulk load data to such a schema? If you choose to use UnsortedSimpleSSTableWriter as we did in the old post, you have to manually construct each cell of complex type to fit to your CQL3 schema. This requires you to have deep knowledge of how CQL3 works internally.Enter CQLSSTableWriter.All you need is DDL for table you want to bulk load, and INSERT statement to insert data to it. // Prepare SSTable writer CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();// set output directory builder.inDirectory(outputDir)       // set target schema        .forTable(SCHEMA)       // set CQL statement to put data        .using(INSERT_STMT)       // set partitioner if needed        // default is Murmur3Partitioner so set if you use different one.        .withPartitioner(new Murmur3Partitioner());CQLSSTableWriter writer = builder.build();// ...snip... while ((line = csvReader.read()) != null){    // We use Java types here based on     // https://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/DataType.Name.html#asJavaClass%28%29     writer.addRow(ticker,                  DATE_FORMAT.parse(line.get(0)),                  new BigDecimal(line.get(1)),                  new BigDecimal(line.get(2)),                  new BigDecimal(line.get(3)),                  new BigDecimal(line.get(4)),                  Long.parseLong(line.get(5)),                  new BigDecimal(line.get(6)));}writer.close();You can see complete example on my github.After you generating SSTable, you can just use sstableloader to target cluster as described before.There are still some limitations in CQLSSTableWriter, like you cannot use it in parallel, or user defined types are not supported yet.But we keep improving so stay tuned to Apache JIRA.Wrap upGenerating SSTable and bulk loading have been improved over the past release. There are many new features available to make your life easier.Start experimenting by yourself today!

Illustration Image

We introduced sstableloader back in 0.8.1, in order to do bulk loading data into Cassandra.
When it was first introduced, we wrote a blog post about its usage along with generating SSTable to bulk load.

Now, Cassandra version 2.1.0 was released, and bulk loading has been evolved since the old blog post.
Let's see how the change makes our life easier than before.

What's changed?

Specific changes are:

  • sstableloader no longer participates in gossip membership to get schema and ring information. Instead, it just contacts one of the nodes in the cluster and ask for it. This allows you to bulk load from the same machine where cassandra is running, since it no longer listens at the same port with cassandra.
  • Internally, streaming protocol is re-designed. You can stream data more efficiently than before.
  • New CQLSSTableWriter is introduced(CASSANDRA-5894). You can now create SSTables using familiar CQL.

In the old post, we showed two scenarios where sstableloader is used. Let's see how the changes work in those scenes.
I use Apache Cassandra ver 2.1.0 through out this example, from cluster to running sstableloader.

Example 1 - Loading existing SSTables

Usage of sstableloader has not changed much, but because it has to contact the node to get schema for loading SSTables, you have to specify the address(es) of the node by using -d option.

So for example, you want to bulk load to

 

$ bin/sstableloader -d 127.0.0.1 ~/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f

Established connection to initial hosts

Opening sstables and calculating sections to stream

Streaming relevant part of /data/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f/Keyspace1-Standard1-ka-6-Data.db /data/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f/Keyspace1-Standard1-ka-5-Data.db to [/127.0.0.1, /127.0.0.2, /127.0.0.3]

progress: [/127.0.0.1]0:2/2 100% [/127.0.0.2]0:2/2 100% [/127.0.0.3]0:2/2 100% total: 100% 0  MB/s(avg: 5 MB/s)

Summary statistics:

   Connections per host:         : 1

   Total files transferred:      : 6

   Total bytes transferred:      : 98802914

   Total duration (ms):          : 9455

   Average transfer rate (MB/s): : 5

   Peak transfer rate (MB/s):    : 11

As you can see, some stats are printed out after the bulk load.

Example 2 - Loading external data

Previously, we had example that creates SSTables from CSV using UnsortedSimpleSSTableWriter and uses sstableloader to load it to Cassandra cluster in the old post.
Schema there is created with thrift, and it has a simple, flat table structure.

For this updated post, let's do more complex scenario with new CQLSSTableWriter.
We will create real data from Yahoo! Finance to load historical prices of stocks in time-series manner.

Schema definition

If we take a look at CSV file for Yahoo!(YHOO), it has 7 fields in it.

 

Date,Open,High,Low,Close,Volume,Adj Close

2014-09-25,39.56,39.80,38.82,38.95,35859400,38.95

...

Let's use ticker symbol as our partition key, and 'Date' field as clustering key.
So schema looks like:

 

CREATE TABLE historical_prices (

    ticker ascii,

    date timestamp,

    open decimal,

    high decimal,

    low decimal,

    close decimal,

    volume bigint,

    adj_close decimal,

    PRIMARY KEY (ticker, date)

) WITH CLUSTERING ORDER BY (date DESC);

We use CLUSTERING ORDER BY to query recent data easily.

Generating SSTable using CQLSSTableWriter

How do you bulk load data to such a schema? If you choose to use UnsortedSimpleSSTableWriter as we did in the old post, you have to manually construct each cell of complex type to fit to your CQL3 schema. This requires you to have deep knowledge of how CQL3 works internally.
Enter CQLSSTableWriter.

All you need is DDL for table you want to bulk load, and INSERT statement to insert data to it.

 

// Prepare SSTable writer 

CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();

// set output directory 

builder.inDirectory(outputDir)

       // set target schema 

       .forTable(SCHEMA)

       // set CQL statement to put data 

       .using(INSERT_STMT)

       // set partitioner if needed 

       // default is Murmur3Partitioner so set if you use different one. 

       .withPartitioner(new Murmur3Partitioner());

CQLSSTableWriter writer = builder.build();

// ...snip... 

while ((line = csvReader.read()) != null)

{

    // We use Java types here based on 

    // https://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/DataType.Name.html#asJavaClass%28%29 

    writer.addRow(ticker,

                  DATE_FORMAT.parse(line.get(0)),

                  new BigDecimal(line.get(1)),

                  new BigDecimal(line.get(2)),

                  new BigDecimal(line.get(3)),

                  new BigDecimal(line.get(4)),

                  Long.parseLong(line.get(5)),

                  new BigDecimal(line.get(6)));

}

writer.close();

You can see complete example on my github.

After you generating SSTable, you can just use sstableloader to target cluster as described before.

There are still some limitations in CQLSSTableWriter, like you cannot use it in parallel, or user defined types are not supported yet.
But we keep improving so stay tuned to Apache JIRA.

Wrap up

Generating SSTable and bulk loading have been improved over the past release. There are many new features available to make your life easier.
Start experimenting by yourself today!

Related Articles

hive
elasticsearch
cassandra

GitHub - embulk/embulk: Embulk: Pluggable Bulk Data Loader.

embulk

12/1/2023

data.processing
cassandra

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

data.engineering