首页 文章

单节点多代理配置上的 kafka 使用者

提问于
浏览
0

是否有人使用单节点多代理设置工作 kafka python?

我能够使用单节点单个代理设置生成和使用数据,但是当我将其更改为单个节点时,生成了多个代理数据并将其存储在主题中,但是当我运行消费者代码时,数据未被消耗。

对上述任何建议都将不胜感激。提前致谢!

注意:生产者,消费者和服务器属性等所有配置都经过验证,并且没有问题。

制片人代码:

from kafka.producer import KafkaProducer

def producer():
    data = {'desc' : 'testing', 'data' : 'testing single node multi broker'}
    topic = 'INTERNAL' 
    producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'), bootstrap_servers=["localhost:9092","localhost:9093","localhost:9094"])

    producer.send(topic, data)

    producer.flush()

消费者代码:

from kafka.consumer import KafkaConsumer

def consumer():  
    topic = 'INTERNAL'
    consumer = KafkaConsumer(topic,bootstrap_servers=["localhost:9092","localhost:9093","localhost:9094"])

    for data in consumer:
        print data

服务器 1 配置:我添加了两个这样的服务器文件,其他代理的参数相同,但broker.idlog.dirs值不同。

broker.id=1
port=9092
num.network.threads=3
log.dirs=/tmp/kafka-logs-1
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
delete.topic.enable=true

制片人配置:

metadata.broker.list=localhost:9092,localhost:9093,localhost:9094

消费者配置:

zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000

1 回答

  • 1

    你收到一个简单的卡夫卡消费者的消息吗?

    bin/kafka-console-consumer.sh –bootstrap-server localhost:9092,localhost:9093,localhost:9094 –topic INTERNAL –from-beginning
    

    或者用这个:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic INTERNAL
    

    如果使用第二个命令获取消息,请尝试删除/tmp/log.dir中的/tmp/log.dir代理和日志文件。然后重新启动 zookeeper 和您的代理并再次创建主题。

相关问题