Illustration Image


The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.


Reading time:4 min

The Correct Way to Connect Spark Shell to Cassandra

by John Doe

tl;drusing the cassandra connector in the spark-shell is fairly straightforward setting up the connection in a way that doens’t break the existing sc is not documented anywhere the correct solution is to not call sc.stop but provide the cassandra host on startup of the shellApache Cassandra is a NoSQL distributed database that’s been gaining popularity recently. It’s also pretty high performance, scoring very high in a (not so) recent comparison of key-value stores (PDF) for different workloads. Among the contenders were HBase, Cassandra, Voldemort, Redis, VoltDB and MySQL, HBase tends to be the winner (by one to two orders of magnitude) when it comes to latency and Cassandra when it comes to throughput - depending on the number of nodes in cluster. A key-value store is nice, but it isn’t much use unless you have something doing reads and writes into it. That’s where spark comes in.Every data scientist’s[1] [2][3] favourite new toy spark is a distributed in-memory data processing framework. Cassandra very helpfully comes with a spark connector that allows you to pull data into spark as RDDs or DataFrames directly from Cassandra.Connecting to a Cassandra host from spark isn’t all that complicated, just import the connector and tell SparkConf where to find the Cassandra host from and you’re off to the races. The Cassandra connector docs cover the basic usage pretty well. Aside from the bazillion different versions of the connector getting everything up and running is fairly straightforward.Start the spark shell with the necessary Cassandra connector dependencies bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.0-M2-s_2.10.123456789101112131415import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.SQLContextimport com.datastax.spark.connector._# connect to a local cassandra instanceval conf = new SparkConf(true) .set("", "")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)# read in some data as a DataFrameval df = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "fooTable", "keyspace" -> "bar")).load.cache()Lovely, you now have a DataFrame that acts just like any other spark DataFrame. So far so good. Now let’s say you wanted to test something in the spark-shell and pull in data from Cassandra. No problem, just do what you did before, except that you need to stop the existing SparkContext that is created automagically when the shell starts up, before you can create a new one. This isn’t really documented anywhere, except sporadically on StackOverflow. The accepted answer is actually the wrong way to do this.123456789101112131415161718// DO NOT DO THISsc.stop // NOOOoooimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.SQLContextimport com.datastax.spark.connector._// connect to a local cassandra instanceval conf = new SparkConf(true) .set("", "")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)// read in some data as a DataFrameval df = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "fooTable", "keyspace" -> "bar")).load.cache()The SparkContext created above will not function like the old SparkContext created when the shell started up. This doesn’t actually have anything to do the Cassandra connector perse, it’s just that the setup for the Cassandra connector brings up this issue. To see the problem consider the following simplified code without the Cassandra connector.1234567891011import org.apache.spark.SparkConfimport org.apache.spark.SparkContextsc.stopval conf = sc.getConfval sc = new SparkContext(conf)val rdd = sc.parallelize(Array(0, 1), (1, 10), (2, 15), (3, 7))val map = Map(0->3, 1->2, 2->1, 3->0)val BVmap = sc.broadcast(map) r => BVmap.value(r._1))The above doesn’t do anything particularly worthwhile, but it illustrates the problem. Because the SparkContext was recreated the code will fail in the shell, due to sc being not serialisable anymore.The solution is extremely simple, but suprisingly difficult to find. Instead of calling sc.stop and then recreating the conf with the Cassandra host details added, just add the Cassandra host details to the configuration defaults in $SPARK_HOME/conf/spark-defaults.conf. Should you not have access to the default conf you can also provide the connection host in the call to spark-shellbin/spark-shell --packages datastax:spark-cassandra-connector:1.6.0-M2-s_2.10 --conf not being included in the official Cassandra connector documentation is bizarre.[3] I don’t like the term either but that’s what we seem to have settled for.Spark Cassandra Spark Cassandra Connector

Illustration Image


  • using the cassandra connector in the spark-shell is fairly straightforward
  • setting up the connection in a way that doens’t break the existing sc is not documented anywhere
  • the correct solution is to not call sc.stop but provide the cassandra host on startup of the shell

Apache Cassandra is a NoSQL distributed database that’s been gaining popularity recently. It’s also pretty high performance, scoring very high in a (not so) recent comparison of key-value stores (PDF) for different workloads. Among the contenders were HBase, Cassandra, Voldemort, Redis, VoltDB and MySQL, HBase tends to be the winner (by one to two orders of magnitude) when it comes to latency and Cassandra when it comes to throughput - depending on the number of nodes in cluster. A key-value store is nice, but it isn’t much use unless you have something doing reads and writes into it. That’s where spark comes in.

Every data scientist’s[1] [2][3] favourite new toy spark is a distributed in-memory data processing framework. Cassandra very helpfully comes with a spark connector that allows you to pull data into spark as RDDs or DataFrames directly from Cassandra.

Connecting to a Cassandra host from spark isn’t all that complicated, just import the connector and tell SparkConf where to find the Cassandra host from and you’re off to the races. The Cassandra connector docs cover the basic usage pretty well. Aside from the bazillion different versions of the connector getting everything up and running is fairly straightforward.

Start the spark shell with the necessary Cassandra connector dependencies bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.0-M2-s_2.10.

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import com.datastax.spark.connector._
# connect to a local cassandra instance
val conf = new SparkConf(true)
    .set("", "")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
# read in some data as a DataFrame
val df = sqlContext
    .options(Map("table" -> "fooTable", "keyspace" -> "bar")).load.cache()

Lovely, you now have a DataFrame that acts just like any other spark DataFrame. So far so good. Now let’s say you wanted to test something in the spark-shell and pull in data from Cassandra. No problem, just do what you did before, except that you need to stop the existing SparkContext that is created automagically when the shell starts up, before you can create a new one. This isn’t really documented anywhere, except sporadically on StackOverflow. The accepted answer is actually the wrong way to do this.

sc.stop // NOOOooo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import com.datastax.spark.connector._
// connect to a local cassandra instance
val conf = new SparkConf(true)
    .set("", "")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// read in some data as a DataFrame
val df = sqlContext
    .options(Map("table" -> "fooTable", "keyspace" -> "bar")).load.cache()

The SparkContext created above will not function like the old SparkContext created when the shell started up. This doesn’t actually have anything to do the Cassandra connector perse, it’s just that the setup for the Cassandra connector brings up this issue. To see the problem consider the following simplified code without the Cassandra connector.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
val conf = sc.getConf
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array(0, 1), (1, 10), (2, 15), (3, 7))
val map = Map(0->3, 1->2, 2->1, 3->0)
val BVmap = sc.broadcast(map) r => BVmap.value(r._1))

The above doesn’t do anything particularly worthwhile, but it illustrates the problem. Because the SparkContext was recreated the code will fail in the shell, due to sc being not serialisable anymore.

The solution is extremely simple, but suprisingly difficult to find. Instead of calling sc.stop and then recreating the conf with the Cassandra host details added, just add the Cassandra host details to the configuration defaults in $SPARK_HOME/conf/spark-defaults.conf. Should you not have access to the default conf you can also provide the connection host in the call to spark-shell

bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.0-M2-s_2.10 --conf

This not being included in the official Cassandra connector documentation is bizarre.

[3] I don’t like the term either but that’s what we seem to have settled for.

Related Articles


GitHub - andreia-negreira/Data_streaming_project: Data streaming project with robust end-to-end pipeline, combining tools such as Airflow, Kafka, Spark, Cassandra and containerized solution to easy deployment.




Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics


Explore Further
