Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

12/2/2020

Reading time:11 mins

Cassandra and SparkSQL: You Don't Need Functional Programming for Fun…

by Databricks

Cassandra and SparkSQL: You Don't Need Functional Programming for Fun… SlideShare Explore You Successfully reported this slideshow.Cassandra and SparkSQL: You Don't Need Functional Programming for Fun with Russell SpitzerUpcoming SlideShareLoading in …5× 6 Comments 3 Likes Statistics Notes Matt K , DBA at Corp Asaf Ben Levi , Solution Architect at Amdocs at Amdocs Idan Zalzberg , VP Data at Agoda - WE ARE HIRING! at Agoda No DownloadsNo notes for slide 1. You don't need Functional Programming for Fun!Cassandra and SparkSQL 2. Russell (left) and Cara (right)• Software Engineer
• Spark-CassandraIntegration since 
Spark 0.9• Cassandra sinceCassandra 1.2• 2 Year Scala Convert• Still not comfortabletalking about Monadsin public@Evanfchan 3. A Story in 3 Parts• Why SparkSQL?• The Spark SQL Thrift Server• Writing SQL for Spark 4. You have lots of options why Spark SQL?• Scala?• Java?• Python?• R?• Notebooks?• Erlang? 5. Spark is A Powerful Analytics Tool Built on ScalaDistributed Analytics Platformwith In Memory CapabilitiesLots of new concepts:
RDDsDataSetsStreamingSerializationFunctional Programming 6. Functional Programming Is AwesomeSide-effect Free FunctionsMonadsEasyParallelizationAnonymousFunctionsScalaAsync ModelsTypeMatchingrdd.map(y => y+1)Endofunctors 7. Functional Programming can be Hardblah-blah blahBlahEasyblahilizationbaaaaahblahalaAsybc blahBlahblahhingrdd.map(y => y+1)Aren'tEndofunctors fromghostbusters?Endofunctors 8. Learning Takes TimeCompile TimeType Safety!Catalyst! Tungsten! Weget to learn all sorts offun new things! SBTis probably great!Usually Me Less Excitable DevWe ship next week 9. Spark SQL Provides A Familiar and Easy APIUse SQL to access the Power of Spark 10. Spark Sql Provides A Familiar and Easy APICatalystCodegen!Optimization!Predicate PushdownsDistributedWorkSQL 11. It still takes Scala/Java/Python/… Code.import org.apache.spark.sql.cassandra._
val df = spark .read .cassandraFormat("tab", "ks") .load
 df.createTempView("tab")
spark.sql("SELECT * FROM tab").show +---+---+---+ | k| c| v| +---+---+---+ | 1| 1| 1| | 1| 2| 2|Let me colorcode that by parts Ilike vs parts I don'tlike. 12. It still takes Scala/Java/Python/… Code.import org.apache.spark.sql.cassandra._
val df = spark .read .cassandraFormat("tab", "ks") .load
 df.createTempView("tab")
