Successfully reported this slideshow.
Spark And Cassandra: 2 Fast, 2 Furious
Upcoming SlideShare
Loading in …5
×
No Downloads
No notes for slide
- 1. Spark and Cassandra: 2 Fast, 2 Furious Russell Spitzer DataStax Inc.
- 2. Russell, ostensibly a software engineer • Did a Ph.D in bioinformatics at some point • Written a great deal of automation and testing framework code • Now develops for Datastax on the Analytics Team • Focuses a lot on the Datastax OSS Spark Cassandra Connector
- 3. Datastax Spark Cassandra Connector Let Spark Interact with your Cassandra Data! https://github.com/datastax/spark-cassandra-connector http://spark-packages.org/package/datastax/spark-cassandra-connector http://spark-packages.org/package/TargetHolding/pyspark-cassandra Compatible with Spark 1.6 + Cassandra 3.0 • Bulk writing to Cassandra • Distributed full table scans • Optimized direct joins with Cassandra • Secondary index pushdown • Connection and prepared statement pools
- 4. Cassandra is essentially a hybrid between a key-value and a column-oriented (or tabular) database management system. Its data model is a partitioned row store with tunable consistency* *https://en.wikipedia.org/wiki/Apache_Cassandra Let's break that down 1.What is a C* Partition and Row 2.How does C* Place Partitions
- 5. CQL looks a lot like SQL CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts))
- 6. INSERTS look almost Identical CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1)
- 7. Cassandra Data is stored in Partitions CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1)
- 8. C* Partitions Store Many Rows CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 ts: 1 x: -1 y:0 ts: 2 x: 0 y: -1 ts: 3 x: 1 y: 0
- 9. C* Partitions Store Many Rows CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 ts: 1 x: -1 y:0 ts: 2 x: 0 y: -1 ts: 3 x: 1 y: 0Partition
- 10. C* Partitions Store Many Rows CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 ts: 1 x: -1 y:0 ts: 2 x: 0 y: -1 ts: 3 x: 1 y: 0Row
- 11. Within a partition there is ordering based on the Clustering Keys CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 ts: 1 x: -1 y:0 ts: 2 x: 0 y: -1 ts: 3 x: 1 y: 0 Ordered by Clustering Key
- 12. Slices within a Partition are Very Easy CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 ts: 1 x: -1 y:0 ts: 2 x: 0 y: -1 ts: 3 x: 1 y: 0 Ordered by Clustering Key
- 13. Cassandra is a Distributed Fault-Tolerant Database San Jose Oakland San Francisco
- 14. Data is located on a Token Range San Jose Oakland San Francisco
- 15. Data is located on a Token Range 0 San Jose Oakland San Francisco 1200 600
- 16. The Partition Key of a Row is Hashed to Determine it's Token 0 San Jose Oakland San Francisco 1200 600
- 17. The Partition Key of a Row is Hashed to Determine it's Token 01200 600 San Jose Oakland San Francisco vehicle_id 1 INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1)
- 18. The Partition Key of a Row is Hashed to Determine it's Token 01200 600 San Jose Oakland San Francisco vehicle_id 1 INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1) Hash(1) = 1000
- 19. How The Spark Cassandra Connector Reads 0 500
- 20. How The Spark Cassandra Connector Reads 0 San Jose Oakland San Francisco 500
- 21. Spark Partitions are Built From the Token Range 0 San Jose Oakland San Francisco 500 500 - 600 600 - 700 500 1200
- 22. Each Partition Can be Drawn Locally from at Least One Node 0 San Jose Oakland San Francisco 500 500 - 600 600 - 700 500 1200
- 23. Each Partition Can be Drawn Locally from at Least One Node 0 San Jose Oakland San Francisco 500 500 - 600 600 - 700 500 1200 Spark Executor Spark Executor Spark Executor
- 24. No Need to Leave the Node For Data! 0 500
- 25. Data is Retrieved using the Datastax Java Driver 0 Spark Executor Cassandra
- 26. A Connection is Established 0 Spark Executor Cassandra 500 - 600
- 27. A Query is Prepared with Token Bounds 0 Spark Executor Cassandra SELECT * FROM TABLE WHERE Token(PK) > StartToken AND Token(PK) < EndToken500 - 600
- 28. The Spark Partitions Bounds are Placed in the Query 0 Spark Executor Cassandra SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600500 - 600
- 29. Paged a number of rows at a Time 0 Spark Executor Cassandra 500 - 600 SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600
- 30. Data is Paged 0 Spark Executor Cassandra 500 - 600 SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600
- 31. Data is Paged 0 Spark Executor Cassandra 500 - 600 SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600
- 32. Data is Paged 0 Spark Executor Cassandra 500 - 600 SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600
- 33. Data is Paged 0 Spark Executor Cassandra 500 - 600 SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600
- 34. How we write to Cassandra • Data is written via the Datastax Java Driver • Data is grouped based on Partition Key (configurable) • Batches are written
- 35. Data is Written using the Datastax Java Driver 0 Spark Executor Cassandra
- 36. A Connection is Established 0 Spark Executor Cassandra
- 37. Data is Grouped on Key 0 Spark Executor Cassandra
- 38. Data is Grouped on Key 0 Spark Executor Cassandra
- 39. Once a Batch is Full or There are Too Many Batches The Largest Batch is Executed 0 Spark Executor Cassandra
- 40. Once a Batch is Full or There are Too Many Batches The Largest Batch is Executed 0 Spark Executor Cassandra
- 41. Once a Batch is Full or There are Too Many Batches The Largest Batch is Executed 0 Spark Executor Cassandra
- 42. Once a Batch is Full or There are Too Many Batches The Largest Batch is Executed 0 Spark Executor Cassandra
- 43. Most Common Features • RDD APIs • cassandraTable • saveToCassandra • repartitionByCassandraTable • joinWithCassandraTable • DF API • Datasource
- 44. Full Table Scans, Making an RDD out of a Table import com.datastax.spark.connector._ sc.cassandraTable(KeyspaceName, TableName) import com.datastax.spark.connector._ sc.cassandraTable[MyClass](KeyspaceName, TableName)
- 45. Pushing Down CQL to Cassandra import com.datastax.spark.connector._ sc.cassandraTable[MyClass](KeyspaceName, TableName).select("vehicle_id").where("ts > 10") SELECT "vehicle_id" FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600 AND ts > 10
- 46. Distributed Key Retrieval import com.datastax.spark.connector._ rdd.joinWithCassandraTable("keyspace", "table") San Jose Oakland San Francisco Spark Executor Spark Executor Spark Executor RDD
- 47. But our Data isn't Colocated import com.datastax.spark.connector._ rdd.joinWithCassandraTable("keyspace", "table") San Jose Oakland San Francisco Spark Executor Spark Executor Spark Executor RDD
- 48. RBCR Moves bulk reshuffles our data so Data Will Be Local rdd .repartitionByCassandraReplica("keyspace","table") .joinWithCassandraTable("keyspace", "table") San Jose Oakland San Francisco Spark Executor Spark Executor Spark Executor RDD
- 49. The Connector Provides a Distributed Pool for Prepared Statements and Sessions CassandraConnector(sc.getConf) rdd.mapPartitions{ it => { val ps = CassandraConnector(sc.getConf) .withSessionDo( s => s.prepare) it.map{ ps.bind(_).executeAsync()} } Your Pool Ready to Be Deployed
- 50. The Connector Provides a Distributed Pool for Prepared Statements and Sessions CassandraConnector(sc.getConf) rdd.mapPartitions{ it => { val ps = CassandraConnector(sc.getConf) .withSessionDo( s => s.prepare) it.map{ ps.bind(_).executeAsync()} } Your Pool Ready to Be Deployed Prepared Statement Cache Session Cache
- 51. The Connector Supports the DataSources Api sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map("keyspace" -> "read_test", "table" -> "simple_kv")) .load import org.apache.spark.sql.cassandra._ sqlContext .read .cassandraFormat("read_test","table") .load https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
- 52. The Connector Works with Catalyst to Pushdown Predicates to Cassandra import com.datastax.spark.connector._ df.select("vehicle_id").filter("ts > 10") SELECT "vehicle_id" FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600 AND ts > 10
- 53. The Connector Works with Catalyst to Pushdown Predicates to Cassandra import com.datastax.spark.connector._ df.select("vehicle_id").filter("ts > 10") QueryPlan Catalyst PrunedFilteredScan Only Prunes (projections) and Filters (predicates) are able to be pushed down.
- 54. Recent Features • C* 3.0 Support • Materialized Views • SASL Indexes (for pushdown) • Advanced Spark Partitioner Support • Increased Performance on JWCT
- 55. Use C* Partitioning in Spark
- 56. Use C* Partitioning in Spark • C* Data is Partitioned • Spark has Partitions and partitioners • Spark can use a known partitioner to speed up Cogroups (joins) • How Can we Leverage this?
- 57. Use C* Partitioning in Spark https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md Now if keyBy is used on a CassandraTableScanRDD and the PartitionKey is included in the key. The RDD will be given a C* Partitioner val ks = "doc_example" val rdd = { sc.cassandraTable[(String, Int)](ks, "users") .select("name" as "_1", "zipcode" as "_2", "userid") .keyBy[Tuple1[Int]]("userid") } rdd.partitioner //res3: Option[org.apache.spark.Partitioner] = Some(com.datastax.spark.connector.rdd.partitioner.CassandraPartitioner@94515d3e)
- 58. Use C* Partitioning in Spark https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md Share partitioners between Tables for joins on Partition Key val ks = "doc_example" val rdd1 = { sc.cassandraTable[(Int, Int, String)](ks, "purchases") .select("purchaseid" as "_1", "amount" as "_2", "objectid" as "_3", "userid") .keyBy[Tuple1[Int]]("userid") } val rdd2 = { sc.cassandraTable[(String, Int)](ks, "users") .select("name" as "_1", "zipcode" as "_2", "userid") .keyBy[Tuple1[Int]]("userid") }.applyPartitionerFrom(rdd1) // Assigns the partitioner from the first rdd to this one val joinRDD = rdd1.join(rdd2)
- 59. Use C* Partitioning in Spark https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md Many other uses for this, try it yourself! • All self joins using the Partition key • Groups within C* Partitions • Anything formerly using SpanBy • Joins with other RDDs And much more!
- 60. Enhanced Parallelism with JWCT • joinWithCassandraTable now has increased concurrency and parallelism! • 5X Speed increases in some cases • https://datastax-oss.atlassian.net/browse/ SPARKC-233 • Thanks Jaroslaw!
- 61. The Connector wants you! • OSS Project that loves community involvement • Bug Reports • Feature Requests • Doc Improvements • Come join us! Vin Diesel may or may not be a contributor
- 62. Tons of Videos at Datastax Academy https://academy.datastax.com/ https://academy.datastax.com/courses/getting-started-apache-spark
- 63. Tons of Videos at Datastax Academy https://academy.datastax.com/ https://academy.datastax.com/courses/getting-started-apache-spark
- 64. See you at Cassandra Summit! Join me and 3500 of your database peers, and take a deep dive into Apache Cassandra™, the massively scalable NoSQL database that powers global businesses like Apple, Spotify, Netflix and Sony. San Jose Convention Center September 7-9, 2016 https://cassandrasummit.org/ Build Something Disruptive
Public clipboards featuring this slide
No public clipboards found for this slide