For several years, the most advanced parts of analytics of Apache Cassandra™ have been restricted to only DataStax Enterprise customers. In our continuing commitment to lead with code in our community, DataStax is releasing Spark Cassandra Connector(SCC) 2.5 for any user of Cassandra. Not only is this a big change in licensing, but full of great new enhancements we are excited to tell you about. The ability to switch to the use of a new Java driver, support for DataStax Astra, and better support of container orchestration platforms are additional improvements. Let's look to the biggest changes one by one - the full list of the changes is available in the CHANGES.txt.

Use of new Java Driver

The new version of Spark Cassandra Connector is based on the Java driver 4.x that represents a major architectural change from Java driver 3.x. The new version contains a lot of changes, providing better performance, better stability, extensibility, ease of use, and ability to use the same driver for Cassandra and DSE.

One of the biggest user-visible differences is the changes in the way Java driver is configured - now it's possible (and recommended) to configure driver using a configuration file. This method helps in some cases when we need to have more control on the driver's behavior, without the need to expose configuration options via SCC itself. You can prepare a custom configuration file and pass its location to SCC using the configuration parameter spark.cassandra.connection.config.profile.path. But please note that if you're providing this file, all other configuration options will be ignored. Location of the file can be any of the following (more information is in the documentation):

  • a file located on a distributed filesystem, such as, S3/HDFS/DSEFS/… (specified as full path s3a://..., hdfs://..., etc.)
  • a file distributed by Spark with the --files option - in this case, a configuration parameter should specify only the file name
  • a full path to file on every executor
  • a file on the Spark Classpath

The new Java Driver uses different interfaces from the older versions, so an manual usage of the Java Driver, as with CassandraConnector.withSessionDo(), will need to be updated.

Support for DataStax Astra

DataStax Astra is a database-as-a-service based on Apache Cassandra™. You can easily and quickly create Cassandra database(s) and connect to them from your existing applications using the "secure bundle" provided by Astra - a file that contains SSL certificates, and other information necessary for secure connection to your database.

Spark Cassandra Connector 2.5.0 fully supports DataStax Astra - you just need to point to the secure bundle location, and provide user credentials. Location of the secure bundle is specified via configuration parameter spark.cassandra.connection.config.cloud.path, and similarly to driver configuration file, could be one of (more information is in the documentation):

  • a file located on a distributed filesystem, such as, S3/HDFS/DSEFS/… (specified as full path s3a://..., hdfs://..., etc.)
  • a file distributed by Spark with the --files option - in this case, a configuration parameter should specify only the file name
  • a full path to file on every executor
  • a file on the Spark Classpath

For example, we can pass secure bundle via --files option:

spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.0 \
  --files _path_to_secure/secure-connect.zip \
  --conf spark.cassandra.connection.config.cloud.path=secure-connect.zip \
  --conf spark.cassandra.auth.username=MyUserName \
  --conf spark.cassandra.auth.password=MyPassword

and then access to the data inside database in the keyspace with name test (we're using the spacecraft_location_over_time table from demo bundled with DataStax Astra):

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("spacecraft_location_over_time", "test").load
data.printSchema
data.show(5)

In this case we'll get the following output:

root
 |-- spacecraft_name: string (nullable = false)
 |-- journey_id: string (nullable = false)
 |-- reading_time: timestamp (nullable = false)
 |-- location: struct (nullable = true)
 |    |-- x_coordinate: double (nullable = true)
 |    |-- y_coordinate: double (nullable = true)
 |    |-- z_coordinate: double (nullable = true)
 |-- location_unit: string (nullable = true)
+---------------+--------------------+-------------------+--------------------+-------------+
|spacecraft_name|          journey_id|       reading_time|            location|location_unit|
+---------------+--------------------+-------------------+--------------------+-------------+
|       apollo11|fc01e800-9954-11b...|1969-07-16 14:49:39|[14994.0, 14997.0...|     km,km,km|
|       apollo11|fc01e800-9954-11b...|1969-07-16 14:49:38|[14985.0, 14996.0...|     km,km,km|
|       apollo11|fc01e800-9954-11b...|1969-07-16 14:49:37|[14976.0, 14994.0...|     km,km,km|
|       apollo11|fc01e800-9954-11b...|1969-07-16 14:49:36|[14971.0, 14993.0...|     km,km,km|
|       apollo11|fc01e800-9954-11b...|1969-07-16 14:49:35|[14964.0, 14983.0...|     km,km,km|
+---------------+--------------------+-------------------+--------------------+-------------+
only showing top 5 rows

Native Support for Spark Structured Streaming

Previously Spark Cassandra Connector had support for Spark DStreams Streaming, but support for Spark Structured Streaming was available only in DataStax Enterprise. To write data to Cassandra from Spark Structure Streaming jobs, open source users either needed to use a custom Sink implementation, like this, or use foreachBatch available starting with Spark 2.4 (example), making it harder to use Spark Structured Streaming with Cassandra. With the new version of Spark Cassandra Connector, we can do it easily:

import org.apache.spark.sql.cassandra._
val query = streamingCountsDF.writeStream
  .outputMode(OutputMode.Update)
  .option("checkpointLocation", "hdfs:///checkpoint")
  .cassandraFormat("sttest", "test")
  .start()

Direct Join Optimizations for Dataframes

This functionality was initially released in DSE 6.0, and allowed the optimization of dataframes joining, when one side of the join was a dataframe of a Cassandra table. This could give a significant boost in performance, as you didn't need to read all data from Cassandra or need to perform a shuffle and join in Spark. The direct join optimization replaces these costly operations with parallel requests to individual Cassandra partitions, fetching only data that is necessary. More detailed information could be found in the original blog post by Russell Spitzer; and we'll look into specific example.

When you want to get this functionality in open source Spark (or bundled with commercial distributions, except DSE), you need to pass configuration parameter spark.sql.extensions with value com.datastax.spark.connector.CassandraSparkExtensions.

For example, if we have a table in Cassandra with following table structure populated with data:

CREATE TABLE test.jtest (
    id int PRIMARY KEY,
    t text
);

we can perform a join of the dataframe (toJoin in our example) with data in Cassandra:

val toJoin = spark.range(1, 1000).map(x => x.intValue).withColumnRenamed("value", "id")
val dataset = spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(Map("table" -> "jtest", "keyspace" -> "test"))
    .load
val joined = toJoin.join(dataset, dataset("id") === toJoin("id"))

If we look to the execution plan using joined.explain we can see the following output:

== Physical Plan ==
Cassandra Direct Join [id = id#7] test.jtest - Reading (id, t) Pushed {}
+- *(1) Project [value#5 AS id#7]
   +- *(1) SerializeFromObject [input[0, int, false] AS value#5]
      +- *(1) MapElements $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$1934/191004666@3d1c7e7b, obj#4: int
         +- *(1) DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false), obj#3: java.lang.Long
            +- *(1) Range (1, 1000, step=1, splits=8)

where "Cassandra Direct Join" is an indicator that we've performed an optimized join by querying only the necessary data. If we didn't have CassandraSparkExtensions class we would perform a normal Spark join by reading all data from Cassandra and joining data by Spark:

== Physical Plan ==
*(2) BroadcastHashJoin [id#20], [id#22], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
:  +- *(1) Project [value#18 AS id#20]
:     +- *(1) SerializeFromObject [input[0, int, false] AS value#18]
:        +- *(1) MapElements $line21.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$2503/411171709@7e325a4, obj#17: int
:           +- *(1) DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#13L, true, false), obj#16: java.lang.Long
:              +- *(1) Range (1, 1000, step=1, splits=8)
+- *(2) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [id#22,t#23] PushedFilters: [], ReadSchema: struct,t:string>

Time-To-Live (TTL) and Writetime in Dataframes

Sometimes, we need to get access to column metadata, like, WriteTime or TTL - most often this happens when we want to migrate data between two tables, and don't want to overwrite newer data with older version of data, or when we need explicitly set this metadata based on values from external sources. For a long time, this functionality was available only in RDD API, so in Dataframes, you had to use lower-level APIs to read or write this metadata.

With the new release of Spark Cassandra Connector, you can also access this metadata via Dataframe APIs.

For open source Spark you need to pass configuration parameter spark.sql.extensions with value com.datastax.spark.connector.CassandraSparkExtensions.

To read metadata, SCC provides two functions: ttl for getting TTL of specific column (as integer specifying the number of seconds), and writeTime for accessing modification time of specific column (as long specifying the time in microseconds). Please note that metadata is available only for regular columns, and may not be available for non-frozen collection types and UDTs.

For example:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("sttest", "test").load
data.select(ttl("count").as("count_ttl"), writeTime("count").as("count_wt"), 
  $"count", $"location").show

outputs something like this (count column do not have TTLs):

+---------+----------------+-----+--------------------+
|count_ttl|        count_wt|count|            location|
+---------+----------------+-----+--------------------+
|     null|1584366701102528|15358|         Mount Hagen|
|     null|1584366701194987|13192|      Thule Air Base|
+----------+----------------+-----+--------------------+

We can also access this metadata via Spark SQL, by executing some additional code to register functions, if you're using Spark < 3.0.0 (this is a limitation of existing Spark versions):

import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.cassandra.CassandraMetadataFunction
import org.apache.spark.sql.catalyst.expressions.Expression
spark.sessionState.functionRegistry.registerFunction(FunctionIdentifier("ttl"),
        (input: Seq[Expression]) => CassandraMetadataFunction.cassandraTTLFunctionBuilder(input))
spark.sessionState.functionRegistry.registerFunction(FunctionIdentifier("writetime"),
        (input: Seq[Expression]) => CassandraMetadataFunction.cassandraWriteTimeFunctionBuilder(input))

When Spark SQL functions are registered, we can access metadata:

spark.sql("""CREATE TEMPORARY VIEW sttest
     USING org.apache.spark.sql.cassandra
     OPTIONS (
     table "sttest",
     keyspace "test",
     pushdown "true")""")
spark.sql("SELECT location, count, ttl(count), writetime(count) FROM sttest").show

If we want to write a dataframe with metadata, we can specify columns to get the metadata for, with TTL and WriteTime. This is done either via option, like here:

data.select(ttl("count").as("count_ttl"), writeTime("count").as("count_wt"), $"count", $"location")
  .write.cassandraFormat("sttest2", "test")
  .option("writetime", "count_wt")
  .option("ttl", "count_ttl")
  .mode("APPEND")
  .save()

or via two auxiliary functions: withTTL(column) and withWriteTime(column):

import org.apache.spark.sql.cassandra._
data.select(ttl("count").as("count_ttl"), writeTime("count").as("count_wt"), $"count", $"location")
  .write.cassandraFormat("sttest2", "test")
  .withTTL("count_ttl")
  .withWriteTime("count_wt")
  .mode("APPEND")
  .save()

Either way, the TTL or writeTime value from the specified column is applied to the whole row. If you need different values for different columns, you'll need to transform your dataframe. In the latter case, save only the columns that comprise the primary key, plus columns on which you need to set metadata.This is a limitation of Cassandra itself, not Spark, as you can set only one timestamp or TTL per query.

Other optimizations & new functionality

Better support for Cassandra running in container orchestration platforms

When Cassandra is running inside container orchestration platforms, like Kubernetes, or DC/OS, a common problem is that the IP address of the container with Cassandra may change after restart. And this can lead to situations that, after restart of several nodes, Spark jobs aren’t able to reconnect to Cassandra cluster even if contact points were specified as host names, not IPs. Primary reason for this problem was that the previous version of the driver performs host name resolution on initial start, and subsequently always tries to use the same IP.

To mitigate this problem, implementation of SPARKC-571 added an additional configuration property spark.cassandra.connection.resolveContactPoints that, when is set to false, delays the resolution of host names to IPs, so Spark job can safely reconnect to Cassandra or DSE cluster.

Parallelization of IN clauses in Spark SQL

Previously, when we were using the IN clause on partition/primary keys in Spark SQL, or performing corresponding filtering on Dataframes, Spark Cassandra Connector pushed the predicate into Cassandra as is, without converting it into individual requests. As a result, there could be increased load onto coordinator nodes, and potentially timeouts, especially if you put hundreds or thousands of values for partition key column(s). The 2.5.0 release of SCC tries to optimize such queries by converting them into a call to the joinWithCassandraTable function, so only necessary partitions/rows are read from the database in parallel. This decreases the load on the coordinating node, and can make query execution faster. Whether or not the in-clause is parallelized depends on how many values are in the IN clause(s). If the number of values isn't very big, then SCC works normally, by issuing a single request to Cassandra, without a conversion to joinWithCassandraTable. This behavior is controlled by two configuration parameters (see documentation):

  • spark.cassandra.sql.inClauseToJoinConversionThreshold (default is 2500) - queries with more than this number of partition keys specified by IN clauses or combinations of inclauses will be converted to the joinWithCassandraTable. To disable all IN clause conversion, set this setting to 0;
  • spark.cassandra.sql.inClauseToFullScanConversionThreshold (default is 20000000) - sets a upper bound on doing a joinWithCassandraTable operation, if the number of partition keys is above this number a full scan and shuffle will be used instead of a joinWithCassandraTable operation.

More control on creation of tables

Spark Cassandra Connector has long supported the creation of the new tables based on the Dataframe's or RDD's structure using the createCassandraTable function for Dataframes, or saveAsCassandraTable for RDDs. But both methods had limitations:

  • There was no way to specify sorting direction for clustering columns
  • This function always generated CREATE TABLE statement that would fail if table already exist
  • There was no way to specify additional options for a table

SPARKC-524 provides implementation of new functions that mitigate the problems listed above: saveAsCassandraTableEx for RDDs, and based on it, createCassandraTableEx for Dataframes. Both of them allow customization of a table's parameters:

  • Specification of sorting direction via ClusteringColumn.Descending or ClusteringColumn.Ascending
  • Generation of CREATE TABLE IF NOT EXISTS if parameter ifNotExists is set to true
  • Specification of additional table options by passing them as Map[String, String]

For example, if we have a Dataframe with following structure:

root
 |-- id: integer (nullable = false)
 |-- c: integer (nullable = false)
 |-- t: string (nullable = true)

Then the following code:

import com.datastax.spark.connector.cql.ClusteringColumn
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._
data.createCassandraTableEx("test", "test_new", Seq("id"), 
  Seq(("c", ClusteringColumn.Descending)),
  ifNotExists = true, tableOptions = Map("gc_grace_seconds" -> "1000"))

will create a table if it doesn't exist:

CREATE TABLE test.test_new (
    id int,
    c int,
    t text,
    PRIMARY KEY (id, c)
) WITH CLUSTERING ORDER BY (c DESC)
  AND gc_grace_seconds = 1000;

Support for DSE-specific optimizations

The new version of Spark Cassandra Connector includes functionality that supports DSE-specific extensions, so SCC can be used with DSE without performance loss or loss of other optimizations. The new functionality includes:

  • Support for Continuous Paging that significantly speeds up reading data from DSE. Paging is controlled with the spark.dse.continuousPagingEnabled configuration parameter, and enabled by default. See the original blog post for more information about this feature and benchmark results;
  • DSE Search Optimizations - SCC is able to perform predicates pushdown for conditions that pass filtering conditions in the solr_query column. In this case, actual filtering is performed by DSE Search, so the results Spark receives from DSE are already filtered. These optimizations are enabled by default if SCC detects that it is connected with DSE, and the table has DSE Search enabled;
  • Conversions for DSE Types - DSE provides support for several data types not available in Cassandra, for example, geo-spatial types, such as, point, line, and polygon. Spark Cassandra Connector implements converters for these data types so they are represented as columns in Dataframes (as string type, as Spark SQL doesn't have support for these data types), or items of corresponding classes in the RDDs. When saving data back to DSE, these values will be converted into corresponding DSE data types.

To demonstrate last two items, let’s use following table with data:

CREATE TABLE test.gen_events (
  day date,
  point 'PointType',
  line 'LineStringType',
  poly 'PolygonType',
  time_of_occurrence timestamp,
  event_type text,
  a_ map,
  PRIMARY KEY((day, event_type), time_of_occurrence)
) WITH CLUSTERING ORDER BY (time_of_occurrence DESC);
CREATE SEARCH INDEX ON test.gen_events;
INSERT INTO test.gen_events (day, point, time_of_occurrence, event_type, a_ ) 
  VALUES ('2018-10-22', 'POINT(10 10)', '2018-07-18T12:00:00Z', 'accident', {'a_speed':100});
INSERT INTO test.gen_events (day, line, time_of_occurrence, event_type, a_ )
  VALUES ('2018-10-22', 'LINESTRING(10 10, 20 20)', '2018-07-18T11:00:00Z', 
   'trafficjam', {'a_speed':10, 'a_length':10});
INSERT INTO test.gen_events (day, poly, time_of_occurrence, event_type, a_ ) 
  VALUES ('2018-10-22', 'POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))', 
   '2018-07-18T10:00:00Z', 'fog', {'a_speed':50, 'a_visibility':100});
,>

and then read that data, selecting only rows with event_type equal to accident:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("gen_events", "test").load
val filtered = data.where($"solr_query" === "event_type:accident")
filtered.explain
filtered.printSchema
filtered.select("day", "event_type", "time_of_occurrence", "point").show

we can see that SCC pushes filters down to DSE (see the * marker near the filter name, like EqualTo), and converts columns into representation suitable for use in Dataframes (string):

== Physical Plan ==
*(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [day#85,event_type#86,time_of_occurrence#87,a_#88,line#89,point#90,poly#91,solr_query#92] PushedFilters: [*IsNotNull(solr_query), *EqualTo(solr_query,event_type:accident)], ReadSchema: struct,line:string,...
root
 |-- day: date (nullable = false)
 |-- event_type: string (nullable = false)
 |-- time_of_occurrence: timestamp (nullable = false)
 |-- a_: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- line: string (nullable = true)
 |-- point: string (nullable = true)
 |-- poly: string (nullable = true)
 |-- solr_query: string (nullable = true)
+----------+----------+-------------------+-------------+
|       day|event_type| time_of_occurrence|        point|
+----------+----------+-------------------+-------------+
|2018-10-22|  accident|2018-07-18 14:00:00|POINT (10 10)|
+----------+----------+-------------------+-------------+
,event_type:string,time_of_occurrence:timestamp,a_:map

With all this new functionality, it's now possible to replace the use of the BYOS jar with a new version of SCC. This swap requires adjustments in the Spark Connector configuration, as some properties and classes were renamed. The biggest change is renaming of the DseSparkExtensions class as CassandraSparkExtensions, affecting the value of the spark.sql.extensions configuration parameters. Some configuration properties changed spark.sql.dse to spark.cassandra.sql. For a full list of available properties refer to the reference.

Improvements of the development process

One of the largest changes that will be invisible to most users is that the project has been reorganized for ease of development and testing. There are now three core modules:

  • Connector - contains all of the base code for the Spark Cassandra Connector that is Spark specific. Here you can find all of the RDD, Dataframe and Catalyst code. Any code which is not used with a generic connector ended up in this module;
  • Driver - contains all of the code related to working with the Datastax Java Driver. This includes code for type conversions and connection caching. Basically everything required for production but is not strictly tied to Spark;
  • Test-Support - contains all of the code for supporting SCC’s integration tests. Rather than using an embedded Cassandra JVM as in prior versions of the connector, we now use CCM to launch both Cassandra and DSE installations for testing. Using CCM makes it much simpler to test a wider matrix of configurations, ensuring stability regardless of your C* target.

 

Conclusion

With the release of the new version, using Cassandra with Spark has become easier and more efficient. But these new features are just a start in the flow of new features - stay tuned! Keep track by joining the Spark Cassandra Connectror mailing list and DataStax Community.