我是kafka和spark的新手,我正在努力做一些事,但不是成功的!问题的细节如下 . 谢谢!
代码如下:
JavaPairDStream<String,Integer> counts = wordCounts.reduceByKeyAndWindow(new AddIntegers(), new SubtractIntegers(), Durations.seconds(8000), Durations.seconds(4000));
如下例外:
线程“Thread-3”org.apache.spark.SparkException中的异常:org.apache.spark.util.ClosureCleaner中的org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:166)中的任务不可序列化$ .clean(ClosureCleaner.scala:158)org.apache.spark.SparkContext.clean(SparkContext.scala:1623)org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala:333)at org .apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala:299)at org.apache.spark.streaming.api.java.JavaPairDStream.reduceByKeyAndWindow(JavaPairDStream.scala:352)at KafkaAndDstreamWithIncrement.KDDConsumer.run(KDDConsumer) .java:110)引起:java.io.NotSerializableException:KafkaAndDstreamWithIncrement.KDDConsumer
1 回答
代码如下(定义静态):