Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

2/27/2020

Reading time:8 min

Apache Spark, Cassandra and Game of Thrones

by John Doe

Apache Spark with Cassandra is a powerful combination in data processing pipelines.  In this tutorial, we will build a Scala application with Spark and  Cassandra with battle data from Game of Thrones.  Now, we’re not going to make any show predictions!   But, we will show the most aggressive kings as well as kings which were attacked the most.  So, you’ve got that goin for you, which is kind of nice.Spark Cassandra OverviewOur primary focus is the technical highlights of Spark Cassandra integration with Scala.  To do so, we will load up Cassandra with Game of Thrones battle data and then query it from Spark using Scala.  We’ll use Spark from both a shell as well as deploying a Spark Driver program to a cluster.  We’ll have examples of Scala case class marshalling courtesy of the DataStax connector as well as using SparkSQL to produce DataFrames.   We’re also mix in some sbt configuration as well.There are screencasts and relevant links at the bottom of this post in the “Resources” section.The intended audience of this Spark Cassandra tutorial is someone with beginning to intermediate experience with Scala and Apache Spark.  If you would like to reach this level quickly and efficiently, please check out our On-Demand Apache Spark with Scala Training Course.Spark Cassandra Pre-RequisitesApache Cassandra (see resources below)Downloaded Game of Thrones data (see resources below)Apache SparkTutorial OutlineImport data into CassandraWrite Scala codeTest Spark Cassandra code in SBT shellDeploy Spark Cassandra to Spark Cluster with SBT and `spark-submit`Spark Cassandra ExamplePart 1: Prepare CassandraLet’s import the GOT battle data into Cassandra.  To keep things simple, I’m going to use a local running Cassandra instance.  I started Cassandra with bin/cassandra script on my Mac.  (use cassandra.bat on Windows, but you knew that already.).Next, connect to Cassandra with cqlsh and create a keyspace to use.  This tutorial creates a “gameofthrones” keyspace:CREATE KEYSPACE gameofthrones WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};From here, we create a table for the battle data.CREATE TABLE gameofthrones.battles (name TEXT,year INT,battle_number INT,attacker_king TEXT,defender_king TEXT,attacker_1 TEXT,attacker_2 TEXT,attacker_3 TEXT,attacker_4 TEXT,defender_1 TEXT,defender_2 TEXT,defender_3 TEXT,defender_4 TEXT,attacker_outcome TEXT,battle_type TEXT,major_death TEXT,major_capture TEXT,attacker_size TEXT,defender_size TEXT,attacker_commander TEXT,defender_commander TEXT,summer TEXT,location TEXT,region TEXT,note TEXT,PRIMARY KEY(battle_number));Then import the battles data using Cassandra COPY shown below.  (see Resouces section below for where to download data).  BTW, I needed to run a Perl script to update the end-of-line encodings from Mac to Unix on the CSV file using perl -pi -e 's/\r/\n/g.  Your mileage may vary.COPY battles (name,year,battle_number,attacker_king,defender_king,attacker_1,attacker_2,attacker_3,attacker_4,defender_1,defender_2,defender_3,defender_4,attacker_outcome,battle_type,major_death,major_capture,attacker_size,defender_size,attacker_commander,defender_commander,summer,location,region,note)FROM 'battles.csv' // update this location as necessaryWITH HEADER = true;That wraps Part 1.  Let’s move on to Part 2 where we’ll write some Scala code.Part 2: Spark Cassandra Scala Code(Note: All of the following sample code if available from Github.  Link in Resources section below.)To begin, let’s layout the skeleton structure of the project –mkdir got-battles // name it anything you'd likecd got-battles // if you named it got-battlesmkdir srcmkdir src/mainmkdir src/main/scalamkdir src/main/scala/commkdir src/main/scala/com/supergloomkdir projectNext, we’re going to add some files for sbt and the sbt-assembly plugin.First the build file for sbtgot-battles/build.sbt file:name := "spark-cassandra-example"version := "1.0"assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)// https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/5muNwRaCJnUassemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) { (old) => { case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last case x => old(x) }}scalaVersion := "2.10.6"resolvers += "jitpack" at "https://jitpack.io"libraryDependencies ++= Seq(// use provided line when building assembly jar// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",// comment above line and uncomment the following to run in sbt "org.apache.spark" %% "spark-sql" % "1.6.1", "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0")and the 1 line got-battles/project/assembly.sbt file:addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")And now let’s create the Spark driver code in got-battles/src/main/scala/com/supergloo called SparkCassandra.scalapackage com.superglooimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.SQLContextimport org.apache.spark.sql.functions._import com.datastax.spark.connector._import com.datastax.spark.connector.rdd._import org.apache.spark.sql.cassandra._/** * Simple Spark Cassandra * One example with Scala case class marshalling * Another example using Spark SQL */object SparkCassandra { case class Battle( battle_number: Integer, year: Integer, attacker_king: String, defender_king: String ) def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkCassandraExampleApp") if (args.length > 0) conf.setMaster(args(0)) // in case we're running in sbt shell such as: run local[5] conf.set("spark.cassandra.connection.host", "127.0.0.1") // so yes, I'm assuming Cassandra is running locally here. // adjust as needed val sc = new SparkContext(conf) // Spark Cassandra Example one which marshalls to Scala case classes val battles:Array[Battle] = sc.cassandraTable[Battle]("gameofthrones", "battles"). select("battle_number","year","attacker_king","defender_king").toArray battles.foreach { b: Battle => println("Battle Number %s was defended by %s.".format(b.battle_number, b.defender_king)) } // Spark Cassandra Example Two - Create DataFrame from Spark SQL read val sqlContext = new SQLContext(sc) val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "battles", "keyspace" -> "gameofthrones" )) .load() df.show // Game of Thrones Battle analysis // Who were the most aggressive kings? (most attacker_kings) val countsByAttack = df.groupBy("attacker_king").count().sort(desc("count")) countsByAttack.show() // Which kings were attacked the most? (most defender_kings) val countsByDefend = df.groupBy("defender_king").count().sort(desc("count")) countsByDefend.show() sc.stop() }}Part 3: Run Spark Cassandra Scala Code from SBT ConsoleStart up the sbt console via sbt.  Once ready, you can issue the run command with an argument for your Spark Master location; i.e. run local[5](Again, there’s a screencast at the end of this post which shows an example of running this command.  See Resources section below.)Depending on your log level, you should see various SparkCassandra outputs from the SparkCassandra code.  These console outputs from Cassandra is the indicator of success.  Oh yeah.  Say it with me now.  Oh yeahhhhhRunning code in the sbt console is a convenient way to make and test changes rapidly.  As I developed this code, there was a terminal open in one window and an editor open in another window.  Whenever I made a Scala source code change and saved, I could simply re-run in the sbt console.So now, let’s say we’ve reached the point of wanting to deploy this Spark program.  Let’s find out in the next section.Part 4: Assemble Spark Cassandra Scala Code and Deploy to Spark ClusterTo build a jar and deploy to a Spark cluster, we need to make a small change to our build.sbt file.  As you may have noticed from the code above, there are comments in the file which indicate what needs to be changed.  We should uncomment this line:// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",and comment out this line: "org.apache.spark" %% "spark-sql" % "1.6.1",then, we can run sbt assembly from command-line to produce a Spark deployable jar.  If you use the sample build.sbt file this will produce target/scala-2.10/spark-cassandra-example-assembly-1.0.jarTo deploy, use spark-submit with the appropriate arguments; i.e.spark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jarspark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jarConclusionSo, what do you think?  When you run the code, you can see the most aggressive kings and the kings which were attacked the most.  Without giving it away, I think one could argue whether Mance Rayder should be tied with Renly Baratheon on the most attacked list.  But, that’s not really the point of this tutorial.  As for the code and setup, do you have any questions, opinions or suggestions for next steps?   Spark Cassandra Tutorial ScreencastIn the following screencast, I run through the steps described in this tutorial.  Stay tuned because there is blooper footage at the end of the screencast.  Because I mean, why not bloopers.Spark Cassandra TutorialWatch this video on YouTubeSpark Cassandra Tutorial ResourcesAll source code including the battles.csv file I scrubbed using the perl script described above at Apache Spark Cassandra Example codehttps://github.com/datastax/spark-cassandra-connectorDataFrames with Cassandra ConnectorGame of Thrones Data: https://github.com/chrisalbon/war_of_the_five_kings_datasetAnd don’t forget the Spark Scala tutorials and speaking of Cassandra, you may find the Spark Thrift Server with Cassandra tutorial interesting too.Featured image credit https://flic.kr/p/dvpku1

