Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

12/2/2020

Reading time:12 mins

Maximum Overdrive: Tuning the Spark Cassandra Connector (Russell Spit…

by DataStax

Maximum Overdrive: Tuning the Spark Cassandra Connector (Russell Spit… SlideShare Explore You Successfully reported this slideshow.Maximum Overdrive: Tuning the Spark Cassandra Connector (Russell Spitzer, DataStax) | C* Summit 2016Upcoming SlideShareLoading in …5× 7 Comments 12 Likes Statistics Notes amit rahul Jeevan Pingali , Senior Architect, CAG, MindTree at Mindtree Lucian Neghina Aditya Kumar Praharaj Joshua Smith Show More No DownloadsNo notes for slide 1. Maximum Overdrive: 
Tuning the Spark Cassandra ConnectorRussell Spitzer, Datastax 2. © DataStax, All Rights Reserved.Who is this guy and why should I listen to him?2Russell Spitzer, Passing Software Engineer•Been working at DataStax since 2013•Worked in Test Engineering and now Analytics Dev•Working with Spark since 0.9•Working with Cassandra since 1.2•Main focus: the Spark Cassandra Connector•Surgically grafted to the Spark Cassandra Connector Mailing List 3. © DataStax, All Rights Reserved.The Spark Cassandra ConnectorConnects Spark to Cassandra3It's all there in the name•Provides a DataSource for Datasets/Data Frames•Provides methods for Writing DataSets/Data Frames•Reading and Writing RDD•Connection Pooling•Type Conversions and Mapping•Data Locality•Open Source Software!https://github.com/datastax/spark-cassandra-connector 4. © DataStax, All Rights Reserved. 4WARNING: THIS TALK WILL CONTAIN TECHNICAL DETAILS AND EXPLICIT SCALADISTRIBUTED SYSTEMSTuning the Spark Cassandra ConnectorDISTRIBUTED SYSTEMS 5. © DataStax, All Rights Reserved.1 Lots of Write Tuning2 A Bit of Read Tuning5 6. © DataStax, All Rights Reserved. 6Context is Very ImportantKnowing your Data is Key for Maximum Performance 7. © DataStax, All Rights Reserved.Write Tuning in the SCC Is all about Batching7Batches aren't goodfor performance inCassandra.Not when the writeswithin the batch are in the samePartition and they are unlogged!
I keep telling you this! 8. © DataStax, All Rights Reserved.Multi-Partition Key Batches put load on theCoordinator8THE BATCHCassandraClusterRow RowRowRow 9. © DataStax, All Rights Reserved.Multi-Partition Key Batches put load on theCoordinator9CassandraClusterA batch moves as a single entityto the Coordinator for that write
This batch has to sit there untilall the portions of it get confirmedat their set consistency level 10. © DataStax, All Rights Reserved.Multi-Partition Key Batches put load on theCoordinator10CassandraClusterEven when some portions of thebatch finish early we have to waituntil the entire thing is done beforewe can respond to the client. 11. © DataStax, All Rights Reserved.We end up with a lot of rows just sitting around inmemory waiting for others to get out of the way11 12. © DataStax, All Rights Reserved.Single Partition Batches are Treated asA Single Mutation in Cassandra12THE BATCHRow Row RowRowRow RowRow Row RowRow Row RowCassandraCluster 13. © DataStax, All Rights Reserved.Single Partition Batches are Treated asA Single Mutation in Cassandra13CassandraClusterNow the entire batch can betreated as a single mutation. Wealso only have to wait for one setof replicas 14. © DataStax, All Rights Reserved.When all of the Rows are Going to the Same PlaceWriting to Cassandra is Fast14 15. The Connector Will Automatically Batch Writes15rdd.saveToCassandra("bestkeyspace", "besttable")df.write
 .format("org.apache.spark.sql.cassandra")
 .options(Map("table" -> "besttable", "keyspace" -> "bestkeyspace"))
 .save()import org.apache.spark.sql.cassandra._ df.write
 .cassandraFormat("besttable", "bestkeyspace")
 .save()RDDDataFrame 16. By default batching happens on 
Identical Partition Key16https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parametersWriteConf(batchGroupingKey= ?)Change it as a SparkConf or DataFrame ParameterOr directly pass in a WriteConf 17. Batches are Placed in Holding Until CertainThresholds are hit17https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters 18. Batches are Placed in Holding Until CertainThresholds are hit18https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parametersoutput.batch.grouping.buffer.sizeoutput.batch.size.bytes / output.batch.size.rowsoutput.concurrent.writesoutput.consistency.level 19. Batches are Placed in Holding Until CertainThresholds are hit19https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parametersoutput.batch.grouping.buffer.sizeoutput.batch.size.bytes / output.batch.size.rowsoutput.concurrent.writesoutput.consistency.level 20. Batches are Placed in Holding Until CertainThresholds are hit20https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parametersoutput.batch.grouping.buffer.sizeoutput.batch.size.bytes / output.batch.size.rowsoutput.concurrent.writesoutput.consistency.level 21. Batches are Placed in Holding Until CertainThresholds are hit21https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parametersoutput.batch.grouping.buffer.sizeoutput.batch.size.bytes / output.batch.size.rowsoutput.concurrent.writesoutput.consistency.level 22. Spark Cassandra Stress for Running BasicBenchmarks22https://github.com/datastax/spark-cassandra-stressRunning Benchmarks on a bunch of AWS Machines, 
5 M3.2XLarge
DSE 5.0.1Spark 1.6.1
Spark CC 1.6.0RF = 32 M Writes/ 100K C* Partitions and 400 Spark Partitions
Caveat: Don't benchmark exactly like thisI'm making some bad decisions to to make some broad points 23. Depending on your use case Sorting within Partitionscan Greatly Increase Write Performance23https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters020406080Rows Out of Order Rows In Order7728Default Conf kOps/sGrouping on Partition Key
The Safest Thing You Can Do 24. Including everything in the Batch24https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters032.56597.5130Rows Out of Order Rows In Order125697728Default Conf kOps/sNo Batch KeyMay be Safe For Short DurationsBUT WILL LEAD TO SYSTEM
INSTABILITY 25. Grouping on Replica Set25https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters032.56597.5130Rows Out of Order Rows In Order125707728Default Conf kOps/sGrouped on Replica SetSafer, But still will putextra load on the Coordinator 26. Remember the Tortoise vs the Hare26Overwhelming Cassandra will slow you downLimit the amount of writes per executor : output.throughput_mb_per_sec
Limit maximum executor cores : spark.max.coresLower concurrency : output.concurrent.writesDEPENDING ON DISK PERFORMANCE YOUR
INITIAL SPEEDS IN BENCHMARKING MAY 
NOT BE SUSTAINABLE 27. For Example Lets run with Batch Key None for aLonger Test (20M writes)27[Stage 0:=========================> (191 + 15) / 400]WARN 2016-08-19 21:11:55,817org.apache.spark.scheduler.TaskSetManager: Lost task 192.0 in stage 0.0 (TID 193, ip-172-31-13-127.us-west-1.compute.internal):java.io.IOException: Failed to write statements to ks.tab.at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:166)at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:134)at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:134)at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)at org.apache.spark.scheduler.Task.run(Task.scala:89)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745) 28. For Example Lets run with Batch Key None for aLonger Test (20M writes)28[Stage 0:=========================> (191 + 15) / 400]WARN 2016-08-19 21:11:55,817org.apache.spark.scheduler.TaskSetManager: Lost task 192.0 in stage 0.0 (TID 193, ip-172-31-13-127.us-west-1.compute.internal):java.io.IOException: Failed to write statements to ks.tab.at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:166)at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:134)at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:134)at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)at org.apache.spark.scheduler.Task.run(Task.scala:89)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745) 29. Back to Default PartitionKey Batching29050100150200Rows Out of Order Rows In Order190397728Initial Run kOps/s10X Length RunSo why are we doing somuch better over a longerrun? 30. Back to Default PartitionKey Batching30050100150200Rows Out of Order Rows In Order190397728Initial Run kOps/s10X Length Run400 Spark Partitions in Both Cases
2M/ 400 = 5000
20M / 400 = 50000 31. Having Too Many Partitions will Slow Down yourWrites31Every task has Setup and Teardown andwe can only build up good batches if thereare enough elements to build them from 32. Depending on your use case Sorting within Partitionscan Greatly Increase Write Performance32https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters050100150200Rows Out of Order Rows In Order1903910X Length RunA spark sort on partition key
may speed up your total operationby several fold 33. Maximizing performance for out of Order Writes or NoClustering Keys33https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters050100150200Rows Out of Order Rows In Order59471903910X Length RunModified Conf kOps/sTurn Off BatchingIncrease Concurrencyspark.cassandra.output.batch.size.rows 1 spark.cassandra.output.concurrent.writes 2000 34. Maximizing performance for out of Order Writes or NoClustering Keys34https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters050100150200Rows Out of Order Rows In Order59471903910X Length RunModified Conf kOps/sTurn Off BatchingIncrease Concurrencyspark.cassandra.output.batch.size.rows 1 spark.cassandra.output.concurrent.writes 2000 SinglePartition Batchesare good! I keeptelling you! 35. This turns the connector into a Multi-Machine CassandraLoader (Basically just executeAsync as fast as possible)35https://github.com/brianmhess/cassandra-loader 36. Now Let's Talk About Reading!36 37. Read Tuning mostly About Partitioning37• RDDs are a large Dataset Broken Into Bits,• These bits are call Partitions• Cassandra Partitions != Spark Partitions• Spark Partitions are sized based on the estimated data size of the underlying C* table• input.split.size_in_mbTokenRangeSpark Partitions 38. OOMs Caused by Spark Partitions Holding Too MuchData38Executor JVM HeapCore 1Core 2Core 3As a general rule of thumb your Executor should beset to hold 
Number of Cores * Size of Partition * 1.2
See a lot of GC? OOM? Increase the amount of partitionsSome Caveats• We don't know the actual partition size until runtime• Cassandra on disk memory usage != in memory size 39. OOMs Caused by Spark Partitions Holding Too MuchData39Executor JVM HeapCore 1Core 2Core 3input.split.size_in_mb 64 Approx amount of data to be fetched into a 
Spark partition. Minimum number of resulting 
Spark partitions is 
1 + 2 * SparkContext.defaultParallelismsplit.size_in_mb compares uses the system table size_esitmatesto determine how many Cassandra Partitions should be in a 
Spark Partition. 
Due to Compression and Inflation, the actual in memory sizecan be much larger 40. Certain Queries can't be broken Up40• Hot Spots Make a Spark Partition OOM• Full C* Partition in Spark Partiton 41. Certain Queries can't be broken Up41• Hot Spots Make a Spark Partition OOM• Full C* Partition in Spark Partiton• Single Partition Lookups• Can't do anything about this• Don't know how partition is distributed 42. Certain Queries can't be broken Up42• Hot Spots Make a Spark Partition OOM• Full C* Partition in Spark Partiton• Single Partition Lookups• Can't do anything about this• Don't know how partition is distributed• IN clauses• Replace with JoinWithCassandraTable
• If all else fails use CassandraConnector 43. Read speed is mostly dictated by Cassandra'sPaging Speed43input.fetch.size_in_rows 1000 Number of CQL rows fetched per driver request 44. Cassandra of the Future, As Fast as CSV!?!44https://issues.apache.org/jira/browse/CASSANDRA-9259 :Bulk Reading from Cassandra
Stefania Alborghetti 45. In Summation, Know your Data• Write Tuning• Batching Key• Sorting• Turning off Batching When Beneficial• Having enough Data in a Task• Read Tuning• Number of Partitions• Some Queries can't be Broken Up• Changing paging from Cassandra• Future bulk read speedup!45 46. 46The End 47. Don't Let it End Like That!Contribute to the Spark Cassandra Connector47• OSS Project that loves community involvement• Bug Reports• Feature Requests• Write Code• Doc Improvements• Come join us!https://github.com/datastax/spark-cassandra-connector 48. See you on the mailing list!https://github.com/datastax/spark-cassandra-connector Recommended How to size up an Apache Cassandra cluster (Training)DataStax Academy Is Your Enterprise Ready to Shine This Holiday Season?DataStax Serverless Functions with Datastax DriversDataStax Solving IoT Data Management ChallengesDataStax The Value of a Relationship-first Approach with Datastax GraphDataStax Designing Fault-Tolerant Applications with DataStax Enterprise and Apache Cas...DataStax Running DataStax Enterprise in VMware Cloud and Hybrid EnvironmentsDataStax Best Practices for Getting to Production with DataStax Enterprise GraphDataStax Webinar | Data Management for Hybrid and Multi-Cloud: A Four-Step JourneyDataStax Webinar | Graph Data at Scale: Lessons from Building the World’s Largest Di...DataStax About Blog Terms Privacy Copyright × Public clipboards featuring this slideNo public clipboards found for this slideSelect another clipboard ×Looks like you’ve clipped this slide to already.Create a clipboardYou just clipped your first slide! Clipping is a handy way to collect important slides you want to go back to later. Now customize the name of a clipboard to store your clips. Description Visibility Others can see my Clipboard

