首页 文章

Spark Streaming中的Kafka消费者

提问于
浏览
4

尝试编写消耗来自Kafka的消息的Spark Streaming作业 . 这是我到目前为止的情况:

1)启动Zookeeper .
2)启动Kafka Server .
3)向服务器发送了一些消息 . 当我运行以下内容时,我可以看到它们:

bin / kafka-console-consumer.sh --zookeeper localhost:2181 - topic mytopic --from-beginning

4)现在尝试编写一个程序来计算在5分钟内进入的消息数量 .

代码看起来像这样:

Map<String, Integer> map = new HashMap<String, Integer>();
    map.put("mytopic", new Integer(1));

    JavaStreamingContext ssc = new JavaStreamingContext(
            sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});


    JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);

不确定第3个参数(使用者组)使用什么值 . 当我运行它时,我得到“无法连接到zookeeper服务器” . 但Zookeeper正在2181端口上运行;否则步骤#3就行不通了 .

好像我没有正确使用KafkaUtils.createStream . 有任何想法吗?

5 回答

  • 0

    我认为你应该为zookeeper而不是localhost指定ip . 此外,第三个参数是消费者组名称 . 它可以是你喜欢的任何名字 . 当您有多个消费者绑定到同一个组时,主题分区会相应地分发 . 您的推文应该是:

    JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "x.x.x.x", "dummy-group", map);
    
  • 2

    没有默认的消费者群体这样的东西 . 您可以在那里使用任意非空字符串 . 如果您只有一个消费者,那么其消费者群体并不重要 . 如果有两个或更多消费者,他们可以是同一个消费者群体的一部分,也可以属于不同的消费者群体 .

    来自http://kafka.apache.org/documentation.html

    消费者

    ...

    如果所有消费者实例都具有相同的消费者群组,那么这就像传统的队列 balancer 对消费者的负载一样 . 如果所有消费者实例具有不同的消费者组,则其工作方式类似于发布 - 订阅,并且所有消息都广播给所有消费者 .

    我认为问题可能出在'topics'参数中 . 来自Spark docs

    要使用的(topic_name - > numPartitions)的映射 . 每个分区都在其自己的线程中使用

    您只为主题指定了一个分区,即“1” . 根据代理的设置(num.partitions),可能有多个分区,您的消息可能会被发送到您的程序无法读取的其他分区 .

    此外,我相信partitionIds是基于0的 . 因此,如果您只有一个分区,则其id将等于0 .

  • -2

    我面临同样的问题 . 这是适合我的解决方案 .

    • 分配给Spark Streaming应用程序的核心数必须大于接收者数 . 否则系统将接收数据,但无法处理它 . 因此,Spark Streaming至少需要两个核心 . 所以在我的火花提交中,我应该提到至少两个核心 .

    • kafka-clients-version.jar应该包含在spark-submit的依赖jar列表中 .

  • 0

    我认为,在你的代码中,调用KafkaUtils.createStream的第二个参数应该是kafka服务器的host:port,而不是zookeeper主机和端口 . 检查一次 .

    编辑:Kafka Utils API Documentation

    根据上面的文件,它应该是动物园管理员的法定人数 . 因此应该使用Zookeeper主机名和端口 .

    zkQuorum Zookeeper仲裁(主机名:端口,主机名:端口,..) .

  • 0

    如果zookeeper与您的流应用程序在同一台计算机上运行,则“localhost:2181”将起作用 . 否则,您必须提及运行zookeeper的主机的地址,并确保运行流应用程序的计算机能够与端口2181上的zookeeper主机通信 .

相关问题