Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

8/5/2020

Reading time:3 min

polomarcus/Spark-Structured-Streaming-Examples

by John Doe

Stream the number of time Drake is broadcasted on each radio.And also, see how easy is Spark Structured Streaming to use using Spark SQL's Dataframe APIRun the ProjectStep 1 - Start containersStart the ZooKeeper, Kafka, Cassandra containers in detached mode (-d)./start-docker-compose.shIt will run these 2 commands together so you don't have todocker-compose up -d# create Cassandra schemadocker-compose exec cassandra cqlsh -f /schema.cql;# confirm schemadocker-compose exec cassandra cqlsh -e "DESCRIBE SCHEMA;"Step 2 - start spark structured streamingsbt runRun the project after another timeAs checkpointing enables us to process our data exactly once, we need to delete the checkpointing folders to re run our examples.rm -rf checkpoint/sbt runMonitorSpark : http://localhost:4040/SQL/Kibana (index "test") : http://localhost:5601/app/kibana#/discoverKafka : Read all messages sentdocker-compose exec kafka \ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginningExamples:{"radio":"nova","artist":"Drake","title":"From Time","count":18}{"radio":"nova","artist":"Drake","title":"4pm In Calabasas","count":1}RequirementsLinuxcurl -L https://github.com/docker/compose/releases/download/1.17.1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-composechmod +x /usr/local/bin/docker-composeMacOSbrew install docker-composeInput dataComing from radio stations stored inside a parquet file, the stream is emulated with .option("maxFilesPerTrigger", 1) option.The stream is after read to be sink into Kafka.Then, Kafka to CassandraOutput dataStored inside Kafka and Cassandra for example only.Cassandra's Sinks uses the ForeachWriter and also the StreamSinkProvider to compare both sinks.One is using the Datastax's Cassandra saveToCassandra method. The other another method, messier (untyped), that uses CQL on a custom foreach loop.From Spark's doc about batch duration:Trigger interval: Optionally, specify the trigger interval. If it is not specified, the system will check for availability of new data as soon as the previous processing has completed. If a trigger time is missed because the previous processing has not completed, then the system will attempt to trigger at the next trigger point, not immediately after the processing has completed.Kafka topicOne topic test with only one partitionList all topicsdocker-compose exec kafka \ kafka-topics --list --zookeeper zookeeper:32181Send a message to be processeddocker-compose exec kafka \ kafka-console-producer --broker-list localhost:9092 --topic test> {"radio":"skyrock","artist":"Drake","title":"Hold On We’Re Going Home","count":38}Cassandra TableThere are 3 tables. 2 used as sinks, and another to save kafka metadata.Have a look to schema.cql for all the details. docker-compose exec cassandra cqlsh -e "SELECT * FROM structuredstreaming.radioOtherSink;" radio | title | artist | count---------+--------------------------+--------+------- skyrock | Controlla | Drake | 1 skyrock | Fake Love | Drake | 9 skyrock | Hold On We’Re Going Home | Drake | 35 skyrock | Hotline Bling | Drake | 1052 skyrock | Started From The Bottom | Drake | 39 nova | 4pm In Calabasas | Drake | 1 nova | Feel No Ways | Drake | 2 nova | From Time | Drake | 34 nova | Hype | Drake | 2Kafka Metadata@TODO Verify this below information. Cf this SO commentWhen doing an application upgrade, we cannot use checkpointing, so we need to store our offset into a external datasource, here Cassandra is chosen.Then, when starting our kafka source we need to use the option "StartingOffsets" with a json string like""" {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """Learn more in the official Spark's doc for Kafka.In the case, there is not Kafka's metadata stored inside Cassandra, earliest is used.docker-compose exec cassandra cqlsh -e "SELECT * FROM structuredstreaming.kafkametadata;" partition | offset-----------+-------- 0 | 171Useful linksKafka tutorial #8 - Spark Structured StreamingProcessing Data in Apache Kafka with Structured Streaming in Apache Spark 2.2https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.htmlhttps://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreachhttps://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modesElastic Structured Streamin docStructured Streaming - “Failed to find data source: es” Arbitrary Stateful Processing in Apache Spark’s Structured StreamingDeep dive stateful stream processingOfficial documentationDocker-composeInspired by

