首页 文章

在PySpark中展平嵌套词典列表[重复]

提问于
浏览
1

这个问题在这里已有答案:

我需要使用PySpark来展平包含嵌套dicts的以下RDD,示例如下:

x = [{1: {345: 2}, 2: {33: 9}}, {5: {3: 2}, 2: {45, 9}}, {2: {33:5}}]

在我的实际数据中,每个嵌套的dict可能具有不同的长度和项目数 . x 只是结构的代表 .

我需要输出是一个单独的字典,其中键和值根据需要合并:

x_out = {1: {345:2}, 2: {33: 14, 45:9}, 5: {3, 2}}

我怎样才能做到这一点?我试过扁平化RDD,但我无法弄明白 . 我知道我必须以同样的方式使用combine和reduceByKey .

这与处理元组和列表的链接问题不同 .

1 回答

  • 1
    val listOfMap = List(Map(1 -> Map(345 -> 2), 2 -> Map(33 -> 9)),
      Map(5 -> Map(3 -> 2), 2 -> Map(45 -> 9)),
      Map(2 -> Map(33 -> 5))
    )
    
    var listOfTuples = new ListBuffer[((Int, Int), Int)]()
    
    for(map <- listOfMap) {
      for(k1 <- map.keys) {
        for(k2 <- map(k1).keys) {
          listOfTuples += Tuple2((k1, k2), map(k1)(k2))
        }
      }
    }
    
    listOfTuples.toList.foreach(println(_))    
    
    // transform input to be list of tuple 
    val listOfTuples = List((1, (345, 2)), (2, (33, 9)), (5, (3, 2)), (2, (45, 9)), (2, (33, 5)))
    
    // make rdd from input
    val rdd = spark.sparkContext.makeRDD(listOfTuples)
    
    // make the key of rdd as (first level key, second level key)
    // the rdd becomes ((k1, k2), val)
    // reduce the key by summing the vals
    val rdd2 = rdd.map(pair => ((pair._1, pair._2._1), pair._2._2)).reduceByKey(_ + _)
    
    // the key is the first level key, re-construct the map entry as val
    // aggregate them by grouping key
    val rdd3 = rdd2.map(pair => (pair._1._1, Map(pair._1._1 -> Map(pair._1._2 -> pair._2)))).groupByKey()
    
    // flat map the previous rdd
    val rdd4 = rdd3.flatMap(pair => pair._2)
    
    rdd4.take(10).foreach(print(_))
    
    // result
    Map(1 -> Map(345 -> 2))Map(5 -> Map(3 -> 2))Map(2 -> Map(45 -> 9))Map(2 -> Map(33 -> 14))
    

相关问题