我在 Apache Kafka
上建了一个排队系统 . 应用程序将生成特定于 Kafka topic
的消息,在消费者端,我必须使用为该主题生成的所有记录 .
我使用新的Java Consumer Api编写了消费者 . 代码看起来像
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerIp+":9092");
props.put("group.id",groupId);
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("consumertest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.println("Data recieved : "+record.value());
}
}
在这里,我需要永远运行消费者,以便 生产环境 者推送到kafka主题的任何记录都应该立即消费和处理 .
所以我的困惑是,使用无限循环(如示例代码中)来使用数据是一种正确的方法吗?
4 回答
它适用于我,但你可能想把你的内部循环放在try / catch块中,以防你抛出任何异常 . 如果断开连接,还要考虑定期重新连接任务 .
是的,你可以使用无限循环 . 实际上,这不是一个繁忙的循环 . 在每次轮询期间,如果数据不可用,则呼叫将等待给定的时间段 .
新消费者自动处理网络通信问题 . 确保在关机时消费者优雅地关闭 .
是的,它是使用无限循环消耗数据的正确方法 .
消费者通常是长期运行的应用程序,不断轮询Kafka以获取更多数据 . 消费者必须继续对Kafka进行投票,否则他们将被视为死亡,他们正在消费的分区将被传递给该组中的另一个消费者以继续消费 .
poll()返回记录列表 . 每条记录包含记录来自的主题和分区,分区内记录的偏移量,以及记录的键和值 . 记录处理是特定于应用程序的 .
如果退出循环,请在退出之前关闭()消费者 . 这将关闭网络连接和套接字,并立即触发重新 balancer .
虽然可以使用无限循环,但在Kafka使用者_3028305中可以找到更优雅的方法,如下所示:
这允许您使用钩子选择正常关闭 .