我有以下序列的Spark Java API调用(Spark 2.0.2):
- 通过map函数处理的Kafka流,它返回来自tuple2._2()的字符串值,用于JavaDStream,如
return tuple2._2();
-
返回的JavaDStream然后由foreachPartition处理,foreachPartition由foreachRDD包装 .
-
foreachPartition的调用函数执行Iterator inputParams.next();
当接收到数据时,执行步骤1,这是正确的 . 但是,步骤3中的inputParams.next()在步骤1中对map函数进行了重复调用 . 因此,每个消息都会调用两次map函数:第一次从Kafka流接收消息时,第二次当从foreachPartition的调用函数调用Iterator inputParams.next()时 .
我也尝试过转换数据
公共TestTransformedClass调用(Tuple2 tuple2)为步骤1
第3步的public void call(Iterator inputParams)
并出现同样的问题 . 因此,无论此Spark API调用序列是否涉及数据转换,都会出现此问题 .
问题:
-
由于消息已在步骤1中处理,为什么步骤3中的inputParams.next()在步骤1中对map函数进行了重复调用?
-
如何修复它以避免每条消息的重复调用?
谢谢 .