首页 文章

Spark flatten Seq通过反转groupby,(即重复每个序列的 Headers )

提问于
浏览
2

我们有一个RDD,其形式如下:

org.apache.spark.rdd.RDD[((BigInt, String), Seq[(BigInt, Int)])]

我们想要做的是将其展平为一个制表符分隔字符串列表,以便与saveAsText文件一起保存 . 并且通过展平,我的意思是为其Seq中的每个项重复groupby元组(BigInt,String) .

所以数据看起来像..

((x1,x2), ((y1.1,y1.2), (y2.1, y2.2) .... ))

......会看起来像

x1   x2   y1.1  y1.2
x1   x2   y2.1  y2.2

到目前为止,我尝试的代码大部分都将它全部展平为一行,“x1,x2,y1.1,y1.2,y2.1,y2.2 ...”等...

任何帮助将不胜感激,提前谢谢!

2 回答

  • 4

    如果要展平groupByKey()操作的结果,以便将键和值列展平为一个元组,我建议使用flatMap:

    val grouped = sc.parallelize(Seq(((1,"two"), List((3,4), (5,6)))))
    val flattened: RDD[(Int, String, Int, Int)] = grouped.flatMap { case (key, groupValues) =>
       groupValues.map { value => (key._1, key._2, value._1, value._2) }
    }
    // flattened.collect() is Array((1,two,3,4), (1,two,5,6))
    

    从这里,您可以使用其他转换和操作将组合元组转换为制表符分隔的字符串并保存输出 .

    如果你不关心包含 Tuples 的扁平RDD,那么你可以编写更通用的

    val flattened: RDD[Array[Any]] = grouped.flatMap { case (key, groupValues) =>
       groupValues.map(value => (key.productIterator ++ value.productIterator).toArray)
     }
     // flattened.collect() is Array(Array(1, two, 3, 4), Array(1, two, 5, 6))
    

    另外,请查看 flatMapValues 转换;如果你有 RDD[(K, Seq[V]])] 并想要 RDD[(K, V)] ,那么你可以做 flatMapValues(identity) .

  • 0

    它有点蒸汽驱动,但是:

    val l :((BigInt, String), Seq[(BigInt, BigInt)]) = ((1,"two"), List((3,4), (5,6)))
            //> l  : ((BigInt, String), Seq[(BigInt, BigInt)]) = ((1,two),List((3,4), (5,6))
                                                      //| )
    
    for (t <-l._2) yield s"${l._1._1}\t${l._1._2}\t${t._1}\t${t._2}"
            //> res0: Seq[String] = List(1  two 3   4, 1    two 5   6)
    

    似乎工作 . 不过,我不知道Spark位是否会干扰它 .

相关问题