首页 文章

从Kafka Stream解析JSON RDD时的Java Execption

提问于
浏览
1

我正在尝试使用Spark流库从kafka读取json字符串 . 代码能够连接到kafka代理,但在解码消息时失败 . 代码的灵感源自

https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/KafkaStreamingJson.scala

val kStream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
         StringDecoder](ssc, kParams, kTopic).map(_._2)
  println("Starting to read from kafka topic:" + topicStr)
kStream.foreachRDD { rdd =>

   if (rdd.toLocalIterator.nonEmpty) {

          val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            sqlContext.read.json(rdd).registerTempTable("mytable")
            if (firstTime) {
                sqlContext.sql("SELECT * FROM mytable").printSchema()
            }
            val df = sqlContext.sql(selectStr)
            df.collect.foreach(println)
            df.rdd.saveAsTextFile(fileName)
            mergeFiles(fileName, firstTime)
            firstTime = false
           println(rdd.name)
        }

java.lang.NoSuchMethodError:kafka.message.MessageAndMetadata . (Ljava / lang / String; ILkafka / message / Message; JLkafka / serializer / Decoder; Lkafka / serializer / Decoder;)V org.apache.spark.streaming.kafka . kafkaRDD $ KafkaRDDIterator.getNext(KafkaRDD.scala:222)at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:327) at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)at scala.collection.generic.Growable $ class . $ plus $ plus $ eq( Growable.scala:48)scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:103)at scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)at scala.collection.AbstractIterator.to(Iterator.scala:1157)at scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)

1 回答

  • 0

    问题在于使用的Kafka jar 的版本,使用0.9.0.0修复了问题 . 类kafka.message.MessageAndMetadata是在0.8.2.0中引入的 .

相关问题