是否有人使用单节点多代理设置工作 kafka python?
我能够使用单节点单个代理设置生成和使用数据,但是当我将其更改为单个节点时,生成了多个代理数据并将其存储在主题中,但是当我运行消费者代码时,数据未被消耗。
对上述任何建议都将不胜感激。提前致谢!
注意:生产者,消费者和服务器属性等所有配置都经过验证,并且没有问题。
制片人代码:
from kafka.producer import KafkaProducer
def producer():
data = {'desc' : 'testing', 'data' : 'testing single node multi broker'}
topic = 'INTERNAL'
producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'), bootstrap_servers=["localhost:9092","localhost:9093","localhost:9094"])
producer.send(topic, data)
producer.flush()
消费者代码:
from kafka.consumer import KafkaConsumer
def consumer():
topic = 'INTERNAL'
consumer = KafkaConsumer(topic,bootstrap_servers=["localhost:9092","localhost:9093","localhost:9094"])
for data in consumer:
print data
服务器 1 配置:我添加了两个这样的服务器文件,其他代理的参数相同,但broker.id
,log.dirs
值不同。
broker.id=1
port=9092
num.network.threads=3
log.dirs=/tmp/kafka-logs-1
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
delete.topic.enable=true
制片人配置:
metadata.broker.list=localhost:9092,localhost:9093,localhost:9094
消费者配置:
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
1 回答
你收到一个简单的卡夫卡消费者的消息吗?
或者用这个:
如果使用第二个命令获取消息,请尝试删除
/tmp/log.dir
中的/tmp/log.dir
代理和日志文件。然后重新启动 zookeeper 和您的代理并再次创建主题。