首页 文章

Spark 请求的阵列大小超过 BufferHolder.grow 的 VM 限制

提问于
浏览
0

我在混合 scala-python 应用程序(类似于 Zeppelin)上的 Hadoop 集群上运行的 Spark 2.1 上遇到此错误:

18/04/09 08:19:34 ERROR Utils: Uncaught exception in thread stdout writer for /x/python/miniconda/bin/python
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:214)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply6_4$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply7_16$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:232)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

BufferHolder.grow引发此类错误似乎很奇怪,因为它包含显式检查:

if (neededSize > Integer.MAX_VALUE - totalSize()) {
  throw new UnsupportedOperationException(
    "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
      "exceeds size limitation " + Integer.MAX_VALUE);
}

但是,在运行时,它将通过此断言来初始化大小大于 Integer.MAX_VALUE 的数组(第 73 行)。该错误似乎与配置调整无关(如果我错了,请更正我),因此我将跳过 application/cluster 的规范,除了-150 个执行程序(每个执行器 2 个内核)。 spark.sql.shuffle.partitions设置为 8000 是为了消除随机偏移。

PythonRDD 的父 RDD 实际上是一个 DataFrame,它是改组后的结果,具有约 30 列,其中之一是很大的 String 类型(最大 100MB,但平均 150KB)。我之所以提及这一点,是因为从堆栈跟踪中看来,该错误在随机读取和 PythonRDD 之间的某个地方引发了。另外,这总是发生在分区的最后 10%(输入数据是静态的),并且前 90%的完成没有错误。

有人遇到过这个问题吗?还是可以阐明一下?

1 回答

相关问题