In this blog, we will discuss a few different data operations we can do with Apache Spark and Cassandra; as well as, steps that you can use to try it out yourself.
We will cover a few different data operations, which include: using Spark to take data from one Cassandra table, transforming it, and writing it into another Cassandra table, deleting data from Cassandra tables using Spark, and then how to import data from one Cassandra cluster to another.
Prerequisites
Read, Transform, and Write
We will start by starting a Docker container that is running the latest Apache Cassandra Image.
docker run --name cassandra -p 9042:9042 -d cassandra:latest
Once it has completed setup, we will start the CQLSH terminal.
docker exec -it cassandra cqlsh
We will create a keyspace and then create a table called spacecraft_journey_catalog
.
CREATE KEYSPACE demo WITH REPLICATION={'class': 'SimpleStrategy', 'replication_factor': 1}; use demo ; CREATE TABLE IF NOT EXISTS spacecraft_journey_catalog ( spacecraft_name text, journey_id timeuuid, start timestamp, end timestamp, active boolean, summary text, PRIMARY KEY ((spacecraft_name), journey_id) ) WITH CLUSTERING ORDER BY (journey_id desc);
We will need to insert some data into the newly created table. Use this text file and copy and paste it into your CQLSH terminal to insert 1000 records.
Now, we will start our local instance of Apache Spark. Open a new terminal / tab, and cd
in to the Spark directory, i.e. cd spark-3.0.1-bin-hadoop2.7/
Once in the Spark directory, we will start a cluster with the master.
./sbin/start-master.sh
Additionally, we will start a worker and point it to the master node. To do this, we will need the Spark master URL, which can be found at localhost:8080
./sbin/start-slave.sh <Spark master URL>
Once the worker is started, you can confirm by going to localhost:8080
and refreshing the page. You should now see a worker logged under the “Workers” section.
Now, we can start our Spark shell. We will be using the basic Scala shell and you can use the below template to copy and paste in the terminal running in the Spark directory.
./bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 \ --master <Spark master URL> \ --conf spark.cassandra.connection.host=127.0.0.1 \ --conf spark.cassandra.connection.port=9042 \ --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
If you noticed, we are also passing in the DataStax Spark-cassandra-connector, which is a package that will be needed to expose Cassandra tables as Spark RDDs and Datasets/DataFrames, write Spark RDDs and Datasets/DataFrames to Cassandra tables and execute arbitrary CQL queries in Spark applications.
Once the Spark shell has run, we will need to import a few packages and functions:
import org.apache.spark.sql.functions._ import com.datastax.spark.connector._ import org.apache.spark.sql.cassandra._ import com.datastax.spark.connector.cql._
There are 2 methods we can use to load data into Spark from Cassandra and do transformations on. The first method would be the load()
method, and the second method would be using a catalog. We will cover both for you.
load() Method
val df = spark.read.cassandraFormat("spacecraft_journey_catalog", "demo").load()
Once the data is loaded, we could then transform the data 2 ways. If we noticed in the first table we made, we have an end time and start time for the individual journeys made by spacecrafts; however, we do not have the durations. If that is something we want to be calculated and put into another table, for displaying / BI purposes, then we can do so with Spark.
The first method is to use the select()
method below. In this, we are taking the summary and journey columns from the data frame we loaded the initial tables data into, as well as, using the datediff
function to calculate the number of days each journey took using the end and start columns as the duration_in_days column.
val newDF = df.select($"summary", $"journey_id", datediff($"end", $"start") as "duration_in_days")
Another way to do this would be to make the initial df
a temp view so that we can do SQL on it like so:
df.createOrReplaceTempView("df") val newDF = spark.sql("select summary, journey_id, datediff(end, start) as duration_in_days from df")
Now we can take a look at the catalog method to generate newDF
.
Catalog
Instead of loading the data with .load, we will set a config option in the shell, like below:
spark.conf.set(s"spark.sql.catalog.cassandra", "com.datastax.spark.connector.datasource.CassandraCatalog")
Now we can make SQL queries directly on our Cassandra data using the database.keyspace.table
method and generating newDF
that way.
val newDF = spark.sql("select summary, journey_id, datediff(end, start) as duration_in_days from cassandra.demo.spacecraft_journey_catalog")
No matter which method you choose to generate newDF
, we can now write it back into Cassandra as a new table.
Writing
To write to Cassandra, we will need to first create a new table using the new dataframe we just created using Spark. We will call the table duration_by_journey_summary
and have the summary column as the partition column, and have journey_id as the clustering column.
newDF.createCassandraTable("demo", "duration_by_journey_summary", partitionKeyColumns = Some(Seq("summary")), clusteringKeyColumns = Some(Seq("journey_id")))
Now, we can write the dataframe into the newly created Cassandra table.
newDF.write.cassandraFormat("duration_by_journey_summary", "demo").mode("append").save()
You can then go to the CQLSH terminal and run the below to confirm that the data was correctly written to Cassandra.
SELECT * FROM demo.duration_by_journey_summary ;
That wraps up how to quickly do read, transform, and write data operations with Spark and Cassandra. Now, we will take a look at deleting data.
Deleting Data
We will delete data from Cassandra using Spark using 2 methods. The first method will delete an entire partition, and the second method will delete select rows from a partition.
Delete Entire Partition
To delete an entire partition we can run the following statement. This will use the where
function to filter our partition and then we can pass that into the deleteFromCassandra
function to delete the selected rows.
sc.cassandraTable("demo", "duration_by_journey_summary").where("summary='ISS crew rotation.'").deleteFromCassandra("demo", "duration_by_journey_summary")
You can confirm this using the CQLSH terminal by running the below statement.
SELECT * FROM demo.duration_by_journey_summary where summary='ISS crew rotation.' ;
Deleting Selected Rows Within A Partition
In this method, we will delete selected rows within a partition by adding additional conditions within the where
function.
sc.cassandraTable("demo", "duration_by_journey_summary").where("summary='Bring supplies to space center' and duration_in_days<200").deleteFromCassandra("demo", "duration_by_journey_summary")
To confirm if the data was deleted or not, you can run the below statement in the CQLSH terminal. There should be less than ~10 records left
SELECT * from demo.duration_by_journey_summary where summary='Bring supplies to space center' ;
If there are still records that show a duration that is less than 200 days, then we may need to make some data model changes. Remember when we created the table from newDF
? We only used summary and journey_id in the primary key. To change this, we can drop the table using CQLSH as seen below:
DROP TABLE demo.duration_by_journey_summary ;
Then we can rerun the create table statement from Spark with one additional item: adding duration_in_days as a clustering column. The statement can be found below:
newDF.createCassandraTable("demo", "duration_by_journey_summary", partitionKeyColumns = Some(Seq("summary")), clusteringKeyColumns = Some(Seq("duration_in_days", "journey_id")))
Then we can rerun writing to the table using the below statement:
newDF.write.cassandraFormat("duration_by_journey_summary", "demo").mode("append").save()
Now, if we go back and try deleting the data again, then we would be able to visualize the changes.
sc.cassandraTable("demo", "duration_by_journey_summary").where("summary='Bring supplies to space center' and duration_in_days<200").deleteFromCassandra("demo", "duration_by_journey_summary")
and in CQLSH
SELECT * from demo.duration_by_journey_summary where summary='Bring supplies to space center' ;
That wraps a quick introduction to deleting Cassandra data using Spark. Now we will take a look at how to migrate data from one cluster to another.
Import data from one Cassandra cluster to another
We will cover how to migrate data between two Cassandra clusters in two ways: using catalogs and RDDs.
Catalog Method
Using the catalog method from DataStax’s spark-cassandra-connector, you can set up 2 different catalogs, one for each cluster, and write statements to transfer data between them. An example is provided below:
//Catalog Cass100 for Cluster at 127.0.0.100 spark.conf.set(s"spark.sql.catalog.cass100", "com.datastax.spark.connector.datasource.CassandraCatalog") spark.conf.set(s"spark.sql.catalog.cass100.spark.cassandra.connection.host", "127.0.0.100") //Catalog Cass200 for Cluster at 127.0.0.200 spark.conf.set(s"spark.sql.catalog.cass200", "com.datastax.spark.connector.datasource.CassandraCatalog") spark.conf.set(s"spark.sql.catalog.cass200.spark.cassandra.connection.host", "127.0.0.200") spark.sql("INSERT INTO cass200.ks.tab SELECT * from cass100.ks.tab") //Or spark.read.table("cass100.ks.tab").writeTo("cass200.ks.tab").append
RDD Method
def twoClusterExample ( sc: SparkContext) = { val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1")) val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2")) val rddFromClusterOne = { implicit val c = connectorToClusterOne sc.cassandraTable("ks","table") } { implicit val c = connectorToClusterTwo rddFromClusterOne.saveToCassandra("ks","table") } }
If you are trying to connect a non-SSL cluster to a SSL enabled one, you can use this method:
import com.datastax.spark.connector._ import com.datastax.spark.connector.cql._ import org.apache.spark.SparkContext val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "10.xxx.xxx.xx")) val rddFromClusterOne = { // Sets connectorToClusterOne as default connection for everything in this code block Implicit val c = connectorToClusterOne sc.cassandraTable("keyspace1","table1") } val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "10.xxx.xxx.xx").set("spark.cassandra.auth.username", "<username>").set("spark.cassandra.auth.password", "<password>").set("spark.cassandra.connection.ssl.enabled", "true").set("spark.cassandra.connection.ssl.trustStore.path", "/etc/dse/security/datatruststore-stage.jks").set("spark.cassandra.connection.ssl.trustStore.password", "<password>")) { //Sets connectorToClusterTwo as the default connection for everything in this code block implicit val c = connectorToClusterTwo rddFromClusterOne.saveToCassandra("keyspace1","table1") }
And with that, you can quickly get started with doing data operations with Spark and Cassandra.
Cassandra.Link is a knowledge base that we created for all things Apache Cassandra. Our goal with Cassandra.Link was to not only fill the gap of Planet Cassandra but to bring the Cassandra community together. Feel free to reach out if you wish to collaborate with us on this project in any capacity.
We are a technology company that specializes in building business platforms. If you have any questions about the tools discussed in this post or about any of our services, feel free to send us an email!