Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

6/18/2021

Reading time:8 min

Exporting Cassandra time series data to S3 for data analysis with Spark

by John Doe

Nov 29, 2018 · 6 min readThis article describes a way to periodically move on-premise Cassandra data to S3 for analysis. We’re been using this approach successfully over the last few months in order to get the best of both worlds for an early-stage platform such as 1200.aero: The cost effectiveness of on-premise hosting for a stable, live workload, and the on-demand scalability of AWS for data analysis and machine learning.Our end goal was to upload data to S3 organized as follows:/adsb/messages/year=2018/month=11/day=06To that effect, we performed the following steps, described in detail in this article:Reorganize the data using Cassandra materialized viewsUse Spark to read Cassandra data efficiently as a time seriesPartition the Spark dataset as a time seriesSave the dataset to S3 as ParquetAnalyze the data in AWSFor your reference, we used Cassandra 3.11 and Spark 2.3.1, both straight open source versions.1. Building a Django POST face-detection API using OpenCV and Haar Cascades2. Understanding and building Generative Adversarial Networks(GANs)3. Learning from mistakes with Hindsight Experience Replay4. Predicting buying behavior using Machine LearningStep 1: Reorganize the dataThe data we want to analyze consists of millions of low-altitude air traffic messages captured from ADS-B receivers at a number of regional airports. The original table definition is:CREATE TABLE messages (icao text,gentime timestamp,alert boolean,alt int,callsign text,emerg boolean,gndspd int,ident boolean,isgnd boolean,lat float,lon float,sqwk text,station text,trk int,vspd int,PRIMARY KEY (icao, gentime)) WITH CLUSTERING ORDER BY (gentime ASC)Notice the primary key is (icao, gentime). Cassandra data models are designed with a specific data retrieval use case in mind. Our applications query data by aircraft identifier, so our partitioning column is the aircraft’s icao (a 6-character hex transponder code), followed by gentime (a timestamp) as the clustering column that orders records chronologically within each partition.Problem: In order to periodically export a day/week/month’s worth of data, we needed to be able to retrieve records by date. Since such attribute did not originally exist, we had to:Create and populate a new day column on the existing table.Define the right Cassandra data structure to enable efficient data retrieval using the new day column.Update our backend to populate the new field for incoming data.Defining a new date column in Cassandra is straightforward:alter table messages add day date;However, populating it proved more complex than anticipated: We had to update several hundred million records and did not find a suitable CQL UPDATE statement to do so in place (i.e. deriving day from the gentime timestamp). After evaluating all alternatives, we ended up doing this:Export the primary key for every record to a CSV file using CQL:copy messages(icao, gentime) to ‘messages_pk_all.csv’ with header = false;Output file:A3A23C,2017–10–06 15:37:06.000+0000A3A23C,2017–10–06 15:37:07.000+0000A3A23C,2017–10–06 15:37:08.000+0000A3A23C,2017–10–06 15:37:09.000+0000A3A23C,2017–10–06 15:37:11.000+0000Update the file in Unix to create a new field based on the existing timestamp:cat messages_pk_all.csv | tr -dc '[:print:]\n' | awk -F "," '{ split($2, subfield, " "); print $1 "," $2 "," subfield[1] }' > messages_pk_day.csvThe above yielded a file with the new attribute:A3A23C,2017–10–06 15:37:06.000+0000,2017–10–06A3A23C,2017–10–06 15:37:07.000+0000,2017–10–06A3A23C,2017–10–06 15:37:08.000+0000,2017–10–06A3A23C,2017–10–06 15:37:09.000+0000,2017–10–06A3A23C,2017–10–06 15:37:11.000+0000,2017–10–06Import the modified file into the same table, which updates every record with just the new column value, leaving all other fields intact:copy messages(icao, gentime, day) from ‘messages_pk_day.csv’;The import took several hours, but worked flawlessly. In hindsight, it’s still far from ideal and required some research to guarantee the safety of the data, but given the lack of support in Cassandra, it’s as good a solution we could find.Prior to creating our first Cassandra materialized view, we considered secondary indexes. However, the many caveats and words of caution from experienced users led us in the direction of materialized views.CREATE MATERIALIZED VIEW messages_by_day ASSELECT *FROM messagesWHERE day IS NOT NULL AND icao IS NOT NULL AND gentime IS NOT NULLPRIMARY KEY (day, icao, gentime)WITH CLUSTERING ORDER BY (icao ASC, gentime ASC)Materialized view creation ran in background for ~12 hours as it duplicated and organized the data. This query reports any builds in progress:SELECT view_name, generation_number, last_token FROM system.views_builds_in_progress;view_name | generation_number | last_token----------------+-------------------+--------------------messages_by_day | 0 | 2527298856676644032To date, we have not observed a significant performance overhead associated with the materialized view. DataStax found that every materialized view takes away ~10% of the insert performance on a given table.Step 2: Read Cassandra data efficiently as a time series using SparkOnce we had the means to retrieve the data by date, we had to ensure that the Spark Cassandra connector issued the right queries to retrieve the data efficiently.Spark configuration parameters for the Cassandra connector (passed when creating a SparkConf, via spark-submit, sparkmagic in Jupyter, etc):spark.cassandra.connection.host: “host1,host2”,spark.cassandra.auth.username: “username”,spark.cassandra.auth.password: “password”,spark.cassandra.input.consistency.level: ALLImport the right packages, define a dataset on the Cassandra table:import org.apache.spark.sql._import org.apache.spark.sql.cassandra._val messagesDf = spark.read.cassandraFormat(“messages_by_day”, “adsb”).load()Filter data for a specific date:import java.sql.Dateval messagesLastDayDf = messagesDf.filter(messagesDf("day") === Date.valueOf("2018-11-06"))…or for a number of dates (discrete dates must be specified in an IN condition):val messagesLastWeekDf = messagesDf.filter(messagesDf(“day”) isin (Date.valueOf("2018-11-06"), Date.valueOf("2018-11-05"), Date.valueOf("2018-11-04"), ... ))To verify that your filter will be applied to the Cassandra table, print the physical plan with df.explain(). There should be predicates (i.e. actual values) being pushed down to the data source, as in these examples:messagesLastDayDf.explain()PushedFilters: [IsNotNull(day), *EqualTo(day,2018-11-06)]messagesLastWeekDf.explain()PushedFilters: [*In(day, [2018-11-06,2018-11-05,2018-11-04])]Important: Attempts to filter the dataset that don’t conform to the connector documentation will cause Spark to issue a full table scan to Cassandra (and thus defeat the purpose of reorganizing your data as done in Step 1).Step 3: Partition the Spark dataset as a time seriesAs a reminder, our goal was to lay the data in S3 partitioned by date, example:/adsb/messages/year=2018/month=11/day=06So, rather than partitioning the dataset by the whole date string (e.g. "2018–11–06"), we needed to partition by its components (year, month, day). Hence, we had to first convert the day column into year, month and day columns:val partMsgsLastWeekDf = messagesLastWeekDf.withColumnRenamed(“day”, “date”).withColumn(“year”, year(col(“date”))).withColumn(“month”, month(col(“date”))).withColumn(“day”, dayofmonth(col(“date”))).drop(“date”)At long last, we had our data available in Spark with the necessary attributes and partitioned by the criteria we needed.Dataset with three new columns (year, month, day), ready to be partitioned and written as ParquetStep 4: Save your Spark dataset to S3 as ParquetSetting up Spark to push data to S3 was pretty easy. Required AWS libraries are included in the Hadoop distribution Spark depends upon. All we had to configure were the following five entries in $SPARK_HOME/conf/spark-defaults.conf:spark.executor.extraClassPath /opt/hadoop-2.7.7/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar:/opt/hadoop-2.7.7/share/hadoop/tools/lib/hadoop-aws-2.7.7.jarspark.driver.extraClassPath /opt/hadoop-2.7.7/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar:/opt/hadoop-2.7.7/share/hadoop/tools/lib/hadoop-aws-2.7.7.jarspark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystemspark.hadoop.fs.s3a.awsAccessKeyId <your AWS access key id>spark.hadoop.fs.s3a.awsSecretAccessKey <your AWS secret access key>Now we were ready to partition and push the data to S3 in a single instruction:partMsgsLastWeekDf.write.partitionBy(“year”, “month”, “day”).mode(SaveMode.Append).parquet(“s3a://mybucket/adsb/messages”)Performance of the above statement is surprisingly fast (in part due to Parquet files being much smaller than their CSV equivalents). Loading a day’s worth of records (~500,000 rows) completes in less than one minute.We use SaveMode.Append to make it possible to load additional days without deleting existing data.Note: The s3a URL prefix has desirable performance and capacity implications for large file operations such as Parquet.Step 5: Analyze your data in AWSFrom here, sky is the limit. You can spin an EMR cluster and query S3 directly with an arbitrary number of cluster nodes. Alternatively, you can also launch a Sagemaker notebook and point it to EMR Spark following similar steps to those outlined in my previous article. And, as of the time of writing, Boto3, the AWS SDK for Python, now makes it possible to issue basic SQL queries against Parquet files in S3.Future articles will describe how 1200.aero is using these data to predict potentially hazardous situations for general aviation aircraft.

Illustration Image
Ivan Vasquez

Nov 29, 2018 · 6 min read

This article describes a way to periodically move on-premise Cassandra data to S3 for analysis. We’re been using this approach successfully over the last few months in order to get the best of both worlds for an early-stage platform such as 1200.aero: The cost effectiveness of on-premise hosting for a stable, live workload, and the on-demand scalability of AWS for data analysis and machine learning.

Our end goal was to upload data to S3 organized as follows:

/adsb/messages/year=2018/month=11/day=06

To that effect, we performed the following steps, described in detail in this article:

  1. Reorganize the data using Cassandra materialized views
  2. Use Spark to read Cassandra data efficiently as a time series
  3. Partition the Spark dataset as a time series
  4. Save the dataset to S3 as Parquet
  5. Analyze the data in AWS

For your reference, we used Cassandra 3.11 and Spark 2.3.1, both straight open source versions.

Step 1: Reorganize the data

The data we want to analyze consists of millions of low-altitude air traffic messages captured from ADS-B receivers at a number of regional airports. The original table definition is:

CREATE TABLE messages (
icao text,
gentime timestamp,
alert boolean,
alt int,
callsign text,
emerg boolean,
gndspd int,
ident boolean,
isgnd boolean,
lat float,
lon float,
sqwk text,
station text,
trk int,
vspd int,
PRIMARY KEY (icao, gentime)
) WITH CLUSTERING ORDER BY (gentime ASC)

Notice the primary key is (icao, gentime). Cassandra data models are designed with a specific data retrieval use case in mind. Our applications query data by aircraft identifier, so our partitioning column is the aircraft’s icao (a 6-character hex transponder code), followed by gentime (a timestamp) as the clustering column that orders records chronologically within each partition.

Problem: In order to periodically export a day/week/month’s worth of data, we needed to be able to retrieve records by date. Since such attribute did not originally exist, we had to:

  1. Create and populate a new day column on the existing table.
  2. Define the right Cassandra data structure to enable efficient data retrieval using the new day column.
  3. Update our backend to populate the new field for incoming data.

Defining a new date column in Cassandra is straightforward:

alter table messages add day date;

However, populating it proved more complex than anticipated: We had to update several hundred million records and did not find a suitable CQL UPDATE statement to do so in place (i.e. deriving day from the gentime timestamp). After evaluating all alternatives, we ended up doing this:

Export the primary key for every record to a CSV file using CQL:

copy messages(icao, gentime) to ‘messages_pk_all.csv’ with header = false;

Output file:

A3A23C,2017–10–06 15:37:06.000+0000
A3A23C,2017–10–06 15:37:07.000+0000
A3A23C,2017–10–06 15:37:08.000+0000
A3A23C,2017–10–06 15:37:09.000+0000
A3A23C,2017–10–06 15:37:11.000+0000

Update the file in Unix to create a new field based on the existing timestamp:

cat messages_pk_all.csv | tr -dc '[:print:]\n' | awk -F "," '{ split($2, subfield, " "); print $1 "," $2 "," subfield[1]  }'  > messages_pk_day.csv

The above yielded a file with the new attribute:

A3A23C,2017–10–06 15:37:06.000+0000,2017–10–06
A3A23C,2017–10–06 15:37:07.000+0000,2017–10–06
A3A23C,2017–10–06 15:37:08.000+0000,2017–10–06
A3A23C,2017–10–06 15:37:09.000+0000,2017–10–06
A3A23C,2017–10–06 15:37:11.000+0000,2017–10–06

Import the modified file into the same table, which updates every record with just the new column value, leaving all other fields intact:

copy messages(icao, gentime, day) from ‘messages_pk_day.csv’;

The import took several hours, but worked flawlessly. In hindsight, it’s still far from ideal and required some research to guarantee the safety of the data, but given the lack of support in Cassandra, it’s as good a solution we could find.

Prior to creating our first Cassandra materialized view, we considered secondary indexes. However, the many caveats and words of caution from experienced users led us in the direction of materialized views.

CREATE MATERIALIZED VIEW messages_by_day AS
SELECT *
FROM messages
WHERE day IS NOT NULL AND icao IS NOT NULL AND gentime IS NOT NULL
PRIMARY KEY (day, icao, gentime)
WITH CLUSTERING ORDER BY (icao ASC, gentime ASC)

Materialized view creation ran in background for ~12 hours as it duplicated and organized the data. This query reports any builds in progress:

SELECT view_name, generation_number, last_token 
FROM system.views_builds_in_progress;view_name | generation_number | last_token
----------------+-------------------+--------------------
messages_by_day | 0 | 2527298856676644032

To date, we have not observed a significant performance overhead associated with the materialized view. DataStax found that every materialized view takes away ~10% of the insert performance on a given table.

Step 2: Read Cassandra data efficiently as a time series using Spark

Once we had the means to retrieve the data by date, we had to ensure that the Spark Cassandra connector issued the right queries to retrieve the data efficiently.

Spark configuration parameters for the Cassandra connector (passed when creating a SparkConf, via spark-submit, sparkmagic in Jupyter, etc):

spark.cassandra.connection.host: “host1,host2”,
spark.cassandra.auth.username: “username”,
spark.cassandra.auth.password: “password”,
spark.cassandra.input.consistency.level: ALL

Import the right packages, define a dataset on the Cassandra table:

import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._val messagesDf = spark.read.cassandraFormat(“messages_by_day”, “adsb”).load()

Filter data for a specific date:

import java.sql.Dateval messagesLastDayDf = messagesDf.filter(messagesDf("day") === Date.valueOf("2018-11-06"))

…or for a number of dates (discrete dates must be specified in an IN condition):

val messagesLastWeekDf = messagesDf.filter(messagesDf(“day”) isin (Date.valueOf("2018-11-06"), Date.valueOf("2018-11-05"), Date.valueOf("2018-11-04"), ... ))

To verify that your filter will be applied to the Cassandra table, print the physical plan with df.explain(). There should be predicates (i.e. actual values) being pushed down to the data source, as in these examples:

messagesLastDayDf.explain()
PushedFilters: [IsNotNull(day), *EqualTo(day,2018-11-06)]
messagesLastWeekDf.explain()
PushedFilters: [*In(day, [2018-11-06,2018-11-05,2018-11-04])]

Important: Attempts to filter the dataset that don’t conform to the connector documentation will cause Spark to issue a full table scan to Cassandra (and thus defeat the purpose of reorganizing your data as done in Step 1).

Step 3: Partition the Spark dataset as a time series

As a reminder, our goal was to lay the data in S3 partitioned by date, example:

/adsb/messages/year=2018/month=11/day=06

So, rather than partitioning the dataset by the whole date string (e.g. "2018–11–06"), we needed to partition by its components (year, month, day). Hence, we had to first convert the day column into year, month and day columns:

val partMsgsLastWeekDf = messagesLastWeekDf.withColumnRenamed(“day”, “date”).withColumn(“year”, year(col(“date”))).withColumn(“month”, month(col(“date”))).withColumn(“day”, dayofmonth(col(“date”))).drop(“date”)

At long last, we had our data available in Spark with the necessary attributes and partitioned by the criteria we needed.

Dataset with three new columns (year, month, day), ready to be partitioned and written as Parquet

Step 4: Save your Spark dataset to S3 as Parquet

Setting up Spark to push data to S3 was pretty easy. Required AWS libraries are included in the Hadoop distribution Spark depends upon. All we had to configure were the following five entries in $SPARK_HOME/conf/spark-defaults.conf:

spark.executor.extraClassPath /opt/hadoop-2.7.7/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar:/opt/hadoop-2.7.7/share/hadoop/tools/lib/hadoop-aws-2.7.7.jarspark.driver.extraClassPath /opt/hadoop-2.7.7/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar:/opt/hadoop-2.7.7/share/hadoop/tools/lib/hadoop-aws-2.7.7.jarspark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystemspark.hadoop.fs.s3a.awsAccessKeyId <your AWS access key id>
spark.hadoop.fs.s3a.awsSecretAccessKey <your AWS secret access key>

Now we were ready to partition and push the data to S3 in a single instruction:

partMsgsLastWeekDf.write.partitionBy(“year”, “month”, “day”).mode(SaveMode.Append).parquet(“s3a://mybucket/adsb/messages”)

Performance of the above statement is surprisingly fast (in part due to Parquet files being much smaller than their CSV equivalents). Loading a day’s worth of records (~500,000 rows) completes in less than one minute.

We use SaveMode.Append to make it possible to load additional days without deleting existing data.

Note: The s3a URL prefix has desirable performance and capacity implications for large file operations such as Parquet.

Step 5: Analyze your data in AWS

From here, sky is the limit. You can spin an EMR cluster and query S3 directly with an arbitrary number of cluster nodes. Alternatively, you can also launch a Sagemaker notebook and point it to EMR Spark following similar steps to those outlined in my previous article. And, as of the time of writing, Boto3, the AWS SDK for Python, now makes it possible to issue basic SQL queries against Parquet files in S3.

Future articles will describe how 1200.aero is using these data to predict potentially hazardous situations for general aviation aircraft.

Related Articles

cluster
troubleshooting
datastax

GitHub - arodrime/Montecristo: Datastax Cluster Health Check Tooling

arodrime

4/3/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra