I have the following code, where the fault is at sc.parallelize()
val pairs = ret.cartesian(ret)
.map {
case ((k1, v1), (k2, v2)) => ((k1, k2), (v1.toList, v2.toList))
}
for (pair <- pairs) {
val test = sc.parallelize(pair._2._1.map(_._1 ))
}
Where
- k1, k2 are strings
- v1, v2 are lists of doubles
I am getting the following error whenever I try to access sc. What am I doing wrong here?
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
at CorrelationCalc$.main(CorrelationCalc.scala:33)
at CorrelationCalc.main(CorrelationCalc.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
– object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@40bee8c5)
– field (class: CorrelationCalc$$anonfun$main$1, name: sc$1, type: class org.apache.spark.SparkContext)
– object (class CorrelationCalc$$anonfun$main$1, )
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
… 20 more
Best Answer
The for-comprehension is just doing a pairs.map()
RDD operations are performed by the workers and to have them do that work, anything you send to them must be serializable. The SparkContext is attached to the master: it is responsible for managing the entire cluster.
If you want to create an RDD, you have to be aware of the whole cluster (that's the 2nd "D" --- distributed) so you can't create a new RDD on the workers. And you probably don't want to turn each row in pairs into an RDD (and each with the same name!) anyway.
It's difficult to tell from your code what you'd like to do, but it will probably look like
which would be an RDD where each row is whatever was in the v1.toList's