我在制作Kafka主题的记录时遇到异常:
java.lang.RuntimeException: This server is not the leader for that topic-partition.
以下是将记录发送到Kafka主题的代码 .
AtomicReference<Exception> exRef = new AtomicReference<>();
while([some condition]) {
producer.send(new ProducerRecord<>(topic, message), (metadata, exception) -> {
if (exception != null) {
exRef.set(exception);
}
});
if (exRef.get() != null) {
throw new RuntimeException(exRef.get().getMessage(), exRef.get().getCause());
}
}
我只需要知道这个例外的原因是什么?怎么预防呢?
1 回答
每个主题分区都有一个领导者代理,负责提供来自客户端的所有读/写请求(如果复制大于一个,则多个跟随者代理仅复制来自领导代理的数据,但不提供任何读取/来自客户的写请求) . 在客户端启动时,客户端查询集群以获取其需要读取/写入的每个分区的领导者并缓存此元数据 .
可能会发生这样的情况:主题分区的领导层由于不同的原因从一个经纪人转移到另一个经纪人(即,追随者成为领导者)(例如,经纪人倒闭 - 在恢复之后它可能不再是领导者,但是要成为追随者,因为不需要转回领导 - 或者如果发出管理命令,则将分区移动到其他经纪人 .
如果发生这种情况,客户端的元数据不再正确,您将获得相应的异常 . 因此,从客户的角度来看,您无法阻止这种情况发生 . 但是,您可以简单地创建一个新的客户端实例,该实例将重新发现新的领导者,您的应用程序可以从那里恢复 .