Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

3/7/2019

Reading time:19 mins

Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Sca…

by Helena Edelson

Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Sca… SlideShare Explore You Successfully reported this slideshow.Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, ScalaUpcoming SlideShareLoading in …5× 5 Comments 233 Likes Statistics Notes I. Nadir Palacios , JavaEE Developer at Sodexo at Sodexo Benefits and Rewards Services Nick Chervov , Big Data Architect at Business Environment Mohamed Abed Razek cherif , Applied Mathematics and Modelling Engineering student mehrdad tat , Solution developer at Data Processing Kharazmi Co ffch1996 Show More No DownloadsNo notes for slide 1. @helenaedelsonHelena EdelsonLambda Architecture with SparkStreaming, Kafka, Cassandra, Akka, Scala1 2. • Spark Cassandra Connector committer• Akka contributor - 2 new features in Akka Cluster• Big Data & Scala conference speaker• Currently Sr Software Engineer, Analytics @ DataStax• Sr Cloud Engineer, VMware,CrowdStrike,SpringSource…• Prev Spring committer - Spring AMQP, Spring IntegrationAnalyticWho Is This Person? 3. Talk RoadmapWhat Lambda Architecture & Delivering MeaningWhy Spark, Kafka, Cassandra & Akka integrationHow Composable Pipelines - Codehelena.edelson@datastax.com 4. I need fast accessto historical dataon the fly forpredictive modelingwith real time datafrom the stream 5. Lambda ArchitectureA data-processing architecture designed to handle massive quantities ofdata by taking advantage of both batch and stream processing methods.• Spark is one of the few data processing frameworks that allows you toseamlessly integrate batch and stream processing• Of petabytes of data• In the same application 6. Your Code 7. Moving Data Between Systems IsDifficult Risky and Expensive9@helenaedelson 8. How Do We Approach This? 9. Strategies• Scalable Infrastructure• Partition For Scale• Replicate For Resiliency• Share Nothing• Asynchronous Message Passing• Parallelism• Isolation• Data Locality• Location Transparency 10. Strategy TechnologiesScalable Infrastructure / Elastic Spark, Cassandra, KafkaPartition For Scale, Network Topology Aware Cassandra, Spark, Kafka, Akka ClusterReplicate For Resiliency Spark,Cassandra, Akka Cluster all hash the node ringShare Nothing, Masterless Cassandra, Akka Cluster both Dynamo styleFault Tolerance / No Single Point of Failure Spark, Cassandra, KafkaReplay From Any Point Of Failure Spark, Cassandra, Kafka, Akka + Akka PersistenceFailure Detection Cassandra, Spark, Akka, KafkaConsensus & Gossip Cassandra & Akka ClusterParallelism Spark, Cassandra, Kafka, AkkaAsynchronous Data Passing Kafka, Akka, SparkFast, Low Latency, Data Locality Cassandra, Spark, KafkaLocation Transparency Akka, Spark, Cassandra, KafkaMy Nerdy Chart 11. AnalyticAnalyticSearch• Fast, distributed, scalable andfault tolerant cluster computesystem• Enables Low-latency withcomplex analytics• Developed in 2009 at UCBerkeley AMPLab, open sourcedin 2010• Became an Apache project inFebruary, 2014 12. • High Throughput Distributed Messaging• Decouples Data Pipelines• Handles Massive Data Load• Support Massive Number of Consumers• Distribution & partitioning across cluster nodes• Automatic recovery from broker failures 13. Speaking Of Fault Tolerance… 14. The one thing in your infrastructureyou can always rely on. 15. © 2014 DataStax, All Rights Reserved Company Confidential17Availability"During Hurricane Sandy, we lost an entire data center. Completely. Lost. It.Our data in Cassandra never went offline." 16. •Massively Scalable• High Performance• Always On• Masterless 17. • Fault tolerant• Hierarchical Supervision• Customizable Failure Strategies & Detection• Asynchronous Data Passing• Parallelization - Balancing Pool Routers• Akka Cluster• Adaptive / Predictive• Load-Balanced Across Cluster Nodes 18. I’ve used Scalawith theseevery single time. 19. • Stream data from Kafka to Cassandra• Stream data from Kafka to Spark and write to Cassandra• Stream from Cassandra to Spark - coming soon!• Read data from Spark/Spark Streaming Source and write to C*• Read data from Cassandra to Spark 20. HADOOP• Distributed Analytics Platform• Easy Abstraction for Datasets• Support in several languages• Streaming• Machine Learning• Graph• Integrated SQL Queries• Has Generalized DAG executionAll in one packageAnd it uses Akka 21. Most Active OSS In Big DataSearch 22. Apache Spark - Easy to Use APIReturns the top (k) highest temps for any location in the yeardef topK(aggregate: Seq[Double]): Seq[Double] =sc.parallelize(aggregate).top(k).collectReturns the top (k) highest temps … in a Futuredef topK(aggregate: Seq[Double]): Future[Seq[Double]] =sc.parallelize(aggregate).top(k).collectAsyncAnalyticAnalyticSearch 23. Use the Spark Shell toquickly try out code samplesAvailable inandPysparkSpark Shell 24. AnalyticAnalyticSearchCollection To RDDscala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distributedData = sc.parallelize(data)
distributedData: spark.RDD[Int] =spark.ParallelCollection@10d13e3e 25. © 2014 DataStax, All Rights Reserved Company ConfidentialNot Just MapReduce 26. Spark Basic Word Countval conf = new SparkConf().setMaster(host).setAppName(app)
val sc = new SparkContext(conf)sc.textFile(words).flatMap(_.split("s+")).map(word => (word.toLowerCase, 1)).reduceByKey(_ + _).collectAnalyticAnalyticSearch 27. RDDs Can be Generated from aVariety of SourcesTextfilesScala Collections 28. AnalyticAnalyticSearchTransformationActionRDD Operations 29. Setting up C* and SparkDSE > 4.5.0Just start your nodes withdse  cassandra  -­‐kApache CassandraFollow the excellent guide by Al Tobeyhttp://tobert.github.io/post/2014-07-15-installing-cassandra-spark-stack.html 30. When Batch Is Not EnoughAnalyticAnalytic 31. AnalyticAnalyticSearchYour Data Is Like CandyDelicious: you want it now 32. AnalyticAnalyticSearchYour Data Is Like CandyDelicious: you want it nowBatch AnalyticsAnalysis after data has accumulatedDecreases the weight of the data by the time it is processedStreaming AnalyticsAnalytics as data arrives.The data won’t be stale and neither will our analyticsBoth in same app = Lambda 33. Spark Streaming• I want results continuously in the event stream• I want to run computations in my even-driven async apps• Exactly once message guarantees 34. DStream (Discretized Stream)RDD (time 0 to time 1) RDD (time 1 to time 2) RDD (time 2 to time 3)A transformation on a DStream = transformations on its RDDsDStreamContinuous stream of micro batches• Complex processing models with minimal effort• Streaming computations on small time intervals 35. val conf = new SparkConf().setMaster(SparkMaster).setAppName(AppName)val ssc = new StreamingContext(conf, Milliseconds(500))ssc.textFileStream("s3n://raw_data_bucket/").flatMap(_.split("s+")).map(_.toLowerCase, 1)).countByValue().saveToCassandra(keyspace,table)ssc.checkpoint(checkpointDir)ssc.start()ssc.awaitTerminationStarts the streaming application pipingraw incoming data to a SinkThe batch streaming intervalBasic Streaming: FileInputDStream 36. DStreams - the stream of raw data received from streaming sources:• Basic Source - in the StreamingContext API• Advanced Source - in external modules and separate Spark artifactsReceivers• Reliable Receivers - for data sources supporting acks (like Kafka)• Unreliable Receivers - for data sources not supporting acks39ReceiverInputDStreams 37. Spark Streaming External Source/Sink 38. Streaming Window OperationskvStream.flatMap { case (k,v) => (k,v.value) }.reduceByKeyAndWindow((a:Int,b:Int) =>(a + b), Seconds(30), Seconds(10)).saveToCassandra(keyspace,table)Window Length:Duration = every 10sSliding Interval:Interval at which the window operationis performed = every 10 s 39. ScaleApache Cassandra• Scales Linearly to as many nodes as you need• Scales whenever you need 40. PerformanceApache Cassandra• It’s Fast• Built to sustain massive data insertion rates inirregular pattern spikes 41. FaultTolerance&AvailabilityApache Cassandra• Automatic Replication• Multi Datacenter• Decentralized - no single point of failure• Survive regional outages• New nodes automatically add themselves tothe cluster• DataStax drivers automatically discover newnodes 42. © 2014 DataStax, All Rights Reserved Company Confidential47ACDABCABDBCDACDABCABDBCDUS-East EuropeHow many copies of adata should exist in the cluster?ReplicationFactor=3A BC DFault Tolerance & Replication 43. © 2014 DataStax, All Rights Reserved Company Confidential48Cassandra ClusterACDABCABDBCDACDABCABDBCDEuropeReplicationFactor=3US-EastA BC DFault Tolerance & ReplicationHow many copies of adata should exist in the cluster? 44. StrategiesApache Cassandra• Consensus - Paxos Protocol• Sequential Read / Write - Timeseries• Tunable Consistency• Gossip:Did you hear node 1was down?? 45. ArchitectureApache Cassandra• Distributed, Masterless Ring Architecture• Network Topology Aware• Flexible, Schemaless - your data structure can evolveseamlessly over time 46. C* At CERN: Large Haldron Colider•ATLAS - Largest of several detectors along the Large Hadron Collider• Measures particle production when protons collide at a very highcenter of mass energy•- Bursty traffic•- Volume of data from sensors requires• - Very large trigger and data acquisition system• - 30,000 applications on 2,000 nodes 47. Genetics / Biological Computations 48. IoT 49. CREATE TABLE users (username varchar,firstname varchar,lastname varchar,email list<varchar>,password varchar,created_date timestamp,PRIMARY KEY (username));INSERT INTO users (username, firstname, lastname,email, password, created_date)VALUES ('hedelson','Helena','Edelson',[‘helena.edelson@datastax.com'],'ba27e03fd95e507daf2937c937d499ab','2014-11-15 13:50:00’)IF NOT EXISTS;• Familiar syntax• Many Tools & Drivers• Many Languages• Friendly to programmers• Paxos for lockingCQL - Easy 50. CREATE  TABLE  weather.raw_data  (
      wsid  text,  year  int,  month  int,  day  int,  hour  int,                          
      temperature  double,  dewpoint  double,  pressure  double,          wind_direction  int,  wind_speed  double,  one_hour_precip              PRIMARY  KEY  ((wsid),  year,  month,  day,  hour)
)  WITH  CLUSTERING  ORDER  BY  (year  DESC,  month  DESC,  day  DESC,  hour  DESC);  C* Clustering Columns Writes by most recentReads return most recent firstTimeseries DataCassandra will automatically sort by most recent for both write and read 51. val multipleStreams = (1 to numDstreams).map { i =>streamingContext.receiverStream[HttpRequest](new HttpReceiver(port))}streamingContext.union(multipleStreams).map { httpRequest => TimelineRequestEvent(httpRequest)}.saveToCassandra("requests_ks", "timeline")A record of every event, in order in which it happened, per URL:CREATE TABLE IF NOT EXISTS requests_ks.timeline (timesegment bigint, url text, t_uuid timeuuid, method text, headers map <text, text>, body text,PRIMARY KEY ((url, timesegment) , t_uuid));timeuuid protects from simultaneous events over-writing one another.timesegment protects from writing unbounded partitions. 52. Spark Cassandra Connector59@helenaedelson 53. Spark Cassandra Connector•NOSQL JOINS!•Write & Read data between Spark and Cassandra•Compatible with Spark 1.3•Handles Data Locality for Speed•Implicit type conversions•Server-Side Filtering - SELECT, WHERE, etc.•Natural Timeseries Integrationhttps://github.com/datastax/spark-cassandra-connector 54. Spark Cassandra ConnectorC*C*C*C*Spark ExecutorC* DriverSpark-Cassandra ConnectorUser ApplicationCassandra 55. AnalyticSearchWriting and ReadingSparkContextimport  com.datastax.spark.connector._  StreamingContext  import  com.datastax.spark.connector.streaming._ 56. AnalyticWrite from Spark to Cassandrasc.parallelize(Seq(0,1,2)).saveToCassandra(“keyspace”,  "raw_data")SparkContext Keyspace TableSpark RDD JOIN with NOSQL!predictionsRdd.join(music).saveToCassandra("music",  "predictions") 57. Read From C* to Sparkval  rdd  =  sc.cassandraTable("github",  "commits")                                            .select("user","count","year","month")                                            .where("commits  >=  ?  and  year  =  ?",  1000,  2015)CassandraRDD[CassandraRow]Keyspace TableServer-Side Columnand Row FilteringSparkContext 58. val  rdd  =  ssc.cassandraTable[MonthlyCommits]("github",  "commits_aggregate")                            .where("user  =  ?  and  project_name  =  ?  and  year  =  ?",                                    "helena",  "spark-­‐cassandra-­‐connector",  2015)CassandraRow Keyspace TableStreamingContextRows: Custom Objects 59. Rowsval  tuplesRdd  =  sc.cassandraTable[(Int,Date,String)](db,  tweetsTable)    .select("cluster_id","time",  "cluster_name")    .where("time  >  ?  and  time  <  ?",                  "2014-­‐07-­‐12  20:00:01",  "2014-­‐07-­‐12  20:00:03”)val rdd = ssc.cassandraTable[MyDataType]("stats", "clustering_time").where("key = 1").limit(10).collect  val  rdd  =  ssc.cassandraTable[(Int,DateTime,String)]("stats",  "clustering_time")                            .where("key  =  1").withDescOrder.collect   60. Cassandra User Defined TypesCREATE TYPE address (street text,city text,zip_code int,country text,cross_streets set<text>);UDT = Your Custom Field Type In Cassandra 61. Cassandra UDT’s With JSON{"productId": 2,"name": "Kitchen Table","price": 249.99,"description" : "Rectangular table with oak finish","dimensions": {"units": "inches","length": 50.0,"width": 66.0,"height": 32},"categories": {{"category" : "Home Furnishings" {"catalogPage": 45,"url": "/home/furnishings"},{"category" : "Kitchen Furnishings" {"catalogPage": 108,"url": "/kitchen/furnishings"}}}CREATE TYPE dimensions (units text,length float,width float,height float);CREATE TYPE category (catalogPage int,url text);CREATE TABLE product (productId int,name text,price float,description text,dimensions frozen <dimensions>,categories map <text, frozen <category>>,PRIMARY KEY (productId)); 62. Data Locality● Spark asks an RDD for a list of its partitions (splits)● Each split consists of one or more token-ranges● For every partition● Spark asks RDD for a list of preferred nodes to process on● Spark creates a task and sends it to one of the nodes for executionEvery Spark task uses a CQL-like query to fetch data for the given token range:C*C*C*C*SELECT  "key",  "value"  
FROM  "test"."kv"  
WHERE  
    token("key")  >    595597420921139321  AND  
    token("key")  <=  595597431194200132    ALLOW  FILTERING 63. All of the rows in a Cassandra Clusterare stored based based on theirlocation in the Token Range.Cassandra Locates a Row Based onPartition Key and Token Range 64. New York City/Manhattan:HelenaWarsaw:Piotr & JacekSan Francisco:Brian,Russell &AlexEach of the Nodes in a 
