尝试编写消耗来自Kafka的消息的Spark Streaming作业 . 这是我到目前为止的情况:
1)启动Zookeeper .
2)启动Kafka Server .
3)向服务器发送了一些消息 . 当我运行以下内容时,我可以看到它们:
bin / kafka-console-consumer.sh --zookeeper localhost:2181 - topic mytopic --from-beginning
4)现在尝试编写一个程序来计算在5分钟内进入的消息数量 .
代码看起来像这样:
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
不确定第3个参数(使用者组)使用什么值 . 当我运行它时,我得到“无法连接到zookeeper服务器” . 但Zookeeper正在2181端口上运行;否则步骤#3就行不通了 .
好像我没有正确使用KafkaUtils.createStream . 有任何想法吗?
5 回答
我认为你应该为zookeeper而不是localhost指定ip . 此外,第三个参数是消费者组名称 . 它可以是你喜欢的任何名字 . 当您有多个消费者绑定到同一个组时,主题分区会相应地分发 . 您的推文应该是:
没有默认的消费者群体这样的东西 . 您可以在那里使用任意非空字符串 . 如果您只有一个消费者,那么其消费者群体并不重要 . 如果有两个或更多消费者,他们可以是同一个消费者群体的一部分,也可以属于不同的消费者群体 .
来自http://kafka.apache.org/documentation.html:
...
我认为问题可能出在'topics'参数中 . 来自Spark docs:
您只为主题指定了一个分区,即“1” . 根据代理的设置(num.partitions),可能有多个分区,您的消息可能会被发送到您的程序无法读取的其他分区 .
此外,我相信partitionIds是基于0的 . 因此,如果您只有一个分区,则其id将等于0 .
我面临同样的问题 . 这是适合我的解决方案 .
分配给Spark Streaming应用程序的核心数必须大于接收者数 . 否则系统将接收数据,但无法处理它 . 因此,Spark Streaming至少需要两个核心 . 所以在我的火花提交中,我应该提到至少两个核心 .
kafka-clients-version.jar应该包含在spark-submit的依赖jar列表中 .
我认为,在你的代码中,调用KafkaUtils.createStream的第二个参数应该是kafka服务器的host:port,而不是zookeeper主机和端口 . 检查一次 .
编辑:Kafka Utils API Documentation
根据上面的文件,它应该是动物园管理员的法定人数 . 因此应该使用Zookeeper主机名和端口 .
zkQuorum Zookeeper仲裁(主机名:端口,主机名:端口,..) .
如果zookeeper与您的流应用程序在同一台计算机上运行,则“localhost:2181”将起作用 . 否则,您必须提及运行zookeeper的主机的地址,并确保运行流应用程序的计算机能够与端口2181上的zookeeper主机通信 .