I have been working with building some research product to store server logs in a secure storage(obviously in a blockchian :)). I named this project as Octopus
. As a part of this project we had to take logs from different services and store in cassandra. In this case, first I have taken different service logs from filebeat
to kafka(I have written about that story in here). Then I have streamed the data in kafka
to cassandra
with using akka streams
.
12/10/2020
Reading time:2 min
End to end streaming from kafka to cassandra
by John Doe
I have been working with building some research product to store server logs in a secure storage(obviously in a blockchian :)). I named this project as Octopus. As a part of this project we had to take logs from different services and store in cassandra. In this case, first I have taken different service logs from filebeat to kafka(I have written about that story in here). Then I have streamed the data in kafka to cassandra with using akka streams.In this post I’m gonna show how I have done the streaming from kafka to cassandra. All the source codes which relates to this post available on this gitlab repo. Please clone it and follow the below steps.First I need to run cassandra in my local machine. Following is the docker-compose file entry for cassandra.I can simply run it with below command. It will start single node cassandra cluster on 9042 port on my local machine.docker-compose up -d cassandraThen I need to create cassandra keyspace and table. The data comes from kafka source sink into this table.Next thing is to setup kafka. Following is the content of docker-compose.yml which correspond for run zookeeper and kafka.I can start zookeeper and kafka with below commands. In here I’m running single instance of zookeeper and kafka. Kafka will be available with 9092port on my local machine.docker-compose up -d zookeeperdocker-compose up -d kafkaIn my sbt application I need to define akka-streams dependency and alpakka connectors dependencies(cassandra and kafka). Following is the build.sbt file.Then we need to define kafka consumer configurations in application.conf. Following are the configurations.Now all the service configurations are done. Next things is to implement akka stream source, flows, sink and build the graph.In here first I have defined kafka source and a cassandra sink. Kafka source connected for log topic(Filebeat service publishing log messages as JSON string to log topic). Cassandra sink connected for log table in mystiko keyspace(Cassandra saves the logs in log table in mystiko keyspace). Inside sink I have added cql query and statementBinder which binds scala case class fields into java field types.Next I have defined two flows. One flow is to convert kafka JSON messages to Message objects. Other flow converts Message objects Log objects. These Log object data will be saved in cassandra. Finally I have defined the graph that connecting the source with flows and sink.In order to test this application I have used kafkacat. I’m publishing some sample log messages(JSON format strings) to kafka with kafkacat. You can take kafkacat command line tool from here. These messages will go through the stream pipeline and saved in the cassandra log table.# connecting to kafka with kafakcat# kafka broker - localhost:9092 # kafka topic - logkafkacat -P -b localhost:9092 -t log# publish messages {"msg": "udpz.go:25 server started 7070", "service":"storage"}https://medium.com/@itseranga/publish-logs-to-kafka-with-filebeat-74497ef7dafehttps://medium.com/@itseranga/kafka-consumer-with-scala-akka-streams-7e3237d6acchttp://safdarhusain.com/DataIntegration2.htmlhttps://github.com/calvinlfer/alpakka-cassandra-sink-usage/blob/master/src/main/scala/com/experiments/calvin/ExampleApp.scala
Related Articles
Checkout Planet Cassandra
Join Our Newsletter!
Sign up below to receive email updates and see what's going on with our company
Explore Further
akka
cassandra
kafka