Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

3/14/2018

Reading time:3 min

PatrickCallaghan/datastax-eventsourcing

by John Doe

README.md This demo shows how Cassandra and DSE can be using to store and replay events.To use Spark you will need to provide your own Cassandra and Spark deployments. In this demo we will use DSE as they are already integrated.First we start DSE in SearchAnalyics mode to allow us to use both Spark and DSE Search -http://docs.datastax.com/en/dse/5.1/dse-admin/datastax_enterprise/operations/startStop/startDseStandalone.html?hl=startingThe implementation uses bucketing to group all data into particular time buckets for replay. The time bucket used in this example is 1 minute but any time bucket can be used. Also depending how many days, months, years of events that need to be kept, it may be beneficial to spread the events over different tiers of tables.To create the schema, run the followingmvn clean compile exec:java -Dexec.mainClass="com.datastax.demo.SchemaSetup" -DcontactPoints=localhostTo create the solr core to make our table searchable, run the followingdsetool create_core datastax.eventsource generateResources=trueTo create events, run the following (Default of 10 million events)mvn clean compile exec:java -Dexec.mainClass="com.datastax.events.Main" -DcontactPoints=localhost -DnoOfEvents=10000000To replay a sample event set, runmvn clean compile exec:java -Dexec.mainClass="com.datastax.events.ReadEvents" -DcontactPoints=localhost -Dfrom=yyyyMMdd-hhmmss -Dto=yyyyMMdd-hhmmssegmvn clean compile exec:java -Dexec.mainClass="com.datastax.events.ReadEvents" -DcontactPoints=localhost -Dfrom=20160805-000000 -Dto=20160805-010000This replays 2 scenarios1. Replay all events for a specified time range2. Replay all events for a specified time range and a specific event type. To run the webservicemvn jetty:run -Djetty.port=8081 To run a rest query, go the brower and enter a url in the format http://localhost:8080/datastax-eventsourcing/rest/getevents/from/to,where the date format is 'yyyyMMdd-hhmmss' e.g. For all events from midnight to 1:00 am on the 1st of August 2016 run -http://localhost:8081/datastax-eventsourcing/rest/getevents/20160801-000000/20160801-010000/We can also use cql to query using the Solr query from DSE SearchGet all LOGIN Events from 9th Aug 2016 at 12:30 to 11th Aug 2016 at 12:30select * from datastax.eventsource where solr_query = '{"q":"eventtype:LOGIN", "fq": "time:[2016-08-09T12:30:00.000Z TO 2016-08-11T12:30:00.000Z]", "sort":"time desc"}' limit 10000;To use Spark, using DSE we can just 'dse spark' to use the repl.First we will create an Event object which will hold our events objectscase class Event (date: String, bucket: Int, id: java.util.UUID, data: String, eventtype: String, aggregatetype: String, time: java.util.Date, loglevel: String, host: String); val events = sc.cassandraTable[Event]("datastax", "eventsource").cache; events.countval max = events.map(_.time).maxval min = events.map(_.time).minWe can query our data and return events before or after a certain time.val yesterday = new java.util.Date(java.util.Calendar.getInstance().getTime().getTime()-200000000);yesterdayval before = events.filter(_.time.before(yesterday)); before.take(10).foreach(print) before.count val after = events.filter(_.time.after(yesterday)); after.take(10).foreach(print) after.countOr we can use filtering to just get the events between two dates.val start = new java.util.Date(java.util.Calendar.getInstance().getTime().getTime()-200000000);val end = new java.util.Date(java.util.Calendar.getInstance().getTime().getTime()-190000000);val filtered = events.filter(_.time.after(start)).filter(_.time.before(end)).cache;filtered.countLets get all number of events per host and a list of all distinct hosts.var hostCounts = events.map(f => (f.host, 1)).reduceByKey(_ + _)hostCounts.collect().foreach(println)var hosts = hostCounts.map(f => (f._1))hosts.collect().foreach(println)To use spark sql - try the following with a valid dateval results = sqlContext.sql("SELECT * from datastax.eventsource where date = '20161019'") results.take(5).foreach(println)val results = sqlContext.sql("SELECT * from datastax.eventsource where time > '2016-10-22 16:18:07' ");results.take(5).foreach(println)val results = sqlContext.sql("SELECT * from datastax.eventsource where time > '2016-10-22 16:18:07' and time < '2016-10-23 16:18:07'");results.countTo remove the tables and the schema, run the following.mvn clean compile exec:java -Dexec.mainClass="com.datastax.demo.SchemaTeardown"

Illustration Image

README.md

This demo shows how Cassandra and DSE can be using to store and replay events.

