Successfully reported this slideshow.
Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala
Upcoming SlideShare
Loading in …5
×
No Downloads
No notes for slide
- 1. @helenaedelson Helena Edelson Lambda Architecture with Spark Streaming, Kafka, Cassandra, Akka, Scala 1
- 2. • Spark Cassandra Connector committer • Akka contributor - 2 new features in Akka Cluster • Big Data & Scala conference speaker • Currently Sr Software Engineer, Analytics @ DataStax • Sr Cloud Engineer, VMware,CrowdStrike,SpringSource… • Prev Spring committer - Spring AMQP, Spring Integration Analytic Who Is This Person?
- 3. Talk Roadmap What Lambda Architecture & Delivering Meaning Why Spark, Kafka, Cassandra & Akka integration How Composable Pipelines - Code helena.edelson@datastax.com
- 4. I need fast access to historical data on the fly for predictive modeling with real time data from the stream
- 5. Lambda Architecture A data-processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream processing methods. • Spark is one of the few data processing frameworks that allows you to seamlessly integrate batch and stream processing • Of petabytes of data • In the same application
- 6. Your Code
- 7. Moving Data Between Systems Is Difficult Risky and Expensive 9 @helenaedelson
- 8. How Do We Approach This?
- 9. Strategies • Scalable Infrastructure • Partition For Scale • Replicate For Resiliency • Share Nothing • Asynchronous Message Passing • Parallelism • Isolation • Data Locality • Location Transparency
- 10. Strategy Technologies Scalable Infrastructure / Elastic Spark, Cassandra, Kafka Partition For Scale, Network Topology Aware Cassandra, Spark, Kafka, Akka Cluster Replicate For Resiliency Spark,Cassandra, Akka Cluster all hash the node ring Share Nothing, Masterless Cassandra, Akka Cluster both Dynamo style Fault Tolerance / No Single Point of Failure Spark, Cassandra, Kafka Replay From Any Point Of Failure Spark, Cassandra, Kafka, Akka + Akka Persistence Failure Detection Cassandra, Spark, Akka, Kafka Consensus & Gossip Cassandra & Akka Cluster Parallelism Spark, Cassandra, Kafka, Akka Asynchronous Data Passing Kafka, Akka, Spark Fast, Low Latency, Data Locality Cassandra, Spark, Kafka Location Transparency Akka, Spark, Cassandra, Kafka My Nerdy Chart
- 11. Analytic Analytic Search • Fast, distributed, scalable and fault tolerant cluster compute system • Enables Low-latency with complex analytics • Developed in 2009 at UC Berkeley AMPLab, open sourced in 2010 • Became an Apache project in February, 2014
- 12. • High Throughput Distributed Messaging • Decouples Data Pipelines • Handles Massive Data Load • Support Massive Number of Consumers • Distribution & partitioning across cluster nodes • Automatic recovery from broker failures
- 13. Speaking Of Fault Tolerance…
- 14. The one thing in your infrastructure you can always rely on.
- 15. © 2014 DataStax, All Rights Reserved Company Confidential17 Availability "During Hurricane Sandy, we lost an entire data center. Completely. Lost. It. Our data in Cassandra never went offline."
- 16. •Massively Scalable • High Performance • Always On • Masterless
- 17. • Fault tolerant • Hierarchical Supervision • Customizable Failure Strategies & Detection • Asynchronous Data Passing • Parallelization - Balancing Pool Routers • Akka Cluster • Adaptive / Predictive • Load-Balanced Across Cluster Nodes
- 18. I’ve used Scala with these every single time.
- 19. • Stream data from Kafka to Cassandra • Stream data from Kafka to Spark and write to Cassandra • Stream from Cassandra to Spark - coming soon! • Read data from Spark/Spark Streaming Source and write to C* • Read data from Cassandra to Spark
- 20. HADOOP • Distributed Analytics Platform • Easy Abstraction for Datasets • Support in several languages • Streaming • Machine Learning • Graph • Integrated SQL Queries • Has Generalized DAG execution All in one package And it uses Akka
- 21. Most Active OSS In Big Data Search
- 22. Apache Spark - Easy to Use API Returns the top (k) highest temps for any location in the year def topK(aggregate: Seq[Double]): Seq[Double] = sc.parallelize(aggregate).top(k).collect Returns the top (k) highest temps … in a Future def topK(aggregate: Seq[Double]): Future[Seq[Double]] = sc.parallelize(aggregate).top(k).collectAsync Analytic Analytic Search
- 23. Use the Spark Shell to quickly try out code samples Available in and Pyspark Spark Shell
- 24. Analytic Analytic Search Collection To RDD scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5) scala> val distributedData = sc.parallelize(data) distributedData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e
- 25. © 2014 DataStax, All Rights Reserved Company Confidential Not Just MapReduce
- 26. Spark Basic Word Count val conf = new SparkConf() .setMaster(host).setAppName(app) val sc = new SparkContext(conf) sc.textFile(words) .flatMap(_.split("s+")) .map(word => (word.toLowerCase, 1)) .reduceByKey(_ + _) .collect Analytic Analytic Search
- 27. RDDs Can be Generated from a Variety of Sources Textfiles Scala Collections
- 28. Analytic Analytic Search Transformation Action RDD Operations
- 29. Setting up C* and Spark DSE > 4.5.0 Just start your nodes with dse cassandra -‐k Apache Cassandra Follow the excellent guide by Al Tobey http://tobert.github.io/post/2014-07-15-installing-cassandra-spark-stack.html
- 30. When Batch Is Not Enough Analytic Analytic
- 31. Analytic Analytic Search Your Data Is Like Candy Delicious: you want it now
- 32. Analytic Analytic Search Your Data Is Like Candy Delicious: you want it now Batch Analytics Analysis after data has accumulated Decreases the weight of the data by the time it is processed Streaming Analytics Analytics as data arrives. The data won’t be stale and neither will our analytics Both in same app = Lambda
- 33. Spark Streaming • I want results continuously in the event stream • I want to run computations in my even-driven async apps • Exactly once message guarantees
- 34. DStream (Discretized Stream) RDD (time 0 to time 1) RDD (time 1 to time 2) RDD (time 2 to time 3) A transformation on a DStream = transformations on its RDDs DStream Continuous stream of micro batches • Complex processing models with minimal effort • Streaming computations on small time intervals
- 35. val conf = new SparkConf().setMaster(SparkMaster).setAppName(AppName) val ssc = new StreamingContext(conf, Milliseconds(500)) ssc.textFileStream("s3n://raw_data_bucket/") .flatMap(_.split("s+")) .map(_.toLowerCase, 1)) .countByValue() .saveToCassandra(keyspace,table) ssc.checkpoint(checkpointDir) ssc.start() ssc.awaitTermination Starts the streaming application piping raw incoming data to a Sink The batch streaming interval Basic Streaming: FileInputDStream
- 36. DStreams - the stream of raw data received from streaming sources: • Basic Source - in the StreamingContext API • Advanced Source - in external modules and separate Spark artifacts Receivers • Reliable Receivers - for data sources supporting acks (like Kafka) • Unreliable Receivers - for data sources not supporting acks 39 ReceiverInputDStreams
- 37. Spark Streaming External Source/Sink
- 38. Streaming Window Operations kvStream .flatMap { case (k,v) => (k,v.value) } .reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) .saveToCassandra(keyspace,table) Window Length: Duration = every 10s Sliding Interval: Interval at which the window operation is performed = every 10 s
- 39. Scale Apache Cassandra • Scales Linearly to as many nodes as you need • Scales whenever you need
- 40. Performance Apache Cassandra • It’s Fast • Built to sustain massive data insertion rates in irregular pattern spikes
- 41. Fault Tolerance & Availability Apache Cassandra • Automatic Replication • Multi Datacenter • Decentralized - no single point of failure • Survive regional outages • New nodes automatically add themselves to the cluster • DataStax drivers automatically discover new nodes
- 42. © 2014 DataStax, All Rights Reserved Company Confidential47 ACD ABCABD BCDACD ABCABD BCD US-East Europe How many copies of a data should exist in the cluster? ReplicationFactor=3 A B C D Fault Tolerance & Replication
- 43. © 2014 DataStax, All Rights Reserved Company Confidential48 Cassandra Cluster ACD ABCABD BCDACD ABCABD BCD Europe ReplicationFactor=3 US-East A B C D Fault Tolerance & Replication How many copies of a data should exist in the cluster?
- 44. Strategies Apache Cassandra • Consensus - Paxos Protocol • Sequential Read / Write - Timeseries • Tunable Consistency • Gossip: Did you hear node 1 was down??
- 45. Architecture Apache Cassandra • Distributed, Masterless Ring Architecture • Network Topology Aware • Flexible, Schemaless - your data structure can evolve seamlessly over time
- 46. C* At CERN: Large Haldron Colider •ATLAS - Largest of several detectors along the Large Hadron Collider • Measures particle production when protons collide at a very high center of mass energy •- Bursty traffic •- Volume of data from sensors requires • - Very large trigger and data acquisition system • - 30,000 applications on 2,000 nodes
- 47. Genetics / Biological Computations
- 48. IoT
- 49. CREATE TABLE users ( username varchar, firstname varchar, lastname varchar, email list<varchar>, password varchar, created_date timestamp, PRIMARY KEY (username) ); INSERT INTO users (username, firstname, lastname, email, password, created_date) VALUES ('hedelson','Helena','Edelson', [‘helena.edelson@datastax.com'],'ba27e03fd95e507daf2937c937d499ab','2014-11-15 13:50:00’) IF NOT EXISTS; • Familiar syntax • Many Tools & Drivers • Many Languages • Friendly to programmers • Paxos for locking CQL - Easy
- 50. CREATE TABLE weather.raw_data ( wsid text, year int, month int, day int, hour int, temperature double, dewpoint double, pressure double, wind_direction int, wind_speed double, one_hour_precip PRIMARY KEY ((wsid), year, month, day, hour) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC); C* Clustering Columns Writes by most recent Reads return most recent first Timeseries Data Cassandra will automatically sort by most recent for both write and read
- 51. val multipleStreams = (1 to numDstreams).map { i => streamingContext.receiverStream[HttpRequest](new HttpReceiver(port)) } streamingContext.union(multipleStreams) .map { httpRequest => TimelineRequestEvent(httpRequest)} .saveToCassandra("requests_ks", "timeline") A record of every event, in order in which it happened, per URL: CREATE TABLE IF NOT EXISTS requests_ks.timeline ( timesegment bigint, url text, t_uuid timeuuid, method text, headers map <text, text>, body text, PRIMARY KEY ((url, timesegment) , t_uuid) ); timeuuid protects from simultaneous events over-writing one another. timesegment protects from writing unbounded partitions.
- 52. Spark Cassandra Connector 59 @helenaedelson
- 53. Spark Cassandra Connector •NOSQL JOINS! •Write & Read data between Spark and Cassandra •Compatible with Spark 1.3 •Handles Data Locality for Speed •Implicit type conversions •Server-Side Filtering - SELECT, WHERE, etc. •Natural Timeseries Integration https://github.com/datastax/spark-cassandra-connector
- 54. Spark Cassandra Connector C* C* C*C* Spark Executor C* Driver Spark-Cassandra Connector User Application Cassandra
- 55. Analytic Search Writing and Reading SparkContext import com.datastax.spark.connector._ StreamingContext import com.datastax.spark.connector.streaming._
- 56. Analytic Write from Spark to Cassandra sc.parallelize(Seq(0,1,2)).saveToCassandra(“keyspace”, "raw_data") SparkContext Keyspace Table Spark RDD JOIN with NOSQL! predictionsRdd.join(music).saveToCassandra("music", "predictions")
- 57. Read From C* to Spark val rdd = sc.cassandraTable("github", "commits") .select("user","count","year","month") .where("commits >= ? and year = ?", 1000, 2015) CassandraRDD[CassandraRow] Keyspace Table Server-Side Column and Row Filtering SparkContext
- 58. val rdd = ssc.cassandraTable[MonthlyCommits]("github", "commits_aggregate") .where("user = ? and project_name = ? and year = ?", "helena", "spark-‐cassandra-‐connector", 2015) CassandraRow Keyspace TableStreamingContext Rows: Custom Objects
- 59. Rows val tuplesRdd = sc.cassandraTable[(Int,Date,String)](db, tweetsTable) .select("cluster_id","time", "cluster_name") .where("time > ? and time < ?", "2014-‐07-‐12 20:00:01", "2014-‐07-‐12 20:00:03”) val rdd = ssc.cassandraTable[MyDataType]("stats", "clustering_time") .where("key = 1").limit(10).collect val rdd = ssc.cassandraTable[(Int,DateTime,String)]("stats", "clustering_time") .where("key = 1").withDescOrder.collect
- 60. Cassandra User Defined Types CREATE TYPE address ( street text, city text, zip_code int, country text, cross_streets set<text> ); UDT = Your Custom Field Type In Cassandra
- 61. Cassandra UDT’s With JSON { "productId": 2, "name": "Kitchen Table", "price": 249.99, "description" : "Rectangular table with oak finish", "dimensions": { "units": "inches", "length": 50.0, "width": 66.0, "height": 32 }, "categories": { { "category" : "Home Furnishings" { "catalogPage": 45, "url": "/home/furnishings" }, { "category" : "Kitchen Furnishings" { "catalogPage": 108, "url": "/kitchen/furnishings" } } } CREATE TYPE dimensions ( units text, length float, width float, height float ); CREATE TYPE category ( catalogPage int, url text ); CREATE TABLE product ( productId int, name text, price float, description text, dimensions frozen <dimensions>, categories map <text, frozen <category>>, PRIMARY KEY (productId) );
- 62. Data Locality ● Spark asks an RDD for a list of its partitions (splits) ● Each split consists of one or more token-ranges ● For every partition ● Spark asks RDD for a list of preferred nodes to process on ● Spark creates a task and sends it to one of the nodes for execution Every Spark task uses a CQL-like query to fetch data for the given token range: C* C* C*C* SELECT "key", "value" FROM "test"."kv" WHERE token("key") > 595597420921139321 AND token("key") <= 595597431194200132 ALLOW FILTERING
- 63. All of the rows in a Cassandra Cluster are stored based based on their location in the Token Range. Cassandra Locates a Row Based on Partition Key and Token Range
- 64. New York City/ Manhattan: Helena Warsaw: Piotr & Jacek San Francisco: Brian,Russell & Alex Each of the Nodes in a Cassandra Cluster is primarily responsible for one set of Tokens. 0999 500 Cassandra Locates a Row Based on Partition Key and Token Range St. Petersburg: Artem
- 65. New York City Warsaw San Francisco Each of the Nodes in a Cassandra Cluster is primarily responsible for one set of Tokens. 0999 500 750 - 99 350 - 749 100 - 349 Cassandra Locates a Row Based on Partition Key and Token Range St. Petersburg
- 66. Jacek 514 Red The CQL Schema designates at least one column to be the Partition Key. New York City Warsaw San Francisco Cassandra Locates a Row Based on Partition Key and Token Range St. Petersburg
- 67. Helena 514 Red The hash of the Partition Key tells us where a row should be stored. New York City Warsaw San Francisco Cassandra Locates a Row Based on Partition Key and Token Range St. Petersburg
- 68. Amsterdam Spark Executor The C* Driver pages spark.cassandra.input.page.row.size CQL rows at a time SELECT * FROM keyspace.table WHERE pk = The Spark Executor uses the Connector to Pull Rows from the Local Cassandra Instance
- 69. Amsterdam Spark Executor (Superman) DataStax Enterprise SELECT * FROM keyspace.table WHERE solr_query = 'title:b' AND token(pk) > 780 and token(pk) <= 830 Tokens 780 - 830 DataStax Enterprise Enables This Same Machinery with Solr Pushdown
- 70. Composable Pipelines With Spark, Kafka & Cassandra 77 @helenaedelson
- 71. Spark SQL with Cassandra import org.apache.spark.sql.cassandra.CassandraSQLContext val cc = new CassandraSQLContext(sparkContext) cc.setKeyspace(keyspaceName) cc.sql(""" SELECT table1.a, table1.b, table.c, table2.a FROM table1 AS table1 JOIN table2 AS table2 ON table1.a = table2.a AND table1.b = table2.b AND table1.c = table2.c """) .map(Data(_)) .saveToCassandra(keyspace1, table3)
- 72. val sql = new SQLContext(sparkContext) val json = Seq( """{"user":"helena","commits":98, "month":3, "year":2015}""", """{"user":"jacek-lewandowski", "commits":72, "month":3, "year":2015}""", """{"user":"pkolaczk", "commits":42, "month":3, "year":2015}""") // write sql.jsonRDD(json) .map(CommitStats(_)) .flatMap(compute) .saveToCassandra("stats","monthly_commits") // read val rdd = sc.cassandraTable[MonthlyCommits]("stats","monthly_commits") cqlsh> CREATE TABLE github_stats.commits_aggr(user VARCHAR PRIMARY KEY, commits INT…); Spark SQL with Cassandra & JSON
- 73. Analytic Analytic Search Spark Streaming, Kafka, C* and JSON cqlsh> select * from github_stats.commits_aggr; user | commits | month | year -------------------+---------+-------+------ pkolaczk | 42 | 3 | 2015 jacek-lewandowski | 43 | 3 | 2015 helena | 98 | 3 | 2015 (3 rows) KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY) .map { case (_,json) => JsonParser.parse(json).extract[MonthlyCommits]} .saveToCassandra("github_stats","commits_aggr")
- 74. Kafka Streaming Word Count sparkConf.set("spark.cassandra.connection.host", "10.20.3.45") val streamingContext = new StreamingContext(conf, Seconds(30)) KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( streamingContext, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY) .map(_._2) .countByValue() .saveToCassandra("my_keyspace","wordcount")
- 75. Spark Streaming, Twitter & Cassandra /** Cassandra is doing the sorting for you here. */ TwitterUtils.createStream( ssc, auth, tags, StorageLevel.MEMORY_ONLY_SER_2) .flatMap(_.getText.toLowerCase.split("""s+""")) .filter(tags.contains(_)) .countByValueAndWindow(Seconds(5), Seconds(5)) .transform((rdd, time) => rdd.map { case (term, count) => (term, count, now(time))}) .saveToCassandra(keyspace, table) CREATE TABLE IF NOT EXISTS keyspace.table ( topic text, interval text, mentions counter, PRIMARY KEY(topic, interval) ) WITH CLUSTERING ORDER BY (interval DESC)
- 76. Training Data Feature Extraction Model Training Model Testing Test Data Your Data Extract Data To Analyze Train your model to predict Spark MLLib
- 77. val ssc = new StreamingContext(new SparkConf()…, Seconds(5) val testData = ssc.cassandraTable[String](keyspace,table).map(LabeledPoint.parse) val trainingStream = KafkaUtils.createStream[K, V, KDecoder, VDecoder]( ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY) .map(_._2).map(LabeledPoint.parse) trainingStream.saveToCassandra("ml_keyspace", “raw_training_data") val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(weights)) .trainOn(trainingStream) //Making predictions on testData model .predictOnValues(testData.map(lp => (lp.label, lp.features))) .saveToCassandra("ml_keyspace", "predictions") Spark Streaming ML, Kafka & C*
- 78. KillrWeather • Global sensors & satellites collect data • Cassandra stores in sequence • Application reads in sequence Apache Cassandra
- 79. Data model should look like your queries
- 80. • Store raw data per ID • Store time series data in order: most recent to oldest • Compute and store aggregate data in the stream • Set TTLs on historic data • Get data by ID • Get data for a single date and time • Get data for a window of time • Compute, store and retrieve daily, monthly, annual aggregations Design Data Model to support queries Queries I Need
- 81. Data Model • Weather Station Id and Time are unique • Store as many as needed CREATE TABLE daily_temperature ( weather_station text, year int, month int, day int, hour int, temperature double, PRIMARY KEY (weather_station,year,month,day,hour) ); INSERT INTO temperature(weather_station,year,month,day,hour,temperature) VALUES (‘10010:99999’,2005,12,1,7,-5.6); INSERT INTO temperature(weather_station,year,month,day,hour,temperature) VALUES (‘10010:99999’,2005,12,1,8,-5.1); INSERT INTO temperature(weather_station,year,month,day,hour,temperature) VALUES (‘10010:99999’,2005,12,1,9,-4.9); INSERT INTO temperature(weather_station,year,month,day,hour,temperature) VALUES (‘10010:99999’,2005,12,1,10,-5.3);
- 82. class HttpNodeGuardian extends ClusterAwareNodeGuardianActor { cluster.joinSeedNodes(Vector(..)) context.actorOf(BalancingPool(PoolSize).props(Props( new KafkaPublisherActor(KafkaHosts, KafkaBatchSendSize)))) Cluster(context.system) registerOnMemberUp { context.actorOf(BalancingPool(PoolSize).props(Props( new HttpReceiverActor(KafkaHosts, KafkaBatchSendSize)))) } def initialized: Actor.Receive = { … } } Load-Balanced Data Ingestion
- 83. class HttpDataIngestActor(kafka: ActorRef) extends Actor with ActorLogging { implicit val system = context.system implicit val askTimeout: Timeout = settings.timeout implicit val materializer = ActorFlowMaterializer( ActorFlowMaterializerSettings(system)) val requestHandler: HttpRequest => HttpResponse = { case HttpRequest(HttpMethods.POST, Uri.Path("/weather/data"), headers, entity, _) => headers.toSource collect { case s: Source => kafka ! KafkaMessageEnvelope[String, String](topic, group, s.data:_*) } HttpResponse(200, entity = HttpEntity(MediaTypes.`text/html`) }.getOrElse(HttpResponse(404, entity = "Unsupported request")) case _: HttpRequest => HttpResponse(400, entity = "Unsupported request") } Http(system).bind(HttpHost, HttpPort).map { case connection => log.info("Accepted new connection from " + connection.remoteAddress) connection.handleWithSyncHandler(requestHandler) } def receive : Actor.Receive = { case e => } } Client: HTTP Receiver Akka Actor
- 84. class KafkaProducerActor[K, V](config: ProducerConfig) extends Actor { override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { case _: ActorInitializationException => Stop case _: FailedToSendMessageException => Restart case _: ProducerClosedException => Restart case _: NoBrokersForPartitionException => Escalate case _: KafkaException => Escalate case _: Exception => Escalate } private val producer = new KafkaProducer[K, V](producerConfig) override def postStop(): Unit = producer.close() def receive = { case e: KafkaMessageEnvelope[K,V] => producer.send(e) } } Client: Kafka Producer Akka Actor
- 85. Store raw data on ingestion
- 86. val kafkaStream = KafkaUtils.createStream[K, V, KDecoder, VDecoder] (ssc, kafkaParams, topicMap, StorageLevel.DISK_ONLY_2) .map(transform) .map(RawWeatherData(_)) /** Saves the raw data to Cassandra. */ kafkaStream.saveToCassandra(keyspace, raw_ws_data) Store Raw Data From Kafka Stream To C* /** Now proceed with computations from the same stream.. */ kafkaStream… Now we can replay on failure for later computation, etc
- 87. CREATE TABLE weather.raw_data ( wsid text, year int, month int, day int, hour int, temperature double, dewpoint double, pressure double, wind_direction int, wind_speed double, one_hour_precip PRIMARY KEY ((wsid), year, month, day, hour) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC, hour DESC); CREATE TABLE daily_aggregate_precip ( wsid text, year int, month int, day int, precipitation counter, PRIMARY KEY ((wsid), year, month, day) ) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC); Let’s See Our Data Model Again
- 88. Gets the partition key: Data Locality Spark C* Connector feeds this to Spark Cassandra Counter column in our schema, no expensive `reduceByKey` needed. Simply let C* do it: not expensive and fast. Efficient Stream Computation class KafkaStreamingActor(kafkaPm: Map[String, String], ssc: StreamingContext, ws: WeatherSettings) extends AggregationActor { import settings._ val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw -> 1), StorageLevel.DISK_ONLY_2) .map(_._2.split(",")) .map(RawWeatherData(_)) kafkaStream.saveToCassandra(CassandraKeyspace, CassandraTableRaw) /** RawWeatherData: wsid, year, month, day, oneHourPrecip */ kafkaStream.map(hour => (hour.wsid, hour.year, hour.month, hour.day, hour.oneHourPrecip)) .saveToCassandra(CassandraKeyspace, CassandraTableDailyPrecip) /** Now the [[StreamingContext]] can be started. */ context.parent ! OutputStreamInitialized def receive : Actor.Receive = {…} }
- 89. /** For a given weather station, calculates annual cumulative precip - or year to date. */ class PrecipitationActor(ssc: StreamingContext, settings: WeatherSettings) extends AggregationActor { def receive : Actor.Receive = { case GetPrecipitation(wsid, year) => cumulative(wsid, year, sender) case GetTopKPrecipitation(wsid, year, k) => topK(wsid, year, k, sender) } /** Computes annual aggregation.Precipitation values are 1 hour deltas from the previous. */ def cumulative(wsid: String, year: Int, requester: ActorRef): Unit = ssc.cassandraTable[Double](keyspace, dailytable) .select("precipitation") .where("wsid = ? AND year = ?", wsid, year) .collectAsync() .map(AnnualPrecipitation(_, wsid, year)) pipeTo requester /** Returns the 10 highest temps for any station in the `year`. */ def topK(wsid: String, year: Int, k: Int, requester: ActorRef): Unit = { val toTopK = (aggregate: Seq[Double]) => TopKPrecipitation(wsid, year, ssc.sparkContext.parallelize(aggregate).top(k).toSeq) ssc.cassandraTable[Double](keyspace, dailytable) .select("precipitation") .where("wsid = ? AND year = ?", wsid, year) .collectAsync().map(toTopK) pipeTo requester } }
- 90. class TemperatureActor(sc: SparkContext, settings: WeatherSettings) extends AggregationActor { import akka.pattern.pipe def receive: Actor.Receive = { case e: GetMonthlyHiLowTemperature => highLow(e, sender) } def highLow(e: GetMonthlyHiLowTemperature, requester: ActorRef): Unit = sc.cassandraTable[DailyTemperature](keyspace, daily_temperature_aggr) .where("wsid = ? AND year = ? AND month = ?", e.wsid, e.year, e.month) .collectAsync() .map(MonthlyTemperature(_, e.wsid, e.year, e.month)) pipeTo requester } C* data is automatically sorted by most recent - due to our data model. Additional Spark or collection sort not needed. Efficient Batch Analytics
- 91. 99 @helenaedelson github.com/helena slideshare.net/helenaedelson
- 92. Learn More Online and at Cassandra Summit https://academy.datastax.com/
Public clipboards featuring this slide
No public clipboards found for this slide