首页 文章

永远运行kafka使用者(新的Consumer API)

提问于
浏览
0

我在 Apache Kafka 上建了一个排队系统 . 应用程序将生成特定于 Kafka topic 的消息,在消费者端,我必须使用为该主题生成的所有记录 .
我使用新的Java Consumer Api编写了消费者 . 代码看起来像

Properties props = new Properties();  
                     props.put("bootstrap.servers", kafkaBrokerIp+":9092");  
                     props.put("group.id",groupId);  
                     props.put("enable.auto.commit", "true");
                     props.put("session.timeout.ms", "30000");
                     props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
                     consumer.subscribe(Arrays.asList("consumertest"));  
                     while (true) {  
                         ConsumerRecords<String, String> records = consumer.poll(100);  
                         for (ConsumerRecord<String, String> record : records){  
                             System.out.println("Data recieved : "+record.value());  
                             }  
                     }

在这里,我需要永远运行消费者,以便 生产环境 者推送到kafka主题的任何记录都应该立即消费和处理 .
所以我的困惑是,使用无限循环(如示例代码中)来使用数据是一种正确的方法吗?

4 回答

  • 0

    它适用于我,但你可能想把你的内部循环放在try / catch块中,以防你抛出任何异常 . 如果断开连接,还要考虑定期重新连接任务 .

  • 0

    是的,你可以使用无限循环 . 实际上,这不是一个繁忙的循环 . 在每次轮询期间,如果数据不可用,则呼叫将等待给定的时间段 .

    long millisToWait = 100;
    consumer.poll(millisToWait);
    

    新消费者自动处理网络通信问题 . 确保在关机时消费者优雅地关闭 .

  • 0

    是的,它是使用无限循环消耗数据的正确方法 .

    消费者通常是长期运行的应用程序,不断轮询Kafka以获取更多数据 . 消费者必须继续对Kafka进行投票,否则他们将被视为死亡,他们正在消费的分区将被传递给该组中的另一个消费者以继续消费 .

    poll()返回记录列表 . 每条记录包含记录来自的主题和分区,分区内记录的偏移量,以及记录的键和值 . 记录处理是特定于应用程序的 .

    如果退出循环,请在退出之前关闭()消费者 . 这将关闭网络连接和套接字,并立即触发重新 balancer .

  • 0

    虽然可以使用无限循环,但在Kafka使用者_3028305中可以找到更优雅的方法,如下所示:

    public class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;
    
        public void run() {
            try {
                consumer.subscribe(Arrays.asList("topic"));
                while (!closed.get()) {
                    ConsumerRecords records = consumer.poll(10000);
                    // Handle new records
                }
            } catch (WakeupException e) {
                // Ignore exception if closing
                if (!closed.get()) throw e;
            } finally {
               consumer.close();
            }
        }
    
        // Shutdown hook which can be called from a separate thread
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
    

    这允许您使用钩子选择正常关闭 .

相关问题