首页 文章

ReduceByKey函数在Spark中

提问于
浏览
1

我已经读过某个地方,对于作用于单个RDD的操作,例如 reduceByKey() ,在预分区的RDD上运行将导致每个键的所有值在本地计算在一台机器上,只需要最终的,本地减少要从每个工作节点发送回主节点的值 . 这意味着我必须声明一个分区器,如:

val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
             .partitionBy(new HashPartitioner(100))   // Create 100 partitions
             .persist()

为了让 reduceByKey 像我之前解释的那样工作 .

我的问题是,如果我想使用reduceByKey(最佳),我是否需要每次分区时声明或者不需要 .

3 回答

  • 2

    执行 reduceByKey 时,对RDD进行分区以避免网络流量几乎不是最佳解决方案 . 即使它不需要改组 reduceByKey ,它也必须对一个完整的数据集进行混洗以执行分区 .

    由于这通常要贵得多,因此预分区是没有意义的,除非您的目标是以增加总体延迟为代价来减少延迟,或者您可以将此分区用于其他任务 .

  • 3

    并不是的 . reduceByKey正在使用数据位置 . 来自RDD api:

    / ** 使用关联reduce函数合并每个键的值 . 这也将在将结果发送到reducer之前在每个mapper上执行本地合并,类似于MapReduce中的*“combiner” . * /

    这意味着当你有一个键值RDD时,在第一阶段,使用提供的函数减少每个分区级别的相同键,然后使用相同的函数对所有已聚合的值进行shuffle和global reduce . 无需提供分区程序 . 它只是有效 .

  • 2

    实际上,你所谈论的两个品质有点无关 .

    对于 reduceByKey() ,第一个质量聚合相同键的元素,并在每个 Actuator 上首先在本地首先提供关联reduce函数,然后最终在 Actuator 之间进行聚合 . 它封装在一个名为 mapSideCombine 的布尔参数中,如果设置为true则执行上述操作 . 如果设置为false,与 groupByKey() 一样,则每个记录将被洗牌并发送给正确的执行程序 .

    第二个质量涉及分区及其使用方式 . 根据其定义,每个RDD包含分裂列表和(可选地)分区器 . 方法 reduceByKey() 已重载,实际上有一些定义 . 例如:

    • def reduceByKey(func: (V, V) => V): RDD[(K, V)]

    该方法的这个定义实际上使用来自父RDD的默认现有分区器,并减少到设置为默认并行度级别的分区数 .

    • def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

    该方法的定义将使用 HashPartitioner 将适当的数据发送到其相应的执行程序,并且分区的数量将为 numPartitions .

    • def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

    最后,该方法的这个定义取代了另外两个,并且接受了一个通用(可能是自定义)分区器,它将产生由分区器如何分区键确定的分区数 .

    关键是你可以在 reduceByKey() 本身内实际编码所需的分区逻辑 . 如果你的目的是避免通过预分区来改变开销,那么它也没有任何意义,因为你仍然会在预分区上进行洗牌 .

相关问题