首页 文章

Spark Scala:任务不可序列化错误

提问于
浏览
4

我正在使用带有Scala插件和spark库的IntelliJ社区版 . 我还在学习Spark并且正在使用Scala工作表 .

我写了下面的代码,删除字符串中的标点符号:

def removePunctuation(text: String): String = {
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}

然后我读了一个文本文件并尝试删除标点符号:

val myfile = sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)

这给出了如下错误,任何帮助将不胜感激:

org.apache.spark.SparkException:在org.apache.spark的org.apache.spark.util.ClosureCleaner $ .ensureSerializable(/home/ubuntu/src/main/scala/Test.sc:294)中无法序列化的任务 . util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(/home/ubuntu/src/main/scala/Test.sc:284)org.apache.spark.util.ClosureCleaner $ .clean(/ home / ubuntu / src / main / scala / Test.sc:104)org.apache.Spark.Spark.span./研究 . 在org.apache.span. / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / .rdd.RDD $$ anonfun $ map $ 1.apply(/home/ubuntu/src/main/scala/Test.sc:366)org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(/ home / ubuntu / src / main / scala / Test.sc:365)atg.apache.spark.rdd.RDDOperationScope $ .withScope(/home/ubuntu/src/main/scala/Test.sc:147)at#worksheet # . #worksheet#(/ home / ubuntu / src / main / scala / Test.sc:108)引起:java.io.NotSerializableException:A $ A21 $ A $ A21序列化堆栈: - 对象不可序列化(类:A $ A21 $ A $ A21, Value :A $ A21 $ A $ A21 @ 62db3891) - 字段(等级:A $ A21 $ A $ A21 $$ anonfun $ words $ 1,姓名: $ outer,类型:class A $ A21 $ A $ A21) - 在org.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala)中的对象(A $ A21 $ A $ A21 $$ anonfun $ words $ 1) :40)org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)位于org.apache.spark.urial上的org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) . 在org.apache.spark.util的org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:288)中的ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:295) .ClosureCleaner $ .clean(ClosureCleaner.scala:108)org.apache.spark.SparkContext.clean(SparkContext.scala:2094)at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD . scala:370)atg.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:369)at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151)at at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112)at org . apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.map(RDD.scala:369)at A $ A21 $ A $ A21.words $ lzycompute(Test . sc:27)A $ A21 $ A $ A21.words(Test.sc:27)A $ A21 $ A $ A21.get $$实例$$字(Test.sc:27)A $ A21 $ . 在Sun.reflect的sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)的sun.reflect.NativeMethodAccessorImpl.invoke0(本地方法)处的A $ A21.main(Test.sc)处的main(Test.sc:73) . 在org.jetbrains.plugins.scala.worksheet.MyWorksheetRunner.main(MyWorksheetRunner.java:22)上的java.lang.reflect.Method.invoke(Method.java:498)中删除了MethodAethorAccess.Ivin(EntatingMethodAccessorImpl.java:43)

3 回答

  • 4

    正如@TGaweda建议的那样,Spark的 SerializationDebugger 非常有助于识别"the serialization path leading from the given object to the problematic object."堆栈跟踪中"Serialization stack"之前的所有美元符号表示方法的容器对象是问题所在 .

    虽然最容易在容器类上使用 Serializable ,但我更喜欢利用Scala是一种函数式语言并将您的函数用作一等公民:

    sc.textFile("/home/ubuntu/data.txt",4).map { text =>
      val punctPattern = "[^a-zA-Z0-9\\s]".r
      punctPattern.replaceAllIn(text, "").toLowerCase
    }
    

    或者如果你真的想把事情分开:

    val removePunctuation: String => String = (text: String) => {
      val punctPattern = "[^a-zA-Z0-9\\s]".r
      punctPattern.replaceAllIn(text, "").toLowerCase
    }
    sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)
    

    这些选项当然是 Regex is serializable,你应该确认 .

    在次要但非常重要的注意事项中,构造 Regex 是昂贵的,因此为了性能而将其从转换中分解出来 - 可能使用broadcast .

  • 2

    阅读stacktrace,有:

    $ outer,类型:A $ A21 $ A $ A21

    这是一个非常好的提示 . 您的lambda是可序列化的,但您的类不可序列化 .

    当你创建lambda表达式时,这个表达式引用了外部类 . 您的案例中的外类不可序列化,即未实现Serializable或其中一个字段不是Serializable的实例

  • 4

    正如T. Gaweda已经指出的那样,你're most likely defining your function in a class that'不可序列化 . 因为它是一个纯函数,即它不依赖于封闭类的任何上下文,我建议你把它放到一个应该扩展 Serializable 的伴随对象中 . 这将是Scala相当于Java静态方法:

    object Helper extends Serializable {
      def removePunctuation(text: String): String = {
        val punctPattern = "[^a-zA-Z0-9\\s]".r
        punctPattern.replaceAllIn(text, "").toLowerCase
      }
    }
    

相关问题