首页 文章

在Spark Scala中添加具有等级的列到rdd

提问于
浏览
1

不幸的是,我们仍然需要使用spark 1.0.0并且需要使用RDD . 我有一个从CSV文件创建的RDD .

val serialRDD = sc.textFile(path)

如果我们打印RDD的每一行,我们得到这样的东西(一个id和一个字符串):

1929  abc
2384  def
8753  ghi
3893  jkl

我希望能够添加另一个列作为另一个id,这将是一个像“SERIAL-”的字符串,其中RANK将是1,2,3等自动递增1

输出应该像:

1929  abc  SERIAL-1
2384  def  SERIAL-2
8753  ghi  SERIAL-3
3893  jkl  SERIAL-4

我怎么能用RDD完成这个?

1 回答

  • 4

    您可以使用 zipWithIndexmap 来完成它:

    serialRDD.zipWithIndex.map{ case (r, i) => (r._1, r._2, s"SERIAL-${i+1}") }
    

    我使用字符串插值来获取 SERIAL-X 字符串 . 我也增加了索引,因为 zipWithIndex 从索引0开始 .

相关问题