Cassandra Cluster is primarilyresponsible for one set ofTokens.0999500Cassandra Locates a Row Based onPartition Key and Token RangeSt. Petersburg:Artem 65. New York CityWarsawSan FranciscoEach of the Nodes in a 
Cassandra Cluster is primarilyresponsible for one set ofTokens.0999500750 - 99350 - 749100 - 349Cassandra Locates a Row Based onPartition Key and Token RangeSt. Petersburg 66. Jacek 514 RedThe CQL Schema designatesat least one column to be thePartition Key.New York CityWarsawSan FranciscoCassandra Locates a Row Based onPartition Key and Token RangeSt. Petersburg 67. Helena 514 RedThe hash of the Partition Keytells us where a rowshould be stored.New York CityWarsawSan FranciscoCassandra Locates a Row Based onPartition Key and Token RangeSt. Petersburg 68. AmsterdamSpark ExecutorThe C* Driver pages spark.cassandra.input.page.row.sizeCQL rows at a timeSELECT * FROM keyspace.table WHEREpk =The Spark Executor uses the Connector toPull Rows from the Local Cassandra Instance 69. AmsterdamSpark Executor (Superman)DataStaxEnterpriseSELECT * FROM keyspace.tableWHERE solr_query = 'title:b'ANDtoken(pk) > 780 and token(pk) <= 830Tokens 780 - 830DataStax Enterprise Enables This Same Machinery 
with Solr Pushdown 70. Composable PipelinesWith Spark, Kafka & Cassandra77@helenaedelson 71. Spark SQL with Cassandraimport org.apache.spark.sql.cassandra.CassandraSQLContextval cc = new CassandraSQLContext(sparkContext)cc.setKeyspace(keyspaceName)cc.sql("""SELECT table1.a, table1.b, table.c, table2.aFROM table1 AS table1JOIN table2 AS table2 ON table1.a = table2.aAND table1.b = table2.bAND table1.c = table2.c""").map(Data(_)).saveToCassandra(keyspace1, table3) 72. val sql = new SQLContext(sparkContext)val json = Seq(
"""{"user":"helena","commits":98, "month":3, "year":2015}""",
"""{"user":"jacek-lewandowski", "commits":72, "month":3, "year":2015}""",
"""{"user":"pkolaczk", "commits":42, "month":3, "year":2015}""")// writesql.jsonRDD(json).map(CommitStats(_)).flatMap(compute).saveToCassandra("stats","monthly_commits")
// readval rdd = sc.cassandraTable[MonthlyCommits]("stats","monthly_commits")cqlsh>  CREATE  TABLE  github_stats.commits_aggr(user  VARCHAR  PRIMARY  KEY,  commits  INT…);Spark SQL with Cassandra & JSON 73. AnalyticAnalyticSearchSpark Streaming, Kafka, C* and JSONcqlsh>  select  *  from  github_stats.commits_aggr;     user | commits | month | year-------------------+---------+-------+------pkolaczk | 42 | 3 | 2015jacek-lewandowski | 43 | 3 | 2015helena | 98 | 3 | 2015
(3  rows)  KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY)
.map { case (_,json) => JsonParser.parse(json).extract[MonthlyCommits]}
.saveToCassandra("github_stats","commits_aggr") 74. Kafka Streaming Word CountsparkConf.set("spark.cassandra.connection.host", "10.20.3.45")
val streamingContext = new StreamingContext(conf, Seconds(30))
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
streamingContext, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY).map(_._2).countByValue().saveToCassandra("my_keyspace","wordcount") 75. Spark Streaming, Twitter & Cassandra/** Cassandra is doing the sorting for you here. */
TwitterUtils.createStream(ssc, auth, tags, StorageLevel.MEMORY_ONLY_SER_2)
.flatMap(_.getText.toLowerCase.split("""s+"""))
.filter(tags.contains(_))
.countByValueAndWindow(Seconds(5), Seconds(5))
.transform((rdd, time) =>rdd.map { case (term, count) => (term, count, now(time))})
.saveToCassandra(keyspace, table)CREATE TABLE IF NOT EXISTS keyspace.table (
topic text, interval text, mentions counter,
PRIMARY KEY(topic, interval)
) WITH CLUSTERING ORDER BY (interval DESC) 76. TrainingDataFeatureExtractionModelTrainingModelTestingTestDataYour Data Extract Data To AnalyzeTrain your model to predictSpark MLLib 77. val ssc = new StreamingContext(new SparkConf()…, Seconds(5)
val testData = ssc.cassandraTable[String](keyspace,table).map(LabeledPoint.parse)
val trainingStream = KafkaUtils.createStream[K, V, KDecoder, VDecoder](
ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY).map(_._2).map(LabeledPoint.parse)trainingStream.saveToCassandra("ml_keyspace", “raw_training_data")
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(weights))
.trainOn(trainingStream)//Making predictions on testDatamodel.predictOnValues(testData.map(lp => (lp.label, lp.features))).saveToCassandra("ml_keyspace", "predictions")Spark Streaming ML, Kafka & C* 78. KillrWeather• Global sensors & satellites collect data• Cassandra stores in sequence• Application reads in sequenceApacheCassandra 79. Data model should look like your queries 80. • Store raw data per ID• Store time series data in order: most recent to oldest• Compute and store aggregate data in the stream• Set TTLs on historic data• Get data by ID• Get data for a single date and time• Get data for a window of time• Compute, store and retrieve daily, monthly, annual aggregationsDesign Data Model to support queriesQueries I Need 81. Data Model• Weather Station Id and Timeare unique• Store as many as neededCREATE TABLE daily_temperature (weather_station text,year int,month int,day int,hour int,temperature double,PRIMARY KEY (weather_station,year,month,day,hour));INSERT INTO temperature(weather_station,year,month,day,hour,temperature)VALUES (‘10010:99999’,2005,12,1,7,-5.6);INSERT INTO temperature(weather_station,year,month,day,hour,temperature)VALUES (‘10010:99999’,2005,12,1,8,-5.1);INSERT INTO temperature(weather_station,year,month,day,hour,temperature)VALUES (‘10010:99999’,2005,12,1,9,-4.9);INSERT INTO temperature(weather_station,year,month,day,hour,temperature)VALUES (‘10010:99999’,2005,12,1,10,-5.3); 82. class HttpNodeGuardian extends ClusterAwareNodeGuardianActor {
cluster.joinSeedNodes(Vector(..))context.actorOf(BalancingPool(PoolSize).props(Props(new KafkaPublisherActor(KafkaHosts, KafkaBatchSendSize))))
Cluster(context.system) registerOnMemberUp {context.actorOf(BalancingPool(PoolSize).props(Props(new HttpReceiverActor(KafkaHosts, KafkaBatchSendSize))))}def initialized: Actor.Receive = { … }}Load-Balanced Data Ingestion 83. class HttpDataIngestActor(kafka: ActorRef) extends Actor with ActorLogging {
implicit val system = context.system
implicit val askTimeout: Timeout = settings.timeout
implicit val materializer = ActorFlowMaterializer(
ActorFlowMaterializerSettings(system))
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(HttpMethods.POST, Uri.Path("/weather/data"), headers, entity, _) =>
headers.toSource collect { case s: Source =>
kafka ! KafkaMessageEnvelope[String, String](topic, group, s.data:_*)
}
HttpResponse(200, entity = HttpEntity(MediaTypes.`text/html`)
}.getOrElse(HttpResponse(404, entity = "Unsupported request"))
case _: HttpRequest =>
HttpResponse(400, entity = "Unsupported request")
}
Http(system).bind(HttpHost, HttpPort).map { case connection =>
log.info("Accepted new connection from " + connection.remoteAddress)
connection.handleWithSyncHandler(requestHandler) }def receive : Actor.Receive = {
case e =>
}
}Client: HTTP Receiver Akka Actor 84. class KafkaProducerActor[K, V](config: ProducerConfig) extends Actor {
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
case _: ActorInitializationException => Stop
case _: FailedToSendMessageException => Restartcase _: ProducerClosedException => Restartcase _: NoBrokersForPartitionException => Escalatecase _: KafkaException => Escalate
case _: Exception => Escalate
}private val producer = new KafkaProducer[K, V](producerConfig)
override def postStop(): Unit = producer.close()def receive = {
case e: KafkaMessageEnvelope[K,V] => producer.send(e)
}
}Client: Kafka Producer Akka Actor 85. Store raw data on ingestion 86. val kafkaStream = KafkaUtils.createStream[K, V, KDecoder, VDecoder](ssc, kafkaParams, topicMap, StorageLevel.DISK_ONLY_2)
.map(transform)
.map(RawWeatherData(_))
/** Saves the raw data to Cassandra. */
kafkaStream.saveToCassandra(keyspace, raw_ws_data)Store Raw Data From Kafka Stream To C*/** Now proceed with computations from the same stream.. */kafkaStream…Now we can replay on failurefor later computation, etc 87. CREATE  TABLE  weather.raw_data  (
      wsid  text,  year  int,  month  int,  day  int,  hour  int,                          
      temperature  double,  dewpoint  double,  pressure  double,          wind_direction  int,  wind_speed  double,  one_hour_precip              PRIMARY  KEY  ((wsid),  year,  month,  day,  hour)
)  WITH  CLUSTERING  ORDER  BY  (year  DESC,  month  DESC,  day  DESC,  hour  DESC);  CREATE  TABLE  daily_aggregate_precip  (
      wsid  text,
      year  int,
      month  int,
      day  int,
      precipitation  counter,
      PRIMARY  KEY  ((wsid),  year,  month,  day)
)  WITH  CLUSTERING  ORDER  BY  (year  DESC,  month  DESC,  day  DESC);Let’s See Our Data Model Again 88. Gets the partition key: Data LocalitySpark C* Connector feeds this to SparkCassandra Counter column in our schema,no expensive `reduceByKey` needed. Simplylet C* do it: not expensive and fast.Efficient Stream Computationclass KafkaStreamingActor(kafkaPm: Map[String, String], ssc: StreamingContext, ws: WeatherSettings)extends AggregationActor {
import settings._val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(KafkaTopicRaw -> 1), StorageLevel.DISK_ONLY_2)
.map(_._2.split(","))
.map(RawWeatherData(_))
kafkaStream.saveToCassandra(CassandraKeyspace, CassandraTableRaw)
/** RawWeatherData: wsid, year, month, day, oneHourPrecip */
kafkaStream.map(hour => (hour.wsid, hour.year, hour.month, hour.day, hour.oneHourPrecip))
.saveToCassandra(CassandraKeyspace, CassandraTableDailyPrecip)
/** Now the [[StreamingContext]] can be started. */
context.parent ! OutputStreamInitialized
def receive : Actor.Receive = {…}} 89. /** For a given weather station, calculates annual cumulative precip - or year to date. */
class PrecipitationActor(ssc: StreamingContext, settings: WeatherSettings) extends AggregationActor {
def receive : Actor.Receive = {
case GetPrecipitation(wsid, year) => cumulative(wsid, year, sender)
case GetTopKPrecipitation(wsid, year, k) => topK(wsid, year, k, sender)
}
/** Computes annual aggregation.Precipitation values are 1 hour deltas from the previous. */
def cumulative(wsid: String, year: Int, requester: ActorRef): Unit =
ssc.cassandraTable[Double](keyspace, dailytable)
.select("precipitation")
.where("wsid = ? AND year = ?", wsid, year)
.collectAsync()
.map(AnnualPrecipitation(_, wsid, year)) pipeTo requester
/** Returns the 10 highest temps for any station in the `year`. */
def topK(wsid: String, year: Int, k: Int, requester: ActorRef): Unit = {
val toTopK = (aggregate: Seq[Double]) => TopKPrecipitation(wsid, year,
ssc.sparkContext.parallelize(aggregate).top(k).toSeq)
ssc.cassandraTable[Double](keyspace, dailytable)
.select("precipitation")
.where("wsid = ? AND year = ?", wsid, year)
.collectAsync().map(toTopK) pipeTo requester
}
} 90. class TemperatureActor(sc: SparkContext, settings: WeatherSettings)extends AggregationActor {
import akka.pattern.pipedef receive: Actor.Receive = {
case e: GetMonthlyHiLowTemperature => highLow(e, sender)
}
def highLow(e: GetMonthlyHiLowTemperature, requester: ActorRef): Unit =
sc.cassandraTable[DailyTemperature](keyspace, daily_temperature_aggr)
.where("wsid = ? AND year = ? AND month = ?", e.wsid, e.year, e.month)
.collectAsync()
.map(MonthlyTemperature(_, e.wsid, e.year, e.month)) pipeTo requester}C* data is automatically sorted by most recent - due to our data model.Additional Spark or collection sort not needed.Efficient Batch Analytics 91. 99@helenaedelsongithub.com/helenaslideshare.net/helenaedelson 92. Learn More Online and at Cassandra Summithttps://academy.datastax.com/ Recommended Visual Thinking StrategiesOnline Course - LinkedIn Learning Learning Management Systems (LMS) Quick StartOnline Course - LinkedIn Learning PowerPoint: From Outline to PresentationOnline Course - LinkedIn Learning Lambda Architecture with Spark, Spark Streaming, Kafka, Cassandra, Akka and S...Helena Edelson How to deploy Apache Spark 
to Mesos/DCOSLegacy Typesafe (now Lightbend) Reactive dashboard’s using apache sparkRahul Kumar Reactive app using actor model & apache sparkRahul Kumar Spark Kernel Talk - Apache Spark Meetup San Francisco (July 2015)Robert "Chip" Senkbeil Intro to Apache SparkMammoth Data Streaming Big Data with Spark, Kafka, Cassandra, Akka & Scala (from webinar)Helena Edelson About Blog Terms Privacy Copyright LinkedIn Corporation © 2019 Public clipboards featuring this slideNo public clipboards found for this slideSelect another clipboard ×Looks like you’ve clipped this slide to already.Create a clipboardYou just clipped your first slide! Clipping is a handy way to collect important slides you want to go back to later. Now customize the name of a clipboard to store your clips. Description Visibility Others can see my Clipboard

