我正在尝试使用 kafka-clients v.0.10.2.1 创建具有不同消费者群组的多个消费者到kafka主题 . 虽然我无法检索消费者群体提交的最后一个偏移量 .
目前我的消费者属性看起来像这样
Properties cproperties = new Properties();
cproperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
cproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, my-broker));
cproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
cproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
cproperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, taskDecoder.getClass());
cproperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
并且没有属性自动重置偏移我不能从主题消耗,但我不能使用此配置,我需要在zookeeper上注册的消费者组 . 所以,我还需要在zookeeper / consumer上创建一个消费者群体 .
1 回答
您需要将属性
auto.offset.reset
包含到earliest
(或latest
,具体取决于您要实现的目标),以避免在未找到偏移时抛出异常(可能是因为数据已删除) .您还需要确保手动提交偏移量,因为您已禁用自动提交 .
为此,您可以使用commitSync()
或commitAsync()
请注意,如果您不提交偏移量,则在
auto.offset.reset
设置为none
时将引发异常 .