所以我用火花流模块进行了一些试验,但它的工作量更少,但我相信火花批量作业更适合我的任务,所以我决定从KakfaUtils的DirectStream()切换到CreateRDD() . 本指南中记录的所有内容https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-an-rdd

我有以下错误:

:64:错误:重载方法值createRDD with alternatives:(jsc:org.apache.spark.api.java.JavaSparkContext,kafkaParams:java.util.Map [String,Object],offsetRanges:Array [org.apache.spark .streaming.kafka010.OffsetRange],locationStrategy:org.apache.spark.streaming.kafka010.LocationStrategy)org.apache.spark.api.java.JavaRDD [org.apache.kafka.clients.consumer.ConsumerRecord [String,String] ](sc:org.apache.spark.SparkContext,kafkaParams:java.util.Map [String,Object],offsetRanges:Array [org.apache.spark.streaming.kafka010.OffsetRange],locationStrategy:org.apache.spark . streaming.kafka010.LocationStrategy)org.apache.spark.rdd.RDD [org.apache.kafka.clients.consumer.ConsumerRecord [String,String]]无法应用于(org.apache.spark.SparkContext,scala.collection . immutable.Map [String,Object],Array [org.apache.spark.streaming.kafka010.OffsetRange],org.apache.spark.streaming.kafka010.LocationStrategy)val rdd = KafkaUtils.createRDD [String,String](sc, kafkaParams,offsetRanges,PreferCo nsistent)

您可以在下面看到我的代码示例 . 我正在起诉一个初始化火花壳的zeppellin笔记本(所以火花设置为解释器),我已经添加了与kafka集成的依赖项 .

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming._
//import org.apache.spark.rdd.RDD
//import org.apache.spark.internal.Logging

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "10.104.17.9:9093,10.103.46.79:9093,10.98.220.217:9093",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "prova_id",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)


val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("prova", 0, 0, 100),
  OffsetRange("prova", 1, 0, 100)
  // OffsetRange("prova", 2, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, offsetRanges, PreferConsistent)

rdd.take(100).foreach(println)

到底是怎么回事?谢谢!