spark.sql("SELECT * FROM tab").show +---+---+---+ | k| c| v| +---+---+---+ | 1| 1| 1| | 1| 2| 2|Also, your importhas an underscorein it.. 13. For exploration we have the Spark-SQL Shellspark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3 14. For exploration we have the Spark-SQL Shellspark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3SparkSession 15. For exploration we have the Spark-SQL Shellspark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3SparkSessionExecutor Executor Executor Executor Executor 16. Not really good for multiple-usersspark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3SparkSessionExecutor Executor Executor Executor Executor 17. Enter Spark Thrift ServerSpark Sql Thrift ServerExecutor Executor Executor Executor ExecutorJDBC Client JDBC ClientJDBC Client 18. The Spark Sql Thrift Server is a Spark Application• Built on HiveServer2• Single Spark Context• Clients Communicate with it via JDBC• Can use all SparkSQL• Fair Scheduling• Clients can share Cached Resources• Security 19. The Spark Sql ThriftServer is a Spark Application• Built on HiveServer2• Single Spark Context• Clients Communicate with it via JDBC• Can use all SparkSQL• Fair Scheduling• Clients can share Cached Resources• Security 20. Fair Scheduling is SharingFIFOTime 21. Fair Scheduling is SharingFIFOTime 22. Fair Scheduling is SharingFIFOTime 23. Fair Scheduling is SharingFIFOTime 24. Fair Scheduling is SharingFIFOFAIRTime 25. Fair Scheduling is SharingFIFOFAIRTime 26. Fair Scheduling is SharingFIFOFAIRTime 27. SingleContext can Share Cached DataSpark Sql Thrift ServerExecutor Executor Executor Executor Executorcache TABLE today select * from ks.tab where date = today; 28. SingleContext can Share Cached DataSpark Sql Thrift ServerExecutor Executor Executor Executor ExecutorCACHED CACHED CACHED CACHED CACHEDcache TABLE today select * from ks.tab where date = today; 29. SingleContext can Share Cached DataSpark Sql Thrift ServerExecutor Executor Executor Executor ExecutorCACHED CACHED CACHED CACHED CACHEDcache TABLE today select * from ks.tab where date = today;SELECT * from TODAY where age > 5 30. How to use itStarts from the command line and can use allSpark Submit Args• ./sbin/start-thriftserver.sh• dse spark-sql-thriftserver startUse with all of your favorite Spark Packageslike the Spark Cassandra Connector!--packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2--conf spark.cassandra.connection.host=127.0.0.1 31. How to use itStarts from the command line and can use allSpark Submit Args• ./sbin/start-thriftserver.sh• dse spark-sql-thriftserver startstarting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 32. Hive? Wait I though we were Doing Sparkstarting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2Why does it say Hiveeverywhere?• Built on HiveServer2 33. A Brief History of the Spark Thrift Server• Thrift?• Hive? 34. They are not the SameCassandra Thrift Hive Thrift 35. Have you heard of the
"Ship of Theseus?"Time for a quick historyMore Greek stuff .. 36. When you replace all the parts of a thing Does itRemain the Same?Greek Boat 37. When you replace all the parts of a thing Does itRemain the Same?SharkServerHive ParserHive OptimizationMap-ReduceSpark ExecutionJDBC Results 38. When you replace all the parts of a thing Does itRemain the Same?SharkServer
ThriftServerHive ParserMap-ReduceSpark ExecutionJDBC ResultsHive Optimization 39. When you replace all the parts of a thing Does itRemain the Same?ThriftServerHive ParserCatalystSchema RDDsSpark ExecutionJDBC Results 40. When you replace all the parts of a thing Does itRemain the Same?ThriftServerHive ParserCatalystDataFramesSpark ExecutionJDBC Results 41. When you replace all the parts of a thing Does itRemain the Same?ThriftServerHive ParserCatalystDatasetsSpark ExecutionJDBC Results 42. When you replace all the parts of a thing Does itRemain the Same?ThriftServerSpark ParserCatalystDataSetsSpark ExecutionJDBC Results 43. Almost all Spark nowThriftServerSpark ParserCatalystDataSetsSpark ExecutionJDBC Results 44. Connecting with Beeline (JDBC Client)./bin/beeline dse beeline !connect jdbc:hive2://localhost:10000Even More Hive! 45. Connect Tableau to Cassandra 46. The Full JDBC/ODBC Ecosystem Can Connect toThriftServer 47. Incremental Collect - Because BI Tools are MeanSELECT *FROM TABLESpark Sql Thrift ServerALL THE DATA 48. Spark Sql Thrift ServerALL THEDATAOOMIncremental Collect - Because BI Tools are Mean 49. Spark Sql Thrift Server
spark.sql.thriftServer.incrementalCollect=trueALL THE DATASpark Partition 1 Spark Partition 2 Spark Partition 3Incremental Collect - Because BI Tools are Mean 50. Spark Sql Thrift Server
spark.sql.thriftServer.incrementalCollect=trueALL THE DATASpark Partition 1Spark Partition 2 Spark Partition 3Incremental Collect - Because BI Tools are Mean 51. Spark Sql Thrift Server
spark.sql.thriftServer.incrementalCollect=trueALL THE DATASpark Partition 1 Spark Partition 2Spark Partition 3Incremental Collect - Because BI Tools are Mean 52. Spark Sql Thrift Server
spark.sql.thriftServer.incrementalCollect=trueALL THE DATASpark Partition 1Spark Partition 2 Spark Partition 3Incremental Collect - Because BI Tools are Mean 53. Getting things done with SQL• Registering Sources• Writing to Tables• Examining Query Plans• Debugging Predicate pushdowns• Caching Views 54. Registering Sources using SQLCREATE TEMPORARY VIEW words USING format.goes.here OPTIONS ( key "value"
) 55. Registering Sources using SQLCREATE TEMPORARY VIEW words USING org.apache.spark.sql.cassandra OPTIONS ( table "tab", keyspace "ks")Not a single monad… 56. CREATE TEMPORARY VIEW words USING org.apache.spark.sql.cassandra OPTIONS ( table "tab", keyspace "ks")Registering Sources using SQLCassandraSourceRelation 57. We Can Still Use a HiveMetaStoreDSE auto registers C* Tables in a C* based MetastoreMetaStore Thrift Server 58. Writing DataFrames using SQLINSERT INTO arrow SELECT * FROM words;CassandraSourceRelationwordsreadCassandraSourceRelationarrow
write 59. Explain to Analyze Query PlansEXPLAIN SELECT * FROM arrow WHERE C > 2;
Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6069193a [k#18,c#19,v#20] 
 PushedFilters: [IsNotNull(c), GreaterThan(c,2)], 
 ReadSchema: struct<k:int,c:int,v:int>We can analyze the insideof the Catalyst justlike with Scala/Java/… 60. Predicates get Pushed Down AutomaticallyEXPLAIN SELECT * FROM arrow WHERE C > 2;
Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6069193a [k#18,c#19,v#20] 
 PushedFilters: [IsNotNull(c), GreaterThan(c,2)], 
 ReadSchema: struct<k:int,c:int,v:int>CassandraSourceRelation Filter [GreaterThan(c,2)] 61. EXPLAIN SELECT * FROM arrow WHERE C > 2;
Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6069193a [k#18,c#19,v#20] 
 PushedFilters: [IsNotNull(c), GreaterThan(c,2)], 
 ReadSchema: struct<k:int,c:int,v:int>CassandraSourceRelationFilter [GreaterThan(c,2)]Internal Request to Cassandra: CQLSELECT * FROM ks.bat WHERE C > 2AutomaticPushdowns!Predicates get Pushed Down Automatically 62. Common Cases where Predicates Don't PushSELECT * from troubles WHERE c < '2017-05-27'
*Filter (cast(c#76 as string) < 2017-05-27) +- *Scan CassandraSourceRelation@53e82b30 [k#75,c#76,v#77] PushedFilters: [IsNotNull(c)], ReadSchema: struct<k:int,c:date,v:int> Why is my dateclustering column not beingpushed down. 63. Common Cases where Predicates Don't PushCassandraSourceRelation Filter [LessThan(c,'2017-05-27')]SELECT * from troubles WHERE c < '2017-05-27'
*Filter (cast(c#76 as string) < 2017-05-27) +- *Scan CassandraSourceRelation@53e82b30 [k#75,c#76,v#77] PushedFilters: [IsNotNull(c)], ReadSchema: struct<k:int,c:date,v:int> 64. Common Cases where Predicates Don't PushCassandraSourceRelation
ReadSchema:struct<k:int,c:date,v:int>Filter [LessThan(c,'2017-05-27')]Date != StringSELECT * from troubles WHERE c < '2017-05-27'
*Filter (cast(c#76 as string) < 2017-05-27) +- *Scan CassandraSourceRelation@53e82b30 [k#75,c#76,v#77] PushedFilters: [IsNotNull(c)], ReadSchema: struct<k:int,c:date,v:int> 65. Make Sure we Cast CorrectlyEXPLAIN SELECT * from troubles WHERE c < cast('2017-05-27' as date);
 *Scan C*Relation PushedFilters: [IsNotNull(c), LessThan(c, 2017-05-27)]CassandraSourceRelation
ReadSchema:struct<k:int,c:date,v:int>Filter [LessThan(c,Date('2017-05-27'))]Date == Date 66. Make Sure we Cast CorrectlyCassandraSourceRelation
ReadSchema:struct<k:int,c:date,v:int>Filter [LessThan(c,Date('2017-05-27'))]AutomaticPushdowns!EXPLAIN SELECT * from troubles WHERE c < cast('2017-05-27' as date);
 *Scan C*Relation PushedFilters: [IsNotNull(c), LessThan(c, 2017-05-27)] 67. DSE Search Automatic Pushdowns!EXPLAIN SELECT * from troubles WHERE v < 6;
 *Scan C*Relation PushedFilters: [IsNotNull(v), LessThan(v, 6)]CassandraSourceRelation
ReadSchema:struct<k:int,c:date,v:int>Solr_Query 68. DSE Search Automatic Pushdowns! 69. DSE Search Automatic Pushdowns!Count Happens in the IndexDSE
Continuous Paging 70. Cache a whole tableCassandraSourceRelation InMemoryRelationCACHE TABLE ks.tab;
explain SELECT * FROM ks.tab; == Physical Plan == InMemoryTableScan [k#0, c#1, v#2] : +- InMemoryRelation StorageLevel(disk, memory, deserialized, 1 replicas), `ks`.`tab` : : +- *Scan CassandraSourceRelation 71. UncacheCassandraSourceRelationUNCACHE TABLE ks.tab; explain SELECT * FROM ks.tab; == Physical Plan == *Scan CassandraSourceRelation 72. Cache a fraction of DataCassandraSourceRelationCACHE TABLE somedata SELECT * FROM ks.tab WHERE c > 2;
explain SELECT * from somedata; == Physical Plan == InMemoryTableScan : +- InMemoryRelation `somedata` : : +- *Scan CassandraSourceRelation PushedFilters: [IsNotNull(c), GreaterThan(c,2)]Filter [GreaterThan(c,2)]InMemoryRelationsomedata 73. Let this be a starting point• https://github.com/datastax/spark-cassandra-connector• https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md• https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-thrift-server.html• http://docs.datastax.com/en/dse/5.1/dse-dev/datastax_enterprise/spark/sparkSqlThriftServer.html• https://spark.apache.org/docs/latest/sql-programming-guide.html#distributed-sql-engine• https://www.datastax.com/dev/blog/dse-5-1-automatic-optimization-of-spark-sql-queries-using-dse-search• https://www.datastax.com/dev/blog/dse-continuous-paging-tuning-and-support-guide 74. Thank You.http://www.russellspitzer.com/
@RussSpitzerCome chat with us at DataStax Academy: 
https://academy.datastax.com/slack Recommended Livestream Economy: The Application of Real-time Media and Algorithmic Person...Databricks The Hidden Value of Hadoop MigrationDatabricks MIDAS: Microcluster-Based Detector of Anomalies in Edge StreamsDatabricks Polymorphic Table Functions: The Best Way to Integrate SQL and Apache SparkDatabricks Continuous Delivery of Deep Transformer-Based NLP Models Using MLflow and AWS...Databricks Active Governance Across the Delta Lake with AlationDatabricks Migrate and Modernize Hadoop-Based Security Policies for DatabricksDatabricks Add Historical Analysis of Operational Data with Easy Configurations in Fivet...Databricks Delta Lake: Optimizing MergeDatabricks Photon Technical Deep Dive: How to Think VectorizedDatabricks About Blog Terms Privacy Copyright × Public clipboards featuring this slideNo public clipboards found for this slideSelect another clipboard ×Looks like you’ve clipped this slide to already.Create a clipboardYou just clipped your first slide! Clipping is a handy way to collect important slides you want to go back to later. Now customize the name of a clipboard to store your clips. Description Visibility Others can see my Clipboard

Illustration Image
Cassandra and SparkSQL: You Don't Need Functional Programming for Fun…

Successfully reported this slideshow.

Cassandra and SparkSQL: You Don't Need Functional Programming for Fun with Russell Spitzer
You don't need Functional Programming for Fun!
Cassandra and SparkSQL
Russell (left) and Cara (right)
• Software Engineer



• Spark-Cassandra
Integration since 

Spark 0.9
• Cassandra since
C...
A Story in 3 Parts
• Why SparkSQL?
• The Spark SQL Thrift Server
• Writing SQL for Spark
You have lots of options why Spark SQL?
• Scala?
• Java?
• Python?
• R?
• Notebooks?
• Erlang?
Spark is A Powerful Analytics Tool Built on Scala
Distributed Analytics Platform
with In Memory Capabilities
Lots of new c...
Functional Programming Is Awesome
Side-effect Free Functions
Monads
Easy
Parallelization
AnonymousFunctions
Scala
Async Mo...
Functional Programming can be Hard
blah-blah blah
Blah
Easy
blahilization
baaaaah
blahala
Asybc blah
Blahblahhing
rdd.map(...
Learning Takes Time
Compile Time
Type Safety!
Catalyst! Tungsten! We
get to learn all sorts of
fun new things! SBT
is prob...
Spark SQL Provides A Familiar and Easy API
Use SQL to access the Power of Spark
Spark Sql Provides A Familiar and Easy API
Catalyst
Codegen!
Optimization!
Predicate Pushdowns
Distributed
Work
SQL
It still takes Scala/Java/Python/… Code.
import	org.apache.spark.sql.cassandra._

val	df	=	spark	
		.read	
		.cassandraFor...
It still takes Scala/Java/Python/… Code.
import	org.apache.spark.sql.cassandra._

val	df	=	spark	
		.read	
		.cassandraFor...
For exploration we have the Spark-SQL Shell
spark-sql>	SELECT	*	FROM	ks.tab;	
1	 2	 2	
1	 3	 3
For exploration we have the Spark-SQL Shell
spark-sql>	SELECT	*	FROM	ks.tab;	
1	 2	 2	
1	 3	 3
SparkSession
For exploration we have the Spark-SQL Shell
spark-sql>	SELECT	*	FROM	ks.tab;	
1	 2	 2	
1	 3	 3
SparkSession
Executor Execu...
Not really good for multiple-users
spark-sql>	SELECT	*	FROM	ks.tab;	
1	 2	 2	
1	 3	 3
SparkSession
Executor Executor Execu...
Enter Spark Thrift Server
Spark Sql Thrift Server
Executor Executor Executor Executor Executor
JDBC Client JDBC ClientJDBC...
The Spark Sql Thrift Server is a Spark Application
• Built on HiveServer2
• Single Spark Context
• Clients Communicate wit...
The Spark Sql ThriftServer is a Spark Application
• Built on HiveServer2
• Single Spark Context
• Clients Communicate with...
Fair Scheduling is Sharing
FIFO
Time
Fair Scheduling is Sharing
FIFO
Time
Fair Scheduling is Sharing
FIFO
Time
Fair Scheduling is Sharing
FIFO
Time
Fair Scheduling is Sharing
FIFO
FAIR
Time
Fair Scheduling is Sharing
FIFO
FAIR
Time
Fair Scheduling is Sharing
FIFO
FAIR
Time
SingleContext can Share Cached Data
Spark Sql Thrift Server
Executor Executor Executor Executor Executor
cache TABLE today...
SingleContext can Share Cached Data
Spark Sql Thrift Server
Executor Executor Executor Executor Executor
CACHED CACHED CAC...
SingleContext can Share Cached Data
Spark Sql Thrift Server
Executor Executor Executor Executor Executor
CACHED CACHED CAC...
How to use it
Starts from the command line and can use all
Spark Submit Args
• ./sbin/start-thriftserver.sh
• dse spark-sq...
How to use it
Starts from the command line and can use all
Spark Submit Args
• ./sbin/start-thriftserver.sh
• dse spark-sq...
Hive? Wait I though we were Doing Spark
starting	org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
Why does it say ...
A Brief History of the Spark Thrift Server
• Thrift?
• Hive?
They are not the Same
Cassandra Thrift Hive Thrift
Have you heard of the

"Ship of Theseus?"
Time for a quick history
More Greek stuff ..
When you replace all the parts of a thing Does it
Remain the Same?
Greek Boat
When you replace all the parts of a thing Does it
Remain the Same?
SharkServer
Hive Parser
Hive Optimization
Map-Reduce
Sp...
When you replace all the parts of a thing Does it
Remain the Same?
SharkServer

ThriftServer
Hive Parser
Map-Reduce
Spark ...
When you replace all the parts of a thing Does it
Remain the Same?
ThriftServer
Hive Parser
Catalyst
Schema RDDs
Spark Exe...
When you replace all the parts of a thing Does it
Remain the Same?
ThriftServer
Hive Parser
Catalyst
DataFrames
Spark Exec...
When you replace all the parts of a thing Does it
Remain the Same?
ThriftServer
Hive Parser
Catalyst
Datasets
Spark Execut...
When you replace all the parts of a thing Does it
Remain the Same?
ThriftServer
Spark Parser
Catalyst
DataSets
Spark Execu...
Almost all Spark now
ThriftServer
Spark Parser
Catalyst
DataSets
Spark Execution
JDBC Results
Connecting with Beeline (JDBC Client)
./bin/beeline	
dse	beeline	
!connect	jdbc:hive2://localhost:10000
Even More Hive!
Connect Tableau to Cassandra
The Full JDBC/ODBC Ecosystem Can Connect to
ThriftServer
Incremental Collect - Because BI Tools are Mean
SELECT *
FROM TABLE
Spark Sql Thrift Server
ALL THE DATA
Spark Sql Thrift Server
ALL THE
DATAOOM
Incremental Collect - Because BI Tools are Mean
Spark Sql Thrift Server

spark.sql.thriftServer.incrementalCollect=true
ALL THE DATASpark Partition 1 Spark Partition 2 Sp...
Spark Sql Thrift Server

spark.sql.thriftServer.incrementalCollect=true
ALL THE DATA
Spark Partition 1
Spark Partition 2 S...
Spark Sql Thrift Server

spark.sql.thriftServer.incrementalCollect=true
ALL THE DATA
Spark Partition 1 Spark Partition 2
S...
Spark Sql Thrift Server

spark.sql.thriftServer.incrementalCollect=true
ALL THE DATA
Spark Partition 1Spark Partition 2 Sp...
Getting things done with SQL
• Registering Sources
• Writing to Tables
• Examining Query Plans
• Debugging Predicate pushd...
Registering Sources using SQL
CREATE	TEMPORARY	VIEW	words	
					USING	format.goes.here	
					OPTIONS	(	
								key	"value...
Registering Sources using SQL
CREATE	TEMPORARY	VIEW	words	
					USING	org.apache.spark.sql.cassandra	
					OPTIONS	(	
				...
CREATE	TEMPORARY	VIEW	words	
					USING	org.apache.spark.sql.cassandra	
					OPTIONS	(	
					table	"tab",	
					keyspace	"...
We Can Still Use a HiveMetaStore
DSE auto registers C* Tables in a C* based Metastore
MetaStore Thrift Server
Writing DataFrames using SQL
INSERT	INTO	arrow	SELECT	*	FROM	words;
CassandraSourceRelation
words
read
CassandraSourceRela...
Explain to Analyze Query Plans
EXPLAIN	SELECT	*	FROM	arrow	WHERE	C	>	2;



Scan	org.apache.spark.sql.cassandra.CassandraSo...
Predicates get Pushed Down Automatically
EXPLAIN	SELECT	*	FROM	arrow	WHERE	C	>	2;



Scan	org.apache.spark.sql.cassandra.C...
EXPLAIN	SELECT	*	FROM	arrow	WHERE	C	>	2;



Scan	org.apache.spark.sql.cassandra.CassandraSourceRelation@6069193a	
[k#18,c#...
Common Cases where Predicates Don't Push
SELECT	*	from	troubles	WHERE	c	<	'2017-05-27'

*Filter	(cast(c#76	as	string)	<	20...
Common Cases where Predicates Don't Push
CassandraSourceRelation Filter [LessThan(c,'2017-05-27')]
SELECT	*	from	troubles	...
Common Cases where Predicates Don't Push
CassandraSourceRelation

ReadSchema:
struct<k:int,c:date,v:int>
Filter [LessThan(...
Make Sure we Cast Correctly
EXPLAIN	SELECT	*	from	troubles	WHERE		
		c	<	cast('2017-05-27'	as	date);

	*Scan	C*Relation	Pu...
Make Sure we Cast Correctly
CassandraSourceRelation

ReadSchema:
struct<k:int,c:date,v:int>
Filter [LessThan(c,Date('2017-...
DSE Search Automatic Pushdowns!
EXPLAIN	SELECT	*	from	troubles	WHERE	v	<	6;

	*Scan	C*Relation	PushedFilters:	[IsNotNull(v...
DSE Search Automatic Pushdowns!
DSE Search Automatic Pushdowns!
Count Happens in the Index
DSE

Continuous Paging
Cache a whole table
CassandraSourceRelation InMemoryRelation
CACHE	TABLE	ks.tab;

explain	SELECT	*	FROM	ks.tab;	
==	Physic...
Uncache
CassandraSourceRelation
UNCACHE	TABLE	ks.tab;	
explain	SELECT	*	FROM	ks.tab;	
==	Physical	Plan	==	
*Scan	Cassandra...
Cache a fraction of Data
CassandraSourceRelation
CACHE	TABLE	somedata	SELECT	*	FROM	ks.tab	WHERE	c	>	2;

explain	SELECT	*	...
Let this be a starting point
• https://github.com/datastax/spark-cassandra-connector
• https://github.com/datastax/spark-c...
Thank You.
http://www.russellspitzer.com/

@RussSpitzer
Come chat with us at DataStax Academy: 

https://academy.datastax....

Upcoming SlideShare

Loading in …5

×

  1. 1. You don't need Functional Programming for Fun! Cassandra and SparkSQL
  2. 2. Russell (left) and Cara (right) • Software Engineer
 
 • Spark-Cassandra Integration since 
 Spark 0.9 • Cassandra since Cassandra 1.2 • 2 Year Scala Convert • Still not comfortable talking about Monads in public @Evanfchan
  3. 3. A Story in 3 Parts • Why SparkSQL? • The Spark SQL Thrift Server • Writing SQL for Spark
  4. 4. You have lots of options why Spark SQL? • Scala? • Java? • Python? • R? • Notebooks? • Erlang?
  5. 5. Spark is A Powerful Analytics Tool Built on Scala Distributed Analytics Platform with In Memory Capabilities Lots of new concepts:
 RDDs DataSets Streaming Serialization Functional Programming
  6. 6. Functional Programming Is Awesome Side-effect Free Functions Monads Easy Parallelization AnonymousFunctions Scala Async Models TypeMatching rdd.map(y => y+1) Endofunctors
  7. 7. Functional Programming can be Hard blah-blah blah Blah Easy blahilization baaaaah blahala Asybc blah Blahblahhing rdd.map(y => y+1) Aren't Endofunctors from ghostbusters? Endofunctors
  8. 8. Learning Takes Time Compile Time Type Safety! Catalyst! Tungsten! We get to learn all sorts of fun new things! SBT is probably great! Usually Me Less Excitable Dev We ship next week
  9. 9. Spark SQL Provides A Familiar and Easy API Use SQL to access the Power of Spark
  10. 10. Spark Sql Provides A Familiar and Easy API Catalyst Codegen! Optimization! Predicate Pushdowns Distributed Work SQL
  11. 11. It still takes Scala/Java/Python/… Code. import org.apache.spark.sql.cassandra._
 val df = spark .read .cassandraFormat("tab", "ks") .load
 df.createTempView("tab")
 spark.sql("SELECT * FROM tab").show +---+---+---+ | k| c| v| +---+---+---+ | 1| 1| 1| | 1| 2| 2| Let me color code that by parts I like vs parts I don't like.
  12. 12. It still takes Scala/Java/Python/… Code. import org.apache.spark.sql.cassandra._
 val df = spark .read .cassandraFormat("tab", "ks") .load
 df.createTempView("tab")
 spark.sql("SELECT * FROM tab").show +---+---+---+ | k| c| v| +---+---+---+ | 1| 1| 1| | 1| 2| 2| Also, your import has an underscore in it..
  13. 13. For exploration we have the Spark-SQL Shell spark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3
  14. 14. For exploration we have the Spark-SQL Shell spark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3 SparkSession
  15. 15. For exploration we have the Spark-SQL Shell spark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3 SparkSession Executor Executor Executor Executor Executor
  16. 16. Not really good for multiple-users spark-sql> SELECT * FROM ks.tab; 1 2 2 1 3 3 SparkSession Executor Executor Executor Executor Executor
  17. 17. Enter Spark Thrift Server Spark Sql Thrift Server Executor Executor Executor Executor Executor JDBC Client JDBC ClientJDBC Client
  18. 18. The Spark Sql Thrift Server is a Spark Application • Built on HiveServer2 • Single Spark Context • Clients Communicate with it via JDBC • Can use all SparkSQL • Fair Scheduling • Clients can share Cached Resources • Security
  19. 19. The Spark Sql ThriftServer is a Spark Application • Built on HiveServer2 • Single Spark Context • Clients Communicate with it via JDBC • Can use all SparkSQL • Fair Scheduling • Clients can share Cached Resources • Security
  20. 20. Fair Scheduling is Sharing FIFO Time
  21. 21. Fair Scheduling is Sharing FIFO Time
  22. 22. Fair Scheduling is Sharing FIFO Time
  23. 23. Fair Scheduling is Sharing FIFO Time
  24. 24. Fair Scheduling is Sharing FIFO FAIR Time
  25. 25. Fair Scheduling is Sharing FIFO FAIR Time
  26. 26. Fair Scheduling is Sharing FIFO FAIR Time
  27. 27. SingleContext can Share Cached Data Spark Sql Thrift Server Executor Executor Executor Executor Executor cache TABLE today select * from ks.tab where date = today;
  28. 28. SingleContext can Share Cached Data Spark Sql Thrift Server Executor Executor Executor Executor Executor CACHED CACHED CACHED CACHED CACHED cache TABLE today select * from ks.tab where date = today;
  29. 29. SingleContext can Share Cached Data Spark Sql Thrift Server Executor Executor Executor Executor Executor CACHED CACHED CACHED CACHED CACHED cache TABLE today select * from ks.tab where date = today; SELECT * from TODAY where age > 5
  30. 30. How to use it Starts from the command line and can use all Spark Submit Args • ./sbin/start-thriftserver.sh • dse spark-sql-thriftserver start Use with all of your favorite Spark Packages like the Spark Cassandra Connector! --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 --conf spark.cassandra.connection.host=127.0.0.1
  31. 31. How to use it Starts from the command line and can use all Spark Submit Args • ./sbin/start-thriftserver.sh • dse spark-sql-thriftserver start starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
  32. 32. Hive? Wait I though we were Doing Spark starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 Why does it say Hive everywhere? • Built on HiveServer2
  33. 33. A Brief History of the Spark Thrift Server • Thrift? • Hive?
  34. 34. They are not the Same Cassandra Thrift Hive Thrift
  35. 35. Have you heard of the
 "Ship of Theseus?" Time for a quick history More Greek stuff ..
  36. 36. When you replace all the parts of a thing Does it Remain the Same? Greek Boat
  37. 37. When you replace all the parts of a thing Does it Remain the Same? SharkServer Hive Parser Hive Optimization Map-Reduce Spark Execution JDBC Results
  38. 38. When you replace all the parts of a thing Does it Remain the Same? SharkServer
 ThriftServer Hive Parser Map-Reduce Spark Execution JDBC Results Hive Optimization
  39. 39. When you replace all the parts of a thing Does it Remain the Same? ThriftServer Hive Parser Catalyst Schema RDDs Spark Execution JDBC Results
  40. 40. When you replace all the parts of a thing Does it Remain the Same? ThriftServer Hive Parser Catalyst DataFrames Spark Execution JDBC Results
  41. 41. When you replace all the parts of a thing Does it Remain the Same? ThriftServer Hive Parser Catalyst Datasets Spark Execution JDBC Results
  42. 42. When you replace all the parts of a thing Does it Remain the Same? ThriftServer Spark Parser Catalyst DataSets Spark Execution JDBC Results
  43. 43. Almost all Spark now ThriftServer Spark Parser Catalyst DataSets Spark Execution JDBC Results
  44. 44. Connecting with Beeline (JDBC Client) ./bin/beeline dse beeline !connect jdbc:hive2://localhost:10000 Even More Hive!
  45. 45. Connect Tableau to Cassandra
  46. 46. The Full JDBC/ODBC Ecosystem Can Connect to ThriftServer
  47. 47. Incremental Collect - Because BI Tools are Mean SELECT * FROM TABLE Spark Sql Thrift Server ALL THE DATA
  48. 48. Spark Sql Thrift Server ALL THE DATAOOM Incremental Collect - Because BI Tools are Mean
  49. 49. Spark Sql Thrift Server
 spark.sql.thriftServer.incrementalCollect=true ALL THE DATASpark Partition 1 Spark Partition 2 Spark Partition 3 Incremental Collect - Because BI Tools are Mean
  50. 50. Spark Sql Thrift Server
 spark.sql.thriftServer.incrementalCollect=true ALL THE DATA Spark Partition 1 Spark Partition 2 Spark Partition 3 Incremental Collect - Because BI Tools are Mean
  51. 51. Spark Sql Thrift Server
 spark.sql.thriftServer.incrementalCollect=true ALL THE DATA Spark Partition 1 Spark Partition 2 Spark Partition 3 Incremental Collect - Because BI Tools are Mean
  52. 52. Spark Sql Thrift Server
 spark.sql.thriftServer.incrementalCollect=true ALL THE DATA Spark Partition 1Spark Partition 2 Spark Partition 3 Incremental Collect - Because BI Tools are Mean
  53. 53. Getting things done with SQL • Registering Sources • Writing to Tables • Examining Query Plans • Debugging Predicate pushdowns • Caching Views
  54. 54. Registering Sources using SQL CREATE TEMPORARY VIEW words USING format.goes.here OPTIONS ( key "value"
 )
  55. 55. Registering Sources using SQL CREATE TEMPORARY VIEW words USING org.apache.spark.sql.cassandra OPTIONS ( table "tab", keyspace "ks") Not a single monad…
  56. 56. CREATE TEMPORARY VIEW words USING org.apache.spark.sql.cassandra OPTIONS ( table "tab", keyspace "ks") Registering Sources using SQL CassandraSourceRelation
  57. 57. We Can Still Use a HiveMetaStore DSE auto registers C* Tables in a C* based Metastore MetaStore Thrift Server
  58. 58. Writing DataFrames using SQL INSERT INTO arrow SELECT * FROM words; CassandraSourceRelation words read CassandraSourceRelation arrow
 write
  59. 59. Explain to Analyze Query Plans EXPLAIN SELECT * FROM arrow WHERE C > 2;
 
 Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6069193a [k#18,c#19,v#20] 
 PushedFilters: [IsNotNull(c), GreaterThan(c,2)], 
 ReadSchema: struct<k:int,c:int,v:int> We can analyze the inside of the Catalyst just like with Scala/Java/…
  60. 60. Predicates get Pushed Down Automatically EXPLAIN SELECT * FROM arrow WHERE C > 2;
 
 Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6069193a [k#18,c#19,v#20] 
 PushedFilters: [IsNotNull(c), GreaterThan(c,2)], 
 ReadSchema: struct<k:int,c:int,v:int> CassandraSourceRelation Filter [GreaterThan(c,2)]
  61. 61. EXPLAIN SELECT * FROM arrow WHERE C > 2;
 
 Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6069193a [k#18,c#19,v#20] 
 PushedFilters: [IsNotNull(c), GreaterThan(c,2)], 
 ReadSchema: struct<k:int,c:int,v:int> CassandraSourceRelation Filter [GreaterThan(c,2)] Internal Request to Cassandra: CQL SELECT * FROM ks.bat WHERE C > 2 Automatic Pushdowns! Predicates get Pushed Down Automatically
  62. 62. Common Cases where Predicates Don't Push SELECT * from troubles WHERE c < '2017-05-27'
 *Filter (cast(c#76 as string) < 2017-05-27) +- *Scan CassandraSourceRelation@53e82b30 [k#75,c#76,v#77] PushedFilters: [IsNotNull(c)], ReadSchema: struct<k:int,c:date,v:int> Why is my date clustering column not being pushed down.
  63. 63. Common Cases where Predicates Don't Push CassandraSourceRelation Filter [LessThan(c,'2017-05-27')] SELECT * from troubles WHERE c < '2017-05-27'
 *Filter (cast(c#76 as string) < 2017-05-27) +- *Scan CassandraSourceRelation@53e82b30 [k#75,c#76,v#77] PushedFilters: [IsNotNull(c)], ReadSchema: struct<k:int,c:date,v:int>
  64. 64. Common Cases where Predicates Don't Push CassandraSourceRelation
 ReadSchema: struct<k:int,c:date,v:int> Filter [LessThan(c,'2017-05-27')] Date != String SELECT * from troubles WHERE c < '2017-05-27'
 *Filter (cast(c#76 as string) < 2017-05-27) +- *Scan CassandraSourceRelation@53e82b30 [k#75,c#76,v#77] PushedFilters: [IsNotNull(c)], ReadSchema: struct<k:int,c:date,v:int>
  65. 65. Make Sure we Cast Correctly EXPLAIN SELECT * from troubles WHERE c < cast('2017-05-27' as date);
 *Scan C*Relation PushedFilters: [IsNotNull(c), LessThan(c, 2017-05-27)] CassandraSourceRelation
 ReadSchema: struct<k:int,c:date,v:int> Filter [LessThan(c,Date('2017-05-27'))] Date == Date
  66. 66. Make Sure we Cast Correctly CassandraSourceRelation
 ReadSchema: struct<k:int,c:date,v:int> Filter [LessThan(c,Date('2017-05-27'))] Automatic Pushdowns! EXPLAIN SELECT * from troubles WHERE c < cast('2017-05-27' as date);
 *Scan C*Relation PushedFilters: [IsNotNull(c), LessThan(c, 2017-05-27)]
  67. 67. DSE Search Automatic Pushdowns! EXPLAIN SELECT * from troubles WHERE v < 6;
 *Scan C*Relation PushedFilters: [IsNotNull(v), LessThan(v, 6)] CassandraSourceRelation
 ReadSchema: struct<k:int,c:date,v:int> Solr_Query
  68. 68. DSE Search Automatic Pushdowns!
  69. 69. DSE Search Automatic Pushdowns! Count Happens in the Index DSE
 Continuous Paging
  70. 70. Cache a whole table CassandraSourceRelation InMemoryRelation CACHE TABLE ks.tab;
 explain SELECT * FROM ks.tab; == Physical Plan == InMemoryTableScan [k#0, c#1, v#2] : +- InMemoryRelation StorageLevel(disk, memory, deserialized, 1 replicas), `ks`.`tab` : : +- *Scan CassandraSourceRelation
  71. 71. Uncache CassandraSourceRelation UNCACHE TABLE ks.tab; explain SELECT * FROM ks.tab; == Physical Plan == *Scan CassandraSourceRelation
  72. 72. Cache a fraction of Data CassandraSourceRelation CACHE TABLE somedata SELECT * FROM ks.tab WHERE c > 2;
 explain SELECT * from somedata; == Physical Plan == InMemoryTableScan : +- InMemoryRelation `somedata` : : +- *Scan CassandraSourceRelation PushedFilters: [IsNotNull(c), GreaterThan(c,2)] Filter [GreaterThan(c,2)] InMemoryRelation somedata
  73. 73. Let this be a starting point • https://github.com/datastax/spark-cassandra-connector • https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md • https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-thrift-server.html • http://docs.datastax.com/en/dse/5.1/dse-dev/datastax_enterprise/spark/sparkSqlThriftServer.html • https://spark.apache.org/docs/latest/sql-programming-guide.html#distributed-sql-engine • https://www.datastax.com/dev/blog/dse-5-1-automatic-optimization-of-spark-sql-queries-using-dse-search • https://www.datastax.com/dev/blog/dse-continuous-paging-tuning-and-support-guide
  74. 74. Thank You. http://www.russellspitzer.com/
 @RussSpitzer Come chat with us at DataStax Academy: 
 https://academy.datastax.com/slack

×

Related Articles

python
cassandra
spark

GitHub - andreia-negreira/Data_streaming_project: Data streaming project with robust end-to-end pipeline, combining tools such as Airflow, Kafka, Spark, Cassandra and containerized solution to easy deployment.

andreia-negreira

12/2/2023

cassandra
spark

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra