首页 文章

如何在Spark中展平元组?

提问于
浏览
3

我想要扁平化一个元组的RDD(使用无操作映射),但我得到一个类型错误:

val fromTuples = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
val flattened = fromTuples.flatMap(x => x)
println(flattened.collect().toNiceString)

错误:类型不匹配; found:(Int,String)required:TraversableOnce [?] val flattened = fromMap.flatMap(x => x)

List s或 Array 的等效列表可以正常工作,例如:

val fromList = sc.parallelize(List(List(1, 2), List(3, 4)))
val flattened = fromList.flatMap(x => x)
println(flattened.collect().toNiceString)

Scala可以处理这个吗?如果没有,为什么不呢?

5 回答

  • 0

    元组不是集合 . 与Python不同,其中元组本质上只是一个不可变列表,Scala中的元组更像是一个类(或者更像是Python namedtuple ) . 你可以't 1697055 a tuple, because it'是一组异类字段 .

    您可以通过调用 .productIterator 将元组转换为可迭代的元素,但是您得到的是 Iterable[Any] . 你当然可以压扁这样的东西,但你已经失去了所有编译时类型的保护 . (大多数Scala程序员都想到了 Any 类型的集合 . )

  • 0

    没有一个好方法,但你可以用这种方法坚持一点类型的安全:

    val fromTuples = session.sparkContext.parallelize(List((1, "a"), (2, "b"), (3, "c")))
    val flattened = fromTuples.flatMap(t => Seq(t._1, t._2))
    println(flattened.collect().mkString)
    

    flatten的类型将是元组中所有类型的父类的 RDD . 哪个,是的,在这种情况下是 Any 但是如果列表是: List(("1", "a"), ("2", "b")) 它将保留 String 类型 .

  • 0
    val fromTuples = sc.parallelize(List((1, "a"), (2, "b"), (3, "c")))
      val flattened = fromTuples.flatMap(x => Array(x))
      flattened.collect()
    

    你的错误的原因是

    flatMap(func)与map类似,但每个输入项可以映射到0个或更多输出项(因此func应返回Seq而不是单个项) .

  • 2

    来自Lyuben的comment,这实际上可以偷偷摸摸地完成:

    sc.parallelize(List(("a", 1), ("c", 2), ("e", 4))).flatMap(_.productIterator).collect()
    

    所有人都荣幸 . (虽然Brian notes,这将放弃类型安全 . )

  • 7

    正如其他人所说,没有很好的方法可以做到这一点,特别是在类型安全方面 .

    但是,如果您只想以漂亮的平面格式打印 RDD ,您只需映射 RDD 并使用 mkString

    scala> val myRDD = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
    myRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> myRDD.map{case (a,b) => s"$a,$b"}.collect.mkString(",")
    res0: String = 1,a,2,b,3,c
    

相关问题