Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

12/10/2020

Reading time:2 min

End to end streaming from kafka to cassandra

by John Doe

I have been working with building some research product to store server logs in a secure storage(obviously in a blockchian :)). I named this project as Octopus. As a part of this project we had to take logs from different services and store in cassandra. In this case, first I have taken different service logs from filebeat to kafka(I have written about that story in here). Then I have streamed the data in kafka to cassandra with using akka streams.In this post I’m gonna show how I have done the streaming from kafka to cassandra. All the source codes which relates to this post available on this gitlab repo. Please clone it and follow the below steps.First I need to run cassandra in my local machine. Following is the docker-compose file entry for cassandra.I can simply run it with below command. It will start single node cassandra cluster on 9042 port on my local machine.docker-compose up -d cassandraThen I need to create cassandra keyspace and table. The data comes from kafka source sink into this table.Next thing is to setup kafka. Following is the content of docker-compose.yml which correspond for run zookeeper and kafka.I can start zookeeper and kafka with below commands. In here I’m running single instance of zookeeper and kafka. Kafka will be available with 9092port on my local machine.docker-compose up -d zookeeperdocker-compose up -d kafkaIn my sbt application I need to define akka-streams dependency and alpakka connectors dependencies(cassandra and kafka). Following is the build.sbt file.Then we need to define kafka consumer configurations in application.conf. Following are the configurations.Now all the service configurations are done. Next things is to implement akka stream source, flows, sink and build the graph.In here first I have defined kafka source and a cassandra sink. Kafka source connected for log topic(Filebeat service publishing log messages as JSON string to log topic). Cassandra sink connected for log table in mystiko keyspace(Cassandra saves the logs in log table in mystiko keyspace). Inside sink I have added cql query and statementBinder which binds scala case class fields into java field types.Next I have defined two flows. One flow is to convert kafka JSON messages to Message objects. Other flow converts Message objects Log objects. These Log object data will be saved in cassandra. Finally I have defined the graph that connecting the source with flows and sink.In order to test this application I have used kafkacat. I’m publishing some sample log messages(JSON format strings) to kafka with kafkacat. You can take kafkacat command line tool from here. These messages will go through the stream pipeline and saved in the cassandra log table.# connecting to kafka with kafakcat# kafka broker - localhost:9092 # kafka topic - logkafkacat -P -b localhost:9092 -t log# publish messages {"msg": "udpz.go:25 server started 7070", "service":"storage"}https://medium.com/@itseranga/publish-logs-to-kafka-with-filebeat-74497ef7dafehttps://medium.com/@itseranga/kafka-consumer-with-scala-akka-streams-7e3237d6acchttp://safdarhusain.com/DataIntegration2.htmlhttps://github.com/calvinlfer/alpakka-cassandra-sink-usage/blob/master/src/main/scala/com/experiments/calvin/ExampleApp.scala

Illustration Image

I have been working with building some research product to store server logs in a secure storage(obviously in a blockchian :)). I named this project as Octopus. As a part of this project we had to take logs from different services and store in cassandra. In this case, first I have taken different service logs from filebeat to kafka(I have written about that story in here). Then I have streamed the data in kafka to cassandra with using akka streams.

In this post I’m gonna show how I have done the streaming from kafka to cassandra. All the source codes which relates to this post available on this gitlab repo. Please clone it and follow the below steps.

First I need to run cassandra in my local machine. Following is the docker-compose file entry for cassandra.

I can simply run it with below command. It will start single node cassandra cluster on 9042 port on my local machine.

docker-compose up -d cassandra

Then I need to create cassandra keyspace and table. The data comes from kafka source sink into this table.

Next thing is to setup kafka. Following is the content of docker-compose.yml which correspond for run zookeeper and kafka.

I can start zookeeper and kafka with below commands. In here I’m running single instance of zookeeper and kafka. Kafka will be available with 9092port on my local machine.

docker-compose up -d zookeeper
docker-compose up -d kafka

In my sbt application I need to define akka-streams dependency and alpakka connectors dependencies(cassandra and kafka). Following is the build.sbt file.

Then we need to define kafka consumer configurations in application.conf. Following are the configurations.

Now all the service configurations are done. Next things is to implement akka stream source, flows, sink and build the graph.

In here first I have defined kafka source and a cassandra sink. Kafka source connected for log topic(Filebeat service publishing log messages as JSON string to log topic). Cassandra sink connected for log table in mystiko keyspace(Cassandra saves the logs in log table in mystiko keyspace). Inside sink I have added cql query and statementBinder which binds scala case class fields into java field types.

Next I have defined two flows. One flow is to convert kafka JSON messages to Message objects. Other flow converts Message objects Log objects. These Log object data will be saved in cassandra. Finally I have defined the graph that connecting the source with flows and sink.

In order to test this application I have used kafkacat. I’m publishing some sample log messages(JSON format strings) to kafka with kafkacat. You can take kafkacat command line tool from here. These messages will go through the stream pipeline and saved in the cassandra log table.

# connecting to kafka with kafakcat
# kafka broker - localhost:9092
# kafka topic - log
kafkacat -P -b localhost:9092 -t log# publish messages
{"msg": "udpz.go:25 server started 7070", "service":"storage"}
  1. https://medium.com/@itseranga/publish-logs-to-kafka-with-filebeat-74497ef7dafe
  2. https://medium.com/@itseranga/kafka-consumer-with-scala-akka-streams-7e3237d6acc
  3. http://safdarhusain.com/DataIntegration2.html
  4. https://github.com/calvinlfer/alpakka-cassandra-sink-usage/blob/master/src/main/scala/com/experiments/calvin/ExampleApp.scala

Related Articles

analytics
streaming
visualization

Keen - Event Streaming Platform

John Doe

2/3/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

akka