Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

12/2/2020

Reading time:37 mins

Spark + Cassandra All You Need to Know: Tips and Optimizations

by John Doe

In this article, I will discuss the implications of running Spark with Cassandra compared to the most common use case which is using a deep storage system such as S3 of HDFS.The goal is to understand the internals of Spark and Cassandra so you can write your code as efficient as possible to really utilize the power of these two great tools.I will give you some tips regarding Spark tuning and Cassandra optimizations so you can maximize performance and minimize costs. I assume you already have basic knowledge of Spark and Cassandra.First, let’s review the different deployment options you have when running Spark and Cassandra together.In general, we can think of two broad types of Spark clusters:Commodity Spark Clusters: This type of clusters, are a cost effective way to process large amount of data. They use cheap but slow storage systems running on low cost hardware. The idea is to take advantage of Spark parallelism to process big data in an efficient way. This is by far the most famous setup both on premises using HDFS and in the cloud using S3 or other deep storage system.High Performance Clusters: These special clusters use high performant machines with high-end CPUs and lots of memory. They also use very efficient and low latency SSDs. This is a similar setup used in Cassandra database clusters, so these types of clusters can run Spark + Cassandra on the same machine types using Cassandra instead of HDFS for storage.High Performance Clusters are more expensive and they can be setup in the cloud or on perm. They are very efficient in combination with Cassandra if used correctly but they could be a waste of resources if not used properly. The goal of this article is to give you some tips on how to tune these clusters.Spark + CassandraLet’s go more in details on how these two architectures may look like…Running Spark in the cloudIf you run in the cloud you have two options:Managed ServiceUse a managed service like AWS EMR or GCP DataProc. These are managed solutions that use the cloud provider deep storage instead of HDFS, although HDFS is also available. This is Commodity Spark Cluster that we talked before running in the cloud.You have to be aware that S3 or other cheap deep storage systems are eventual consistent and do not rely on data locality like HDFS. They use REST HTTP protocols to load the data into the Spark cluster and HDFS is only used to cache data which does not fit into memory. HDFS is the ephemeral storage and S3 permanent storage. To solve the eventual consistency problem where you may read stale data, cloud providers have implemented their own solutions, you should read about it and use it when necessary.Separating storage and compute provides a cost effective, flexible and scalable solution which has gotten extremely popular, but be aware that you cannot take advantage of data locality when reading data, which is an issue when adding Cassandra into the equation.Build your ownThe other option is to spin up EC2 instances and install Spark on your own, this is a lot more tedious but provides you with full control so you can optimize the hardware to meet the needs required to run your specific workloads. This is the option you will have to choose if you want to run Cassandra + Spark in the same cluster in the cloud.Running Spark on premisesIf you run on premises, you have the same options. You will use HDFS as the file system instead of S3. The advantage is that the data will be local to the processing nodes and no network calls are involved achieving better I/O throughput and a bit lower latency, but these advantages are not that noticeable.Running on-prem you may get better deals on high end servers, so in this case, you should consider running Spark and Cassandra in the same cluster for high performance computing.Regardless where you run your workloads, you have two approaches that you can use to integrate Spark and Cassandra. You can have a cluster for each tool or runt them in the same cluster which is the main focus of this article.Spark + Cassandra in different clustersIn this scenario, you have two clusters, one for Cassandra and one for Spark. In this case, the Cassandra cluster will be setup using good hardware and SSDs drives optimized for Cassandra. In the other hand, Spark will be setup using commodity hardware and typically, it will have more nodes than the Cassandra cluster.This is a low cost solution and a lot more flexible. It can be setup on premises or in the cloud, although in the cloud it is easier and cheaper. Here you will typically use your deep storage for your data lake and run Spark Jobs for your OLAP workloads. You will use Cassandra for OLTP, your online services will write to Cassandra, and over night, your Spark jobs will read or write to your main Cassandra database.In this case, Cassandra is not used for processing but just as a source or sink to do the initial read or final write, but not to write intermediate results . A typical example, is reading previous day worth of data from Cassandra and the rest of the data from HDFS/S3 to run OLAP workloads on Spark. Then, you will write a summary of the data back in Cassandra with the latest insights for the users and the rest of the data back into the data lake to be analysed by your internal team.Note that with this approach writing and reading from Cassandra goes over the network, sometimes across different VPCs or regions, so this is not too efficient in terms of network performance, but it is cost efficient.This approach is typically used when you have Cassandra for your OLTP applications and you just need to query the data from Spark, but Cassandra itself it is not used for your jobs, neither is optimized for OLAP.This is a popular approach, it is easy to setup. In the cloud, you will have your own Cassandra cluster running in your VMs and your managed Spark cluster taking to Cassandra over the network.Use this approach when high performance it is not required, or you have huge amounts of data where Cassandra may struggle to process or be just too expensive to run.Spark + Cassandra in the same clusterIn this case, your goal is to get the best performance for your data processing pipeline and leverage on Cassandra’s get throughput and low latency, basically you setup a Spark cluster but use Cassandra instead of HDFS. This is great of iterative data processing such as Machine Learning where you need to read and write to disk very often.You would need to get a set of machines for the clusters first. Use SSDs drives and in this case, balance your vertical and horizontal scaling options. What I mean, is that compared to commodity hardware Spark clusters, you would want to have less nodes with better machines with many cores and more RAM.Tune your Cassandra cluster for OLAP operations, you want high throughput over low latency, remember that Spark will read and write lots of data but most of the time, it will be in batches.You can check this article for more information regarding how to install Cassandra and Spark on the same cluster.Spark with Cassandra. Source: https://opencredo.com/blogs/deploy-spark-apache-cassandra/Use this approach if:You already have a Cassandra cluster setup and it is not fully utilized and/orYour job needs to run really fast (stream or mini batches) and does not consume huge amounts of data.For this approach, first you will ingest your data into Cassandra. You have several options:Use Spark: This is a good option if your source data is at rest already and you need to transfer it to Cassandra for your fast performant ETL pipeline.Use Cassandra’s bulk loader to import data to Cassandra.If you need to pull data from APIs, you can write your own application using some streaming solution such as Akka Streams. I talked about this in this article.Once you have your data in Cassandra, you will run your ETL pipeline using Spark reading and writing data to Cassandra. Note that although HDFS will be available, you shouldn’t use it for two reasons:First, it is not performant. If you have Cassandra use it and not the slow file system.It will compete with Cassandra for I/O. Spark HDFS writes are quite heavy I/O operations and they will slow down and starve your Cassandra cluster.The rest of the article will focus mainly on running Spark with Cassandra in the same cluster although many of the optimizations also apply if you run them in different clusters.Use the Spark Cassandra Connector to talk to Cassandra. You have two options when using this connector:Use the low lever RDD API. This provides more flexibility and the ability to manually optimize your codeUse the Data Frame or Data Set APIs for Spark. In this case you read and write Data Frames like you would do with HDFS and the connector will do all optimizations under the hood.To start with, I recommend using the Data Frame/Data Set API. This way, you can leverage the same API and write to Cassandra the same way you write to other systems. You just need to be aware that your storage is Cassandra and not HDFS. You need to understand how to optimize Spark for Cassandra and also set the right settings in your Connector. We will talk about this later.To read data from a Cassandra table you just need to specify a different format: “org.apache.spark.sql.cassandra”val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words", "keyspace" -> "test" )).load()This will create a new data frame that matches the table “words” in the key space “test”.If you import “org.apache.spark.sql.cassandra._” you can simply write:import org.apache.spark.sql.cassandra._val df = spark.read.cassandraFormat("words", "test").load()Where the first argument is the table and the second one the key space.To write data from a data frame into a Cassandra table:df.write.cassandraFormat("words_copy", "test").mode("append").save()Note that the schema of the Data Frame must match the table schema.And that’s it, basically from the API perceptive that’s all you need to get started, of course there are some advance features that we will mention later.Cassandra + Spark high performance clusterIf you use run Cassandra and Spark in the same cluster, then using Spark with Cassandra is similar to use it with HDFS but you really need to understand the subtle differences. First, data locally is important, same as HDFS. You need to understand Spark partitions leverage that knowledge to maximize data locality, this is critical in Cassandra, you don’t want a spark executor in one node making network calls to get data from a different note. This will be a big topic for later in this article.Next, we will review the Spark optimizations that you should be aware which apply also when you run Spark with Cassandra and then we will review Cassandra specific optimizations.Many articles have been written regarding this topic, I will just summarize the most important ones that you need to be aware when working with Cassandra.In Spark you write code that transform the data, this code is lazy evaluated and under the hood is converted to a query plan which gets materialized when you call an action such as collect() or write(). Spark divides the data into partitions which are handle by executors, each one will handle a set of partitions. Operations that are executed within a single partition are called narrow operations and include functions such map or filter. In the other hand, aggregations are wide operations that require moving the data across nodes which is very expensive. The query plan itself can be of two major types: a logical plan and a physical plan which we will discuss later.Remember, the main rule regarding Spark performance is: Minimize Data Shuffles. This is caused by wide operations in Spark such joins or aggregations which are very expensive because of the data shuffle.Wide vs Narrow Dependencies. Source: https://medium.com/@dvcanton/wide-and-narrow-dependencies-in-apache-spark-21acf2faf031Always try to reduce the number of data shuffles, actually, the goal of most of the optimizations that we are going to talk, is to try to do this: reduce the amount of data sent over the network.source: https://luminousmen.com/Let’s now review some of the optimizations…Filter EarlyThis is really simple but very important. Filter the data as early as possible so you don’t process data that will be discarded later on. Specially when using Cassandra, the more you can push filters down to Cassandra, and particularly where you can limit queries by partition key, the better. When possible, specify all the components of the partition key in the filter statement. Also, if you are going to use a set of data from Cassandra more than once, make sure to use cache() to keep it in Spark memory rather than reading from Cassandra each time.A good rule of thumb is to use the coalesce() method after filtering data to reduce the number of partitions.Spark Partitions and Spark JoinsThis is critical in Spark, I really recommend this article where it explains the different optimizations in detail.Set the right number of partitionsYou need to set the right number of partitions to maximize parallelism in your cluster. If you have too few partitions then you will not be utilizing all cores available in the cluster and wasting resources. Having too many partitions will cause an excessive overhead in managing many small tasks as well as data movement reducing the performance.Spark partitions. Source: https://luminousmen.com/post/spark-tips-partition-tuningThe general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, but of course, this will depend on your use case. When in doubt, it’s a good idea to be wrong on the side of a larger number of tasks. This needs to align with the number of executors and the memory per executor which we will review later.Avoid Data SkewIn general, you want your data to be evenly distributed across all the nodes, this is specially important when performing joins. Your join keys should have a even distribution to avoid data skews. Skewed data leads to performance degradation of parallel processing or even OOM(out of memory) crashes and should be avoided.You can diagnose data skewness of your data by looking at the Spark UI and checking the time spent on each task. If one or more tasks are taking longer than the others, you have unbalanced partitions.Some solutions may are:Repartition data on a more evenly distributed key. This is the preferred option, use a key that naturally distributes the data across partitions before performing any wide operation.Use salting which synthesizing an additional random key for better distribution of the data.Split data into skewed and non-skewed data and work with them in parallel by redistributing skewed data.If one of the data sets to join is small, like a fact table, use broadcast variables which we will discuss later on. This is useful to do lookups on fact tables.Use broadcast joins when joining two data sets and one is quite small, this has the same benefits as broadcast variables. A more advanced feature is iterative broadcast joins where we split the data and do many small broadcast joins instead of a bigger one.Let’s have a look to broadcast joins a it more in details…Spark JoinsBy default, Spark user Sort Merge Join which works great for large data sets.Sort Merge Join. Source: https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693cThe idea is to sort the partitions before the join to reduce the amount of data shuffle, however sorting is itself an expensive operation, the performance of this join will change greatly depending on the source data on both sides of the join, if the data is already shuffled it is very fast, if not, Spark will need to perform an exchange and sort, which causes a data shuffle.Sort merge join when exchange is required. source: https://towardsdatascience.com/should-i-repartition-836f7842298cThese Join technique works great when both data sets are large but when you join a table with a small fact table, then the advantages are lost. In this case, and specially if you have a cluster which has enough RAM available, you can use broadcast joins.Broadcast JoinsIn the case of broadcast joins, Spark will send a copy of the data to each executor and will be kept in memory, this can increase performance by 70% and in some cases even more. The concept of broadcast joins is similar to broadcast variables which we will discuss later, however broadcast joins are handled automatically by Spark, all you need to do is tell Spark which table you want to broadcast:df = fact_table.join(broadcast(dimension_table), fact_table.col("dimension_id") === dimension_table.col("id"))Note that when using broadcast joins the data is shared between the cores, but each executor will have its own copy, hence balancing cores and executors is important and we will discuss this later.Broadcast Join. Source: https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693cFor more information about Spark joins check this article.Repartition before expensive or multiple joinsThe repartition() method allows us to change the distribution of the data on the cluster. This will cause data shuffle which is expensive but it will improve performance if done before certain types of joins or before multiple joins, as long as you specify the column name. As we seen before, Spark needs to be aware of the data distribution to make use of it before the sorted merge join. This can be done using the bucketing which is storing the data in a pre-shuffled and possibly pre-sorted state where the information about bucketing is stored in the metastore. But even if your data is sorted and already shuffled on disk for one of the tables, Spark will not know about it and still do a re sorting and full shuffle of both tables. This is why repartition your data to match the partitions of one side of the join is needed and can reduce data shuffle so only one side(table) of the join is distributed over the network. The idea is that by specifying the column, Spark under the hood adds metadata to the logical plan so it knows that it does not need to move the data. For example, if one table is partition by ID into 100 partitions. By re partitioning the data you avoid data shuffle:df.repartition(100, "id").join(..., "id") The goal is to have the same number of partitions on both sides of the join to avoid data exchanges. Data shuffle will also occur if the number of partitions differ from this property:spark.sql.shuffle.partitions // default 200which controls the number of partitions during the shuffle, and used by the sort merge join to repartition and sort the data before the join. I really recommend reading this article which goes more into details on how joins and data partition work.A good rule of thumb is to try to re partition before multiple joins or very expensive joins, to avoid sort merge join re shuffling the data over and over again.If you need to decrease the number of partitions, use coalesce instead of repartition() method, because it minimizes data shuffles and doesn’t trigger a data exchange. After a filter remember to use coalesce(), because you will have less data and this is much more efficient than repartition() since it minimize data shuffle. Also, always persist or cache your data after re partition to minimize data shuffle. But be remember that repartition is itself an expensive operation, it moves the data all over the cluster, so try to use it just once and only when completely necessary; and always remember to do narrow operations first.It is important to remember that some wide operations like group by, change the number of partitions.Repartition before writing to storageIt is always a good idea to write your data in a way that is optimized to read. In HDFS you want to use a columnar format such Parquet to increase performance of read operations when performing column based operations. I talked about this in this article. The idea is that if you create partitions using a the right folder structure, then the read operations do not need to scan all the data in disk but just the folders/partitions specified.You can partition your data on write using:df.write.partitionBy('key').json('/path...')In Cassandra, the end table should be already partitioned, to increase write performance the same principle applies, and you can use partitionBy to achieve data locality so the data will be in the right Cassandra node when writing to disk (when using a high performance cluster); however, the Cassandra connector does this for you on the coordinator node, it already knows the Cassandra partitions and send data in batches to the right partitions. You just need to tweak some properties that we will discuss later.Avoid disk spillsWhen joining large data set, Spark needs to store intermediate data during the data shuffle, if the executor does not have enough memory, it will move it to the disk and then join will become extremely slow, make sure you set the right amount of memory(spark.executor.memory) per executor and reduce your data size. You can also change the size of the buffer by setting: spark.shuffle.file.bufferSet the right number of executors, cores and memoryIt is important to set the number of executors according to the number of partitions. Your aim is too maximize parallelism and make sure your Spark executors are busy throughout the duration of the Job and all cores are used in the nodes.Remember that each executor handles a sub set of the data, that is, a set of partitions. Also, each executor used 1 or more cores as set with the property:spark.executor.coresWhen running with YARN is set to 1.In Spark, we achieve parallelism by splitting the data into partitions which are the way Spark divides the data. The partitions are spread over the different nodes and each node have a set of executors. Then each executor manages one or more partitions. Note, that at the computational level, each executor performs a set of tasks so each the executor will apply a set of functions to the same partition. So, this is why you may want to have more than one core per executor, so you can run independent task in parallel. It is important to understand that each executor will have its own local and independent data in memory which includes broadcast variables(which will discuss later) and accumulators, both of these use quite a bit of memory; however, these are shared between cores. So, having just one core per executor will mean that all the data needs to replicated for each executor. Also, I/O operations, specially on splitable file formats or Cassandra can take advantage of multiple cores when reading or writing a partition, maximizing throughput.In summary, when you use broadcast variables, accumulators or you read/write a lot of data, you want to have more than one core per executor, but not too many, otherwise some of the cores will not be used. For more information check this article.As a rule of thumb 3–5 cores per executor is a good choice. Of course, this dependents on the number of partitions and the size of the partitions. If you have very small partitions and you don’t use much memory like broadcast variables, then less cores is recommended.Your goal is to find the balance between the number of executors and cores, plus the right amount of memory for each executor. You can set executor memory, and number of executors and cores when you run spark-submit.# Run on a Spark standalone cluster in cluster deploy mode with supervise./bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master spark://207.184.161.138:7077 \--deploy-mode cluster \--supervise \--executor-memory 20G \--total-executor-cores 100 \/path/to/examples.jar \1000You can also pass them as Spark properties.Setting the right amount of memory per executor is also important, this needs to be based on your use of accumulator, broadcast variables and the size of your data when doing joins and the data is shuffle as we seen before. Make sure you set alerts so you are aware of data spills which are very inefficient.Broadcast VariablesWe already talked about broadcast joins which are extremely useful and quite popular, because it is common to join small data sets with big data sets, for example; when you join your table with a small fact table uploaded from a CSV file.In this case, the idea is to copy the data to each executor so no data needs to be moved, and the join is done locally because one side of the join is stored on each node completely in memory. This is the idea of broadcasting in Spark, both for joins and for variables.Joins are managed by Spark for you under the hood, so they are easy to use. Broadcast Variables need to be created in your code. You can store any JVM object as long as it is serializable. We will talk about serialization in the next section.To create a broadcast variable:val df = spark.sparkContext.broadcast(data)Note that the data cannot be a data frame or data set, it needs to be a regular object, so you need to call the collect() method to get all the data before sending it to the executors.Spark SerializationThere are several articles and books that teach you how to optimize your Spark code, however, the single most efficient thing you can do to increase Spark performance across all the code is to get rid of the the Java Serialization.Java Serialization is very inefficient and also insecure; and can really slow down your jobs.You need to understand how Spark runs the applications. In a nutshell, the driver needs to serialize your code and send it to all the nodes, so broadcast variables and the job itself needs to be transferred over the network, besides that, intermediate data and metadata needs to be serialized as well. This happens very often under the hood and it can be a bottle neck for your application.If you have run into the famous “Task not serializable” error in Spark, then you know what I’m talking about. Understanding which parts of your code needs to be serialize and which ones don’t is critical. Your goal is to identify these objects and optimize them by using another serializable format. To solve serialization issues, I really suggest having a look to this article. In general, ensure all the objects passed to closure are serializable.The most famous Spark alternative to Java serialization is Kyro Serialization which can increase the Serialization performance by several order of magnitude.Kyro Performance vs Java, Source: https://docs.mulesoft.com/mule-runtime/3.9/improving-performance-with-the-kryo-serializerMany Spark developers choose not to use it because you need to register every class you use using:Kryo kryo = new Kryo();kryo.register(SomeClass.class);But I really recommend you enable Kyro from the beginning of your project to take advantage of the performance boost. Note that if you rely on the DataSet API, then you may not need Kyro since you classes will use Tungsten encoders which are even more efficient than Kyro and we will discuss them later.Spark CatalystI left the best optimization to the end. Spark has a secret weapon that increases your job efficiently tremendously and the best part is that you “almost” don’t have to do anything to use it, it runs under the hood. We already touch upon this feature before: Spark Catalyst Engine. It basically rewrites your code in an optimal way.Catalyst is available on the Data Frame API and partially in the Data Sets API. It is not available in the RDD API. It is part of the Spark SQL layer and the idea behind it is to use all the optimizations done over many years in the RDBMS world and bring them to Spark SQL. Since SQL provides a know mathematical model, Spark Catalyst can understand the data, make assumptions and optimize the code. Under the hood, Spark runs a complicated workflow which completely rewrites your code into a harder to understand but much more efficient one.Think of the Spark Data Frame API as a declarative language, not real code. You are kind of writing on top this engine that will interpret your code and optimize it. This is the process that happens when you trigger an action in Spark:Catalyst workflowIn a nutshell, it creates an optimized logical plan which them is divided in multiple physical plans. Then, it generates the code for the physical plans and it distributes the code to the executors (Remember why serializing was important?). Catalyst generates an optimized physical query plan from the logical query plan by applying a series of transformations like predicate push-down, column pruning, and constant folding on the logical plan.Catalyst Optimizer has two types of optimizations:Cost-Based Optimizer: Since Data Frames are based in SQL, Catalyst can calculate the cost of each path and analyzes which path is cheaper, and then executes that path to improve the query execution.Rule-Based optimizer: These include constant folding, predicate push-down, projection pruning, null propagation, Boolean simplification, and other rules.Catalyst will also auto perform broadcast joins when one side of the join is small, the threshold can be set using this property:spark.sql.autoBroadcastJoinThresholdProject TungstenData Frames and Data Set APIs also benefit for Project Tungsten which aims to fix the serialization issues that we mentioned before, memory management and performance. It uses an optimized in memory format and off-heap memory which does not need to be garbage collected.It optimizes Spark jobs for CPU and memory efficiency by doing the following:Off-Heap Memory Management using binary in-memory data representation, this is the Tungsten row format.Cache Locality which is about cache-aware computations with cache-aware layout for high cache hit rates.Whole-Stage Code Generation (CodeGen).This is why you need to use encoders when using Data Set API, these are in charge of the off heap optimizations. You get this for free in the Data Frames and Data Set APIs but Datasets are better optimize. For more information check this great article.What about RDDs? Catalyst cannot do anything about it, this is the do it your self approach. Actually Catalyst generates RDD code, this is the end result. Also, you need to manage serialization.Does Catalyst Optimizes DataSet API? Yes and No. Initially, the Data Set API was slower since Catalyst “couldn’t see” what you were doing in the code since data sets syntax is broader than the fixed SQL syntax. But if you use the latest version of Spark (3.0), then most of the optimizations are available. Still, Data Frames API will be a bit more efficient in optimizing your queries; with DataSets you need to be more careful, but if you write good Spark code, the differences are minimal. Plus, Datasets have better space and speed performance, data sets Tungsten optimization is better than data frames. My recommendation is that Spark SQL with Data Frames should be used by data scientists which no deep knowledge of Spark, Catalyst will optimize the code for them. Data Set API is ideal for Scala engineers with good knowledge of Spark internals.Spark APIs comparison. Source: https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.htmlDo I need Kyro if I use the Data Set API? Yes and No. Tungsten will manage your case classes using encoders, but there are still other parts of the code that needs to be Serialized and enabling Kyro would help. If you use the data set API, I recommend to skip Kyro unless you need to maximize performance.Other Spark OptimizationsUse reduceBy/aggregateBy methods instead of groupBy when possible, since they reduce data shuffle. It works faster because Spark knows about the combined output with a common key on each partition before shuffling the data.Avoid reduceByKey when the input and output value types are different, use use aggregateByKey instead; if the types are the same use reduceByKey if not aggregateByKey.Use localCheckpoint() method before repartition and write. This is to create stage barrier between shuffle stage and the write operation.Don’t use collect() method. It will try to move all data in to the driver and where it may run out of memory and crash. Only use it for debug purposes or to create broadcast variables which are small in size by definition.Use the right file format. Avro for row level operations and Parquet or ORC for column based operations such Spark SQL. Compress the data and make sure that the format is splitable, so it can be read in parallel by Spark readers. Splittable formats are lso, bzip2, snappy, etc. Non-splittable gzip, zip, lz4…Monitoring and observability is critical is Spark. Use explain() method before writing to disk as last step to verify that all the optimizations that we talked about take place. Check that the expected Catalyst optimizations are done and filters are pushed to the source, queries are re arranged, data shuffles are minimized, etc. Enable the Spark history server to analyze and observe the evolution of your jobs and detect any change that may cause performance issues.As you can see, the goal of all these optimizations is to reduce data movement. To do this, always filter as much data as you can. Try to minimize wide operations. Cache the data sets if they are going to be used multiple times. Make sure your data is partitioned evenly before performing joins or writing to disk. Be aware of data locality and remember to use Spark UI to and the explain() method to understand Spark logical and physical plan.First of all, the Spark optimizations we have mentioned also apply when using Cassandra either in the same cluster or on a separate cluster. Aim to filter out the data you don’t need, balance partitions and rely on Spark broadcast joins.The Spark Cassandra Connector, same as the Catalyst engine, also optimizes the Data Set and Data Frames APIs. So, some of the methods mentioned are only used for RDDs and automatically added when using the high level APIs.Now let’s review the specific details regarding Cassandra and its connector…Spark and Cassandra PartitionsWe talked a lot about Spark partitions, in case you run Spark and Cassandra in the same cluster there are few extra things you need to be aware.The most important rule is this one: Match Spark partitions to Cassandra partitions. And the second rule is: Cassandra partitions are not the same as Spark partitions:Cassandra partitions: These are the unit at which Cassandra splits data across nodes and determine which Cassandra node your data is stored on. The Cassandra partition key is set when you define the schema for your Cassandra table. Each partition is contained on a single node (per replica).Spark partitions: These are the unit at which Spark splits data (in memory) across workers. Spark partitions also determine the degree of parallelism that Spark can apply in processing data (each partition can be processed in parallel). The number of partitions and partition key can either be determined automatically by Spark or set explicitly.Knowing the difference between the two and writing your code to take advantage of partitioning is critical. Your goal is to have the right number of Spark partitions to allow Spark to efficiently parallel process calculations.In many cases, there will be a less Spark partitions than Cassandra partitions because Cassandra is more granular; but when possible, you want Cassandra data to be read and written to/from the Cassandra node where the Spark partition resides. As we mention before, you also want to have enough Spark partitions that each partition will fit in available executor memory so that each processing a step for a partition is not excessively long running but not so many that each step is small, resulting in excessive overhead. The right number really depends on your use case. In Spark, and specially with Cassandra you will have to run performance and stress tests and play with these parameters to get the right value. A good rule of thumb is to have at least 30 partitions per executor.The good news is that in many cases the Cassandra connector will take care of this for you automatically. When you use the Cassandra Spark connector’s, it will automatically create Spark partitions aligned to the Cassandra partition key!. This is why setting the right partitions in Cassandra is important. The Cassandra spark Connector tries to estimate the size of the table and dividing this by the parameter spark.cassandra.input.split.size_in_mb (64MB by default). But also remember that some Spark functions change the number of partitions.You need to be careful when you are joining with a Cassandra table using a different partition key or doing multi-step processing. Remember that some operations like aggregations change the number of partitions. So, in this case you start off with an appropriately sized set of partitions but then greatly change the size of your data, resulting in an inappropriate number of partitions. One way to address this problem is to us the connector repartitionByCassandraReplica() method to resize and/or redistribute the data in the Spark partition. This will redistribute Spark’s in-memory copy of the the data to match the distribution of a specified Cassandra table and with a specified number of Spark partitions per executor.The connector also provides an interested method to perform joins:JoinWithCassandraTable()which pulls only the partition keys which match the RDD entries from Cassandra so that it only works on partition keys which is much faster. It is recommended that you call repartitionByCassandraReplica before JoinWithCassandraTable to obtain data locality, such that each spark partition will only require queries to their local node.Note that these methods are used under the hood by the connector when you use the data set or data frames API.Write settingsRegarding reading and writing data to Cassandra, I really recommend watching this video from the DataStax conference:There are many parameters that you can set in the connector, but in general you have two approaches when writing data from Spark to Cassandra:Asynchronous fire and forget: the idea is to have many parallel single writes executed asynchronous. This could perform very well in case batching is not an option because you don’t have a single partition key.Batching: In this case we create batches. In order for this to perform, we need to make sure we partition the data by a single Cassandra key, so each batch goes to the same partition, check the video for more details.You can always use spark repartition() method before writing to Cassandra to achieve data locality but this is slow and overkill since the Spark Cassandra Connector already does this under the hood much more efficiently. The Connector automatically batches the data for your in an optimal way.As a rule of thumb, if you are not sure about something, do not do it and let the connector and Spark Catalyst optimize the code for you.Depending on the data size and the target table partitions you may want to play around with the following settings per job:“spark.cassandra.output.batch.grouping.key”: Cassandra connector will group the batches based on this value, the default is partition so it will try to re partition so each batch goes to the same partition and in turn same node. You can set it to replica_set.“spark.cassandra.output.batch.grouping.buffer.size”: This is the size of the batch when the driver does batching for you. This needs to be set depending on the size of your data size. Default is 1000.“spark.cassandra.output.batch.size.rows”: The batch size in rows, it will override previous property, the default is auto.“spark.cassandra.output.concurrent.writes”: Number of concurrent writes, this property is directly proportional to the two previous ones, the bigger the batch the less concurrent writes you want. By tuning these two parameters based on the data size and partitions you get the maximum performance.“cassandra.output.throughput_mb_per_sec” (default: unlimited): maximum write throughput per core. You need to set this along the number of cores per executor. By default, Spark will write to Cassandra as fast as it possibly can(unlimited). For long running jobs, this is not great since Cassandra nodes will be overwhelmed and will crash. You should set some value for long running jobs. As a general guide we suggest starting with cassandra.output.throughput_mb_per_sec set to 5.To use the fire and forget approach set spark.cassandra.output.batch.size.rows to 1 and spark.cassandra.output.concurrent.writes to a large number.Read settingsWhile reading huge amount of data from Cassandra ensure that data partitioned with proper partition key. You can set several properties to increase the read performance in the connector. Remember, read data tuning is mostly about partitioning which we already cover.Match spark.cassandra.concurrent.reads to the number of cores. When reading data fro Cassandra you want a bigger ratio of cores per executor than when using HDFS since the throughput is higher, try to take advantage of Cassandra when possible.When reading data, the connector will size partitions based on the estimate of the Spark data size, you can increase “spark.cassandra.input.split.sizeInMB” if you want to pull more data into Spark, however be careful not to hold too much data or you will run into issues. As a general rule your executor should be set to hold number of cores * size of partition * 1.2 to avoid out of memory issues or garbage collection issues.Hot spots caused by big partitions in Cassandra will cause issues in Spark as well due to problems with data skewness that we already mentioned.Also remember that read speed is mostly dictated by Cassandra’s paging speed which is set using “spark.cassandra.input.fetch.sizeInRows” property, which means how many rows will be asynchronously requested while reading the current batch.Keep an eye for this new Cassandra feature which will be released soon enabling bulk reading from Cassandra.Performance TestsLast but not least, you will have to spend a lot of time tuning all these parameters. Make sure you use a test dataset that resembles production in terms of data size and shape. Your environment should be stable and the test repeatable.Spark Tuning. source: https://luminousmen.com/post/spark-tips-dataframe-apiYou can use Spark Cassandra Stress tool to test Cassandra. Also check the metrics exposed by the connector.In this article, we unfolded the internals of Spark to be able to understand how it works and how to optimize it. Regarding Spark, we can summarize what we learned with two key takeaways:Minimize data shuffles and maximize data locality.Use Data Frames or Data Sets high level APIs to take advantages of the Spark optimizations.If you are a Scala developer, remember that Spark high order functions are not the same ones as in Scala and you really need to understand how Spark and the Catalyst engine work.If you have a separate Cassandra cluster and you just use Cassandra as a source of sink, then focus on optimizing the read and write settings to maximize parallelism. Remember that setting right number of balanced partitions is very important. If you have a high performance Spark + Cassandra cluster, you need to understand the relation between Spark and Cassandra partitions and try to take advantage of Cassandra’s speed and performance.Remember to follow DataBricks for Spark updates and Datastax for Cassandra updates since they are the companies behind these technologies. Also, check ScyllaDB which is a excellent drop-in replacement for Cassandra.Good luck on your Big Data Journey!Remember to clap if you enjoyed this article and follow me for more updates!

