我正在尝试使用Spark流库从kafka读取json字符串 . 代码能够连接到kafka代理,但在解码消息时失败 . 代码的灵感源自
val kStream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kParams, kTopic).map(_._2)
println("Starting to read from kafka topic:" + topicStr)
kStream.foreachRDD { rdd =>
if (rdd.toLocalIterator.nonEmpty) {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.read.json(rdd).registerTempTable("mytable")
if (firstTime) {
sqlContext.sql("SELECT * FROM mytable").printSchema()
}
val df = sqlContext.sql(selectStr)
df.collect.foreach(println)
df.rdd.saveAsTextFile(fileName)
mergeFiles(fileName, firstTime)
firstTime = false
println(rdd.name)
}
java.lang.NoSuchMethodError:kafka.message.MessageAndMetadata . (Ljava / lang / String; ILkafka / message / Message; JLkafka / serializer / Decoder; Lkafka / serializer / Decoder;)V org.apache.spark.streaming.kafka . kafkaRDD $ KafkaRDDIterator.getNext(KafkaRDD.scala:222)at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:327) at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)at scala.collection.generic.Growable $ class . $ plus $ plus $ eq( Growable.scala:48)scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:103)at scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)at scala.collection.AbstractIterator.to(Iterator.scala:1157)at scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)
1 回答
问题在于使用的Kafka jar 的版本,使用0.9.0.0修复了问题 . 类kafka.message.MessageAndMetadata是在0.8.2.0中引入的 .