我试图在Kafka中创建新主题时启动动态消费者,但动态启动的消费者总是丢失开始/第一个消息但是从那里消费消息 . 我正在使用kafka-python模块,并使用更新的KafkaConsumer和KafkaProducer .
制片人代码是
producer = KafkaProducer(bootstrap_servers='localhost:9092')
record_metadata = producer.send(topic, data)
和消费者的代码是
consumer = KafkaConsumer(topic,group_id="abc",bootstrap_servers='localhost:9092',auto_offset_reset='earliest')
请提出建议,以解决此问题或我必须包含在我的 生产环境 者和消费者实例中的任何配置 .
1 回答
你能将auto_offset_reset设置为最早 .
创建新的使用者流时,它从最新的偏移量(auto_offset_reset的默认值)开始,您将错过在未启动使用者时发送的消息 .
你可以在kafka python doc中阅读它 . 相关部分如下