我一直在玩 Zookeeper 和 Kafka 的基本设置来学习如何使用它,但我遇到了消费者的问题。当 Kafka 不可用时,对poll()
方法的调用会挂起,直到它重新联机。
卡夫卡版:0.10.1.0
我的代码看起来像这样:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);
while (!stopped) {
// If by any reason Kafka is not available this call will hang
// until Kafka is back online.
records = consumer.poll(timeout);
for (ConsumerRecord<String, byte[]> record : records) {
process(record);
}
Thread.sleep(sleepTime);
}
我已经读过,当我打电话给poll()
时,消费者将尝试无限期地连接到 Kafka,直到它重新上线或直到consumer.wakeup()
被调用。
当 Kafka 不在线时,我希望代码行为不同。 在从 non-existent kafka 进行投票时,有没有办法限制消费者重试或使其失败?
1 回答
不幸的是,这仍然是个问题。许多消费者方法可以挂起各种场景。
正在进行的 Kafka 改进建议KIP-266,为消费者方法添加超时以避免挂起。
据我所知,从另一个线程调用
wakeup()
是最好的解决方法编辑:从 Kafka 2.0.0 开始,所有 Consumer 调用都可以接受超时。这样可以在经纪人倒闭的情况下恢复控制权。