I have been working with building some research product to store server logs in a secure storage(obviously in a blockchian :)). I named this project as Octopus
. As a part of this project we had to take logs from different services and store in cassandra. In this case, first I have taken different service logs from filebeat
to kafka(I have written about that story in here). Then I have streamed the data in kafka
to cassandra
with using akka streams
.
In this post I’m gonna show how I have done the streaming from kafka to cassandra. All the source codes which relates to this post available on this gitlab repo. Please clone it and follow the below steps.
First I need to run cassandra in my local machine. Following is the docker-compose file entry for cassandra.
I can simply run it with below command. It will start single node cassandra cluster on 9042
port on my local machine.
docker-compose up -d cassandra
Then I need to create cassandra keyspace
and table
. The data comes from kafka source sink into this table.
Next thing is to setup kafka. Following is the content of docker-compose.yml
which correspond for run zookeeper
and kafka
.
I can start zookeeper and kafka with below commands. In here I’m running single instance of zookeeper and kafka. Kafka will be available with 9092
port on my local machine.
docker-compose up -d zookeeper
docker-compose up -d kafka
In my sbt application I need to define akka-streams dependency and alpakka connectors dependencies(cassandra and kafka). Following is the build.sbt
file.
Then we need to define kafka consumer configurations in application.conf
. Following are the configurations.
Now all the service configurations are done. Next things is to implement akka stream source
, flows
, sink
and build the graph
.
In here first I have defined kafka source and a cassandra sink. Kafka source connected for log
topic(Filebeat service publishing log messages as JSON string to log
topic). Cassandra sink connected for log
table in mystiko
keyspace(Cassandra saves the logs in log
table in mystiko
keyspace). Inside sink I have added cql query
and statementBinder
which binds scala case class fields into java field types.
Next I have defined two flows. One flow is to convert kafka JSON
messages to Message
objects. Other flow converts Message objects Log
objects. These Log object data will be saved in cassandra. Finally I have defined the graph that connecting the source with flows and sink.
In order to test this application I have used kafkacat. I’m publishing some sample log messages(JSON format strings) to kafka with kafkacat
. You can take kafkacat command line tool from here. These messages will go through the stream pipeline and saved in the cassandra log table.
# connecting to kafka with kafakcat
# kafka broker - localhost:9092
# kafka topic - log
kafkacat -P -b localhost:9092 -t log# publish messages
{"msg": "udpz.go:25 server started 7070", "service":"storage"}
- https://medium.com/@itseranga/publish-logs-to-kafka-with-filebeat-74497ef7dafe
- https://medium.com/@itseranga/kafka-consumer-with-scala-akka-streams-7e3237d6acc
- http://safdarhusain.com/DataIntegration2.html
- https://github.com/calvinlfer/alpakka-cassandra-sink-usage/blob/master/src/main/scala/com/experiments/calvin/ExampleApp.scala