首页 文章

在spark中的大数据上运行reduceByKey

提问于
浏览
7

我在spark中运行reduceByKey . 我的程序是spark的最简单的例子:

val counts = textFile.flatMap(line => line.split(" ")).repartition(20000).
                 .map(word => (word, 1))
                 .reduceByKey(_ + _, 10000)
counts.saveAsTextFile("hdfs://...")

但它总是耗尽内存......

我使用50台服务器,每台服务器35个 Actuator ,每台服务器140GB内存 .

文件量为:8TB文件,20亿文件,总计1000亿字 . 减少后的字数将约为1亿 .

我想知道如何设置spark的配置?

我想知道这些参数应该是什么 Value ?

1. the number of the maps ? 20000 for example?
2. the number of the reduces ? 10000 for example?
3. others parameters?

2 回答

  • 0

    如果您发布日志会有所帮助,但是一个选项是在读取初始文本文件时指定更多数量的分区(例如 sc.textFile(path, 200000) ),而不是在读取后重新分区 . 另一个重要的事情是确保您的输入文件是可拆分的(某些压缩选项使其不可拆分,在这种情况下,Spark可能必须在单个机器上读取它,从而导致OOM) .

    其他一些选项,因为你没有缓存任何数据,会减少Spark为缓存留出的内存量(用 spark.storage.memoryFraction 控制),因为你只使用我建议的字符串元组使用 org.apache.spark.serializer. KryoSerializer 序列化程序 .

  • 5

    您是否尝试使用 partionner ,它可以帮助减少每个节点的密钥数量,如果我们假设密钥字的权重平均为1ko,则意味着100Go的内存专用于每个节点的密钥 . 通过分区,您可以按节点数近似减少每个节点的密钥数量,从而相应地减少每个节点的必要的memery数量 . @Holden提到的 spark.storage.memoryFraction 选项也是一个关键因素 .

相关问题