Illustration Image

Codacy Badge

Stream the number of time Drake is broadcasted on each radio. And also, see how easy is Spark Structured Streaming to use using Spark SQL's Dataframe API

Run the Project

Step 1 - Start containers

Start the ZooKeeper, Kafka, Cassandra containers in detached mode (-d)

./start-docker-compose.sh

It will run these 2 commands together so you don't have to

docker-compose up -d
# create Cassandra schema
docker-compose exec cassandra cqlsh -f /schema.cql;
# confirm schema
docker-compose exec cassandra cqlsh -e "DESCRIBE SCHEMA;"

Step 2 - start spark structured streaming

sbt run

Run the project after another time

As checkpointing enables us to process our data exactly once, we need to delete the checkpointing folders to re run our examples.

rm -rf checkpoint/
sbt run

Monitor

docker-compose exec kafka  \
 kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

Examples:

{"radio":"nova","artist":"Drake","title":"From Time","count":18}
{"radio":"nova","artist":"Drake","title":"4pm In Calabasas","count":1}

Requirements

Linux

curl -L https://github.com/docker/compose/releases/download/1.17.1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose

MacOS

brew install docker-compose

Input data

Coming from radio stations stored inside a parquet file, the stream is emulated with .option("maxFilesPerTrigger", 1) option.

The stream is after read to be sink into Kafka. Then, Kafka to Cassandra

Output data

Stored inside Kafka and Cassandra for example only. Cassandra's Sinks uses the ForeachWriter and also the StreamSinkProvider to compare both sinks.

One is using the Datastax's Cassandra saveToCassandra method. The other another method, messier (untyped), that uses CQL on a custom foreach loop.

From Spark's doc about batch duration:

Trigger interval: Optionally, specify the trigger interval. If it is not specified, the system will check for availability of new data as soon as the previous processing has completed. If a trigger time is missed because the previous processing has not completed, then the system will attempt to trigger at the next trigger point, not immediately after the processing has completed.

Kafka topic

One topic test with only one partition

List all topics

docker-compose exec kafka  \
  kafka-topics --list --zookeeper zookeeper:32181

Send a message to be processed

docker-compose exec kafka  \
 kafka-console-producer --broker-list localhost:9092 --topic test
> {"radio":"skyrock","artist":"Drake","title":"Hold On We’Re Going Home","count":38}

Cassandra Table

There are 3 tables. 2 used as sinks, and another to save kafka metadata. Have a look to schema.cql for all the details.

 docker-compose exec cassandra cqlsh -e "SELECT * FROM structuredstreaming.radioOtherSink;"
 radio   | title                    | artist | count
---------+--------------------------+--------+-------
 skyrock |                Controlla |  Drake |     1
 skyrock |                Fake Love |  Drake |     9
 skyrock | Hold On We’Re Going Home |  Drake |    35
 skyrock |            Hotline Bling |  Drake |  1052
 skyrock |  Started From The Bottom |  Drake |    39
    nova |         4pm In Calabasas |  Drake |     1
    nova |             Feel No Ways |  Drake |     2
    nova |                From Time |  Drake |    34
    nova |                     Hype |  Drake |     2

Kafka Metadata

@TODO Verify this below information. Cf this SO comment

When doing an application upgrade, we cannot use checkpointing, so we need to store our offset into a external datasource, here Cassandra is chosen. Then, when starting our kafka source we need to use the option "StartingOffsets" with a json string like

""" {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """

Learn more in the official Spark's doc for Kafka.

In the case, there is not Kafka's metadata stored inside Cassandra, earliest is used.

docker-compose exec cassandra cqlsh -e "SELECT * FROM structuredstreaming.kafkametadata;"
 partition | offset
-----------+--------
         0 |    171

Useful links

Docker-compose

Inspired by

Related Articles

mongo
nocode
elasticsearch

GitHub - ibagroup-eu/Visual-Flow: Visual-Flow main repository

ibagroup-eu

12/2/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

elasticsearch