Illustration Image
Spark Cassandra tutorial

Apache Spark with Cassandra is a powerful combination in data processing pipelines.  In this tutorial, we will build a Scala application with Spark and  Cassandra with battle data from Game of Thrones.  Now, we’re not going to make any show predictions!   But, we will show the most aggressive kings as well as kings which were attacked the most.  So, you’ve got that goin for you, which is kind of nice.

Spark Cassandra Overview

Our primary focus is the technical highlights of Spark Cassandra integration with Scala.  To do so, we will load up Cassandra with Game of Thrones battle data and then query it from Spark using Scala.  We’ll use Spark from both a shell as well as deploying a Spark Driver program to a cluster.  We’ll have examples of Scala case class marshalling courtesy of the DataStax connector as well as using SparkSQL to produce DataFrames.   We’re also mix in some sbt configuration as well.

There are screencasts and relevant links at the bottom of this post in the “Resources” section.

The intended audience of this Spark Cassandra tutorial is someone with beginning to intermediate experience with Scala and Apache Spark.  If you would like to reach this level quickly and efficiently, please check out our On-Demand Apache Spark with Scala Training Course.

Spark Cassandra Pre-Requisites

  1. Apache Cassandra (see resources below)
  2. Downloaded Game of Thrones data (see resources below)
  3. Apache Spark

