我正在使用kafka主题消息,其中密钥和值是avro编码的,试图通过使用KafkaAvroDecoder将消息的值部分转换为java [],完整代码如下(如果有一个更好的工作方式,请让我知道使用java api从kafka消费avro消息,但在map方法中,我得到stackover流异常,并且异常被粘贴在代码下方,需要帮助才能继续 .

属性decoderProps = new Properties(); decoderProps.put(“schema.registry.url”,SCHEMA_REG_URL); decoderProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,“true”);

KafkaAvroDecoder decoder = new KafkaAvroDecoder(new VerifiableProperties(decoderProps));


SparkSession spark = SparkSession
    .builder()
    .appName("JavaCount1").master("local[2]")
    .config("spark.driver.extraJavaOptions", "-Xss4M")
    .getOrCreate();

Dataset<Message> ds1 = spark
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "ip address")
    .option("subscribe", "topicName")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1)
    .load().as(Encoders.bean(Message.class));

Dataset<UserEvent> ds2 = ds1.map(m-> {
    UserEvent data = (UserEvent)decoder.fromBytes(m.getValue());
     return data;

},Encoders.bean(UserEvent.class));

ds2.printSchema();

StreamingQuery query = ds2.writeStream()
    .outputMode("append")
    .format("console")
    .trigger(ProcessingTime.apply(15))
    .start();

query.awaitTermination();

. 位于org.spark_project.guava.base.Preconditions.checkPositionIndex(Preconditions.java:334)的org.spark_project.guava.base.Preconditions.checkPositionIndex(Preconditions.java:354)中的线程“main”java.lang.StackOverflowError中的异常org.spark_project.guava.collect.AbstractIndexedListIterator . (AbstractIndexedListIterator.java:69)org.spark_project.guava.collect.Iterators $ 12 . (Iterators.java:1125)at org.spark_project.guava.collect.Iterators.forArray(Iterators) .java:1125)org.spark_project.guava.collect.RegularImmutableList.listIterator(RegularImmutableList.java:96)位于org.spark_project.guava的org.spark_project.guava.collect.RegularImmutableAsList.listIterator(RegularImmutableAsList.java:54) . org.spark_project.guava.collect上的org.spark_project.guava.collect.ImmutableList.iterator(ImmutableList.java:330)中的collect.ImmutableList.listIterator(ImmutableList.java:334).RegularImmutableMap $ EntrySet.iterator(RegularImmutableMap.java: 181)在org.spark_project.guava.collect.RegularImmut org.spark_project.guava.refava.Ra返回org.spark_project.guava.refava.Rovolver.accordingTo(TypeResolver.java:65)的org.spark_project.guava.reflect.TypeResolver.where(TypeResolver.java:97)中的ableMap $ EntrySet.iterator(RegularImmutableMap.java:173) org.spark_project.guava.reflect.TypeToken.resolveType(TypeToken.java:266)位于org.spark_project.guava.refava.refava.refava.refava.refava.refava.Rot . 的org.spark_project.guava.reflect.TypeToken.resolveSupertype(TypeToken.java:279) (TypeToken.java:401)org.apache.spark.sql.catalyst.JavaTypeInference $ .elementType(JavaTypeInference.scala:142)at org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql @ catalyst.JavaTypeInference $ inferDataType(JavaTypeInference.scala:111)atg.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ an onfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at at scala.collection.TraversableLike $ class.map(TraversableLike.scala:245)位于org.apache.spark.sql.catalyst.JavaTypeInference $ .org的scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)位于org.apache.spark的org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)中的$ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125) . sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply (TraversableLike.scala:245)scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at s cala.collection.TraversableLike $ class.map(TraversableLike.scala:245)位于org.apache.spark.sql.catalyst.JavaTypeInference $ .org的scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)位于org.apache.spark的org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)中的$ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125) . sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply (TraversableLike.scala:245)atscala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(TraversableLike.scala: 245)在org.apache.spark.sql.catalyst.JavaTypeInference的scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)$ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference) .scala:125)org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala :125)scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(Traver sableLike.scala:245)在org.apache.spark.sql.catalyst.JavaTypeInference的scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)$ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125)atg.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply (JavaTypeInference.scala:125)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala . collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:245) at scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)at org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(Ja vaTypeInference.scala:125)org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference . scala:125)scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:245)at scala . org.apache.spark.sql.catalyst.JavaTypeInference中的collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)$ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125) at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInfere nce.scala:125)在scala.collection上的scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245) .IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:245)at at scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)at org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala: 125)org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125) at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.Ind exedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:245)at scala .collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)at org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125 ) 在org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)位于scala的org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125) . collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized . scala:33)scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:245)at scala.collection.mutable.ArrayOps $ ofRef .map(ArrayOps.scala:186)org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125)at org.apache.spark . scala.collec上的org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125)中的sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127) .TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized . scala:33)scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:245)at scala.collection.mutable.ArrayOps $ ofRef .map(ArrayOps.scala:186)org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125)at org.apache.spark . sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125)at scala.collection.TraversableLike $ anonfun $在scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)在scala.collection.IndexedSeqOptimized $ class.foreach上映射$ 1.apply(TraversableLike.scala:245) (indexedSeqOptimized.scala:33)scala.collection.mutable.mutable上的scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)scala.collection.TraversableLike $ class.map(TraversableLike.scala:245) . org.apache上的org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ Spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125)org.apache中的ArrayOps $ ofRef.map(ArrayOps.scala:186) .spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33在scala.collection.mutable.mutable.A的scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)scala.collection.TraversableLike $ class.map(TraversableLike.scala:245)at scala.collection.mutable.A rrayOps $ ofRef.map(ArrayOps.scala:186)atg.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125)at org.apache .spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33 at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:245)at scala.collection.mutable.ArrayOps $ ofRef.map( ArrayOps.scala:186)org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125)at org.apache.spark.sql .catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)at org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala . collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)atscala.collection.TraversableLike $ class.map(TraversableLike.scala:245)位于org.apache.spark.sql.catalyst.JavaTypeInference $ .org的scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)位于org.apache.spark的org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)中的$ apache $ spark $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125) . sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:125)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)at scala.collection.TraversableLike $ anonfun $ map $ 1.apply (TraversableLike.scala:245)scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:245)位于org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spar的scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)位于org.apache.spark.sql.catalyst的org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)中的k $ sql $ catalyst $ JavaTypeInference $ inferDataType(JavaTypeInference.scala:125)在scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike . scala:245)scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map (TraversableLike.scala:245)at sca.aplection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)at org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ catalyst $位于org.apache.spark.sql.catalyst的org.apache.spark.sql.catalyst.JavaTypeInference $ anonfun $ 2.apply(JavaTypeInference.scala:127)中的JavaTypeInference $ inferDataType(JavaTypeInference.scala:125)在scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike.scala:245)scala.collection.TraversableLike $ anonfun $ map $ 1.apply(TraversableLike . scala:245)scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186)at scala.collection.TraversableLike $ class.map (TraversableLike.scala:245)at sca.aplection.mutable.ArrayOps $ ofRef.map(ArrayOps.scala:186)at org.apache.spark.sql.catalyst.JavaTypeInference $ .org $ apache $ spark $ sql $ catalyst $ JavaTypeInference $