首页 文章

无法通过火花流消耗kafka消息

提问于
浏览
1

我试图通过火花流程序消费来自kafka制作人的消息 .

这是我的计划

val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local")
      val ssc = new StreamingContext(sparkConf, Seconds(5))

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
     // lines.print()
lines.foreachRDD(rdd=>{
            rdd.foreach(message=>
      println(message))
    })

上述程序运行成功 . 但我看不到任何消息被打印出来 .

1 回答

  • 1

    使用 "local[*]" 设置主网址

    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
    

    您也可以尝试调用collect()并查看是否收到消息 .

    lines.foreachRDD { rdd =>
          rdd.collect().foreach(println)
    }
    

相关问题