Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

1/8/2020

Reading time:3 min

Optimizing Spark SQL JOIN statements for High Performance

by John Doe

Similar to SQL performance Spark SQL performance also depends on several factors. Hardware resources like the size of your compute resources,  network bandwidth and your data model, application design, query construction etc.  Data model is the most critical factor among all non-hardware related factors. Initially, I wanted to blog about the data modeling aspects of optimization. However, I’ve decided to write about Spark-SQL inner workings as they are the core foundation behind efficient data modeling when using Spark-SQL.I have used Datastax Enterprise 4.8 version for writing this article. DSE 4.8 comes with Spark 1.4.x and Cassandra 2.1.x. Although I’m explaining Spark-SQL from Cassandra data source perspective, similar concepts can be applied to other data sources supported by Spark SQL.In order to optimize Spark SQL for high performance we first need to understand how Spark SQL is executed by Spark catalyst optimizer.Let’s use below example Spark SQL statement.Spark creates a plan similar to below.Spark plan can be divided into 3 phases.Phase 1: Read data from individual tables. Phase 2:  Join tablesPhase 3: Aggregation.Below is the hypothetical execution timeline for each phase.Optimization Rule #1:  Include predicates for all tables in Spark SQL query.Spark SQL tries to optimize tasks for parallel  execution. First, it will break join statements into individual table SQL statements. It will read all tables in parallel.Total time spent in read phase = Max read time taken by a single table.Here is why you need to include predicates against all tables.  In normal SQL against relational database we provide predicates against only on table. Usually, on the smaller tables. RDBMS would know you to leverage filtered data from smaller table to efficiently read other tables. If you write similar query in Spark SQL, spark will create individual table SELECT statements like below during read phase.This means only table T1 will get filtered data from its data source (Cassandra in this case) into spark memory. All other tables will become full table scans. Full table scans are bad for performance in Spark.In order to achieve high read performance we have to write spark SQL like below.Spark will have better read performance when data is filtered at data source. However, just adding any predicates against Cassandra table columns will guarantee that data will be filtered at Cassandra data source. There are certain predicate pushdown rules apply to Spark-SQL. These rules depend on the data source behind each table. I will be covering Cassandra predicate pushdown rules in a separate post.Optimization Rule #2:  Minimize number of spark tasks in scan/read phaseSpark plan creates multiple stages in read phase to read each table. There is a separate stage for each table. The number of tasks in each stage depends on the number of data partitions spark has to read into memory. By having efficient partitioning strategy on tables and utilizing proper predicates against partitions you can minimize the number of tasks in read stage.For example, Single partition scans using = operator will create fewer tasks than multi-partition scans using “IN” operator.Optimization Rule #3:  Order of tables in FROM clause matters. Keep the table with the largest size at the top.Spark table is based on Dataframe which is based on RDD. In simple terms, RDD is a distribute collection. Spark SQL JOIN operation is very similar to fold left operation on a collection.This post is not about Scala or functional programming concepts. However, it helps to know how fold left operation works on a collection. Refer to below link for the explanation of fold left. http://alvinalexander.com/scala/how-to-walk-scala-collections-reduceleft-foldright-cookbookAs you can see in the picture of Spark plan JOIN operation requires shuffling data from second table executors to first table executors to perform JOIN operation. Shuffle is very expensive operation on IO & CPU. This operation is repeated until all tables to the right are merged with the result on the left.By keeping the table with largest data size in join operation at the top you are avoiding shuffling largest data.Optimization Rule #4:  Keep consistent partitioning strategy across JOINing tables  By having common partition keys it makes it easier to write queries with filters that can be applied across many of the joining tables.Optimization Rule #5:  Minimize the number of tables in JOIN.As you can see in the timeline of the plan picture, reads are parallel operations. However, JOINs are sequential steps. That means every join step adds to the timeline of the total execution of the query.Use proper denormalization techniques to reduce the number of tables in your data model.

Illustration Image

