In this blog post we will see how to setup a simple search and anlytics pipeline on streaming data in scala.
- For sample timeseries data, we will use twitter stream.
- For data pipelining, we will use kafka
- For search, we will use Solr. We will use Banana for a UI query interface for solr data.
- For analytics, we will store data in cassandra. We will see example of using spark for running analytics query. We will use zeppelin for a UI query interface.
Full code for this post is avaliable at https://github.com/saumitras/twitter-analysis
Create a new project and add following dependecies in build.sbt. Note that there are few conflicting dependecies in kafka so exclude them:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
libraryDependencies ++= Seq( "org.twitter4j" % "twitter4j-core" % "4.0.4", "org.twitter4j" % "twitter4j-stream" % "4.0.4", "com.typesafe.akka" % "akka-actor_2.11" % "2.4.17", "org.apache.kafka" % "kafka_2.11" % "0.10.0.0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"), "org.apache.avro" % "avro" % "1.7.7" withSources(), "org.apache.solr" % "solr-solrj" % "6.4.1" withSources(), "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0", "ch.qos.logback" % "logback-classic" % "1.1.2", "com.datastax.cassandra" % "cassandra-driver-core" % "3.0.2", "org.apache.cassandra" % "cassandra-clientutil" % "3.0.2", "org.apache.spark" %% "spark-core" % "2.1.0", "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.spark" %% "spark-hive" % "2.1.0", "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0" )
Setting up twiiter stream
For streaming data from twitter you need access keys and token. You can go to https://apps.twitter.com and creata a new app to get these. After creating an app, click on “Keys and access token” and copy following:
- Consumer Key (API Key)
- Consumer Secret (API Secret)
- Access Token
- Access Token Secret
We will use twitter4j. Build a configuration using token and key
You can now open a stream and listen for tweets with some specific keyswords or hashtags:
StatusListener provide couple of callback to handle different scenarios.
onStatus is the one which will get the
tweet and its metadata.
stream.filter(fq) will start the stream.
If you run this, you should start seeing the tweets:
Lets define a type and extract out tweet metadata
Next we will send these tweets to kafka.
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. In our example, both Kafka and Solr will need zookeeper for their state and config management, so you need to first start zookeeper.
- Download it from
- Extract it and go inside conf directory
- Make a copy of zoo_sample.conf as zoo.cfg
- Run it using
- Verify its started successfully by running
Putting data in Kafka
Here’s steps to send data to kafka.
- Start kafka server and broker(s)
- Create a topic in kafka to which data will be send
- Define a avro schema for the tweets
- Create a kafka producer which will serialize tweets using avro schema and send it to kafka
Download kafka from here.
Create a topic
You can see if topic is created successfully
Avro is a data serialization system. It has a JSON like data model, but can be represented as either JSON or in a compact binary form. It comes with a very sophisticated schema description language that describes data. Lets define avro schema for our
Kafka supports lot of other formats too, but avro is the preferred format for streaming data. You can read more about it here
Next create a producer
Schema using the avro schema definition
Serialize the tweet and send it to producer
We will create an ActorSystem and put all this inside a
KafkaTweetProducer actor. We will then send a message to
KafkaTweetProducer whenever a new tweet is recieved.
To test whether this data is getting written in kafka properly on not, you can use the command line console consumer and watch for the topic
Next we will consume this data in solr and cassandra
Putting data in solr
Here’s steps for writing data to solr:
- Define a solr schema(config-set) corresponding to tweet type
- Upload the schmea to zookeeper
- Creata a collection in solr using this config set
- Create a solr consumer which will read from
tweet1topic from kafka
- Deserialize the data read from kafka and create solr documents from it
- Send documents to solr
Here’s what the shema definition will look like:
Upload the configset to solr and create a collection:
Create the collection
Next create a
SolrWriter actor which will recieve a
Tweet message from a
KafkaSolrComsumer (which we will define next), convert it to
SolrInputDocument and send it to solr
Now we need to define a kafka consumer which will read data from solr and send it to
Consumer will read data from kafka, deserialize it using avro schema, and convert it to
Tweet type and forward the message to a destination actor. We will keep the consumer generic so that any destination actor(solr or cassandra) can be passed to it.
Create consumer and avro schema object
Convert binary data to
Tweet type using avro
Start consuming from kafka and send messages to destination, Solr in this specific case.
You shoud now start seeing data in solr:
Querying solr data with banana
Banana is a data visualization tool that uses solr for data analysis and display. It can be run in same container as solr. Here’s how to set it up:
Here’s how to set it up for our tweet data. We will run it in same container as solr:
Download banana and put it in solr’s webapp direcory
To save dashboards and setting, banana expects a collection named
banana-int. Lets go ahead and create it. Configset for that collection can be obtained found in
Upload banana config to zookeeper
Create the collection
Navigate to banana UI at
http://localhost:8983/solr/banana/src/index.html and change the collection in settings to point to
tweet collection in
Here’s what it will look like for our tweets data:
Next we will create a cassandra consumer.
Putting data in cassandra
- Download cassandra from http://archive.apache.org/dist/cassandra/3.0.12/apache-cassandra-3.0.12-bin.tar.gz and uncompress it
bin/cassandrato start it
We need to first create a keyspace and table for storing tweets
Then we will create a
CassWriter actor similar to solr one which will accept a tweet message and write it to cassandra.
Connect to cluster.
Since we will be using same query repeatedly to insert data with different parameters, hence we will use prepared statement to improve performance:
Tweet, create a
BoundStatement by setting values for all fields and write it to cassandra
We will create a new instance of this actor
And then create a new
KafkaTweetConsumer whose destination will be this
You should start seeing data in cassandra
Next we will setup spark and use it to query cassandra data.
Query cassandra data with spark
We will use datastax spark cassandra connector https://github.com/datastax/spark-cassandra-connector. Download the correct connection version jar and place it in lib directory of your project:
First thing which we need is a spark context
Then you can query and apply different aggregrations. This query will be picked up as a spark job and exectuted on you spark cluster:
If job is successfull, you will see the result:
Visulizing cassandra data with zeppelin
Zeppelin is a web-based notebook that can be used for interactive data analytics on cassandra data using spark.
Download the binary from https://zeppelin.apache.org/download.html and uncompress it.
Default port used by it is
8080 which conflicts with spark master web ui port, so change the port in
Create a new notebook and select
Create a view of our
tweet table from cassandra
We can now run aggregations or other analytics queries on this view:
Here’s what output of above query will look like:
I hope you got the idea of how to get started with creating a search and analytics pipeline.