Illustration Image
Maximum Overdrive: Tuning the Spark Cassandra Connector (Russell Spit…

Successfully reported this slideshow.

Maximum Overdrive: Tuning the Spark Cassandra Connector (Russell Spitzer, DataStax) | C* Summit 2016
Maximum Overdrive: 

Tuning the Spark Cassandra Connector
Russell Spitzer, Datastax
© DataStax, All Rights Reserved.
Who is this guy and why should I listen to him?
2
Russell Spitzer, Passing Software Engin...
© DataStax, All Rights Reserved.
The Spark Cassandra Connector
Connects Spark to Cassandra
3
It's all there in the name
•P...
© DataStax, All Rights Reserved. 4
WARNING: THIS TALK WILL CONTAIN TECHNICAL DETAILS AND EXPLICIT SCALA
DISTRIBUTED SYSTEM...
© DataStax, All Rights Reserved.
1 Lots of Write Tuning
2 A Bit of Read Tuning
5
© DataStax, All Rights Reserved. 6
Context is Very Important
Knowing your Data is Key for Maximum Performance
© DataStax, All Rights Reserved.
Write Tuning in the SCC Is all about Batching
7
Batches aren't good
for performance in
Ca...
© DataStax, All Rights Reserved.
Multi-Partition Key Batches put load on the
Coordinator
8
THE BATCH
Cassandra
Cluster
Row...
© DataStax, All Rights Reserved.
Multi-Partition Key Batches put load on the
Coordinator
9
Cassandra
Cluster
A batch moves...
© DataStax, All Rights Reserved.
Multi-Partition Key Batches put load on the
Coordinator
10
Cassandra
Cluster
Even when so...
© DataStax, All Rights Reserved.
We end up with a lot of rows just sitting around in
memory waiting for others to get out ...
© DataStax, All Rights Reserved.
Single Partition Batches are Treated as
A Single Mutation in Cassandra
12
THE BATCH
Row R...
© DataStax, All Rights Reserved.
Single Partition Batches are Treated as
A Single Mutation in Cassandra
13
Cassandra
Clust...
© DataStax, All Rights Reserved.
When all of the Rows are Going to the Same Place
Writing to Cassandra is Fast
14
The Connector Will Automatically Batch Writes
15
rdd.saveToCassandra("bestkeyspace",	"besttable")
df.write

		.format("org...
By default batching happens on 

Identical Partition Key
16
https://github.com/datastax/spark-cassandra-connector/blob/mas...
Batches are Placed in Holding Until Certain
Thresholds are hit
17
https://github.com/datastax/spark-cassandra-connector/bl...
Batches are Placed in Holding Until Certain
Thresholds are hit
18
https://github.com/datastax/spark-cassandra-connector/bl...
Batches are Placed in Holding Until Certain
Thresholds are hit
19
https://github.com/datastax/spark-cassandra-connector/bl...
Batches are Placed in Holding Until Certain
Thresholds are hit
20
https://github.com/datastax/spark-cassandra-connector/bl...
Batches are Placed in Holding Until Certain
Thresholds are hit
21
https://github.com/datastax/spark-cassandra-connector/bl...
Spark Cassandra Stress for Running Basic
Benchmarks
22
https://github.com/datastax/spark-cassandra-stress
Running Benchmar...
Depending on your use case Sorting within Partitions
can Greatly Increase Write Performance
23
https://github.com/datastax...
Including everything in the Batch
24
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#wr...
Grouping on Replica Set
25
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning...
Remember the Tortoise vs the Hare
26
Overwhelming Cassandra will slow you down
Limit the amount of writes per executor : o...
For Example Lets run with Batch Key None for a
Longer Test (20M writes)
27
[Stage 0:=========================> (191 + 15) ...
For Example Lets run with Batch Key None for a
Longer Test (20M writes)
28
[Stage 0:=========================> (191 + 15) ...
Back to Default PartitionKey Batching
29
0
50
100
150
200
Rows Out of Order Rows In Order
190
39
77
28
Initial Run kOps/s
...
Back to Default PartitionKey Batching
30
0
50
100
150
200
Rows Out of Order Rows In Order
190
39
77
28
Initial Run kOps/s
...
Having Too Many Partitions will Slow Down your
Writes
31
Every task has Setup and Teardown and
we can only build up good b...
Depending on your use case Sorting within Partitions
can Greatly Increase Write Performance
32
https://github.com/datastax...
Maximizing performance for out of Order Writes or No
Clustering Keys
33
https://github.com/datastax/spark-cassandra-connec...
Maximizing performance for out of Order Writes or No
Clustering Keys
34
https://github.com/datastax/spark-cassandra-connec...
This turns the connector into a Multi-Machine Cassandra
Loader (Basically just executeAsync as fast as possible)
35
https:...
Now Let's Talk About Reading!
36
Read Tuning mostly About Partitioning
37
• RDDs are a large Dataset Broken Into Bits,
• These bits are call Partitions
• C...
OOMs Caused by Spark Partitions Holding Too Much
Data
38
Executor JVM Heap
Core 1
Core 2
Core 3
As a general rule of thumb...
OOMs Caused by Spark Partitions Holding Too Much
Data
39
Executor JVM Heap
Core 1
Core 2
Core 3
input.split.size_in_mb	 64...
Certain Queries can't be broken Up
40
• Hot Spots Make a Spark Partition OOM
• Full C* Partition in Spark Partiton
Certain Queries can't be broken Up
41
• Hot Spots Make a Spark Partition OOM
• Full C* Partition in Spark Partiton
• Singl...
Certain Queries can't be broken Up
42
• Hot Spots Make a Spark Partition OOM
• Full C* Partition in Spark Partiton
• Singl...
Read speed is mostly dictated by Cassandra's
Paging Speed
43
input.fetch.size_in_rows 1000 Number of CQL rows fetched per ...
Cassandra of the Future, As Fast as CSV!?!
44
https://issues.apache.org/jira/browse/CASSANDRA-9259 :
Bulk Reading from Cas...
In Summation, Know your Data
• Write Tuning
• Batching Key
• Sorting
• Turning off Batching When Beneficial
• Having enoug...
46
The End
Don't Let it End Like That!
Contribute to the Spark Cassandra Connector
47
• OSS Project that loves community involvement
...
See you on the mailing list!
https://github.com/datastax/spark-cassandra-connector

Upcoming SlideShare

Loading in …5

×

  1. 1. Maximum Overdrive: 
 Tuning the Spark Cassandra Connector Russell Spitzer, Datastax
  2. 2. © DataStax, All Rights Reserved. Who is this guy and why should I listen to him? 2 Russell Spitzer, Passing Software Engineer •Been working at DataStax since 2013 •Worked in Test Engineering and now Analytics Dev •Working with Spark since 0.9 •Working with Cassandra since 1.2 •Main focus: the Spark Cassandra Connector •Surgically grafted to the Spark Cassandra Connector Mailing List
  3. 3. © DataStax, All Rights Reserved. The Spark Cassandra Connector Connects Spark to Cassandra 3 It's all there in the name •Provides a DataSource for Datasets/Data Frames •Provides methods for Writing DataSets/Data Frames •Reading and Writing RDD •Connection Pooling •Type Conversions and Mapping •Data Locality •Open Source Software! https://github.com/datastax/spark-cassandra-connector
  4. 4. © DataStax, All Rights Reserved. 4 WARNING: THIS TALK WILL CONTAIN TECHNICAL DETAILS AND EXPLICIT SCALA DISTRIBUTED SYSTEMS Tuning the Spark Cassandra Connector DISTRIBUTED SYSTEMS
  5. 5. © DataStax, All Rights Reserved. 1 Lots of Write Tuning 2 A Bit of Read Tuning 5
  6. 6. © DataStax, All Rights Reserved. 6 Context is Very Important Knowing your Data is Key for Maximum Performance
  7. 7. © DataStax, All Rights Reserved. Write Tuning in the SCC Is all about Batching 7 Batches aren't good for performance in Cassandra. Not when the writes within the batch are in the same Partition and they are unlogged!
 I keep telling you this!
  8. 8. © DataStax, All Rights Reserved. Multi-Partition Key Batches put load on the Coordinator 8 THE BATCH Cassandra Cluster Row Row Row Row
  9. 9. © DataStax, All Rights Reserved. Multi-Partition Key Batches put load on the Coordinator 9 Cassandra Cluster A batch moves as a single entity to the Coordinator for that write
 
 This batch has to sit there until all the portions of it get confirmed at their set consistency level
  10. 10. © DataStax, All Rights Reserved. Multi-Partition Key Batches put load on the Coordinator 10 Cassandra Cluster Even when some portions of the batch finish early we have to wait until the entire thing is done before we can respond to the client.
  11. 11. © DataStax, All Rights Reserved. We end up with a lot of rows just sitting around in memory waiting for others to get out of the way 11
  12. 12. © DataStax, All Rights Reserved. Single Partition Batches are Treated as A Single Mutation in Cassandra 12 THE BATCH Row Row Row RowRow Row Row Row Row Row Row Row Cassandra Cluster
  13. 13. © DataStax, All Rights Reserved. Single Partition Batches are Treated as A Single Mutation in Cassandra 13 Cassandra Cluster Now the entire batch can be treated as a single mutation. We also only have to wait for one set of replicas
  14. 14. © DataStax, All Rights Reserved. When all of the Rows are Going to the Same Place Writing to Cassandra is Fast 14
  15. 15. The Connector Will Automatically Batch Writes 15 rdd.saveToCassandra("bestkeyspace", "besttable") df.write
 .format("org.apache.spark.sql.cassandra")
 .options(Map("table" -> "besttable", "keyspace" -> "bestkeyspace"))
 .save() import org.apache.spark.sql.cassandra._ df.write
 .cassandraFormat("besttable", "bestkeyspace")
 .save() RDD DataFrame
  16. 16. By default batching happens on 
 Identical Partition Key 16 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters WriteConf(batchGroupingKey= ?) Change it as a SparkConf or DataFrame Parameter Or directly pass in a WriteConf
  17. 17. Batches are Placed in Holding Until Certain Thresholds are hit 17 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters
  18. 18. Batches are Placed in Holding Until Certain Thresholds are hit 18 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters output.batch.grouping.buffer.size output.batch.size.bytes / output.batch.size.rows output.concurrent.writes output.consistency.level
  19. 19. Batches are Placed in Holding Until Certain Thresholds are hit 19 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters output.batch.grouping.buffer.size output.batch.size.bytes / output.batch.size.rows output.concurrent.writes output.consistency.level
  20. 20. Batches are Placed in Holding Until Certain Thresholds are hit 20 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters output.batch.grouping.buffer.size output.batch.size.bytes / output.batch.size.rows output.concurrent.writes output.consistency.level
  21. 21. Batches are Placed in Holding Until Certain Thresholds are hit 21 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters output.batch.grouping.buffer.size output.batch.size.bytes / output.batch.size.rows output.concurrent.writes output.consistency.level
  22. 22. Spark Cassandra Stress for Running Basic Benchmarks 22 https://github.com/datastax/spark-cassandra-stress Running Benchmarks on a bunch of AWS Machines, 
 5 M3.2XLarge
 DSE 5.0.1 Spark 1.6.1
 Spark CC 1.6.0 RF = 3 2 M Writes/ 100K C* Partitions and 400 Spark Partitions
 
 Caveat: Don't benchmark exactly like this I'm making some bad decisions to to make some broad points
  23. 23. Depending on your use case Sorting within Partitions can Greatly Increase Write Performance 23 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters 0 20 40 60 80 Rows Out of Order Rows In Order 77 28 Default Conf kOps/s Grouping on Partition Key
 The Safest Thing You Can Do
  24. 24. Including everything in the Batch 24 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters 0 32.5 65 97.5 130 Rows Out of Order Rows In Order 125 69 77 28 Default Conf kOps/s No Batch Key May be Safe For Short Durations BUT WILL LEAD TO SYSTEM
 INSTABILITY
  25. 25. Grouping on Replica Set 25 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters 0 32.5 65 97.5 130 Rows Out of Order Rows In Order 125 70 77 28 Default Conf kOps/s Grouped on Replica Set Safer, But still will put extra load on the Coordinator
  26. 26. Remember the Tortoise vs the Hare 26 Overwhelming Cassandra will slow you down Limit the amount of writes per executor : output.throughput_mb_per_sec
 Limit maximum executor cores : spark.max.cores Lower concurrency : output.concurrent.writes DEPENDING ON DISK PERFORMANCE YOUR
 INITIAL SPEEDS IN BENCHMARKING MAY 
 NOT BE SUSTAINABLE
  27. 27. For Example Lets run with Batch Key None for a Longer Test (20M writes) 27 [Stage 0:=========================> (191 + 15) / 400]WARN 2016-08-19 21:11:55,817 org.apache.spark.scheduler.TaskSetManager: Lost task 192.0 in stage 0.0 (TID 193, ip-172-31-13-127.us-west-1.compute.internal): java.io.IOException: Failed to write statements to ks.tab. at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:166) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:134) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:134) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
  28. 28. For Example Lets run with Batch Key None for a Longer Test (20M writes) 28 [Stage 0:=========================> (191 + 15) / 400]WARN 2016-08-19 21:11:55,817 org.apache.spark.scheduler.TaskSetManager: Lost task 192.0 in stage 0.0 (TID 193, ip-172-31-13-127.us-west-1.compute.internal): java.io.IOException: Failed to write statements to ks.tab. at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:166) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:134) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:134) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
  29. 29. Back to Default PartitionKey Batching 29 0 50 100 150 200 Rows Out of Order Rows In Order 190 39 77 28 Initial Run kOps/s 10X Length Run So why are we doing so much better over a longer run?
  30. 30. Back to Default PartitionKey Batching 30 0 50 100 150 200 Rows Out of Order Rows In Order 190 39 77 28 Initial Run kOps/s 10X Length Run 400 Spark Partitions in Both Cases
 2M/ 400 = 5000
 20M / 400 = 50000
  31. 31. Having Too Many Partitions will Slow Down your Writes 31 Every task has Setup and Teardown and we can only build up good batches if there are enough elements to build them from
  32. 32. Depending on your use case Sorting within Partitions can Greatly Increase Write Performance 32 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters 0 50 100 150 200 Rows Out of Order Rows In Order 190 39 10X Length Run A spark sort on partition key
 may speed up your total operation by several fold
  33. 33. Maximizing performance for out of Order Writes or No Clustering Keys 33 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters 0 50 100 150 200 Rows Out of Order Rows In Order 59 47 190 39 10X Length Run Modified Conf kOps/s Turn Off Batching Increase Concurrency spark.cassandra.output.batch.size.rows 1 spark.cassandra.output.concurrent.writes 2000
  34. 34. Maximizing performance for out of Order Writes or No Clustering Keys 34 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters 0 50 100 150 200 Rows Out of Order Rows In Order 59 47 190 39 10X Length Run Modified Conf kOps/s Turn Off Batching Increase Concurrency spark.cassandra.output.batch.size.rows 1 spark.cassandra.output.concurrent.writes 2000 Single Partition Batches are good! I keep telling you!
  35. 35. This turns the connector into a Multi-Machine Cassandra Loader (Basically just executeAsync as fast as possible) 35 https://github.com/brianmhess/cassandra-loader
  36. 36. Now Let's Talk About Reading! 36
  37. 37. Read Tuning mostly About Partitioning 37 • RDDs are a large Dataset Broken Into Bits, • These bits are call Partitions • Cassandra Partitions != Spark Partitions • Spark Partitions are sized based on the estimated data size of the underlying C* table • input.split.size_in_mb TokenRange Spark Partitions
  38. 38. OOMs Caused by Spark Partitions Holding Too Much Data 38 Executor JVM Heap Core 1 Core 2 Core 3 As a general rule of thumb your Executor should be set to hold 
 
 Number of Cores * Size of Partition * 1.2
 
 See a lot of GC? OOM? Increase the amount of partitions Some Caveats • We don't know the actual partition size until runtime • Cassandra on disk memory usage != in memory size
  39. 39. OOMs Caused by Spark Partitions Holding Too Much Data 39 Executor JVM Heap Core 1 Core 2 Core 3 input.split.size_in_mb 64 Approx amount of data to be fetched into a 
 Spark partition. Minimum number of resulting 
 Spark partitions is 
 1 + 2 * SparkContext.defaultParallelism split.size_in_mb compares uses the system table size_esitmates to determine how many Cassandra Partitions should be in a 
 Spark Partition. 
 
 Due to Compression and Inflation, the actual in memory size can be much larger
  40. 40. Certain Queries can't be broken Up 40 • Hot Spots Make a Spark Partition OOM • Full C* Partition in Spark Partiton
  41. 41. Certain Queries can't be broken Up 41 • Hot Spots Make a Spark Partition OOM • Full C* Partition in Spark Partiton • Single Partition Lookups • Can't do anything about this • Don't know how partition is distributed
  42. 42. Certain Queries can't be broken Up 42 • Hot Spots Make a Spark Partition OOM • Full C* Partition in Spark Partiton • Single Partition Lookups • Can't do anything about this • Don't know how partition is distributed • IN clauses • Replace with JoinWithCassandraTable
 
 
 
 • If all else fails use CassandraConnector
  43. 43. Read speed is mostly dictated by Cassandra's Paging Speed 43 input.fetch.size_in_rows 1000 Number of CQL rows fetched per driver request
  44. 44. Cassandra of the Future, As Fast as CSV!?! 44 https://issues.apache.org/jira/browse/CASSANDRA-9259 : Bulk Reading from Cassandra
 Stefania Alborghetti
  45. 45. In Summation, Know your Data • Write Tuning • Batching Key • Sorting • Turning off Batching When Beneficial • Having enough Data in a Task • Read Tuning • Number of Partitions • Some Queries can't be Broken Up • Changing paging from Cassandra • Future bulk read speedup! 45
  46. 46. 46 The End
  47. 47. Don't Let it End Like That! Contribute to the Spark Cassandra Connector 47 • OSS Project that loves community involvement • Bug Reports • Feature Requests • Write Code • Doc Improvements • Come join us! https://github.com/datastax/spark-cassandra-connector
  48. 48. See you on the mailing list! https://github.com/datastax/spark-cassandra-connector

×

Related Articles

python
cassandra
spark

GitHub - andreia-negreira/Data_streaming_project: Data streaming project with robust end-to-end pipeline, combining tools such as Airflow, Kafka, Spark, Cassandra and containerized solution to easy deployment.

andreia-negreira

12/2/2023

cassandra
spark

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra