首页 文章

发送给具有相同使用者组名称的所有使用者的消息

提问于
浏览
2

有以下消费者代码:

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer

kafka = KafkaClient("localhost", 9092)

consumer = SimpleConsumer(kafka, "my-group", "my-topic")
consumer.seek(0, 2)
for message in consumer:
  print message

kafka.close()

然后我用脚本生成消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

问题在于,当我将消费者作为两个不同的流程启动时,我会在每个流程中收到新消息 . 但是我希望它只发送给一个消费者,而不是广播 .

在Kafka(https://kafka.apache.org/documentation.html)的文档中写道:

如果所有消费者实例都具有相同的消费者群组,那么这就像传统的队列 balancer 对消费者的负载一样 .

我看到这些消费者的群体是相同的 - my-group .

如何使新消息被一个消费者读取而不是广播?

3 回答

  • 2

    直到kafka v . 0 . 0 . 1(2014年3月12日发布)之前,才正式支持消费者组API . 对于先前的服务器版本,使用者组无法正常工作 . 截至本文,kafka-python库目前不会尝试发送组偏移数据:

    https://github.com/mumrah/kafka-python/blob/c9d9d0aad2447bb8bad0e62c97365e5101001e4b/kafka/consumer.py#L108-L115

  • 1

    很难从上面的例子中看出你的Zookeeper配置是什么,或者根本没有 . 您需要一个Zookeeper集群来保持消费者组信息的持久性WRT每个组中的消费者已经消耗给给定的偏移量 .

    一个坚实的例子在这里:Official Kafka documentation - Consumer Group Example

  • 1

    这不应该发生 - 确保两个使用者都在zookeeper znodes中的同一个使用者组下注册 . 一个主题的每条消息应该由消费者组使用一次,因此组中每个人中的一个消费者应该接收消息,而不是您正在经历的消息 . 您使用的是什么版本的Kafka?

相关问题