我是kafka和spark的新手 . 我有一个用例,需要从多个火花流窗口中消耗kafka主题 .

Topic ...

kafka-topics.sh --create --topic feed --partitions 10 --zookeeper xxx.xxx.xxx.xxx:xxxx --replication-factor 2

Code ...

package tech.webstar.speed

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}


/**
  * @author sasajib
  */
class FeedStream2(ssc: StreamingContext, group: String = "default") {

    def start(): Unit = {

        val kafkaStream: ReceiverInputDStream[(String, String)] = {
            KafkaUtils.createStream(ssc, "xxx.xxx.xxx.xxx:xxxx", group, Map("feed" -> 10))
        }

        val window: DStream[(String, String)] = kafkaStream.window(Duration(5000))
        window.foreachRDD(_.foreach(result => {
            println("<===============Window===============================>")
            println(result)
            println(">===============Window===============================<")
        }))
    }
}


object FeedStream2 extends App {
    val sparkConf: SparkConf = {
        new SparkConf()
                .setAppName("speed-layer")
                .setMaster("local[*]")
    }

    val context: SparkContext = new SparkContext(sparkConf)
    val ssc: StreamingContext = new StreamingContext(context, Duration(1000))
    ssc.checkpoint("checkpoint")
    //    context.setLogLevel("ERROR")

    //if I comment out one of these, the code works
    new FeedStream2(ssc, "group1").start()
    new FeedStream2(ssc, "group2").start()   

    sys.addShutdownHook(() => {
        ssc.stop()
    })

    ssc.start()
    ssc.awaitTermination()
}

据我所知,给定2个窗口应该工作并消耗相同的消息,导致kafka组ID不同 . 但窗户都没有工作 . 如果我注释掉其中一个窗口代码的工作原理并打印主题消息 .

如何使用不同的火花流窗口消耗相同的主题消息?

谢谢 .

spark-streaming-kafka vesion:1.6.2