我搜索了很长时间的解决方案,但没有找到正确的算法。
在 Scala 中使用 Spark RDD,知道如何不能使用 collect 或其他可能将数据加载到内存的方法,如何将RDD[(Key, Value)]
转换为Map[key, RDD[Value]]
?
实际上,我的最终目标是通过键在Map[Key, RDD[Value]]
上循环并为每个RDD[Value]
调用saveAsNewAPIHadoopFile
例如,如果我得到:
RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]
我想要 :
Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])]
我想知道在RDD[(Key, Value)]
的每个键 A,B,C 上使用filter
是否花费太多,但是我不知道是否多次调用 filter 来获得不同的键会有效吗? (当然不是,但是可能使用cache
吗?)
谢谢
3 回答
您应该使用以下代码(Python):
一个 RDD 不能成为另一个 RDD 的一部分,并且您没有选择仅收集键并将其相关值转换为单独的 RDD 的选项。在我的示例中,您将遍历缓存的 RDD,这是可以的,并且可以快速运行
听起来您真正想要的是将每个键的 KV RDD 保存到单独的文件中。与其创建
Map[Key, RDD[Value]]
,不如考虑使用MultipleTextOutputFormat
类似于此处的示例。。示例中的代码几乎全部存在。这种方法的好处是,在洗牌之后,您可以确保只对 RDD 进行一次传递,并获得与所需相同的结果。如果您按照另一个答案中的建议通过过滤并创建多个 ID(除非您的源支持下推式过滤器)来执行此操作,则最终将对每个单独的键进行一次数据集传递,这将变得更慢。
这是我的简单测试代码。
结果如下
或者你可以这样做
结果是这样