首页 文章

当kafka服务器关闭时,Kafka 生产环境 者无限期地发送块

提问于
浏览
2

我正在使用Kafka 0.11.0.0 . 我有一个发布到Kafka主题的测试程序;如果zookeeper和Kafka服务器关闭(这在我的开发环境中是正常的;我根据需要启动它们),那么对KafkaProducer <> . send()的调用将无限期挂起 .

我要么需要send()返回,最好指出错误;或者我需要一种方法来检查服务器是启动还是关闭 . 基本上,我希望我的测试工具能够告诉我,“嘿,假,启动Kafka!”而不是挂 .

我的 生产环境 者任务是否有办法确定服务器是启动还是关闭?

我正在调用send(),如下所示:

kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY,
    message), (rm, ex) -> {
        System.out.println("**** " + rm + "\n**** " +ex);
});

我有linger.ms = 1;我尝试重试= 0,1和2,并发送()静止块 . 我从来没有见过回调 .

较旧的消息建议将metadata.fetch.timeout.ms设置为较小的值,但这在0.11中消失了 . 其他人建议调用命令行实用程序来查看服务器是否正常......但引用的实用程序似乎也已消失 .

完成这项工作的优雅方式是什么?

2 回答

  • 2

    这很奇怪 . 它应返回错误,指出“无法更新元数据”或“使x个记录失效” .

    检查 生产环境 者的 request.timeout.msmax.block.ms 设置 . 默认情况下 request.timeout.ms 为60秒

  • 2

    我们可以通过三种方式向经纪人发送消息:

    Fire-and-forget : 我们向服务器发送消息,并不关心它是否成功到达 . 大多数情况下,它会成功到达,因为Kafka具有高可用性,并且 生产环境 者将自动重试发送消息 . 但是,使用此方法会丢失一些消息 .

    Asynchronous send 我们使用回调函数调用send()方法,该函数在收到Kafka代理的响应时被触发 .

    Synchronous send 我们发送一条消息,send()方法返回一个Future对象,我们使用get()等待将来查看send()是否成功 .

    同步发送消息的最简单方法如下:

    ProducerRecord<String, String> record =
                new ProducerRecord<>(KAFKA_TOPIC, KEY, message);
        try {
                producer.send(record).get();
        } catch (Exception e) {
                 e.printStackTrace();
        }
    

    在这里,我们使用 Future.get() 等待Kafka的回复 . 如果记录未成功发送到Kafka,此方法将引发异常 . 如果没有错误,我们将获得一个RecordMetadata对象,我们可以使用它来检索消息写入的偏移量 .

    希望这可以帮助 .

相关问题