To use Spark you will need to provide your own Cassandra and Spark deployments. In this demo we will use DSE as they are already integrated.

First we start DSE in SearchAnalyics mode to allow us to use both Spark and DSE Search - http://docs.datastax.com/en/dse/5.1/dse-admin/datastax_enterprise/operations/startStop/startDseStandalone.html?hl=starting

The implementation uses bucketing to group all data into particular time buckets for replay. The time bucket used in this example is 1 minute but any time bucket can be used. Also depending how many days, months, years of events that need to be kept, it may be beneficial to spread the events over different tiers of tables.

To create the schema, run the following

mvn clean compile exec:java -Dexec.mainClass="com.datastax.demo.SchemaSetup" -DcontactPoints=localhost

To create the solr core to make our table searchable, run the following

dsetool create_core datastax.eventsource generateResources=true

To create events, run the following (Default of 10 million events)

mvn clean compile exec:java -Dexec.mainClass="com.datastax.events.Main"  -DcontactPoints=localhost -DnoOfEvents=10000000

To replay a sample event set, run

mvn clean compile exec:java -Dexec.mainClass="com.datastax.events.ReadEvents"  -DcontactPoints=localhost -Dfrom=yyyyMMdd-hhmmss -Dto=yyyyMMdd-hhmmss

eg

mvn clean compile exec:java -Dexec.mainClass="com.datastax.events.ReadEvents"  -DcontactPoints=localhost -Dfrom=20160805-000000 -Dto=20160805-010000

This replays 2 scenarios

1. Replay all events for a specified time range
2. Replay all events for a specified time range and a specific event type.		

To run the webservice

mvn jetty:run -Djetty.port=8081 

To run a rest query, go the brower and enter a url in the format http://localhost:8080/datastax-eventsourcing/rest/getevents/from/to, where the date format is 'yyyyMMdd-hhmmss' e.g. For all events from midnight to 1:00 am on the 1st of August 2016 run -

http://localhost:8081/datastax-eventsourcing/rest/getevents/20160801-000000/20160801-010000/

We can also use cql to query using the Solr query from DSE Search

Get all LOGIN Events from 9th Aug 2016 at 12:30 to 11th Aug 2016 at 12:30

select * from datastax.eventsource where solr_query = '{"q":"eventtype:LOGIN", "fq": "time:[2016-08-09T12:30:00.000Z TO 2016-08-11T12:30:00.000Z]", "sort":"time desc"}' limit 10000;

To use Spark, using DSE we can just 'dse spark' to use the repl.

First we will create an Event object which will hold our events objects

case class Event (date: String, bucket: Int, id: java.util.UUID, data: String, eventtype: String, 
aggregatetype: String, time: java.util.Date, loglevel: String, host: String); 
val events =  sc.cassandraTable[Event]("datastax", "eventsource").cache; 
events.count
val max = events.map(_.time).max
val min = events.map(_.time).min

We can query our data and return events before or after a certain time.

val yesterday = new java.util.Date(java.util.Calendar.getInstance().getTime().getTime()-200000000);
yesterday
val before = events.filter(_.time.before(yesterday)); 
before.take(10).foreach(print) 
before.count
 
val after = events.filter(_.time.after(yesterday)); 
after.take(10).foreach(print) 
after.count

Or we can use filtering to just get the events between two dates.

val start = new java.util.Date(java.util.Calendar.getInstance().getTime().getTime()-200000000);
val end = new java.util.Date(java.util.Calendar.getInstance().getTime().getTime()-190000000);
val filtered = events.filter(_.time.after(start)).filter(_.time.before(end)).cache;
filtered.count

Lets get all number of events per host and a list of all distinct hosts.

var hostCounts =  events.map(f => (f.host, 1)).reduceByKey(_ + _)
hostCounts.collect().foreach(println)
var hosts =  hostCounts.map(f => (f._1))
hosts.collect().foreach(println)

To use spark sql - try the following with a valid date

val results = sqlContext.sql("SELECT * from datastax.eventsource where date = '20161019'")
 
results.take(5).foreach(println)
val results = sqlContext.sql("SELECT * from datastax.eventsource where time > '2016-10-22 16:18:07' ");
results.take(5).foreach(println)
val results = sqlContext.sql("SELECT * from datastax.eventsource where time > '2016-10-22 16:18:07' and time < '2016-10-23 16:18:07'");
results.count

To remove the tables and the schema, run the following.

mvn clean compile exec:java -Dexec.mainClass="com.datastax.demo.SchemaTeardown"

Related Articles

cluster
troubleshooting
datastax

GitHub - arodrime/Montecristo: Datastax Cluster Health Check Tooling

arodrime

4/3/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra