首页 文章

任务不可序列化:java.io.NotSerializableException仅在类而不是对象上调用闭包外的函数时

提问于
浏览
193

在闭包之外调用函数时会出现奇怪的行为:

  • 当函数在一个对象中时,一切正常

  • 当函数在类get时:

任务不可序列化:java.io.NotSerializableException:testing

问题是我需要在类中的代码而不是对象 . 知道为什么会这样吗? Scala对象是否已序列化(默认?)?

这是一个有效的代码示例:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

这是一个非工作的例子:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

6 回答

  • 31

    我不认为其他答案是完全正确的 . RDDs are indeed serializable,所以这不是导致任务失败的原因 .

    Spark是一个分布式计算引擎,它的主要抽象是一个弹性分布式数据集( RDD ),可以看作是一个分布式集合 . 基本上,RDD的元素在集群的节点之间进行分区,但Spark将其从用户抽象出来,让用户与RDD(集合)进行交互,就好像它是本地的一样 .

    不要涉及太多细节,但是当你在RDD( mapflatMapfilter 和其他)上运行不同的转换时,你的转换代码(闭包)是:

    • 在驱动程序节点上序列化,

    • 发送到集群中的相应节点,

    • 反序列化,

    • 并最终在节点上执行

    您当然可以在本地运行(如您的示例所示),但所有这些阶段(除了通过网络传输)仍然会发生 . [这使您可以在部署到 生产环境 之前捕获任何错误]

    在第二种情况下会发生的情况是,您正在调用一个方法,该方法在map函数内部的 testing 类中定义 . Spark看到了这一点,并且由于方法无法自行序列化,因此Spark会尝试序列化整个 testing 类,以便代码在另一个JVM中执行时仍然可以工作 . 你有两种可能性:

    要么使类测试可序列化,所以Spark可以序列化整个类:

    import org.apache.spark.{SparkContext,SparkConf}
    
    object Spark {
      val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
    }
    
    object NOTworking extends App {
      new Test().doIT
    }
    
    class Test extends java.io.Serializable {
      val rddList = Spark.ctx.parallelize(List(1,2,3))
    
      def doIT() =  {
        val after = rddList.map(someFunc)
        after.collect().foreach(println)
      }
    
      def someFunc(a: Int) = a + 1
    }
    

    或者您使用 someFunc 函数而不是方法(函数是Scala中的对象),以便Spark能够序列化它:

    import org.apache.spark.{SparkContext,SparkConf}
    
    object Spark {
      val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
    }
    
    object NOTworking extends App {
      new Test().doIT
    }
    
    class Test {
      val rddList = Spark.ctx.parallelize(List(1,2,3))
    
      def doIT() =  {
        val after = rddList.map(someFunc)
        after.collect().foreach(println)
      }
    
      val someFunc = (a: Int) => a + 1
    }
    

    类序,但类序列化不同的问题可能是您感兴趣的,您可以阅读它in this Spark Summit 2013 presentation .

    作为旁注,您可以将 rddList.map(someFunc(_)) 重写为 rddList.map(someFunc) ,它们完全相同 . 通常,第二种是优选的,因为它不那么冗长和清晰 .

    编辑(2015-03-15):SPARK-5307介绍 SerializationDebugger 和Spark 1.3.0是第一个使用它的版本 . 它添加了NotSerializableException的序列化路径 . 遇到NotSerializableException时,调试器访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户查找对象 .

    在OP的情况下,这是打印到stdout的内容:

    Serialization stack:
        - object not serializable (class: testing, value: testing@2dfe2f00)
        - field (class: testing$$anonfun$1, name: $outer, type: class testing)
        - object (class testing$$anonfun$1, <function1>)
    
  • 24

    Grega's answer很好地解释了为什么原始代码不起作用以及解决问题的两种方法 . 但是,这个解决方案不是很灵活;考虑一下你的闭包包含一个你无法控制的非 Serializable 类的方法调用的情况 . 您既不能将 Serializable 标记添加到此类,也不能更改底层实现以将方法更改为函数 .

    Nilesh为此提供了一个很好的解决方法,但解决方案可以更加简洁和通用:

    def genMapper[A, B](f: A => B): A => B = {
      val locker = com.twitter.chill.MeatLocker(f)
      x => locker.get.apply(x)
    }
    

    然后,此函数序列化程序可用于自动包装闭包和方法调用:

    rdd map genMapper(someFunc)
    

    这项技术的好处是不需要额外的Shark依赖来访问 KryoSerializationWrapper ,因为Twitter的Chill已经被核心Spark所吸引

  • 6

    完整的讲话充分解释了这个问题,提出了一个很好的范式转换方法来避免这些序列化问题:https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md

    最高投票的答案基本上是建议丢掉整个语言功能 - 不再使用方法而只使用功能 . 确实在函数式编程中应该避免类中的方法,但是将它们转换为函数并不能解决这里的设计问题(参见上面的链接) .

    作为这种特殊情况的快速修复,你可以使用 @transient 注释告诉它不要尝试序列化有问题的值(这里, Spark.ctx 是一个自定义类而不是Spark 's one following OP'命名):

    @transient
    val rddList = Spark.ctx.parallelize(list)
    

    您还可以重新构建代码,以便rddList存在于其他位置,但这也是讨厌 .

    未来可能是孢子

    在未来,Scala将包含这些被称为“孢子”的东西,这些东西应该允许我们精细控制粒子控制什么做和不完全被闭合拉入 . 此外,这应该将所有错误意外地将非可序列化类型(或任何不需要的值)引入编译错误,而不是现在这是可怕的运行时异常/内存泄漏 .

    http://docs.scala-lang.org/sips/pending/spores.html

    关于Kryo序列化的提示

    使用kyro时,请将其设置为必须注册,这意味着您会收到错误而不是内存泄漏:

    “最后,我知道kryo有kryo.setRegistrationOptional(true)但是我很难找到如何使用它 . 当这个选项打开时,如果我没有注册,kryo似乎仍然会抛出异常类“ .

    Strategy for registering classes with kryo

    当然,这只能为您提供类型级控制而不是值级控制 .

    ......更多想法来了 .

  • 7

    我用不同的方法解决了这个问题 . 您只需要在通过闭包之前序列化对象,然后进行反序列化 . 即使您的类不是Serializable,这种方法也可以正常工作,因为它在幕后使用Kryo . 你需要的只是一些咖喱 . ;)

    这是我如何做到的一个例子:

    def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
                   (foo: Foo) : Bar = {
        kryoWrapper.value.apply(foo)
    }
    val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
    rdd.flatMap(mapper).collectAsMap()
    
    object Blah(abc: ABC) extends (Foo => Bar) {
        def apply(foo: Foo) : Bar = { //This is the real function }
    }
    

    随意使Blah像你想要的那样复杂,类,伴随对象,嵌套类,对多个第三方库的引用 .

    KryoSerializationWrapper是指:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

  • 284

    我不完全确定这适用于Scala,但在Java中,我通过重构我的代码解决了 NotSerializableException ,因此闭包没有访问不可序列化的 final 字段 .

  • 8

    我遇到了类似的问题,我从Grega's answer得到的是

    object NOTworking extends App {
     new testing().doIT
    }
    //adding extends Serializable wont help
    class testing {
    
    val list = List(1,2,3)
    
    val rddList = Spark.ctx.parallelize(list)
    
    def doIT =  {
      //again calling the fucntion someFunc 
      val after = rddList.map(someFunc(_))
      //this will crash (spark lazy)
      after.collect().map(println(_))
    }
    
    def someFunc(a:Int) = a+1
    
    }
    

    您的 doIT 方法正在尝试序列化 someFunc(_) 方法,但由于方法不可序列化,它会尝试序列化类 testing ,这也是不可序列化的 .

    所以让你的代码工作,你应该在 doIT 方法中定义 someFunc . 例如:

    def doIT =  {
     def someFunc(a:Int) = a+1
      //function definition
     }
     val after = rddList.map(someFunc(_))
     after.collect().map(println(_))
    }
    

    如果有多个功能出现在图片中,那么所有这些功能都应该可用于父上下文 .

相关问题