首页 文章

如何将 RDD [1]转换为 Map [Key,RDD [2]]

提问于
浏览
6

我搜索了很长时间的解决方案,但没有找到正确的算法。

在 Scala 中使用 Spark RDD,知道如何不能使用 collect 或其他可能将数据加载到内存的方法,如何将RDD[(Key, Value)]转换为Map[key, RDD[Value]]

实际上,我的最终目标是通过键在Map[Key, RDD[Value]]上循环并为每个RDD[Value]调用saveAsNewAPIHadoopFile

例如,如果我得到:

RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]

我想要 :

Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])]

我想知道在RDD[(Key, Value)]的每个键 A,B,C 上使用filter是否花费太多,但是我不知道是否多次调用 filter 来获得不同的键会有效吗? (当然不是,但是可能使用cache吗?)

谢谢

3 回答

  • 2

    您应该使用以下代码(Python):

    rdd = sc.parallelize( [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] ).cache()
    keys = rdd.keys().distinct().collect()
    for key in keys:
        out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y)
        out.saveAsNewAPIHadoopFile (...)
    

    一个 RDD 不能成为另一个 RDD 的一部分,并且您没有选择仅收集键并将其相关值转换为单独的 RDD 的选项。在我的示例中,您将遍历缓存的 RDD,这是可以的,并且可以快速运行

  • 0

    听起来您真正想要的是将每个键的 KV RDD 保存到单独的文件中。与其创建Map[Key, RDD[Value]],不如考虑使用MultipleTextOutputFormat 类似于此处的示例。。示例中的代码几乎全部存在。

    这种方法的好处是,在洗牌之后,您可以确保只对 RDD 进行一次传递,并获得与所需相同的结果。如果您按照另一个答案中的建议通过过滤并创建多个 ID(除非您的源支持下推式过滤器)来执行此操作,则最终将对每个单独的键进行一次数据集传递,这将变得更慢。

  • -1

    这是我的简单测试代码。

    val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6)))
    val groupby_RDD = test_RDD.groupByKey()
    val result_RDD = groupby_RDD.map{v => 
        var result_list:List[Int] = Nil
        for (i <- v._2) {
            result_list ::= i
        }
        (v._1, result_list)
    }
    

    结果如下

    result_RDD.take(3)
    > > res86: Array[(String, List[Int])] = Array((A,List(1, 3, 2)), (B,List(5, 4)), (C,List(6)))
    

    或者你可以这样做

    val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6)))
    val nil_list:List[Int] = Nil
    val result2 = test_RDD.aggregateByKey(nil_list)(
        (acc, value) => value :: acc,
        (acc1, acc2) => acc1 ::: acc2 )
    

    结果是这样

    result2.take(3)
    > > res209: Array[(String, List[Int])] = Array((A,List(3, 2, 1)), (B,List(5, 4)), (C,List(6)))
    

相关问题