我有以下序列的Spark Java API调用(Spark 2.0.2):

  • 通过map函数处理的Kafka流,它返回来自tuple2._2()的字符串值,用于JavaDStream,如

return tuple2._2();

  • 返回的JavaDStream然后由foreachPartition处理,foreachPartition由foreachRDD包装 .

  • foreachPartition的调用函数执行Iterator inputParams.next();

当接收到数据时,执行步骤1,这是正确的 . 但是,步骤3中的inputParams.next()在步骤1中对map函数进行了重复调用 . 因此,每个消息都会调用两次map函数:第一次从Kafka流接收消息时,第二次当从foreachPartition的调用函数调用Iterator inputParams.next()时 .

我也尝试过转换数据

公共TestTransformedClass调用(Tuple2 tuple2)为步骤1

第3步的public void call(Iterator inputParams)

并出现同样的问题 . 因此,无论此Spark API调用序列是否涉及数据转换,都会出现此问题 .

问题:

  • 由于消息已在步骤1中处理,为什么步骤3中的inputParams.next()在步骤1中对map函数进行了重复调用?

  • 如何修复它以避免每条消息的重复调用?

谢谢 .