Illustration Image
Spark + Cassandra

Running Spark in the cloud

Running Spark on premises

Spark + Cassandra in different clusters

Spark + Cassandra in the same cluster

Spark with Cassandra. Source: https://opencredo.com/blogs/deploy-spark-apache-cassandra/
val df = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test" ))
.load()
import org.apache.spark.sql.cassandra._
val df = spark
.read
.cassandraFormat("words", "test")
.load()
df.write
.cassandraFormat("words_copy", "test")
.mode("append")
.save()

Cassandra + Spark high performance cluster

Wide vs Narrow Dependencies. Source: https://medium.com/@dvcanton/wide-and-narrow-dependencies-in-apache-spark-21acf2faf031
source: https://luminousmen.com/

Filter Early

Spark Partitions and Spark Joins

Spark partitions. Source: https://luminousmen.com/post/spark-tips-partition-tuning
Sort Merge Join. Source: https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c
Sort merge join when exchange is required. source: https://towardsdatascience.com/should-i-repartition-836f7842298c

df = fact_table.join(broadcast(dimension_table), fact_table.col("dimension_id") === dimension_table.col("id"))
Broadcast Join. Source: https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c
df.repartition(100, "id").join(..., "id") 
spark.sql.shuffle.partitions // default 200
df.write.partitionBy('key').json('/path...')

Set the right number of executors, cores and memory

spark.executor.cores
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000

Broadcast Variables

val df = spark.sparkContext.broadcast(data)

Spark Serialization

Kyro Performance vs Java, Source: https://docs.mulesoft.com/mule-runtime/3.9/improving-performance-with-the-kryo-serializer
Kryo kryo = new Kryo();
kryo.register(SomeClass.class);

Spark Catalyst

Catalyst workflow
spark.sql.autoBroadcastJoinThreshold

Project Tungsten

Spark APIs comparison. Source: https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

Other Spark Optimizations

Spark and Cassandra Partitions

JoinWithCassandraTable()

Write settings

Read settings

Performance Tests

Spark Tuning. source: https://luminousmen.com/post/spark-tips-dataframe-api

Related Articles

mongo
nocode
elasticsearch

GitHub - ibagroup-eu/Visual-Flow: Visual-Flow main repository

ibagroup-eu

12/2/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

aws.s3