Illustration Image
Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Sca…

Successfully reported this slideshow.

Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala
@helenaedelson
Helena Edelson
Lambda Architecture with Spark
Streaming, Kafka, Cassandra, Akka, Scala
1
• Spark Cassandra Connector committer
• Akka contributor - 2 new features in Akka Cluster
• Big Data & Scala conference sp...
Talk Roadmap
What Lambda Architecture & Delivering Meaning
Why Spark, Kafka, Cassandra & Akka integration
How Composable P...
I need fast access
to historical data
on the fly for
predictive modeling
with real time data
from the stream
Lambda Architecture
A data-processing architecture designed to handle massive quantities of
data by taking advantage of bo...
Your Code
Moving Data Between Systems Is
Difficult Risky and Expensive
9
@helenaedelson
How Do We Approach This?
Strategies
• Scalable Infrastructure
• Partition For Scale
• Replicate For Resiliency
• Share Nothing
• Asynchronous Messa...
Strategy Technologies
Scalable Infrastructure / Elastic Spark, Cassandra, Kafka
Partition For Scale, Network Topology Awar...
Analytic
Analytic
Search
• Fast, distributed, scalable and
fault tolerant cluster compute
system
• Enables Low-latency wit...
• High Throughput Distributed Messaging
• Decouples Data Pipelines
• Handles Massive Data Load
• Support Massive Number of...
Speaking Of Fault Tolerance…
The one thing in your infrastructure
you can always rely on.
© 2014 DataStax, All Rights Reserved Company Confidential17
Availability
"During Hurricane Sandy, we lost an entire data c...
•Massively Scalable
• High Performance
• Always On
• Masterless
• Fault tolerant
• Hierarchical Supervision
• Customizable Failure Strategies & Detection
• Asynchronous Data Passing
• Pa...
I’ve used Scala
with these
every single time.
• Stream data from Kafka to Cassandra
• Stream data from Kafka to Spark and write to Cassandra
• Stream from Cassandra to ...
HADOOP
• Distributed Analytics Platform
• Easy Abstraction for Datasets
• Support in several languages
• Streaming
• Machi...
Most Active OSS In Big Data
Search
Apache Spark - Easy to Use API
Returns the top (k) highest temps for any location in the year
def topK(aggregate: Seq[Doub...
Use the Spark Shell to
quickly try out code samples
Available in
and
Pyspark
Spark Shell
Analytic
Analytic
Search
Collection To RDD
scala> val data = Array(1, 2, 3, 4, 5)

data: Array[Int] = Array(1, 2, 3, 4, 5)...
© 2014 DataStax, All Rights Reserved Company Confidential
Not Just MapReduce
Spark Basic Word Count
val conf = new SparkConf()
.setMaster(host).setAppName(app)



val sc = new SparkContext(conf)
sc.t...
RDDs Can be Generated from a
Variety of Sources
Textfiles
Scala Collections
Analytic
Analytic
Search
Transformation
Action
RDD Operations
Setting up C* and Spark
DSE > 4.5.0
Just start your nodes with
dse	
  cassandra	
  -­‐k
Apache Cassandra
Follow the excell...
When Batch Is Not Enough
Analytic
Analytic
Analytic
Analytic
Search
Your Data Is Like Candy
Delicious: you want it now
Analytic
Analytic
Search
Your Data Is Like Candy
Delicious: you want it now
Batch Analytics
Analysis after data has accumu...
Spark Streaming
• I want results continuously in the event stream
• I want to run computations in my even-driven async app...
DStream (Discretized Stream)
RDD (time 0 to time 1) RDD (time 1 to time 2) RDD (time 2 to time 3)
A transformation on a DS...
val conf = new SparkConf().setMaster(SparkMaster).setAppName(AppName)
val ssc = new StreamingContext(conf, Milliseconds(50...
DStreams - the stream of raw data received from streaming sources:
• Basic Source - in the StreamingContext API
• Advanced...
Spark Streaming External Source/Sink
Streaming Window Operations
kvStream
.flatMap { case (k,v) => (k,v.value) }
.reduceByKeyAndWindow((a:Int,b:Int) =>
(a + b)...
Scale
Apache Cassandra
• Scales Linearly to as many nodes as you need
• Scales whenever you need
Performance
Apache Cassandra
• It’s Fast
• Built to sustain massive data insertion rates in
irregular pattern spikes
Fault
Tolerance
&
Availability
Apache Cassandra
• Automatic Replication
• Multi Datacenter
• Decentralized - no single poi...
© 2014 DataStax, All Rights Reserved Company Confidential47
ACD
ABCABD
BCDACD
ABCABD
BCD
US-East Europe
How many copies of...
© 2014 DataStax, All Rights Reserved Company Confidential48
Cassandra Cluster
ACD
ABCABD
BCDACD
ABCABD
BCD
Europe
Replicat...
Strategies
Apache Cassandra
• Consensus - Paxos Protocol
• Sequential Read / Write - Timeseries
• Tunable Consistency
• Go...
Architecture
Apache Cassandra
• Distributed, Masterless Ring Architecture
• Network Topology Aware
• Flexible, Schemaless ...
C* At CERN: Large Haldron Colider
•ATLAS - Largest of several detectors along the Large Hadron Collider
• Measures particl...
Genetics / Biological Computations
IoT
CREATE TABLE users (
username varchar,
firstname varchar,
lastname varchar,
email list<varchar>,
password varchar,
created_...
CREATE	
  TABLE	
  weather.raw_data	
  (

	
  	
  	
  wsid	
  text,	
  year	
  int,	
  month	
  int,	
  day	
  int,	
  hou...
val multipleStreams = (1 to numDstreams).map { i =>
streamingContext.receiverStream[HttpRequest](new HttpReceiver(port))
}...
Spark Cassandra Connector
59
@helenaedelson
Spark Cassandra Connector
•NOSQL JOINS!
•Write & Read data between Spark and Cassandra
•Compatible with Spark 1.3
•Handles...
Spark Cassandra Connector
C*
C*
C*C*
Spark Executor
C* Driver
Spark-Cassandra Connector
User Application
Cassandra
Analytic
Search
Writing and Reading
SparkContext
import	
  com.datastax.spark.connector._	
  
StreamingContext	
  
import	...
Analytic
Write from Spark to Cassandra
sc.parallelize(Seq(0,1,2)).saveToCassandra(“keyspace”,	
  "raw_data")
SparkContext ...
Read From C* to Spark
val	
  rdd	
  =	
  sc.cassandraTable("github",	
  "commits")	
  
	
  	
  	
  	
  	
  	
  	
  	
  	
 ...
val	
  rdd	
  =	
  ssc.cassandraTable[MonthlyCommits]("github",	
  "commits_aggregate")	
  
	
  	
  	
  	
  	
  	
  	
  	
...
Rows
val	
  tuplesRdd	
  =	
  sc.cassandraTable[(Int,Date,String)](db,	
  tweetsTable)	
  
	
  .select("cluster_id","time"...
Cassandra User Defined Types
CREATE TYPE address (
street text,
city text,
zip_code int,
country text,
cross_streets set<t...
Cassandra UDT’s With JSON
{
"productId": 2,
"name": "Kitchen Table",
"price": 249.99,
"description" : "Rectangular table w...
Data Locality
● Spark asks an RDD for a list of its partitions (splits)
● Each split consists of one or more token-ranges
...
All of the rows in a Cassandra Cluster
are stored based based on their
location in the Token Range.
Cassandra Locates a Ro...
New York City/
Manhattan:
Helena
Warsaw:
Piotr & Jacek
San Francisco:
Brian,Russell &
Alex
Each of the Nodes in a 

Cassan...
New York City
Warsaw
San Francisco
Each of the Nodes in a 

Cassandra Cluster is primarily
responsible for one set of
Toke...
Jacek 514 Red
The CQL Schema designates
at least one column to be the
Partition Key.
New York City
Warsaw
San Francisco
Ca...
Helena 514 Red
The hash of the Partition Key
tells us where a row
should be stored.
New York City
Warsaw
San Francisco
Cas...
Amsterdam
Spark Executor
The C* Driver pages spark.cassandra.input.page.row.size
CQL rows at a time
SELECT * FROM keyspace...
Amsterdam
Spark Executor (Superman)
DataStax
Enterprise
SELECT * FROM keyspace.table
WHERE solr_query = 'title:b'
AND
toke...
Composable Pipelines
With Spark, Kafka & Cassandra
77
@helenaedelson
Spark SQL with Cassandra
import org.apache.spark.sql.cassandra.CassandraSQLContext
val cc = new CassandraSQLContext(sparkC...


val sql = new SQLContext(sparkContext)
val json = Seq(

"""{"user":"helena","commits":98, "month":3, "year":2015}""",

"...
Analytic
Analytic
Search
Spark Streaming, Kafka, C* and JSON
cqlsh>	
  select	
  *	
  from	
  github_stats.commits_aggr;	
...
Kafka Streaming Word Count
sparkConf.set("spark.cassandra.connection.host", "10.20.3.45")

val streamingContext = new Stre...
Spark Streaming, Twitter & Cassandra
/** Cassandra is doing the sorting for you here. */

TwitterUtils.createStream(
ssc, ...
Training
Data
Feature
Extraction
Model
Training
Model
Testing
Test
Data
Your Data Extract Data To Analyze
Train your model...
val ssc = new StreamingContext(new SparkConf()…, Seconds(5)

val testData = ssc.cassandraTable[String](keyspace,table).map...
KillrWeather
• Global sensors & satellites collect data
• Cassandra stores in sequence
• Application reads in sequence
Apa...
Data model should look like your queries
• Store raw data per ID
• Store time series data in order: most recent to oldest
• Compute and store aggregate data in the...
Data Model
• Weather Station Id and Time
are unique
• Store as many as needed
CREATE TABLE daily_temperature (
weather_sta...
class HttpNodeGuardian extends ClusterAwareNodeGuardianActor {

cluster.joinSeedNodes(Vector(..))


context.actorOf(Balanc...
class HttpDataIngestActor(kafka: ActorRef) extends Actor with ActorLogging {

implicit val system = context.system

implic...
class KafkaProducerActor[K, V](config: ProducerConfig) extends Actor {



override val supervisorStrategy =

OneForOneStra...
Store raw data on ingestion
val kafkaStream = KafkaUtils.createStream[K, V, KDecoder, VDecoder]
(ssc, kafkaParams, topicMap, StorageLevel.DISK_ONLY_2)...
CREATE	
  TABLE	
  weather.raw_data	
  (

	
  	
  	
  wsid	
  text,	
  year	
  int,	
  month	
  int,	
  day	
  int,	
  hou...
Gets the partition key: Data Locality
Spark C* Connector feeds this to Spark
Cassandra Counter column in our schema,
no ex...
/** For a given weather station, calculates annual cumulative precip - or year to date. */

class PrecipitationActor(ssc: ...
class TemperatureActor(sc: SparkContext, settings: WeatherSettings)
extends AggregationActor {

import akka.pattern.pipe

...
99
@helenaedelson
github.com/helena
slideshare.net/helenaedelson
Learn More Online and at Cassandra Summit
https://academy.datastax.com/
Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala
Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala
Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala
Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala
Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala
Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala
Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala
Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala

Upcoming SlideShare

Loading in …5

×

  1. 1. @helenaedelson Helena Edelson Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala 1
  2. 2. • Spark Cassandra Connector committer • Akka contributor - 2 new features in Akka Cluster • Big Data & Scala conference speaker • Currently Sr Software Engineer, Analytics @ DataStax • Sr Cloud Engineer, VMware,CrowdStrike,SpringSource… • Prev Spring committer - Spring AMQP, Spring Integration Analytic Who Is This Person?
  3. 3. Talk Roadmap What Lambda Architecture & Delivering Meaning Why Spark, Kafka, Cassandra & Akka integration How Composable Pipelines - Code helena.edelson@datastax.com
  4. 4. I need fast access to historical data on the fly for predictive modeling with real time data from the stream
  5. 5. Lambda Architecture A data-processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream processing methods. • Spark is one of the few data processing frameworks that allows you to seamlessly integrate batch and stream processing • Of petabytes of data • In the same application
  6. 6. Your Code
  7. 7. Moving Data Between Systems Is Difficult Risky and Expensive 9 @helenaedelson
  8. 8. How Do We Approach This?
  9. 9. Strategies • Scalable Infrastructure • Partition For Scale • Replicate For Resiliency • Share Nothing • Asynchronous Message Passing • Parallelism • Isolation • Data Locality • Location Transparency
  10. 10. Strategy Technologies Scalable Infrastructure / Elastic Spark, Cassandra, Kafka Partition For Scale, Network Topology Aware Cassandra, Spark, Kafka, Akka Cluster Replicate For Resiliency Spark,Cassandra, Akka Cluster all hash the node ring Share Nothing, Masterless Cassandra, Akka Cluster both Dynamo style Fault Tolerance / No Single Point of Failure Spark, Cassandra, Kafka Replay From Any Point Of Failure Spark, Cassandra, Kafka, Akka + Akka Persistence Failure Detection Cassandra, Spark, Akka, Kafka Consensus & Gossip Cassandra & Akka Cluster Parallelism Spark, Cassandra, Kafka, Akka Asynchronous Data Passing Kafka, Akka, Spark Fast, Low Latency, Data Locality Cassandra, Spark, Kafka Location Transparency Akka, Spark, Cassandra, Kafka My Nerdy Chart
  11. 11. Analytic Analytic Search • Fast, distributed, scalable and fault tolerant cluster compute system • Enables Low-latency with complex analytics • Developed in 2009 at UC Berkeley AMPLab, open sourced in 2010 • Became an Apache project in February, 2014
  12. 12. • High Throughput Distributed Messaging • Decouples Data Pipelines • Handles Massive Data Load • Support Massive Number of Consumers • Distribution & partitioning across cluster nodes • Automatic recovery from broker failures
  13. 13. Speaking Of Fault Tolerance…
  14. 14. The one thing in your infrastructure you can always rely on.
  15. 15. © 2014 DataStax, All Rights Reserved Company Confidential17 Availability "During Hurricane Sandy, we lost an entire data center. Completely. Lost. It. Our data in Cassandra never went offline."
  16. 16. •Massively Scalable • High Performance • Always On • Masterless
  17. 17. • Fault tolerant • Hierarchical Supervision • Customizable Failure Strategies & Detection • Asynchronous Data Passing • Parallelization - Balancing Pool Routers • Akka Cluster • Adaptive / Predictive • Load-Balanced Across Cluster Nodes
  18. 18. I’ve used Scala with these every single time.
  19. 19. • Stream data from Kafka to Cassandra • Stream data from Kafka to Spark and write to Cassandra • Stream from Cassandra to Spark - coming soon! • Read data from Spark/Spark Streaming Source and write to C* • Read data from Cassandra to Spark
  20. 20. HADOOP • Distributed Analytics Platform • Easy Abstraction for Datasets • Support in several languages • Streaming • Machine Learning • Graph • Integrated SQL Queries • Has Generalized DAG execution All in one package And it uses Akka
  21. 21. Most Active OSS In Big Data Search
  22. 22. Apache Spark - Easy to Use API Returns the top (k) highest temps for any location in the year def topK(aggregate: Seq[Double]): Seq[Double] = sc.parallelize(aggregate).top(k).collect Returns the top (k) highest temps … in a Future def topK(aggregate: Seq[Double]): Future[Seq[Double]] = sc.parallelize(aggregate).top(k).collectAsync Analytic Analytic Search
  23. 23. Use the Spark Shell to quickly try out code samples Available in and Pyspark Spark Shell
  24. 24. Analytic Analytic Search Collection To RDD scala> val data = Array(1, 2, 3, 4, 5)
 data: Array[Int] = Array(1, 2, 3, 4, 5)
 
 scala> val distributedData = sc.parallelize(data)
 distributedData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e
  25. 25. © 2014 DataStax, All Rights Reserved Company Confidential Not Just MapReduce
  26. 26. Spark Basic Word Count val conf = new SparkConf() .setMaster(host).setAppName(app)
 
 val sc = new SparkContext(conf) sc.textFile(words) .flatMap(_.split("s+")) .map(word => (word.toLowerCase, 1)) .reduceByKey(_ + _) .collect Analytic Analytic Search
  27. 27. RDDs Can be Generated from a Variety of Sources Textfiles Scala Collections
  28. 28. Analytic Analytic Search Transformation Action RDD Operations
  29. 29. Setting up C* and Spark DSE > 4.5.0 Just start your nodes with dse  cassandra  -­‐k Apache Cassandra Follow the excellent guide by Al Tobey http://tobert.github.io/post/2014-07-15-installing-cassandra-spark-stack.html
  30. 30. When Batch Is Not Enough Analytic Analytic
  31. 31. Analytic Analytic Search Your Data Is Like Candy Delicious: you want it now
  32. 32. Analytic Analytic Search Your Data Is Like Candy Delicious: you want it now Batch Analytics Analysis after data has accumulated Decreases the weight of the data by the time it is processed Streaming Analytics Analytics as data arrives. The data won’t be stale and neither will our analytics Both in same app = Lambda
  33. 33. Spark Streaming • I want results continuously in the event stream • I want to run computations in my even-driven async apps • Exactly once message guarantees
  34. 34. DStream (Discretized Stream) RDD (time 0 to time 1) RDD (time 1 to time 2) RDD (time 2 to time 3) A transformation on a DStream = transformations on its RDDs DStream Continuous stream of micro batches • Complex processing models with minimal effort • Streaming computations on small time intervals
  35. 35. val conf = new SparkConf().setMaster(SparkMaster).setAppName(AppName) val ssc = new StreamingContext(conf, Milliseconds(500)) ssc.textFileStream("s3n://raw_data_bucket/") .flatMap(_.split("s+")) .map(_.toLowerCase, 1)) .countByValue() .saveToCassandra(keyspace,table) ssc.checkpoint(checkpointDir) ssc.start() ssc.awaitTermination Starts the streaming application piping raw incoming data to a Sink The batch streaming interval Basic Streaming: FileInputDStream
  36. 36. DStreams - the stream of raw data received from streaming sources: • Basic Source - in the StreamingContext API • Advanced Source - in external modules and separate Spark artifacts Receivers • Reliable Receivers - for data sources supporting acks (like Kafka) • Unreliable Receivers - for data sources not supporting acks 39 ReceiverInputDStreams
  37. 37. Spark Streaming External Source/Sink
  38. 38. Streaming Window Operations kvStream .flatMap { case (k,v) => (k,v.value) } .reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) .saveToCassandra(keyspace,table) Window Length: Duration = every 10s Sliding Interval: Interval at which the window operation is performed = every 10 s
  39. 39. Scale Apache Cassandra • Scales Linearly to as many nodes as you need • Scales whenever you need
  40. 40. Performance Apache Cassandra • It’s Fast • Built to sustain massive data insertion rates in irregular pattern spikes
  41. 41. Fault Tolerance & Availability Apache Cassandra • Automatic Replication • Multi Datacenter • Decentralized - no single point of failure • Survive regional outages • New nodes automatically add themselves to the cluster • DataStax drivers automatically discover new nodes
  42. 42. © 2014 DataStax, All Rights Reserved Company Confidential47 ACD ABCABD BCDACD ABCABD BCD US-East Europe How many copies of a data should exist in the cluster? ReplicationFactor=3 A B C D Fault Tolerance & Replication
  43. 43. © 2014 DataStax, All Rights Reserved Company Confidential48 Cassandra Cluster ACD ABCABD BCDACD ABCABD BCD Europe ReplicationFactor=3 US-East A B C D Fault Tolerance & Replication How many copies of a data should exist in the cluster?
  44. 44. Strategies Apache Cassandra • Consensus - Paxos Protocol • Sequential Read / Write - Timeseries • Tunable Consistency • Gossip: Did you hear node 1 was down??
  45. 45. Architecture Apache Cassandra • Distributed, Masterless Ring Architecture • Network Topology Aware • Flexible, Schemaless - your data structure can evolve seamlessly over time
  46. 46. C* At CERN: Large Haldron Colider •ATLAS - Largest of several detectors along the Large Hadron Collider • Measures particle production when protons collide at a very high center of mass energy •- Bursty traffic •- Volume of data from sensors requires • - Very large trigger and data acquisition system • - 30,000 applications on 2,000 nodes
  47. 47. Genetics / Biological Computations
  48. 48. IoT
  49. 49. CREATE TABLE users ( username varchar, firstname varchar, lastname varchar, email list<varchar>, password varchar, created_date timestamp, PRIMARY KEY (username) ); INSERT INTO users (username, firstname, lastname, email, password, created_date) VALUES ('hedelson','Helena','Edelson', [‘helena.edelson@datastax.com'],'ba27e03fd95e507daf2937c937d499ab','2014-11-15 13:50:00’) IF NOT EXISTS; • Familiar syntax • Many Tools & Drivers • Many Languages • Friendly to programmers • Paxos for locking CQL - Easy
  50. 50. CREATE  TABLE  weather.raw_data  (
      wsid  text,  year  int,  month  int,  day  int,  hour  int,                          
      temperature  double,  dewpoint  double,  pressure  double,          wind_direction  int,  wind_speed  double,  one_hour_precip              PRIMARY  KEY  ((wsid),  year,  month,  day,  hour)
 )  WITH  CLUSTERING  ORDER  BY  (year  DESC,  month  DESC,  day  DESC,  hour  DESC);   C* Clustering Columns Writes by most recent Reads return most recent first Timeseries Data Cassandra will automatically sort by most recent for both write and read
  51. 51. val multipleStreams = (1 to numDstreams).map { i => streamingContext.receiverStream[HttpRequest](new HttpReceiver(port)) } streamingContext.union(multipleStreams) .map { httpRequest => TimelineRequestEvent(httpRequest)} .saveToCassandra("requests_ks", "timeline") A record of every event, in order in which it happened, per URL: CREATE TABLE IF NOT EXISTS requests_ks.timeline ( timesegment bigint, url text, t_uuid timeuuid, method text, headers map <text, text>, body text, PRIMARY KEY ((url, timesegment) , t_uuid) ); timeuuid protects from simultaneous events over-writing one another. timesegment protects from writing unbounded partitions.
  52. 52. Spark Cassandra Connector 59 @helenaedelson
  53. 53. Spark Cassandra Connector •NOSQL JOINS! •Write & Read data between Spark and Cassandra •Compatible with Spark 1.3 •Handles Data Locality for Speed •Implicit type conversions •Server-Side Filtering - SELECT, WHERE, etc. •Natural Timeseries Integration https://github.com/datastax/spark-cassandra-connector
  54. 54. Spark Cassandra Connector C* C* C*C* Spark Executor C* Driver Spark-Cassandra Connector User Application Cassandra
  55. 55. Analytic Search Writing and Reading SparkContext import  com.datastax.spark.connector._   StreamingContext   import  com.datastax.spark.connector.streaming._
  56. 56. Analytic Write from Spark to Cassandra sc.parallelize(Seq(0,1,2)).saveToCassandra(“keyspace”,  "raw_data") SparkContext Keyspace Table Spark RDD JOIN with NOSQL! predictionsRdd.join(music).saveToCassandra("music",  "predictions")
  57. 57. Read From C* to Spark val  rdd  =  sc.cassandraTable("github",  "commits")                                            .select("user","count","year","month")                                            .where("commits  >=  ?  and  year  =  ?",  1000,  2015) CassandraRDD[CassandraRow] Keyspace Table Server-Side Column and Row Filtering SparkContext
  58. 58. val  rdd  =  ssc.cassandraTable[MonthlyCommits]("github",  "commits_aggregate")                            .where("user  =  ?  and  project_name  =  ?  and  year  =  ?",                                    "helena",  "spark-­‐cassandra-­‐connector",  2015) CassandraRow Keyspace TableStreamingContext Rows: Custom Objects
  59. 59. Rows val  tuplesRdd  =  sc.cassandraTable[(Int,Date,String)](db,  tweetsTable)    .select("cluster_id","time",  "cluster_name")    .where("time  >  ?  and  time  <  ?",                  "2014-­‐07-­‐12  20:00:01",  "2014-­‐07-­‐12  20:00:03”) val rdd = ssc.cassandraTable[MyDataType]("stats", "clustering_time") .where("key = 1").limit(10).collect   val  rdd  =  ssc.cassandraTable[(Int,DateTime,String)]("stats",  "clustering_time")                            .where("key  =  1").withDescOrder.collect  
  60. 60. Cassandra User Defined Types CREATE TYPE address ( street text, city text, zip_code int, country text, cross_streets set<text> ); UDT = Your Custom Field Type In Cassandra
  61. 61. Cassandra UDT’s With JSON { "productId": 2, "name": "Kitchen Table", "price": 249.99, "description" : "Rectangular table with oak finish", "dimensions": { "units": "inches", "length": 50.0, "width": 66.0, "height": 32 }, "categories": { { "category" : "Home Furnishings" { "catalogPage": 45, "url": "/home/furnishings" }, { "category" : "Kitchen Furnishings" { "catalogPage": 108, "url": "/kitchen/furnishings" } } } CREATE TYPE dimensions ( units text, length float, width float, height float ); CREATE TYPE category ( catalogPage int, url text ); CREATE TABLE product ( productId int, name text, price float, description text, dimensions frozen <dimensions>, categories map <text, frozen <category>>, PRIMARY KEY (productId) );
  62. 62. Data Locality ● Spark asks an RDD for a list of its partitions (splits) ● Each split consists of one or more token-ranges ● For every partition ● Spark asks RDD for a list of preferred nodes to process on ● Spark creates a task and sends it to one of the nodes for execution Every Spark task uses a CQL-like query to fetch data for the given token range: C* C* C*C* SELECT  "key",  "value"  
 FROM  "test"."kv"  
 WHERE  
    token("key")  >    595597420921139321  AND  
    token("key")  <=  595597431194200132     ALLOW  FILTERING
  63. 63. All of the rows in a Cassandra Cluster are stored based based on their location in the Token Range. Cassandra Locates a Row Based on Partition Key and Token Range
  64. 64. New York City/ Manhattan: Helena Warsaw: Piotr & Jacek San Francisco: Brian,Russell & Alex Each of the Nodes in a 
 Cassandra Cluster is primarily responsible for one set of Tokens. 0999 500 Cassandra Locates a Row Based on Partition Key and Token Range St. Petersburg: Artem
  65. 65. New York City Warsaw San Francisco Each of the Nodes in a 
 Cassandra Cluster is primarily responsible for one set of Tokens. 0999 500 750 - 99 350 - 749 100 - 349 Cassandra Locates a Row Based on Partition Key and Token Range St. Petersburg
  66. 66. Jacek 514 Red The CQL Schema designates at least one column to be the Partition Key. New York City Warsaw San Francisco Cassandra Locates a Row Based on Partition Key and Token Range St. Petersburg
  67. 67. Helena 514 Red The hash of the Partition Key tells us where a row should be stored. New York City Warsaw San Francisco Cassandra Locates a Row Based on Partition Key and Token Range St. Petersburg
  68. 68. Amsterdam Spark Executor The C* Driver pages spark.cassandra.input.page.row.size CQL rows at a time SELECT * FROM keyspace.table WHERE pk = The Spark Executor uses the Connector to Pull Rows from the Local Cassandra Instance
  69. 69. Amsterdam Spark Executor (Superman) DataStax Enterprise SELECT * FROM keyspace.table WHERE solr_query = 'title:b' AND token(pk) > 780 and token(pk) <= 830 Tokens 780 - 830 DataStax Enterprise Enables This Same Machinery 
 with Solr Pushdown
  70. 70. Composable Pipelines With Spark, Kafka & Cassandra 77 @helenaedelson
  71. 71. Spark SQL with Cassandra import org.apache.spark.sql.cassandra.CassandraSQLContext val cc = new CassandraSQLContext(sparkContext) cc.setKeyspace(keyspaceName) cc.sql(""" SELECT table1.a, table1.b, table.c, table2.a FROM table1 AS table1 JOIN table2 AS table2 ON table1.a = table2.a AND table1.b = table2.b AND table1.c = table2.c """) .map(Data(_)) .saveToCassandra(keyspace1, table3)
  72. 72. 
 val sql = new SQLContext(sparkContext) val json = Seq(
 """{"user":"helena","commits":98, "month":3, "year":2015}""",
 """{"user":"jacek-lewandowski", "commits":72, "month":3, "year":2015}""",
 """{"user":"pkolaczk", "commits":42, "month":3, "year":2015}""") // write sql.jsonRDD(json) .map(CommitStats(_)) .flatMap(compute) .saveToCassandra("stats","monthly_commits")
 // read val rdd = sc.cassandraTable[MonthlyCommits]("stats","monthly_commits") cqlsh>  CREATE  TABLE  github_stats.commits_aggr(user  VARCHAR  PRIMARY  KEY,  commits  INT…); Spark SQL with Cassandra & JSON
  73. 73. Analytic Analytic Search Spark Streaming, Kafka, C* and JSON cqlsh>  select  *  from  github_stats.commits_aggr;   
   user | commits | month | year -------------------+---------+-------+------ pkolaczk | 42 | 3 | 2015 jacek-lewandowski | 43 | 3 | 2015 helena | 98 | 3 | 2015
 (3  rows)   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
 ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY)
 .map { case (_,json) => JsonParser.parse(json).extract[MonthlyCommits]}
 .saveToCassandra("github_stats","commits_aggr")
  74. 74. Kafka Streaming Word Count sparkConf.set("spark.cassandra.connection.host", "10.20.3.45")
 val streamingContext = new StreamingContext(conf, Seconds(30))
 KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
 streamingContext, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY) .map(_._2) .countByValue() .saveToCassandra("my_keyspace","wordcount")
  75. 75. Spark Streaming, Twitter & Cassandra /** Cassandra is doing the sorting for you here. */
 TwitterUtils.createStream( ssc, auth, tags, StorageLevel.MEMORY_ONLY_SER_2)
 .flatMap(_.getText.toLowerCase.split("""s+"""))
 .filter(tags.contains(_))
 .countByValueAndWindow(Seconds(5), Seconds(5))
 .transform((rdd, time) => rdd.map { case (term, count) => (term, count, now(time))})
 .saveToCassandra(keyspace, table) CREATE TABLE IF NOT EXISTS keyspace.table (
 topic text, interval text, mentions counter,
 PRIMARY KEY(topic, interval)
 ) WITH CLUSTERING ORDER BY (interval DESC)
  76. 76. Training Data Feature Extraction Model Training Model Testing Test Data Your Data Extract Data To Analyze Train your model to predict Spark MLLib
  77. 77. val ssc = new StreamingContext(new SparkConf()…, Seconds(5)
 val testData = ssc.cassandraTable[String](keyspace,table).map(LabeledPoint.parse)
 
 val trainingStream = KafkaUtils.createStream[K, V, KDecoder, VDecoder](
 ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY) .map(_._2).map(LabeledPoint.parse) trainingStream.saveToCassandra("ml_keyspace", “raw_training_data")
 
 val model = new StreamingLinearRegressionWithSGD()
 .setInitialWeights(Vectors.dense(weights))
 .trainOn(trainingStream) 
 //Making predictions on testData model .predictOnValues(testData.map(lp => (lp.label, lp.features))) .saveToCassandra("ml_keyspace", "predictions") Spark Streaming ML, Kafka & C*
  78. 78. KillrWeather • Global sensors & satellites collect data • Cassandra stores in sequence • Application reads in sequence Apache Cassandra
  79. 79. Data model should look like your queries
  80. 80. • Store raw data per ID • Store time series data in order: most recent to oldest • Compute and store aggregate data in the stream • Set TTLs on historic data • Get data by ID • Get data for a single date and time • Get data for a window of time • Compute, store and retrieve daily, monthly, annual aggregations Design Data Model to support queries Queries I Need
  81. 81. Data Model • Weather Station Id and Time are unique • Store as many as needed CREATE TABLE daily_temperature ( weather_station text, year int, month int, day int, hour int, temperature double, PRIMARY KEY (weather_station,year,month,day,hour) ); INSERT INTO temperature(weather_station,year,month,day,hour,temperature) VALUES (‘10010:99999’,2005,12,1,7,-5.6); INSERT INTO temperature(weather_station,year,month,day,hour,temperature) VALUES (‘10010:99999’,2005,12,1,8,-5.1); INSERT INTO temperature(weather_station,year,month,day,hour,temperature) VALUES (‘10010:99999’,2005,12,1,9,-4.9); INSERT INTO temperature(weather_station,year,month,day,hour,temperature) VALUES (‘10010:99999’,2005,12,1,10,-5.3);
  82. 82. class HttpNodeGuardian extends ClusterAwareNodeGuardianActor {
 cluster.joinSeedNodes(Vector(..)) 
 context.actorOf(BalancingPool(PoolSize).props(Props( new KafkaPublisherActor(KafkaHosts, KafkaBatchSendSize))))
 
 Cluster(context.system) registerOnMemberUp { context.actorOf(BalancingPool(PoolSize).props(Props( new HttpReceiverActor(KafkaHosts, KafkaBatchSendSize)))) } def initialized: Actor.Receive = { … } 
 } Load-Balanced Data Ingestion
  83. 83. class HttpDataIngestActor(kafka: ActorRef) extends Actor with ActorLogging {
 implicit val system = context.system
 implicit val askTimeout: Timeout = settings.timeout
 implicit val materializer = ActorFlowMaterializer(
 ActorFlowMaterializerSettings(system))
 
 val requestHandler: HttpRequest => HttpResponse = {
 case HttpRequest(HttpMethods.POST, Uri.Path("/weather/data"), headers, entity, _) =>
 headers.toSource collect { case s: Source =>
 kafka ! KafkaMessageEnvelope[String, String](topic, group, s.data:_*)
 }
 HttpResponse(200, entity = HttpEntity(MediaTypes.`text/html`)
 }.getOrElse(HttpResponse(404, entity = "Unsupported request"))
 case _: HttpRequest =>
 HttpResponse(400, entity = "Unsupported request")
 }
 
 Http(system).bind(HttpHost, HttpPort).map { case connection =>
 log.info("Accepted new connection from " + connection.remoteAddress)
 connection.handleWithSyncHandler(requestHandler) } 
 def receive : Actor.Receive = {
 case e =>
 }
 } Client: HTTP Receiver Akka Actor
  84. 84. class KafkaProducerActor[K, V](config: ProducerConfig) extends Actor {
 
 override val supervisorStrategy =
 OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
 case _: ActorInitializationException => Stop
 case _: FailedToSendMessageException => Restart case _: ProducerClosedException => Restart case _: NoBrokersForPartitionException => Escalate case _: KafkaException => Escalate
 case _: Exception => Escalate
 } 
 private val producer = new KafkaProducer[K, V](producerConfig)
 
 override def postStop(): Unit = producer.close() 
 def receive = {
 case e: KafkaMessageEnvelope[K,V] => producer.send(e)
 }
 } Client: Kafka Producer Akka Actor
  85. 85. Store raw data on ingestion
  86. 86. val kafkaStream = KafkaUtils.createStream[K, V, KDecoder, VDecoder] (ssc, kafkaParams, topicMap, StorageLevel.DISK_ONLY_2)
 .map(transform)
 .map(RawWeatherData(_))
 
 /** Saves the raw data to Cassandra. */
 kafkaStream.saveToCassandra(keyspace, raw_ws_data) Store Raw Data From Kafka Stream To C* /** Now proceed with computations from the same stream.. */ kafkaStream… Now we can replay on failure for later computation, etc
  87. 87. CREATE  TABLE  weather.raw_data  (
      wsid  text,  year  int,  month  int,  day  int,  hour  int,                          
      temperature  double,  dewpoint  double,  pressure  double,          wind_direction  int,  wind_speed  double,  one_hour_precip              PRIMARY  KEY  ((wsid),  year,  month,  day,  hour)
 )  WITH  CLUSTERING  ORDER  BY  (year  DESC,  month  DESC,  day  DESC,  hour  DESC);   CREATE  TABLE  daily_aggregate_precip  (
      wsid  text,
      year  int,
      month  int,
      day  int,
      precipitation  counter,
      PRIMARY  KEY  ((wsid),  year,  month,  day)
 )  WITH  CLUSTERING  ORDER  BY  (year  DESC,  month  DESC,  day  DESC); Let’s See Our Data Model Again
  88. 88. Gets the partition key: Data Locality Spark C* Connector feeds this to Spark Cassandra Counter column in our schema, no expensive `reduceByKey` needed. Simply let C* do it: not expensive and fast. Efficient Stream Computation class KafkaStreamingActor(kafkaPm: Map[String, String], ssc: StreamingContext, ws: WeatherSettings) extends AggregationActor {
 import settings._ 
 val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
 ssc, kafkaParams, Map(KafkaTopicRaw -> 1), StorageLevel.DISK_ONLY_2)
 .map(_._2.split(","))
 .map(RawWeatherData(_))
 
 kafkaStream.saveToCassandra(CassandraKeyspace, CassandraTableRaw)
 /** RawWeatherData: wsid, year, month, day, oneHourPrecip */
 kafkaStream.map(hour => (hour.wsid, hour.year, hour.month, hour.day, hour.oneHourPrecip))
 .saveToCassandra(CassandraKeyspace, CassandraTableDailyPrecip)
 
 /** Now the [[StreamingContext]] can be started. */
 context.parent ! OutputStreamInitialized
 
 def receive : Actor.Receive = {…} }
  89. 89. /** For a given weather station, calculates annual cumulative precip - or year to date. */
 class PrecipitationActor(ssc: StreamingContext, settings: WeatherSettings) extends AggregationActor {
 
 def receive : Actor.Receive = {
 case GetPrecipitation(wsid, year) => cumulative(wsid, year, sender)
 case GetTopKPrecipitation(wsid, year, k) => topK(wsid, year, k, sender)
 }
 
 /** Computes annual aggregation.Precipitation values are 1 hour deltas from the previous. */
 def cumulative(wsid: String, year: Int, requester: ActorRef): Unit =
 ssc.cassandraTable[Double](keyspace, dailytable)
 .select("precipitation")
 .where("wsid = ? AND year = ?", wsid, year)
 .collectAsync()
 .map(AnnualPrecipitation(_, wsid, year)) pipeTo requester
 
 /** Returns the 10 highest temps for any station in the `year`. */
 def topK(wsid: String, year: Int, k: Int, requester: ActorRef): Unit = {
 val toTopK = (aggregate: Seq[Double]) => TopKPrecipitation(wsid, year,
 ssc.sparkContext.parallelize(aggregate).top(k).toSeq)
 
 ssc.cassandraTable[Double](keyspace, dailytable)
 .select("precipitation")
 .where("wsid = ? AND year = ?", wsid, year)
 .collectAsync().map(toTopK) pipeTo requester
 }
 }
  90. 90. class TemperatureActor(sc: SparkContext, settings: WeatherSettings) extends AggregationActor {
 import akka.pattern.pipe 
 def receive: Actor.Receive = {
 case e: GetMonthlyHiLowTemperature => highLow(e, sender)
 }
 
 def highLow(e: GetMonthlyHiLowTemperature, requester: ActorRef): Unit =
 sc.cassandraTable[DailyTemperature](keyspace, daily_temperature_aggr)
 .where("wsid = ? AND year = ? AND month = ?", e.wsid, e.year, e.month)
 .collectAsync()
 .map(MonthlyTemperature(_, e.wsid, e.year, e.month)) pipeTo requester } C* data is automatically sorted by most recent - due to our data model. Additional Spark or collection sort not needed. Efficient Batch Analytics
  91. 91. 99 @helenaedelson github.com/helena slideshare.net/helenaedelson
  92. 92. Learn More Online and at Cassandra Summit https://academy.datastax.com/

Related Articles

analytics
streaming
visualization

Keen - Event Streaming Platform

John Doe

2/3/2024

cassandra
spark

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

streaming