Similar to SQL performance Spark SQL performance also depends on several factors. Hardware resources like the size of your compute resources,  network bandwidth and your data model, application design, query construction etc.  Data model is the most critical factor among all non-hardware related factors. Initially, I wanted to blog about the data modeling aspects of optimization. However, I’ve decided to write about Spark-SQL inner workings as they are the core foundation behind efficient data modeling when using Spark-SQL.

I have used Datastax Enterprise 4.8 version for writing this article. DSE 4.8 comes with Spark 1.4.x and Cassandra 2.1.x. Although I’m explaining Spark-SQL from Cassandra data source perspective, similar concepts can be applied to other data sources supported by Spark SQL.

In order to optimize Spark SQL for high performance we first need to understand how Spark SQL is executed by Spark catalyst optimizer.

Let’s use below example Spark SQL statement.

Sample SQL

Spark creates a plan similar to below.

Plan

Spark plan can be divided into 3 phases.

Phase 1: Read data from individual tables. 

Phase 2:  Join tables

Phase 3: Aggregation.

Below is the hypothetical execution timeline for each phase.

time

Optimization Rule #1:  Include predicates for all tables in Spark SQL query.

Spark SQL tries to optimize tasks for parallel  execution. First, it will break join statements into individual table SQL statements. It will read all tables in parallel.

Total time spent in read phase = Max read time taken by a single table.

Here is why you need to include predicates against all tables.  In normal SQL against relational database we provide predicates against only on table. Usually, on the smaller tables. RDBMS would know you to leverage filtered data from smaller table to efficiently read other tables. If you write similar query in Spark SQL, spark will create individual table SELECT statements like below during read phase.

SQL1

This means only table T1 will get filtered data from its data source (Cassandra in this case) into spark memory. All other tables will become full table scans. Full table scans are bad for performance in Spark.

In order to achieve high read performance we have to write spark SQL like below.

SQL2

Spark will have better read performance when data is filtered at data source.

However, just adding any predicates against Cassandra table columns will guarantee that data will be filtered at Cassandra data source. There are certain predicate pushdown rules apply to Spark-SQL. These rules depend on the data source behind each table. I will be covering Cassandra predicate pushdown rules in a separate post.

Optimization Rule #2:  Minimize number of spark tasks in scan/read phase

Spark plan creates multiple stages in read phase to read each table. There is a separate stage for each table. The number of tasks in each stage depends on the number of data partitions spark has to read into memory. By having efficient partitioning strategy on tables and utilizing proper predicates against partitions you can minimize the number of tasks in read stage.

For example, Single partition scans using = operator will create fewer tasks than multi-partition scans using “IN” operator.

Optimization Rule #3:  Order of tables in FROM clause matters. Keep the table with the largest size at the top.

Spark table is based on Dataframe which is based on RDD. In simple terms, RDD is a distribute collection. Spark SQL JOIN operation is very similar to fold left operation on a collection.

This post is not about Scala or functional programming concepts. However, it helps to know how fold left operation works on a collection. Refer to below link for the explanation of fold left. http://alvinalexander.com/scala/how-to-walk-scala-collections-reduceleft-foldright-cookbook

As you can see in the picture of Spark plan JOIN operation requires shuffling data from second table executors to first table executors to perform JOIN operation. Shuffle is very expensive operation on IO & CPU. This operation is repeated until all tables to the right are merged with the result on the left.

By keeping the table with largest data size in join operation at the top you are avoiding shuffling largest data.

Optimization Rule #4:  Keep consistent partitioning strategy across JOINing tables 

By having common partition keys it makes it easier to write queries with filters that can be applied across many of the joining tables.

Optimization Rule #5:  Minimize the number of tables in JOIN.

As you can see in the timeline of the plan picture, reads are parallel operations. However, JOINs are sequential steps. That means every join step adds to the timeline of the total execution of the query.

Use proper denormalization techniques to reduce the number of tables in your data model.

Related Articles

sstable
cassandra
spark

Spark and Cassandra’s SSTable loader

Arunkumar

11/1/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra