我写过从 Kafka 和手动提交偏移量读取的 Spark 作业。它工作正常但是因为我引入了广播变量,我得到可序列化的异常,因为它试图序列化 KafkaInputDStream。这是一个显示问题的最小代码(代码是用 Kotlin 编写的,但我相信它也会在 Java 中发生):

fun testBroadCast(jsc: JavaStreamingContext, kafkaStream: JavaInputDStream<ConsumerRecord<String, SomeSerializableEvent>>) {
    val keyPrefix = jsc.sparkContext().broadcast("EVENT:")
    kafkaStream.foreachRDD { rdd ->
        val offsetRanges = (rdd.rdd() as HasOffsetRanges).offsetRanges()
        val prefixedIds = rdd.map { "${keyPrefix.value}:$it" }.collect()
        (kafkaStream.dstream() as CanCommitOffsets).commitAsync(offsetRanges)
    }
}

fun main(args: Array<String>) {
    val jsc = JavaStreamingContext(SparkConf().setAppName("test simple prefixer").setMaster("local[*]"), Duration(5000))
    val stream = makeStreamFromSerializableEventTopic(jsc)
    testBroadCast(jsc, stream)
    jsc.start()
    jsc.awaitTermination()
}

如果我删除keyPreix并在地图功能中放置“EVENT:”,它按预期工作。否则我得到:

java.io.NotSerializableException:org.apache.spark.streaming.kafka010.DirectKafkaInputDStream 的对象可能被序列化,可能是 RDD 操作关闭的一部分。这是因为正在从闭包内引用 DStream 对象。请在此 DStream 中重写 RDD 操作以避免这种情况。这已被强制执行以避免使用不必要的对象使 Spark 任务膨胀。 at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:525)at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512)atorg.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512)at )at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:512)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDD.map(RDD.scala:369)at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)at ir.pegahtech.tapsell.brain.engine.jobs.Test$testBroadCast$1.call(Test.kt:226)at ir.pegahtech.tapsell.brain.engine.jobs.Test$testBroadCast$1.call(Test.kt)at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628 )java.lang.Thread.run(Thread.java:745)

如何使用或不使用广播变量与序列化 KafkaInputDStream 有关? Spark 版本是 2.2.0.