Vote count: 1

I am trying to read a CVS File with Spark and then save it to Cassandra. Saving to Cassandra is working, when I'm using trivial values.

I have a file with the following values:

id,name,tag1|tag2|tag3

I want to store it in a cassandra table:

id bigint, name varchar, tags set

I defined a case class for this:

case class Item(id:Integer,name:String,tag:Set[String])

Then I use this expression for getting the RDD out of the CVS file

val items = sc.textFile("items.csv").map(l => l.split(",") match {case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)})

When I now call collect or saveToCassandra on items (which starts the processing) I get the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 29.0 failed 1 times, most recent failure: Lost task 1.0 in stage 29.0 (TID 38, localhost): scala.MatchError: [Ljava.lang.String;@6030bbe6 (of class [Ljava.lang.String;) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:33) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

asked Apr 30 '15 at 11:48
mniehoff
2191213

2 Answers 2

Vote count: 2 accepted

As mentioned, the issue is that splitting on some inputs is generating an array that has less or more than the 3 elements used in the match.

But the partialFuntion used to do the match can be used to filter on the elements that do fit the match criteria. rdd.collect{partialFunction} is exactly meant for that:

val data = sc.textFile("items.csv")
val arrayData = data.map(l => l.split(","))
val items = arrayData.collect{case Array (a,b,c) => Item(Integer.parseInt(a),b,c.split("\\|").toSet)})
 items.saveToCassandra(...)
  • Note1: you should also protect against dirty values. e.g. parseInt on a value that's not an int number,...)
  • Note2: rdd.collect{partialFunc} (filters/map data using a partial function) should not be confused with rdd.collect (get back data to the driver))
answered Apr 30 '15 at 14:57
maasg
27.4k76093

Vote count: 1

You'll get that match error if your input isn't an array of 3 entries e.g.

String("a,b").split(",") match {
   case Array(a,b,c) => ....
}

so I suspect this is some input data issue, and you need to cater for it in your match.

answered Apr 30 '15 at 11:55
Brian Agnew
211k25262371