首页 文章

Spring kafka不支持大消息使用者

提问于
浏览
0

我使用spring Kafka消耗LinkedIn large message supported Kafka client产生的消息

鉴于此Kafka客户端始终将 AUTO_OFFSET_RESET_CONFIG 覆盖为无,如其构造函数中所示 .

private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
    Deserializer<K> keyDeserializer,
    Deserializer<V> valueDeserializer,
    Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer,
    Auditor<K, V> consumerAuditor) {
        _kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(),
        byteArrayDeserializer,
        byteArrayDeserializer);
    }
Map<String, Object> configForVanillaConsumer() {
    Map<String, Object> newConfigs = new HashMap<>();
    newConfigs.putAll(this.originals());
    newConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    newConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
    return newConfigs;
}

因此,一旦我开始使用批量提交并将 ENABLE_AUTO_COMMIT_CONFIG 设置为false,它将引发以下错误:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] ERROR oakcciConsumerCoordinator - 用户提供收听com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener用于组文档事件消费者未能上分区分配org.apache .kafka.clients.consumer.NoOffsetForPartitionException:DocumentEvents-2 org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:369)在org.apache.kafka:具有用于分区的任何重置策略未定义偏移.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:247)在org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1602)在org.apache.kafka.clients.consumer.KafkaConsumer .position(KafkaConsumer.java:1265)at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.position(LiKafkaConsumerImpl.java:403)at org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer $ 1.onPartitionsAssigned(KafkaMessageListenerContainer.ja va:447)在org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)的com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener.onPartitionsAssigned(LiKafkaConsumerRebalanceListener.java:62)位于org.apache.kafka的org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)的.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) .clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)在org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)在org.apache.kafka.clients.consumer.KafkaConsumer .poll(KafkaConsumer.java:995)at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.poll(LiKafkaConsumerImpl.java:231)at org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.run(KafkaMessageListenerContainer.java:558 )在java.util.concurrent.E xecutors $ RunnableAdapter.call(Executors.java:511)在java.util.concurrent.FutureTask.run(FutureTask.java:266)在java.lang.Thread.run(Thread.java:745)

出现此问题的原因是此使用者组首次使用此主题的消息,因此它尝试使用偏移重置策略 .

虽然我把它设置为“最早的”,但它被底层的LinkedIn kafka客户端覆盖为“无”

我还试图覆盖ConsumerRebalanceListener以在这种情况下手动搜索到开头,但实际上它并没有达到这一点 .

我该如何解决这个问题?

1 回答

  • 0

    有趣;请在GitHub中打开一个问题 .

    如果政策是 none ,我们应该 grab 那个例外 .

    与此同时,您可以通过仅使用常规客户端来解决该问题,实际设置组的初始偏移量(您不必实际接收任何消息,只需分配和设置分区)小组的初始职位) .

相关问题