Tutorial Outline

  1. Import data into Cassandra
  2. Write Scala code
  3. Test Spark Cassandra code in SBT shell
  4. Deploy Spark Cassandra to Spark Cluster with SBT and `spark-submit`

Spark Cassandra Example

Part 1: Prepare Cassandra

Let’s import the GOT battle data into Cassandra.  To keep things simple, I’m going to use a local running Cassandra instance.  I started Cassandra with bin/cassandra script on my Mac.  (use cassandra.bat on Windows, but you knew that already.).

Next, connect to Cassandra with cqlsh and create a keyspace to use.  This tutorial creates a “gameofthrones” keyspace:

CREATE KEYSPACE gameofthrones WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

From here, we create a table for the battle data.

CREATE TABLE gameofthrones.battles (
name TEXT,
year INT,
battle_number INT,
attacker_king TEXT,
defender_king TEXT,
attacker_1 TEXT,
attacker_2 TEXT,
attacker_3 TEXT,
attacker_4 TEXT,
defender_1 TEXT,
defender_2 TEXT,
defender_3 TEXT,
defender_4 TEXT,
attacker_outcome TEXT,
battle_type TEXT,
major_death TEXT,
major_capture TEXT,
attacker_size TEXT,
defender_size TEXT,
attacker_commander TEXT,
defender_commander TEXT,
summer TEXT,
location TEXT,
region TEXT,
note TEXT,
PRIMARY KEY(battle_number)
);

Then import the battles data using Cassandra COPY shown below.  (see Resouces section below for where to download data).  BTW, I needed to run a Perl script to update the end-of-line encodings from Mac to Unix on the CSV file using perl -pi -e 's/\r/\n/g.  Your mileage may vary.

COPY battles (
name,
year,
battle_number,
attacker_king,
defender_king,
attacker_1,
attacker_2,
attacker_3,
attacker_4,
defender_1,
defender_2,
defender_3,
defender_4,
attacker_outcome,
battle_type,
major_death,
major_capture,
attacker_size,
defender_size,
attacker_commander,
defender_commander,
summer,
location,
region,
note)
FROM 'battles.csv'  // update this location as necessary
WITH HEADER = true;

That wraps Part 1.  Let’s move on to Part 2 where we’ll write some Scala code.

Part 2: Spark Cassandra Scala Code

(Note: All of the following sample code if available from Github.  Link in Resources section below.)

To begin, let’s layout the skeleton structure of the project –

mkdir got-battles // name it anything you'd like
cd got-battles  // if you named it got-battles
mkdir src
mkdir src/main
mkdir src/main/scala
mkdir src/main/scala/com
mkdir src/main/scala/com/supergloo
mkdir project

Next, we’re going to add some files for sbt and the sbt-assembly plugin.

First the build file for sbt

got-battles/build.sbt file:

name := "spark-cassandra-example"
version := "1.0"
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
// https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/5muNwRaCJnU
assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) {
  (old) => {
    case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
    case x => old(x)
  }
}
scalaVersion := "2.10.6"
resolvers += "jitpack" at "https://jitpack.io"
libraryDependencies ++= Seq(
// use provided line when building assembly jar
// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",
// comment above line and uncomment the following to run in sbt
   "org.apache.spark" %% "spark-sql" % "1.6.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0"
)

and the 1 line got-battles/project/assembly.sbt file:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

And now let’s create the Spark driver code in got-battles/src/main/scala/com/supergloo called SparkCassandra.scala

package com.supergloo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd._
import org.apache.spark.sql.cassandra._
/**
  * Simple Spark Cassandra 
  * One example with Scala case class marshalling
  * Another example using Spark SQL 
  */
