首页 文章

当卡夫卡失败时,卡夫卡的消费者依此坚持投票

提问于
浏览
3

我一直在玩 Zookeeper 和 Kafka 的基本设置来学习如何使用它,但我遇到了消费者的问题。当 Kafka 不可用时,对poll()方法的调用会挂起,直到它重新联机。

卡夫卡版:0.10.1.0

我的代码看起来像这样:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);

while (!stopped) {
    // If by any reason Kafka is not available this call will hang
    // until Kafka is back online.
    records = consumer.poll(timeout);

    for (ConsumerRecord<String, byte[]> record : records) {
        process(record);
    }

    Thread.sleep(sleepTime);
}

我已经读过,当我打电话给poll()时,消费者将尝试无限期地连接到 Kafka,直到它重新上线或直到consumer.wakeup()被调用。

当 Kafka 不在线时,我希望代码行为不同。 在从 non-existent kafka 进行投票时,有没有办法限制消费者重试或使其失败?

1 回答

  • 4

    不幸的是,这仍然是个问题。许多消费者方法可以挂起各种场景。

    正在进行的 Kafka 改进建议KIP-266,为消费者方法添加超时以避免挂起。

    据我所知,从另一个线程调用wakeup()是最好的解决方法


    编辑:从 Kafka 2.0.0 开始,所有 Consumer 调用都可以接受超时。这样可以在经纪人倒闭的情况下恢复控制权。

相关问题