Successfully reported this slideshow.
Cassandra and Spark, closing the gap between no sql and analytics codemotion berlin 2015
Upcoming SlideShare
Loading in …5
×
- 1. @doanduyhai Cassandra & Spark, closing the gap between NoSQL and analytics DuyHai DOAN, Technical Advocate
- 2. @doanduyhai Who Am I ? Duy Hai DOAN Cassandra technical advocate • talks, meetups, confs • open-source devs (Achilles, …) • OSS Cassandra point of contact ☞ duy_hai.doan@datastax.com ☞ @doanduyhai 2
- 3. @doanduyhai Datastax • Founded in April 2010 • We contribute a lot to Apache Cassandra™ • 400+ customers (25 of the Fortune 100), 400+ employees • Headquarter in San Francisco Bay area • EU headquarter in London, offices in France and Germany • Datastax Enterprise = OSS Cassandra + extra features 3
- 4. @doanduyhai Spark – Cassandra Use Cases Load data from various sources Analytics (join, aggregate, transform, …) Sanitize, validate, normalize, transform data Schema migration, Data conversion 4
- 5. Spark & Cassandra Presentation Spark & its eco-system Cassandra Quick Recap
- 6. @doanduyhai What is Apache Spark ? Created at Apache Project since 2010 General data processing framework Faster than Hadoop, in memory One-framework-many-components approach 6
- 7. @doanduyhai Partitions transformations map(tuple => (tuple._3, tuple)) Direct transformation Shuffle (expensive !) groupByKey() countByKey() partition RDD Final action 7
- 8. @doanduyhai Spark eco-system Local Standalone cluster YARN Mesos Spark Core Engine (Scala/Java/Python) Spark Streaming MLLibGraphXSpark SQL Persistence Cluster Manager … etc… 8
- 9. @doanduyhai Spark eco-system Local Standalone cluster YARN Mesos Spark Core Engine (Scala/Java/Python) Spark Streaming MLLibGraphXSpark SQL Persistence Cluster Manager … etc… 9
- 10. @doanduyhai What is Apache Cassandra? Created at Apache Project since 2009 Distributed NoSQL database Eventual consistency Distributed table abstraction 10
- 11. @doanduyhai Cassandra data distribution reminder Random: hash of #partition → token = hash(#p) Hash: ]-X, X] X = huge number (264/2) n1 n2 n3 n4 n5 n6 n7 n8 11
- 12. @doanduyhai Cassandra token ranges A: ]0, X/8] B: ] X/8, 2X/8] C: ] 2X/8, 3X/8] D: ] 3X/8, 4X/8] E: ] 4X/8, 5X/8] F: ] 5X/8, 6X/8] G: ] 6X/8, 7X/8] H: ] 7X/8, X] Murmur3 hash function n1 n2 n3 n4 n5 n6 n7 n8 A B C D E F G H 12
- 13. @doanduyhai Linear scalability n1 n2 n3 n4 n5 n6 n7 n8 A B C D E F G H user_id1 user_id2 user_id3 user_id4 user_id5 13
- 14. @doanduyhai Linear scalability n1 n2 n3 n4 n5 n6 n7 n8 A B C D E F G H user_id1 user_id2 user_id3 user_id4 user_id5 14
- 15. @doanduyhai Cassandra Query Language (CQL) INSERT INTO users(login, name, age) VALUES(‘jdoe’, ‘John DOE’, 33); UPDATE users SET age = 34 WHERE login = ‘jdoe’; DELETE age FROM users WHERE login = ‘jdoe’; SELECT age FROM users WHERE login = ‘jdoe’; 15
- 16. Spark & Cassandra Connector Spark Core API SparkSQL/DataFrame Spark Streaming
- 17. @doanduyhai Spark/Cassandra connector architecture All Cassandra types supported and converted to Scala types Server side data filtering (SELECT … WHERE …) Use Java-driver underneath Scala and Java support. Python support via PySpark (exp.) 17
- 18. @doanduyhai Connector architecture – Core API Cassandra tables exposed as Spark RDDs Read from and write to Cassandra Mapping of C* tables and rows to Scala objects • CassandraRDD and CassandraRow • Scala case class (object mapper) • Scala tuples 18
- 19. @doanduyhai Spark Core https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
- 20. @doanduyhai Connector architecture – DataFrame Mapping of Cassandra table to DataFrame • CassandraSQLContext à org.apache.spark.sql.SQLContext • CassandraSQLRow à org.apache.spark.sql.catalyst.expressions.Row • Mapping of Cassandra types to Catalyst types • CassandraCatalog à Catalog (used by Catalyst Analyzer) 20
- 21. @doanduyhai Connector architecture – DataFrame Mapping of Cassandra table to SchemaRDD • CassandraSourceRelation • extends BaseRelation with InsertableRelation with PruntedFilteredScan • custom query plan • push predicates to CQL for early filtering (if possible) SELECT * FROM user_emails WHERE login = ‘jdoe’; 21
- 22. @doanduyhai Spark SQL https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
- 23. @doanduyhai Connector architecture – Spark Streaming Streaming data INTO Cassandra table • trivial setup • be careful about your Cassandra data model when having an infinite stream !!! Streaming data OUT of Cassandra tables (CASSANDRA-8844) ? • notification system (publish/subscribe) • at-least-once delivery semantics • work in progress … 23
- 24. @doanduyhai Spark Streaming https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
- 25. @doanduyhai Q & R ! "
- 26. Spark/Cassandra operations Cluster deployment & job lifecycle Data locality
- 27. @doanduyhai Cluster deployment C* SparkM SparkW C* SparkW C* SparkW C* SparkW C* SparkW Stand-alone cluster 27
- 28. @doanduyhai Cassandra – Spark placement Spark Worker Spark Worker Spark Worker Spark Worker 1 Cassandra process ⟷ 1 Spark worker C* C* C* C* 28 Spark Master
- 29. @doanduyhai Cassandra – Spark job lifecycle Spark Worker Spark Worker Spark Worker Spark Worker C* C* C* C* 29 Spark Master Spark Client Driver Program Spark Context 1 D e f i n e y o u r business logic here !
- 30. @doanduyhai Cassandra – Spark job lifecycle Spark Worker Spark Worker Spark Worker Spark Worker C* C* C* C* 30 Spark Master Spark Client Driver Program Spark Context 2
- 31. @doanduyhai Cassandra – Spark job lifecycle Spark Worker Spark Worker Spark Worker Spark Worker C* C* C* C* 31 Spark Master Spark Client Driver Program Spark Context 3 333
- 32. @doanduyhai Cassandra – Spark job lifecycle Spark Worker Spark Worker Spark Worker Spark Worker C* C* C* C* 32 Spark Master Spark Client Driver Program Spark Context Spark Executor Spark Executor Spark Executor Spark Executor 444 4
- 33. @doanduyhai Cassandra – Spark job lifecycle Spark Worker Spark Worker Spark Worker Spark Worker C* C* C* C* 33 Spark Master Spark Client Driver Program Spark Context Spark Executor Spark Executor Spark Executor Spark Executor 5 5 5 5
- 34. @doanduyhai Data Locality – Cassandra token ranges A: ]0, X/8] B: ] X/8, 2X/8] C: ] 2X/8, 3X/8] D: ] 3X/8, 4X/8] E: ] 4X/8, 5X/8] F: ] 5X/8, 6X/8] G: ] 6X/8, 7X/8] H: ] 7X/8, X] n1 n2 n3 n4 n5 n6 n7 n8 A B C D E F G H 34
- 35. @doanduyhai Data Locality – How To C* SparkM SparkW C* SparkW C* SparkW C* SparkW C* SparkW Spark partition RDD Cassandra tokens ranges 35
- 36. @doanduyhai Data Locality – How To C* SparkM SparkW C* SparkW C* SparkW C* SparkW C* SparkW Use Murmur3Partitioner 36
- 37. @doanduyhai Read data locality Read from Cassandra 37
- 38. @doanduyhai Read data locality Spark shuffle operations 38
- 39. @doanduyhai Write to Cassandra without data locality Async batches fan-out writes to Cassandra Because of shuffle, original data locality is lost 39
- 40. @doanduyhai Write to Cassandra with data locality Write to Cassandra rdd.repartitionByCassandraReplica("keyspace","table") 40
- 41. @doanduyhai Write data locality • either stream data in Spark layer using repartitionByCassandraReplica() • or flush data to Cassandra by async batches • in any case, there will be data movement on network (sorry no magic) 41
- 42. @doanduyhai Joins with data locality CREATE TABLE artists(name text, style text, … PRIMARY KEY(name)); CREATE TABLE albums(title text, artist text, year int,… PRIMARY KEY(title)); val join: CassandraJoinRDD[(String,Int), (String,String)] = sc.cassandraTable[(String,Int)](KEYSPACE, ALBUMS) // Select only useful columns for join and processing .select("artist","year") .as((_:String, _:Int)) // Repartition RDDs by "artists" PK, which is "name" .repartitionByCassandraReplica(KEYSPACE, ARTISTS) // Join with "artists" table, selecting only "name" and "country" columns .joinWithCassandraTable[(String,String)](KEYSPACE, ARTISTS, SomeColumns("name","country")) .on(SomeColumns("name")) 42
- 43. @doanduyhai Joins pipeline with data locality LOCAL READ FROM CASSANDRA 43
- 44. @doanduyhai Joins pipeline with data locality REPARTITION TO MAP CASSANDRA REPLICA 44
- 45. @doanduyhai Joins pipeline with data locality JOIN WITH DATA LOCALITY 45
- 46. @doanduyhai Perfect data locality scenario • read localy from Cassandra • use operations that do not require shuffle in Spark (map, filter, …) • repartitionbyCassandraReplica() • à to a table having same partition key as original table • save back into this Cassandra table Sanitize, validate, normalize, transform data USE CASE 46
- 47. Spark/Cassandra use-case demos Data cleaning Schema migration Analytics
- 48. @doanduyhai Use Cases Load data from various sources Analytics (join, aggregate, transform, …) Sanitize, validate, normalize, transform data Schema migration, Data conversion 48
- 49. @doanduyhai Data cleaning use-cases Bug in your application ? Dirty input data ? ☞ Spark job to clean it up! (perfect data locality) Sanitize, validate, normalize, transform data 49
- 50. @doanduyhai Data Cleaning https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
- 51. @doanduyhai Schema migration use-cases Business requirements change with time ? Current data model no longer relevant ? ☞ Spark job to migrate data ! Schema migration, Data conversion 51
- 52. @doanduyhai Data Migration https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
- 53. @doanduyhai Analytics use-cases Given existing tables of performers and albums, I want to : • count the number of albums releases by decade (70’s, 80’s, 90’s, …) ☞ Spark job to compute analytics ! Analytics (join, aggregate, transform, …) 53
- 54. @doanduyhai Analytics pipeline ① Read from production transactional tables ② Perform aggregation with Spark ③ Save back data into dedicated tables for fast visualization ④ Repeat step ① 54
- 55. @doanduyhai Analytics https://github.com/doanduyhai/incubator-zeppelin/tree/ApacheBigData
- 56. @doanduyhai Thank You @doanduyhai duy_hai.doan@datastax.com https://academy.datastax.com/
Public clipboards featuring this slide
No public clipboards found for this slide