(Just a minor change in the configuration would be selecting “AMI: Ubuntu Server 16.04 LTS (HVM), SSD Volume Type as the AMI”)
Ensure that your Spark client machine has Java 8 installed and selected as the preferred Java version:
And finally select Java 8 as the preferred Java version:
Use the following commands to install sbt in order to build the example project:
Once you have configured a Cassandra + Spark cluster and a Spark client, follow the instructions here to create a Kafka cluster. Once you’ve created a Kafka cluster you will need to allow the IP addresses of the Spark client and Spark cluster through your Kafka cluster firewall. You can find the IP addresses of your Spark cluster in the cluster’s Details tab on the Instaclustr console. You can add the Spark client and Spark cluster IPs to your Kafka cluster’s allowed addresses in the cluster’s Settings tab on the Instaclustr console.
Build the sample
We have uploaded a sample project including the build, source and configuration files to Github. To build the project:
build.sbt: the project file that specifies dependencies.
cassandra-count.conf: configuration file with IPs, username, password.
kafka.properties: configuration file with Kafka connection details.
kafka.properties.template: template kafka.properties file for a Kafka cluster without Client ⇆ Broker encryption.
kafka-secure.properties.template: template kafka.properties file for a Kafka cluster with Client ⇆ Broker encryption.
src/main/scala/KafkaSparkCassandra.scala: the scala file with the actual application. The code is heavily commented to explain what is going on.
project/assembly.sbt: sbt plugin config to package dependencies in the target jar.
When executed, the application will:
Connect directly from the Spark driver to Cassandra, create a keyspace and table to store results if required.
Start a Spark streaming session connected to Kafka. Summarise messages received in each 5 second period by counting words. Save the summary result in Cassandra.
Stop the streaming session after 30 seconds.
Use Spark SQL to connect to Cassandra and extract the summary results table data that has been saved.
Build the project:
This may take a while the first time as all the required dependencies will need to be downloaded.
Setup your cassandra-count.conf configuration file like so:
Fill in the <> brackets with appropriate values. You can find the IP addresses of your Spark cluster in the cluster’s Details tab on the Instaclustr console. Similarly, you can find the connection details for your Cassandra cluster on the cluster’s Connection Details tab on the console.
Setup your kafka.properties configuration file. If your Kafka cluster does not have Client ⇆ Broker encryption enabled, copy and modify the kafka.properties.template file into kafka.properties:
Filling in the <> brackets with appropriate values. If your Kafka cluster does have Client ⇆ Broker encryption enabled, copy and modify the kafka-secure.properties.template into the kafka.properties file:
You can find all the connection details for your Kafka cluster on the cluster’s Connection Details page. If your cluster has client ⇆ broker encryption enabled make sure to use the “Download Cluster CA X.509 Certificates” button and move the truststore.jks file contained in the archive to the sample-KafkaSparkCassandra directory.
Run the sample
We need to run both the Kafka producer test harness and the Spark sample app at the same time so it’s easiest if you have two console windows open. Once you have the two windows open do the following:
Follow the guide here to start a Kafka console producer, changing the topic name to the wordcount-input topic.
In the second console window, submit your Spark job. If your Kafka cluster does not have Client ⇆ Broker encryption enabled, use the following command:
If your Kafka cluster does have Client ⇆ Broker encryption enabled, use this command instead:
Make sure the path to truststore.jks is correct. You can download the truststore.jks file by going to your cluster’s Connection Details page and using the “Download Cluster CA X.509 Certificates” button.
Switch back to the Kafka producer console window and enter some test messages for 20 seconds or so.
Switch back to the Spark console window, you should see something like the following which is the summary from a single Spark streaming batch:
After 30 seconds of streaming has passed, you should see an output like so, which is the dump of the word_count Cassandra table: