Successfully reported this slideshow.
Cassandra and SparkSQL: You Don't Need Functional Programming for Fun with Russell Spitzer
Upcoming SlideShare
Loading in …5
×
No Downloads
No notes for slide
- 1. You don't need Functional Programming for Fun! Cassandra and SparkSQL
- 2. Russell (left) and Cara (right) • Software Engineer • Spark-Cassandra Integration since Spark 0.9 • Cassandra since Cassandra 1.2 • 2 Year Scala Convert • Still not comfortable talking about Monads in public @Evanfchan
- 3. A Story in 3 Parts • Why SparkSQL? • The Spark SQL Thrift Server • Writing SQL for Spark
- 4. You have lots of options why Spark SQL? • Scala? • Java? • Python? • R? • Notebooks? • Erlang?
- 5. Spark is A Powerful Analytics Tool Built on Scala Distributed Analytics Platform with In Memory Capabilities Lots of new concepts: RDDs DataSets Streaming Serialization Functional Programming
- 6. Functional Programming Is Awesome Side-effect Free Functions Monads Easy Parallelization AnonymousFunctions Scala Async Models TypeMatching rdd.map(y => y+1) Endofunctors
- 7. Functional Programming can be Hard blah-blah blah Blah Easy blahilization baaaaah blahala Asybc blah Blahblahhing rdd.map(y => y+1) Aren't Endofunctors from ghostbusters? Endofunctors
- 8. Learning Takes Time Compile Time Type Safety! Catalyst! Tungsten! We get to learn all sorts of fun new things! SBT is probably great! Usually Me Less Excitable Dev We ship next week
- 9. Spark SQL Provides A Familiar and Easy API Use SQL to access the Power of Spark
- 10. Spark Sql Provides A Familiar and Easy API Catalyst Codegen! Optimization! Predicate Pushdowns Distributed Work SQL
- 11. It still takes Scala/Java/Python/… Code. import org.apache.spark.sql.cassandra._ val df = spark .read .cassandraFormat("tab", "ks") .load df.createTempView("tab") spark.sql("SELECT * FROM tab").show +---+---+---+ | k| c| v| +---+---+---+ | 1| 1| 1| | 1| 2| 2| Let me color code that by parts I like vs parts I don't like.
- 12. It still takes Scala/Java/Python/… Code. import org.apache.spark.sql.cassandra._ val df = spark .read .cassandraFormat("tab", "ks") .load df.createTempView("tab") spark.sql("SELECT * FROM tab").show +---+---+---+ | k| c| v| +---+---+---+ | 1| 1| 1| | 1| 2| 2| Also, your import has an underscore in it..
- 13. For exploration we have the Spark-SQL Shell spark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3
- 14. For exploration we have the Spark-SQL Shell spark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3 SparkSession
- 15. For exploration we have the Spark-SQL Shell spark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3 SparkSession Executor Executor Executor Executor Executor
- 16. Not really good for multiple-users spark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3 SparkSession Executor Executor Executor Executor Executor
- 17. Enter Spark Thrift Server Spark Sql Thrift Server Executor Executor Executor Executor Executor JDBC Client JDBC ClientJDBC Client
- 18. The Spark Sql Thrift Server is a Spark Application • Built on HiveServer2 • Single Spark Context • Clients Communicate with it via JDBC • Can use all SparkSQL • Fair Scheduling • Clients can share Cached Resources • Security
- 19. The Spark Sql ThriftServer is a Spark Application • Built on HiveServer2 • Single Spark Context • Clients Communicate with it via JDBC • Can use all SparkSQL • Fair Scheduling • Clients can share Cached Resources • Security
- 20. Fair Scheduling is Sharing FIFO Time
- 21. Fair Scheduling is Sharing FIFO Time
- 22. Fair Scheduling is Sharing FIFO Time
- 23. Fair Scheduling is Sharing FIFO Time
- 24. Fair Scheduling is Sharing FIFO FAIR Time
- 25. Fair Scheduling is Sharing FIFO FAIR Time
- 26. Fair Scheduling is Sharing FIFO FAIR Time
- 27. SingleContext can Share Cached Data Spark Sql Thrift Server Executor Executor Executor Executor Executor cache TABLE today select * from ks.tab where date = today;
- 28. SingleContext can Share Cached Data Spark Sql Thrift Server Executor Executor Executor Executor Executor CACHED CACHED CACHED CACHED CACHED cache TABLE today select * from ks.tab where date = today;
- 29. SingleContext can Share Cached Data Spark Sql Thrift Server Executor Executor Executor Executor Executor CACHED CACHED CACHED CACHED CACHED cache TABLE today select * from ks.tab where date = today; SELECT * from TODAY where age > 5
- 30. How to use it Starts from the command line and can use all Spark Submit Args • ./sbin/start-thriftserver.sh • dse spark-sql-thriftserver start Use with all of your favorite Spark Packages like the Spark Cassandra Connector! --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 --conf spark.cassandra.connection.host=127.0.0.1
- 31. How to use it Starts from the command line and can use all Spark Submit Args • ./sbin/start-thriftserver.sh • dse spark-sql-thriftserver start starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
- 32. Hive? Wait I though we were Doing Spark starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 Why does it say Hive everywhere? • Built on HiveServer2
- 33. A Brief History of the Spark Thrift Server • Thrift? • Hive?
- 34. They are not the Same Cassandra Thrift Hive Thrift
- 35. Have you heard of the "Ship of Theseus?" Time for a quick history More Greek stuff ..
- 36. When you replace all the parts of a thing Does it Remain the Same? Greek Boat
- 37. When you replace all the parts of a thing Does it Remain the Same? SharkServer Hive Parser Hive Optimization Map-Reduce Spark Execution JDBC Results
- 38. When you replace all the parts of a thing Does it Remain the Same? SharkServer ThriftServer Hive Parser Map-Reduce Spark Execution JDBC Results Hive Optimization
- 39. When you replace all the parts of a thing Does it Remain the Same? ThriftServer Hive Parser Catalyst Schema RDDs Spark Execution JDBC Results
- 40. When you replace all the parts of a thing Does it Remain the Same? ThriftServer Hive Parser Catalyst DataFrames Spark Execution JDBC Results
- 41. When you replace all the parts of a thing Does it Remain the Same? ThriftServer Hive Parser Catalyst Datasets Spark Execution JDBC Results
- 42. When you replace all the parts of a thing Does it Remain the Same? ThriftServer Spark Parser Catalyst DataSets Spark Execution JDBC Results
- 43. Almost all Spark now ThriftServer Spark Parser Catalyst DataSets Spark Execution JDBC Results
- 44. Connecting with Beeline (JDBC Client) ./bin/beeline dse beeline !connect jdbc:hive2://localhost:10000 Even More Hive!
- 45. Connect Tableau to Cassandra
- 46. The Full JDBC/ODBC Ecosystem Can Connect to ThriftServer
- 47. Incremental Collect - Because BI Tools are Mean SELECT * FROM TABLE Spark Sql Thrift Server ALL THE DATA
- 48. Spark Sql Thrift Server ALL THE DATAOOM Incremental Collect - Because BI Tools are Mean
- 49. Spark Sql Thrift Server spark.sql.thriftServer.incrementalCollect=true ALL THE DATASpark Partition 1 Spark Partition 2 Spark Partition 3 Incremental Collect - Because BI Tools are Mean
- 50. Spark Sql Thrift Server spark.sql.thriftServer.incrementalCollect=true ALL THE DATA Spark Partition 1 Spark Partition 2 Spark Partition 3 Incremental Collect - Because BI Tools are Mean
- 51. Spark Sql Thrift Server spark.sql.thriftServer.incrementalCollect=true ALL THE DATA Spark Partition 1 Spark Partition 2 Spark Partition 3 Incremental Collect - Because BI Tools are Mean
- 52. Spark Sql Thrift Server spark.sql.thriftServer.incrementalCollect=true ALL THE DATA Spark Partition 1Spark Partition 2 Spark Partition 3 Incremental Collect - Because BI Tools are Mean
- 53. Getting things done with SQL • Registering Sources • Writing to Tables • Examining Query Plans • Debugging Predicate pushdowns • Caching Views
- 54. Registering Sources using SQL CREATE TEMPORARY VIEW words USING format.goes.here OPTIONS ( key "value" )
- 55. Registering Sources using SQL CREATE TEMPORARY VIEW words USING org.apache.spark.sql.cassandra OPTIONS ( table "tab", keyspace "ks") Not a single monad…
- 56. CREATE TEMPORARY VIEW words USING org.apache.spark.sql.cassandra OPTIONS ( table "tab", keyspace "ks") Registering Sources using SQL CassandraSourceRelation
- 57. We Can Still Use a HiveMetaStore DSE auto registers C* Tables in a C* based Metastore MetaStore Thrift Server
- 58. Writing DataFrames using SQL INSERT INTO arrow SELECT * FROM words; CassandraSourceRelation words read CassandraSourceRelation arrow write
- 59. Explain to Analyze Query Plans EXPLAIN SELECT * FROM arrow WHERE C > 2; Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6069193a [k#18,c#19,v#20] PushedFilters: [IsNotNull(c), GreaterThan(c,2)], ReadSchema: struct<k:int,c:int,v:int> We can analyze the inside of the Catalyst just like with Scala/Java/…
- 60. Predicates get Pushed Down Automatically EXPLAIN SELECT * FROM arrow WHERE C > 2; Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6069193a [k#18,c#19,v#20] PushedFilters: [IsNotNull(c), GreaterThan(c,2)], ReadSchema: struct<k:int,c:int,v:int> CassandraSourceRelation Filter [GreaterThan(c,2)]
- 61. EXPLAIN SELECT * FROM arrow WHERE C > 2; Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6069193a [k#18,c#19,v#20] PushedFilters: [IsNotNull(c), GreaterThan(c,2)], ReadSchema: struct<k:int,c:int,v:int> CassandraSourceRelation Filter [GreaterThan(c,2)] Internal Request to Cassandra: CQL SELECT * FROM ks.bat WHERE C > 2 Automatic Pushdowns! Predicates get Pushed Down Automatically
- 62. Common Cases where Predicates Don't Push SELECT * from troubles WHERE c < '2017-05-27' *Filter (cast(c#76 as string) < 2017-05-27) +- *Scan CassandraSourceRelation@53e82b30 [k#75,c#76,v#77] PushedFilters: [IsNotNull(c)], ReadSchema: struct<k:int,c:date,v:int> Why is my date clustering column not being pushed down.
- 63. Common Cases where Predicates Don't Push CassandraSourceRelation Filter [LessThan(c,'2017-05-27')] SELECT * from troubles WHERE c < '2017-05-27' *Filter (cast(c#76 as string) < 2017-05-27) +- *Scan CassandraSourceRelation@53e82b30 [k#75,c#76,v#77] PushedFilters: [IsNotNull(c)], ReadSchema: struct<k:int,c:date,v:int>
- 64. Common Cases where Predicates Don't Push CassandraSourceRelation ReadSchema: struct<k:int,c:date,v:int> Filter [LessThan(c,'2017-05-27')] Date != String SELECT * from troubles WHERE c < '2017-05-27' *Filter (cast(c#76 as string) < 2017-05-27) +- *Scan CassandraSourceRelation@53e82b30 [k#75,c#76,v#77] PushedFilters: [IsNotNull(c)], ReadSchema: struct<k:int,c:date,v:int>
- 65. Make Sure we Cast Correctly EXPLAIN SELECT * from troubles WHERE c < cast('2017-05-27' as date); *Scan C*Relation PushedFilters: [IsNotNull(c), LessThan(c, 2017-05-27)] CassandraSourceRelation ReadSchema: struct<k:int,c:date,v:int> Filter [LessThan(c,Date('2017-05-27'))] Date == Date
- 66. Make Sure we Cast Correctly CassandraSourceRelation ReadSchema: struct<k:int,c:date,v:int> Filter [LessThan(c,Date('2017-05-27'))] Automatic Pushdowns! EXPLAIN SELECT * from troubles WHERE c < cast('2017-05-27' as date); *Scan C*Relation PushedFilters: [IsNotNull(c), LessThan(c, 2017-05-27)]
- 67. DSE Search Automatic Pushdowns! EXPLAIN SELECT * from troubles WHERE v < 6; *Scan C*Relation PushedFilters: [IsNotNull(v), LessThan(v, 6)] CassandraSourceRelation ReadSchema: struct<k:int,c:date,v:int> Solr_Query
- 68. DSE Search Automatic Pushdowns!
- 69. DSE Search Automatic Pushdowns! Count Happens in the Index DSE Continuous Paging
- 70. Cache a whole table CassandraSourceRelation InMemoryRelation CACHE TABLE ks.tab; explain SELECT * FROM ks.tab; == Physical Plan == InMemoryTableScan [k#0, c#1, v#2] : +- InMemoryRelation StorageLevel(disk, memory, deserialized, 1 replicas), `ks`.`tab` : : +- *Scan CassandraSourceRelation
- 71. Uncache CassandraSourceRelation UNCACHE TABLE ks.tab; explain SELECT * FROM ks.tab; == Physical Plan == *Scan CassandraSourceRelation
- 72. Cache a fraction of Data CassandraSourceRelation CACHE TABLE somedata SELECT * FROM ks.tab WHERE c > 2; explain SELECT * from somedata; == Physical Plan == InMemoryTableScan : +- InMemoryRelation `somedata` : : +- *Scan CassandraSourceRelation PushedFilters: [IsNotNull(c), GreaterThan(c,2)] Filter [GreaterThan(c,2)] InMemoryRelation somedata
- 73. Let this be a starting point • https://github.com/datastax/spark-cassandra-connector • https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md • https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-thrift-server.html • http://docs.datastax.com/en/dse/5.1/dse-dev/datastax_enterprise/spark/sparkSqlThriftServer.html • https://spark.apache.org/docs/latest/sql-programming-guide.html#distributed-sql-engine • https://www.datastax.com/dev/blog/dse-5-1-automatic-optimization-of-spark-sql-queries-using-dse-search • https://www.datastax.com/dev/blog/dse-continuous-paging-tuning-and-support-guide
- 74. Thank You. http://www.russellspitzer.com/ @RussSpitzer Come chat with us at DataStax Academy: https://academy.datastax.com/slack
Public clipboards featuring this slide
No public clipboards found for this slide