Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

12/14/2020

Reading time:9 min

Spark + Cassandra Best Practices | Official Pythian® Blog

by Valerie Parham-Thompson

Spark was created in 2009 as a response to difficulties with map-reduce in Hadoop, particularly in supporting machine learning and other interactive data analysis. Spark simplifies the processing and analysis of data, reducing the number of steps and allowing ease of development. It also provides for reduced latency, since processing is done in-memory.Spark can be used to process and analyze data to and from a variety of data storage sources and destinations. In this blog , we will discuss Spark in conjunction with data stored in Cassandra.Querying and manipulating data in Spark has several advantages over doing so directly in Cassandra, not the least of which is being able to join data performantly. This feature is useful for analytics projects.Spark Use CasesTypical use cases for Spark when used with Cassandra are: aggregating data (for example, calculating averages by day or grouping counts by category) and archiving data (for example, sending external data to cold storage before deleting from Cassandra). Spark is also used for batched inserts to Cassandra. Other use cases not particular to Cassandra include a variety of machine learning topics.A data analysis project starts with data ingestion into data storage. From there, data is cleansed and otherwise processed. The resulting data is analyzed, reviewing for patterns and other qualities. It may then be further analyzed using a variety of machine learning methods. End-users will be able to run ad hoc queries and use interfaces to visualize data patterns. Spark has a star role within this data flow architecture.IngestionSpark can be used independently to load data in batches from a variety of data sources (including Cassandra tables) into distributed data structures (RDDs) used in Spark to parallelize analytic jobs. Since one of the key features of RDDs is the ability to do this processing in memory, loading large amounts of data without server-side filtering will slow down your project. The spark-cassandra-connector has this filtering and other capabilities. (See https://github.com/datastax/spark-cassandra-connector/.) The limitation on memory resources also implies that, once the data is analyzed, it should be persisted (e.g., to a file or database).To avoid some of the limitations of this batch processing, streaming functionality was added to Spark. In Spark 1, this functionality was offered through DStreams. (See https://spark.apache.org/docs/latest/streaming-programming-guide.html.)Spark 2 — a more robust version of Spark in general — includes Structured Streaming. (See https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.) With Structured Streaming, consider that instead of creating a static table based on a batch input, the table is constantly updated with new data from the source. The data will be stored in a data frame and continuously updated with the new data. (Another benefit of using dataframes over RDDs is that the data is intuitively abstracted into columns and rows.)Available data sources on the source side for streaming include the commonly used Apache Kafka. Kafka buffers the ingest, which is key for high-volume streams. See https://kafka.apache.org/ for more details on Kafka.Data StorageAlthough we are focusing on Cassandra as the data storage in this presentation, other storage sources and destinations are possible. Another frequently used data storage option is Hadoop HDFS. The previously mentioned spark-cassandra-connector has capabilities to write results to Cassandra, and in the case of batch loading, to read data directly from Cassandra.Native data output formats available include both JSON and Parquet. The Parquet format in particular is useful for writing to AWS S3. See https://aws.amazon.com/about-aws/whats-new/2018/09/amazon-s3-announces-new-features-for-s3-select/ for more information on querying S3 files stored in Parquet format. A good use case for this is archiving data from Cassandra.Data CleansingData cleansing involves dealing with questionable data (such as null values) and other preprocessing tasks (such as converting categorical data to mapped integers). Once data is stored in a data frame, it can be transformed into new dataframes based on filters. Other than the fact you have the capability to do this cleansing within the same code (e.g., the Scala script running Spark), Spark does not provide magic to clean data; after all, this takes knowledge about the data and the business to understand and code particular transformation tasks.Pattern AnalysisSpark dataframes can be easily explored for basic patterns using commands like describe, which will show the count, mean, standard deviation, minimum value, and maximum value of selected columns. Dataframes can be further transformed with functions like groupBy, pivot, and agg (aggregate). Spark SQL can be used for complex joins and similar tasks, using the SQL language that will be familiar to many data analysts.Machine Learning and Data MiningMachine learning and data mining encompass a broad range of data modeling algorithms intended to make predictions or to discover unknown meaning within data.From Spark 2 onward, the main library for Spark machine learning is based on data frames instead of RDDs. You may see this new data frame-based library referred to as Spark ML, but the library name hasn’t changed — it is still MLlib. (See https://spark.apache.org/docs/latest/ml-guide.html.) Some things that are possible with Spark in this area are recommendation engines, anomaly detection, semantic analysis, and risk analysis.Ad Hoc QueriesSpark SQL is available to use within any code used with Spark, or from the command line interface; however, the requirement to run ad hoc queries generally implies that business end-users want to access a GUI to both ask questions of the data and create visualizations. This activity could take place using the eventual destination datastore as the backend. Directly from Spark, there are enterprise options such as Tableau, which has a Spark connector. For query speed, a memory-based cache such as Apache Ignite could be used as the analytics backend; to maintain that speed by avoiding disk i/o, the data being used for queries should fit into memory.VisualizationDepending on the programming language and platform used, there may be libraries available to directly visualize results. For example, if Python is used in a Jupyter notebook, then Matplotlib will be available. (See https://matplotlib.org/.) In general, except for investigating data in order to clean it, etc., visualization will be done on the data after it is written to the destination. For business end-users, the above discussion in Ad Hoc Queries applies.The general architecture for a Spark + Cassandra project is apparent from the discussion of the Data Lifecycle above. The core elements are source data storage, a queueing technology, the Spark cluster, and destination data storage.In the case of Cassandra, the source data storage is of course a cluster. Spark will only query a single data center, and to avoid load on a production cluster, this is what you want. Most installations will set up a second data center replicating data from the production data center cluster and attach Spark to this second data center. If you are unable to set up a separate data center to connect to Spark (and we strongly recommend setting it up), be sure to carefully tune the write variables in the spark-cassandra-connector. In addition, if Datastax Enterprise is being used, then DSE Search should be isolated on a third data center.Another consideration is whether to set up Spark on dedicated machines. It is possible to run Spark on the same nodes as Cassandra, and this is the default with DSE Analytics. Again, this is not advised on the main production cluster, but can be done on a second, separate cluster.Spark in the CloudSpark at Google CloudGoogle Cloud offers Dataproc as a fully managed service for Spark (and Hadoop):https://cloud.google.com/dataproc/Spark at AWSAWS supports Spark on EMR: https://aws.amazon.com/emr/features/spark/.Coding Language OptionsSpark code can be written in Python, Scala, Java, or R. SQL can also be used within much of Spark code.Spark Connector ConfigurationSlowing down the throughput (output.throughput_mb_per_sec) can alleviate latency.For writing, then the Spark batch size (spark.cassandra.output.batch.size.bytes) should be within the Cassandra configured batch size (batch_size_fail_threshold_in_kb).Writing more frequently will reduce the size of the write, reducing the latency. Increasing the batch size will reduce the number of times Cassandra has to deal with timestamp management. Spark can cause allocation failures if the batch size is too big.For further documentation on connector settings, see  https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md.Security has to be explicitly configured in Spark; it is not on by default. However, the configuration doesn’t cover all risk vectors, so review the options carefully. Also, most of these settings can be overridden in code accessing Spark, so it is important to audit your codebase and most important to limit connections from specific hosts further protected by user authentication.Enable authenticationAuthentication is turned off by default. It is enabled through two configuration variables, using a shared secret.The two configuration variables are spark.authenticate (default is false; set to true) and spark.authenticate.secret (set to string of shared secret). If YARN is used, then much of this is done by default. If not, then set these two in the spark-defaults.conf file.Then use this secret key when submitting jobs. Note that the secret key can be used to submit jobs by anyone with the key, so protect it well.Enable logging for all submitted jobsYou can set spark.eventLog.enabled in the spark-defaults.conf file, but it can be overridden in a user’s code (e.g., in the SparkConf) or in shell commands, so it has to be enforced by business policy.Note also that the log file itself (configured via spark.eventLog.dir) should be protected with filesystem permissions to avoid users snooping data within it.Block Java debugging options in JVMMake sure the JVM configuration does not include the following options: -Xdebug, -Xrunjdwp, -agentlib:jdwp.Redact environment data in WebUICan disable the whole UI via spark.ui.enabled, but other than that, or overriding the EnvironmentListener with alternate custom code, there is no way to redact the information in the Environment tab of the UI specifically.It is recommended to enable ACLs for both the WebUI and the history server, which will protect the entirety of the web-based information.Enable and enforce SASL RPC encryptionThe recommendation with Spark is to enable AES encryption since version 2.2, unless using an external Shuffle service. To enable SASL, set the following to true: spark.authenticate.enableSaslEncryption and spark.network.sasl.serverAlwaysEncrypt.Enable encryption on all UIs and clientsTo enable AES encryption for data going across the wire, in addition to turning on authentication as above, also set the following to true: spark.network.crypto.enabled. Choose a key length and set via spark.network.crypto.keyLength, and choose an algorithm from those available in your JRE and set via spark.network.crypto.keyFactoryAlgorithm.Don’t forget to also set configuration from any database (e.g., Cassandra) to Spark, to encrypt that traffic.Enable encryption on Shuffle serviceIn addition to the above encryption configuration, set the following to true: spark.network.crypto.saslFallback.To encrypt the temporary files created by the Shuffle service, set this to true: spark.io.encryption.enabled. The key size and algorithm can also be set via spark.io.encryption.keySizeBits and spark.io.encryption.keygen.algorithm, but these have reasonable defaults.Disable Spark REST serverThe REST server presents a serious risk, as it does not allow for encryption. Set the following to false: spark.master.rest.enabled.Monitoring SparkSpark has built-in monitoring: https://spark.apache.org/docs/latest/monitoring.htmlInterested in working with Valerie? Schedule a tech call.

Illustration Image

Spark was created in 2009 as a response to difficulties with map-reduce in Hadoop, particularly in supporting machine learning and other interactive data analysis. Spark simplifies the processing and analysis of data, reducing the number of steps and allowing ease of development. It also provides for reduced latency, since processing is done in-memory.

Spark can be used to process and analyze data to and from a variety of data storage sources and destinations. In this blog , we will discuss Spark in conjunction with data stored in Cassandra.

Querying and manipulating data in Spark has several advantages over doing so directly in Cassandra, not the least of which is being able to join data performantly. This feature is useful for analytics projects.

Spark Use Cases

Typical use cases for Spark when used with Cassandra are: aggregating data (for example, calculating averages by day or grouping counts by category) and archiving data (for example, sending external data to cold storage before deleting from Cassandra). Spark is also used for batched inserts to Cassandra. Other use cases not particular to Cassandra include a variety of machine learning topics.

A data analysis project starts with data ingestion into data storage. From there, data is cleansed and otherwise processed. The resulting data is analyzed, reviewing for patterns and other qualities. It may then be further analyzed using a variety of machine learning methods. End-users will be able to run ad hoc queries and use interfaces to visualize data patterns. Spark has a star role within this data flow architecture.

Ingestion

Spark can be used independently to load data in batches from a variety of data sources (including Cassandra tables) into distributed data structures (RDDs) used in Spark to parallelize analytic jobs. Since one of the key features of RDDs is the ability to do this processing in memory, loading large amounts of data without server-side filtering will slow down your project. The spark-cassandra-connector has this filtering and other capabilities. (See https://github.com/datastax/spark-cassandra-connector/.) The limitation on memory resources also implies that, once the data is analyzed, it should be persisted (e.g., to a file or database).

To avoid some of the limitations of this batch processing, streaming functionality was added to Spark. In Spark 1, this functionality was offered through DStreams. (See https://spark.apache.org/docs/latest/streaming-programming-guide.html.)

Spark 2 — a more robust version of Spark in general — includes Structured Streaming. (See https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.) With Structured Streaming, consider that instead of creating a static table based on a batch input, the table is constantly updated with new data from the source. The data will be stored in a data frame and continuously updated with the new data. (Another benefit of using dataframes over RDDs is that the data is intuitively abstracted into columns and rows.)

Available data sources on the source side for streaming include the commonly used Apache Kafka. Kafka buffers the ingest, which is key for high-volume streams. See https://kafka.apache.org/ for more details on Kafka.

Data Storage

Although we are focusing on Cassandra as the data storage in this presentation, other storage sources and destinations are possible. Another frequently used data storage option is Hadoop HDFS. The previously mentioned spark-cassandra-connector has capabilities to write results to Cassandra, and in the case of batch loading, to read data directly from Cassandra.

Native data output formats available include both JSON and Parquet. The Parquet format in particular is useful for writing to AWS S3. See https://aws.amazon.com/about-aws/whats-new/2018/09/amazon-s3-announces-new-features-for-s3-select/ for more information on querying S3 files stored in Parquet format. A good use case for this is archiving data from Cassandra.

Data Cleansing

Data cleansing involves dealing with questionable data (such as null values) and other preprocessing tasks (such as converting categorical data to mapped integers). Once data is stored in a data frame, it can be transformed into new dataframes based on filters. Other than the fact you have the capability to do this cleansing within the same code (e.g., the Scala script running Spark), Spark does not provide magic to clean data; after all, this takes knowledge about the data and the business to understand and code particular transformation tasks.

Pattern Analysis

Spark dataframes can be easily explored for basic patterns using commands like describe, which will show the count, mean, standard deviation, minimum value, and maximum value of selected columns. Dataframes can be further transformed with functions like groupBy, pivot, and agg (aggregate). Spark SQL can be used for complex joins and similar tasks, using the SQL language that will be familiar to many data analysts.

Machine Learning and Data Mining

Machine learning and data mining encompass a broad range of data modeling algorithms intended to make predictions or to discover unknown meaning within data.

From Spark 2 onward, the main library for Spark machine learning is based on data frames instead of RDDs. You may see this new data frame-based library referred to as Spark ML, but the library name hasn’t changed — it is still MLlib. (See https://spark.apache.org/docs/latest/ml-guide.html.) Some things that are possible with Spark in this area are recommendation engines, anomaly detection, semantic analysis, and risk analysis.

Ad Hoc Queries

Spark SQL is available to use within any code used with Spark, or from the command line interface; however, the requirement to run ad hoc queries generally implies that business end-users want to access a GUI to both ask questions of the data and create visualizations. This activity could take place using the eventual destination datastore as the backend. Directly from Spark, there are enterprise options such as Tableau, which has a Spark connector. For query speed, a memory-based cache such as Apache Ignite could be used as the analytics backend; to maintain that speed by avoiding disk i/o, the data being used for queries should fit into memory.

Visualization

Depending on the programming language and platform used, there may be libraries available to directly visualize results. For example, if Python is used in a Jupyter notebook, then Matplotlib will be available. (See https://matplotlib.org/.) In general, except for investigating data in order to clean it, etc., visualization will be done on the data after it is written to the destination. For business end-users, the above discussion in Ad Hoc Queries applies.

The general architecture for a Spark + Cassandra project is apparent from the discussion of the Data Lifecycle above. The core elements are source data storage, a queueing technology, the Spark cluster, and destination data storage.

In the case of Cassandra, the source data storage is of course a cluster. Spark will only query a single data center, and to avoid load on a production cluster, this is what you want. Most installations will set up a second data center replicating data from the production data center cluster and attach Spark to this second data center. If you are unable to set up a separate data center to connect to Spark (and we strongly recommend setting it up), be sure to carefully tune the write variables in the spark-cassandra-connector. In addition, if Datastax Enterprise is being used, then DSE Search should be isolated on a third data center.

Another consideration is whether to set up Spark on dedicated machines. It is possible to run Spark on the same nodes as Cassandra, and this is the default with DSE Analytics. Again, this is not advised on the main production cluster, but can be done on a second, separate cluster.

Spark in the Cloud

Spark at Google Cloud

Google Cloud offers Dataproc as a fully managed service for Spark (and Hadoop):

https://cloud.google.com/dataproc/

Spark at AWS

AWS supports Spark on EMR: https://aws.amazon.com/emr/features/spark/.

Coding Language Options

Spark code can be written in Python, Scala, Java, or R. SQL can also be used within much of Spark code.

Spark Connector Configuration

Slowing down the throughput (output.throughput_mb_per_sec) can alleviate latency.

For writing, then the Spark batch size (spark.cassandra.output.batch.size.bytes) should be within the Cassandra configured batch size (batch_size_fail_threshold_in_kb).

Writing more frequently will reduce the size of the write, reducing the latency. Increasing the batch size will reduce the number of times Cassandra has to deal with timestamp management. Spark can cause allocation failures if the batch size is too big.

For further documentation on connector settings, see  https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md.

Security has to be explicitly configured in Spark; it is not on by default. However, the configuration doesn’t cover all risk vectors, so review the options carefully. Also, most of these settings can be overridden in code accessing Spark, so it is important to audit your codebase and most important to limit connections from specific hosts further protected by user authentication.

Enable authentication

Authentication is turned off by default. It is enabled through two configuration variables, using a shared secret.

The two configuration variables are spark.authenticate (default is false; set to true) and spark.authenticate.secret (set to string of shared secret). If YARN is used, then much of this is done by default. If not, then set these two in the spark-defaults.conf file.

Then use this secret key when submitting jobs. Note that the secret key can be used to submit jobs by anyone with the key, so protect it well.

Enable logging for all submitted jobs

You can set spark.eventLog.enabled in the spark-defaults.conf file, but it can be overridden in a user’s code (e.g., in the SparkConf) or in shell commands, so it has to be enforced by business policy.

Note also that the log file itself (configured via spark.eventLog.dir) should be protected with filesystem permissions to avoid users snooping data within it.

Block Java debugging options in JVM

Make sure the JVM configuration does not include the following options: -Xdebug, -Xrunjdwp, -agentlib:jdwp.

Redact environment data in WebUI

Can disable the whole UI via spark.ui.enabled, but other than that, or overriding the EnvironmentListener with alternate custom code, there is no way to redact the information in the Environment tab of the UI specifically.

It is recommended to enable ACLs for both the WebUI and the history server, which will protect the entirety of the web-based information.

Enable and enforce SASL RPC encryption

The recommendation with Spark is to enable AES encryption since version 2.2, unless using an external Shuffle service. To enable SASL, set the following to true: spark.authenticate.enableSaslEncryption and spark.network.sasl.serverAlwaysEncrypt.

Enable encryption on all UIs and clients

To enable AES encryption for data going across the wire, in addition to turning on authentication as above, also set the following to true: spark.network.crypto.enabled. Choose a key length and set via spark.network.crypto.keyLength, and choose an algorithm from those available in your JRE and set via spark.network.crypto.keyFactoryAlgorithm.

Don’t forget to also set configuration from any database (e.g., Cassandra) to Spark, to encrypt that traffic.

Enable encryption on Shuffle service

In addition to the above encryption configuration, set the following to true: spark.network.crypto.saslFallback.

To encrypt the temporary files created by the Shuffle service, set this to true: spark.io.encryption.enabled. The key size and algorithm can also be set via spark.io.encryption.keySizeBits and spark.io.encryption.keygen.algorithm, but these have reasonable defaults.

Disable Spark REST server

The REST server presents a serious risk, as it does not allow for encryption. Set the following to false: spark.master.rest.enabled.

Monitoring Spark

Spark has built-in monitoring: https://spark.apache.org/docs/latest/monitoring.html

Interested in working with Valerie? Schedule a tech call.

Related Articles

sstable
cassandra
spark

Spark and Cassandra’s SSTable loader

Arunkumar

11/1/2024

cassandra
spark

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra