我使用Spark 1.0.1处理大量数据 . 每行包含一个ID号,一些具有重复的ID . 我想在同一位置保存具有相同ID号的所有行,但我无法有效地执行此操作 . 我创建了(ID号,数据行)对的RDD [(String,String)]:
val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)}
一种有效但不具备性能的方法是收集ID号,过滤每个ID的RDD,并使用与文本文件相同的ID保存值的RDD .
val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
val dataRows = mapRdd.filter(_._1 == id).values
dataRows.saveAsTextFile(id)
})
我还尝试了groupByKey或reduceByKey,以便RDD中的每个元组包含一个唯一的ID号作为键,以及由该ID号的新行分隔的一组组合数据行 . 我想只使用foreach迭代RDD一次以保存数据,但它不能将值作为RDD给出
groupedRdd.foreach({ tup =>
val data = sc.parallelize(List(tup._2)) //nested RDD does not work
data.saveAsTextFile(tup._1)
})
基本上,我想通过ID号将RDD拆分为多个RDD,并将该ID号的值保存到它们自己的位置 .
3 回答
我认为这个问题类似于Write to multiple outputs by key Spark - one Spark job
请在那里参考答案 .
刚看到上面的类似答案,但实际上我们不需要自定义分区 . MultipleTextOutputFormat将为每个键创建文件 . 可以将具有相同键的多个记录放入同一分区中 .
new HashPartitioner(num),其中num是您想要的分区号 . 如果您有大量不同的密钥,您可以将数字设置为大 . 在这种情况下,每个分区都不会打开太多的hdfs文件处理程序 .
您可以直接在分组RDD上调用saveAsTextFile,这里它将基于分区保存数据,我的意思是,如果您有4个distinctID,并且您将groupsRDD的分区数指定为4,则spark将每个分区数据存储到一个文件中(所以通过它你只能有一个文件管理器ID)你甚至可以将数据看作文件系统中eachId的可迭代 .
这将保存每个用户ID的数据
如果您需要根据用户ID再次检索数据,您可以执行类似的操作
请注意,在这种情况下我没有特别的理由保存到文件,因为OP要求它,所以说保存到文件确实允许您在初始分组完成后随时加载RDD .
最后一件事,
lookup
比访问id的过滤方法快,但如果你愿意从spark中取消一个pull请求,你可以查看this answer以获得更快的方法