鉴于以下情况:
我在本地启动了zookeeper和一个kafka经纪人,并按照kafka快速入门中的描述创建了"test"主题:https://kafka.apache.org/quickstart
然后,我运行一个简单的java程序,每秒都会向“test”主题生成一条消息 . 一段时间后,我关闭了我的本地kafka经纪人,看到 生产环境 者继续产生消息,它不会抛出任何异常 . 最后,我再次启动kafka代理, 生产环境 者能够重新连接到代理并继续生成消息,但是,在kafka代理停机期间生成的所有消息都将丢失 . 当检测到 Health 的kafka经纪人时,制作人不会重播它们 .
我怎么能阻止这个?我希望kafka 生产环境 商在检测到kafka经纪人重新联机时重播这些消息 . 这是我的 生产环境 者配置:
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("linger.ms", 0);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
2 回答
Kafka Producer库内置了重试机制,但默认情况下它已关闭 . 将
retries
Producer配置更改为大于0(默认值)的值以将其打开 . 您还应该尝试使用retry.backoff.ms
和request.timetout.ms
来自定义Producer重试 .示例Kafka Producer配置启用重试:
您可以在Apache Kafka documentation中找到有关这些属性的更多信息 .
由于您只运行一个经纪人,我担心当您的经纪人关闭时,您将无法存储消息 .
但是,当您将经纪人关闭时,您不会收到任何异常/警告/错误,这很奇怪 .
我期望“无法更新元数据”或“过期消息”错误,因为当 生产环境 者向提到的针对bootstrap.servers属性的代理发送消息时,它首先检查zookeeper以查找活动控制器(或领导者)和分区 . 因此,在您的情况下,因为您在独立模式下运行kafka,并且当代理关闭时, 生产环境 者不应该收到领导者信息和错误输出 .
您能否查看以下属性设置为:
用这些值来玩(减少,可能)?并检查结果?
您可能想要尝试的另一个选项是以同步方式向Kafka发送消息(阻止send()方法,直到收到消息),这是一个可能有用的代码片段(取自this documentation reference):
如果要模拟简单的阻塞调用,可以立即调用get()方法:
在这种情况下,如果由于任何原因未成功发送消息,kafka应该抛出异常 .
我希望这有帮助 .