object SparkCassandra {
  case class Battle(    
    battle_number: Integer,
    year: Integer,
    attacker_king: String,
    defender_king: String
  )
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkCassandraExampleApp")
    if (args.length > 0) conf.setMaster(args(0)) // in case we're running in sbt shell such as: run local[5]
    conf.set("spark.cassandra.connection.host", "127.0.0.1")  // so yes, I'm assuming Cassandra is running locally here.
                                                              // adjust as needed
    val sc = new SparkContext(conf)
    // Spark Cassandra Example one which marshalls to Scala case classes
    val battles:Array[Battle] = sc.cassandraTable[Battle]("gameofthrones", "battles").
                                        select("battle_number","year","attacker_king","defender_king").toArray
    battles.foreach { b: Battle =>
        println("Battle Number %s was defended by %s.".format(b.battle_number, b.defender_king))
    }
    // Spark Cassandra Example Two - Create DataFrame from Spark SQL read
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read
              .format("org.apache.spark.sql.cassandra")
              .options(Map( "table" -> "battles", "keyspace" -> "gameofthrones" ))
              .load()
    df.show
    // Game of Thrones Battle analysis 
    // Who were the most aggressive kings?  (most attacker_kings)
    val countsByAttack = df.groupBy("attacker_king").count().sort(desc("count"))
    countsByAttack.show()
    // Which kings were attacked the most?  (most defender_kings)
    val countsByDefend = df.groupBy("defender_king").count().sort(desc("count"))
    countsByDefend.show()
    sc.stop()
  }
}

Part 3: Run Spark Cassandra Scala Code from SBT Console

Start up the sbt console via sbt.  Once ready, you can issue the run command with an argument for your Spark Master location; i.e. run local[5]

(Again, there’s a screencast at the end of this post which shows an example of running this command.  See Resources section below.)

Depending on your log level, you should see various SparkCassandra outputs from the SparkCassandra code.  These console outputs from Cassandra is the indicator of success.  Oh yeah.  Say it with me now.  Oh yeahhhhh

Running code in the sbt console is a convenient way to make and test changes rapidly.  As I developed this code, there was a terminal open in one window and an editor open in another window.  Whenever I made a Scala source code change and saved, I could simply re-run in the sbt console.

So now, let’s say we’ve reached the point of wanting to deploy this Spark program.  Let’s find out in the next section.

Part 4: Assemble Spark Cassandra Scala Code and Deploy to Spark Cluster

To build a jar and deploy to a Spark cluster, we need to make a small change to our build.sbt file.  As you may have noticed from the code above, there are comments in the file which indicate what needs to be changed.  We should uncomment this line:

// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",

and comment out this line:

   "org.apache.spark" %% "spark-sql" % "1.6.1",

then, we can run sbt assembly from command-line to produce a Spark deployable jar.  If you use the sample build.sbt file this will produce target/scala-2.10/spark-cassandra-example-assembly-1.0.jar

To deploy, use spark-submit with the appropriate arguments; i.e.

spark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jarspark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jar

Conclusion

So, what do you think?  When you run the code, you can see the most aggressive kings and the kings which were attacked the most.  Without giving it away, I think one could argue whether Mance Rayder should be tied with Renly Baratheon on the most attacked list.  But, that’s not really the point of this tutorial.  As for the code and setup, do you have any questions, opinions or suggestions for next steps?   

Spark Cassandra Tutorial Screencast

In the following screencast, I run through the steps described in this tutorial.  Stay tuned because there is blooper footage at the end of the screencast.  Because I mean, why not bloopers.

Spark Cassandra Tutorial

Spark Cassandra Tutorial Resources

  1. All source code including the battles.csv file I scrubbed using the perl script described above at Apache Spark Cassandra Example code
  2. https://github.com/datastax/spark-cassandra-connector
  3. DataFrames with Cassandra Connector
  4. Game of Thrones Data: https://github.com/chrisalbon/war_of_the_five_kings_dataset

And don’t forget the Spark Scala tutorials and speaking of Cassandra, you may find the Spark Thrift Server with Cassandra tutorial interesting too.

Featured image credit https://flic.kr/p/dvpku1

Related Articles

python
cassandra
spark

GitHub - andreia-negreira/Data_streaming_project: Data streaming project with robust end-to-end pipeline, combining tools such as Airflow, Kafka, Spark, Cassandra and containerized solution to easy deployment.

andreia-negreira

12/2/2023

cassandra
spark

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra