我在spark中运行reduceByKey . 我的程序是spark的最简单的例子:
val counts = textFile.flatMap(line => line.split(" ")).repartition(20000).
.map(word => (word, 1))
.reduceByKey(_ + _, 10000)
counts.saveAsTextFile("hdfs://...")
但它总是耗尽内存......
我使用50台服务器,每台服务器35个 Actuator ,每台服务器140GB内存 .
文件量为:8TB文件,20亿文件,总计1000亿字 . 减少后的字数将约为1亿 .
我想知道如何设置spark的配置?
我想知道这些参数应该是什么 Value ?
1. the number of the maps ? 20000 for example?
2. the number of the reduces ? 10000 for example?
3. others parameters?
2 回答
如果您发布日志会有所帮助,但是一个选项是在读取初始文本文件时指定更多数量的分区(例如
sc.textFile(path, 200000)
),而不是在读取后重新分区 . 另一个重要的事情是确保您的输入文件是可拆分的(某些压缩选项使其不可拆分,在这种情况下,Spark可能必须在单个机器上读取它,从而导致OOM) .其他一些选项,因为你没有缓存任何数据,会减少Spark为缓存留出的内存量(用
spark.storage.memoryFraction
控制),因为你只使用我建议的字符串元组使用org.apache.spark.serializer. KryoSerializer
序列化程序 .您是否尝试使用
partionner
,它可以帮助减少每个节点的密钥数量,如果我们假设密钥字的权重平均为1ko,则意味着100Go的内存专用于每个节点的密钥 . 通过分区,您可以按节点数近似减少每个节点的密钥数量,从而相应地减少每个节点的必要的memery数量 . @Holden提到的spark.storage.memoryFraction
选项也是一个关键因素 .