首页 文章

如何在RDD映射操作中更新全局变量

提问于
浏览
1

我有RDD [(Int,Array [Double])]之后,我调用了一个classFunction

val rdd = spark.sparkContext.parallelize(Seq(
        (1, Array(2.0,5.0,6.3)),
        (5, Array(1.0,3.3,9.5)),
        (1, Array(5.0,4.2,3.1)),
        (2, Array(9.6,6.3,2.3)),
        (1, Array(8.5,2.5,1.2)),
        (5, Array(6.0,2.4,7.8)),
        (2, Array(7.8,9.1,4.2))
      )
    )
 val new_class = new ABC
 new_class.demo(data)

在类内部,声明了一个全局变量值= 0 . 在demo()中,声明了新变量new_value = 0 . 在map操作之后,new_value会更新并在 Map 中打印更新后的值 .

class ABC extends Serializable {
        var value  = 0
        def demo(data_new : RDD[(Int ,Array[Double])]): Unit ={
            var new_value = 0
            data_new.coalesce(1).map(x => {
                if(x._1 == 1)
                    new_value = new_value + 1
                println(new_value)
                value = new_value
            }).count()
            println("Outside-->" +value)
        }
    }

输出: -

1
1
2
2
3
3
3
Outside-->0

如何在映射操作后更新全局变量值?

3 回答

  • 2

    不,你无法从 Map 内部更改全局变量 .

    如果您尝试计算函数中的一个数,则可以使用过滤器

    val value = data_new.filter(x => (x._1 == 1)).count 
    println("Outside-->" +value)
    

    输出:

    Outside-->3
    

    此外,不建议使用可变变量 var . 你应该总是尝试使用不可变的 val

    我希望这有帮助!

  • 0

    我不确定你在做什么,但你需要使用Accumulators来执行你需要添加这样的值的操作类型 .

    这是一个例子:

    scala> val rdd = spark.sparkContext.parallelize(Seq(
         |         (1, Array(2.0,5.0,6.3)),
         |         (5, Array(1.0,3.3,9.5)),
         |         (1, Array(5.0,4.2,3.1)),
         |         (2, Array(9.6,6.3,2.3)),
         |         (1, Array(8.5,2.5,1.2)),
         |         (5, Array(6.0,2.4,7.8)),
         |         (2, Array(7.8,9.1,4.2))
         |       )
         | )
    rdd: org.apache.spark.rdd.RDD[(Int, Array[Double])] = ParallelCollectionRDD[83] at parallelize at <console>:24
    
    scala> val accum = sc.longAccumulator("My Accumulator")
    accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 46181, name: Some(My Accumulator), value: 0)
    
    scala> rdd.foreach { x => if(x._1 == 1) accum.add(1) }
    
    scala> accum.value
    res38: Long = 3
    

    正如@philantrovert所提到的,如果您希望计算每个密钥的出现次数,您可以执行以下操作:

    scala> rdd.mapValues(_ => 1L).reduceByKey(_ + _).take(3)
    res41: Array[(Int, Long)] = Array((1,3), (2,2), (5,2))
    

    您也可以使用 countByKey ,但要避免使用大数据集 .

  • 3
    OR You can do achieve your problem in this way also:
    class ABC extends Serializable {
            def demo(data_new : RDD[(Int ,Array[Double])]): Unit ={
                var new_value = 0
                data_new.coalesce(1).map(x => {
                    if(x._1 == 1)
                      var key = x._1
                 (key, 1)
                }).reduceByKey(_ + _)
    
            }
         println("Outside-->" +demo(data_new))
        }
    

相关问题