首页 文章

无法获取kafka主题中的消息数

提问于
浏览
0

我对 Kafka 很新 . 我在java中创建了一个示例 生产环境 者和使用者 . 使用 生产环境 者,我能够将数据发送到kafka主题,但我无法使用以下使用者代码获取主题中的记录数 .

public class ConsumerTests {
        public static void main(String[] args) throws Exception {
            BasicConfigurator.configure();

            String topicName = "MobileData";
            String groupId = "TestGroup";
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("group.id", groupId);
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            kafkaConsumer.subscribe(Arrays.asList(topicName));


try {
  while (true) {
    ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
    System.out.println("Record count is " + records.count());
  }
} catch (WakeupException e) {
  // ignore for shutdown
} finally {
  consumer.close();
}

}
}

我在控制台中没有任何异常,但是consumerRecords.count()总是返回0,即使主题中有消息也是如此 . 如果我遗漏了一些记录细节,请告诉我 .

1 回答

  • 0

    poll(...) 调用通常应该处于循环中 . 在分区分配正在进行时,初始 poll(...) 总是可以不返回数据(取决于超时) . 这是一个例子:

    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        System.out.println("Record count is " + records.count());
      }
    } catch (WakeupException e) {
      // ignore for shutdown
    } finally {
      consumer.close();
    }
    

    有关详细信息,请参阅this relevant article

相关问题