我正在使用Kafka 0.11.0.0 . 我有一个发布到Kafka主题的测试程序;如果zookeeper和Kafka服务器关闭(这在我的开发环境中是正常的;我根据需要启动它们),那么对KafkaProducer <> . send()的调用将无限期挂起 .
我要么需要send()返回,最好指出错误;或者我需要一种方法来检查服务器是启动还是关闭 . 基本上,我希望我的测试工具能够告诉我,“嘿,假,启动Kafka!”而不是挂 .
我的 生产环境 者任务是否有办法确定服务器是启动还是关闭?
我正在调用send(),如下所示:
kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY,
message), (rm, ex) -> {
System.out.println("**** " + rm + "\n**** " +ex);
});
我有linger.ms = 1;我尝试重试= 0,1和2,并发送()静止块 . 我从来没有见过回调 .
较旧的消息建议将metadata.fetch.timeout.ms设置为较小的值,但这在0.11中消失了 . 其他人建议调用命令行实用程序来查看服务器是否正常......但引用的实用程序似乎也已消失 .
完成这项工作的优雅方式是什么?
2 回答
这很奇怪 . 它应返回错误,指出“无法更新元数据”或“使x个记录失效” .
检查 生产环境 者的
request.timeout.ms
和max.block.ms
设置 . 默认情况下request.timeout.ms
为60秒我们可以通过三种方式向经纪人发送消息:
Fire-and-forget : 我们向服务器发送消息,并不关心它是否成功到达 . 大多数情况下,它会成功到达,因为Kafka具有高可用性,并且 生产环境 者将自动重试发送消息 . 但是,使用此方法会丢失一些消息 .
Asynchronous send 我们使用回调函数调用send()方法,该函数在收到Kafka代理的响应时被触发 .
Synchronous send 我们发送一条消息,send()方法返回一个Future对象,我们使用get()等待将来查看send()是否成功 .
同步发送消息的最简单方法如下:
在这里,我们使用 Future.get() 等待Kafka的回复 . 如果记录未成功发送到Kafka,此方法将引发异常 . 如果没有错误,我们将获得一个RecordMetadata对象,我们可以使用它来检索消息写入的偏移量 .
希望这可以帮助 .