Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

1/15/2018

Reading time:3 min

Read CSV File in Spark and Write it to Cassandra

by John Doe

Vote count:
 
 1
 

 
 




I am trying to read a CVS File with Spark and then save it to Cassandra. Saving to Cassandra is working, when I'm using trivial values.I have a file with the following values:id,name,tag1|tag2|tag3I want to store it in a cassandra table: id bigint, name varchar, tags setI defined a case class for this:case class Item(id:Integer,name:String,tag:Set[String])Then I use this expression for getting the RDD out of the CVS fileval items = sc.textFile("items.csv").map(l => l.split(",") match {case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)})When I now call collect or saveToCassandra on items (which starts the processing) I get the following error:org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 29.0 failed 1 times, most recent failure: Lost task 1.0 in stage 29.0 (TID 38, localhost): scala.MatchError: [Ljava.lang.String;@6030bbe6 (of class [Ljava.lang.String;) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
 edited Apr 30 '15 at 12:15

 
 

 asked Apr 30 '15 at 11:48

 mniehoff
 
 2191213
 

 2 Answers
 2
Vote count:
 
 2
 



 accepted


As mentioned, the issue is that splitting on some inputs is generating an array that has less or more than the 3 elements used in the match. But the partialFuntion used to do the match can be used to filter on the elements that do fit the match criteria. rdd.collect{partialFunction} is exactly meant for that:val data = sc.textFile("items.csv")val arrayData = data.map(l => l.split(","))val items = arrayData.collect{case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)}) items.saveToCassandra(...)Note1: you should also protect against dirty values. e.g. parseInt on a value that's not an int number,...) Note2: rdd.collect{partialFunc} (filters/map data using a partial function) should not be confused with rdd.collect (get back data to the driver))
 answered Apr 30 '15 at 14:57

 maasg
 
 27.4k76093
 
Vote count:
 
 1
 





You'll get that match error if your input isn't an array of 3 entries e.g.String("a,b").split(",") match { case Array(a,b,c) => ....}so I suspect this is some input data issue, and you need to cater for it in your match.
 answered Apr 30 '15 at 11:55

 Brian Agnew
 
 211k25262371
 

Illustration Image

Vote count: 1

I am trying to read a CVS File with Spark and then save it to Cassandra. Saving to Cassandra is working, when I'm using trivial values.

I have a file with the following values:

id,name,tag1|tag2|tag3

I want to store it in a cassandra table:

id bigint, name varchar, tags set

I defined a case class for this:

case class Item(id:Integer,name:String,tag:Set[String])

Then I use this expression for getting the RDD out of the CVS file

val items = sc.textFile("items.csv").map(l => l.split(",") match {case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)})

When I now call collect or saveToCassandra on items (which starts the processing) I get the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 29.0 failed 1 times, most recent failure: Lost task 1.0 in stage 29.0 (TID 38, localhost): scala.MatchError: [Ljava.lang.String;@6030bbe6 (of class [Ljava.lang.String;) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

asked Apr 30 '15 at 11:48
mniehoff
2191213

2 Answers 2

Vote count: 2 accepted

As mentioned, the issue is that splitting on some inputs is generating an array that has less or more than the 3 elements used in the match.

But the partialFuntion used to do the match can be used to filter on the elements that do fit the match criteria. rdd.collect{partialFunction} is exactly meant for that:

val data = sc.textFile("items.csv")
val arrayData = data.map(l => l.split(","))
val items = arrayData.collect{case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)})
 items.saveToCassandra(...)
  • Note1: you should also protect against dirty values. e.g. parseInt on a value that's not an int number,...)
  • Note2: rdd.collect{partialFunc} (filters/map data using a partial function) should not be confused with rdd.collect (get back data to the driver))
answered Apr 30 '15 at 14:57
maasg
27.4k76093

Vote count: 1

You'll get that match error if your input isn't an array of 3 entries e.g.

String("a,b").split(",") match {
   case Array(a,b,c) => ....
}

so I suspect this is some input data issue, and you need to cater for it in your match.

answered Apr 30 '15 at 11:55
Brian Agnew
211k25262371

Related Articles

python
cassandra
spark

GitHub - andreia-negreira/Data_streaming_project: Data streaming project with robust end-to-end pipeline, combining tools such as Airflow, Kafka, Spark, Cassandra and containerized solution to easy deployment.

andreia-negreira

12/2/2023

cassandra
spark

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra