以下程序尝试为每个ROW调用3个函数(在RDD映射中):
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
class TagCalculation extends Serializable {
def test1(x: String) = x + " test1"
def test2(x: String) = x + "test2"
def test3(x: String) = x + "test3"
def test5(arg1: java.lang.Integer, arg2: String, arg3: scala.collection.immutable.$colon$colon[Any]) = "test mix2"
}
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone")))
val get_test = new TagCalculation
val field = Array("test1","test2","test3")
val bb = df.rdd.map(row => {
val reValue1 = "start"
val ret = for(every <- field)
yield {
val test_para = Array(reValue1)
val argtypes = test_para.map(_.getClass)
val method4 = get_test.getClass.getMethod(every, argtypes: _*)
val bbq = method4.invoke(get_test, test_para: _*)
if (field.last == every)
bbq
}
ret.last
})
但有些错误输出:
org.apache.spark.SparkException:org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304)org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $的任务不可序列化util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294)atg.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122)at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)在org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:314)at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:313) org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:147)atg.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:108)at org.apache.spark.rdd.RDD .withScope(RDD.scala:306)atg.apache.spark.rdd.RDD.map(RDD.scala:313)........ org.apache.spark.deploy.SparkSubmit $ .submit( SparkSubmit.scala:205)atg.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:120)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)引起by:java.io.NotSerializableException:org.json4s.DefaultFormats $
有什么指针吗?
它可能是由“隐式val格式= DefaultFormats”引起的 . 但我需要在“ Map ”之前提取 Value .
1 回答
问题是因为您在
calling class
中定义了TagCalculation
类,您初始化并使用该对象 . 只需将它移到calling class
之外或将其设为separate class
,就应该解决NotSerializableException
的问题 .