首页 文章

经纪人倒闭时, Kafka 制片人正在失去信息

提问于
浏览
0

鉴于以下情况:

我在本地启动了zookeeper和一个kafka经纪人,并按照kafka快速入门中的描述创建了"test"主题:https://kafka.apache.org/quickstart

然后,我运行一个简单的java程序,每秒都会向“test”主题生成一条消息 . 一段时间后,我关闭了我的本地kafka经纪人,看到 生产环境 者继续产生消息,它不会抛出任何异常 . 最后,我再次启动kafka代理, 生产环境 者能够重新连接到代理并继续生成消息,但是,在kafka代理停机期间生成的所有消息都将丢失 . 当检测到 Health 的kafka经纪人时,制作人不会重播它们 .

我怎么能阻止这个?我希望kafka 生产环境 商在检测到kafka经纪人重新联机时重播这些消息 . 这是我的 生产环境 者配置:

props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("linger.ms", 0);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());

2 回答

  • 1

    Kafka Producer库内置了重试机制,但默认情况下它已关闭 . 将 retries Producer配置更改为大于0(默认值)的值以将其打开 . 您还应该尝试使用 retry.backoff.msrequest.timetout.ms 来自定义Producer重试 .

    示例Kafka Producer配置启用重试:

    retries=2147483647         //Integer.MAX_VALUE 
    retry.backoff.ms=1000
    request.timeout.ms=305000  //5 minutes
    max.block.ms=2147483647    //Integer.MAX_VALUE
    

    您可以在Apache Kafka documentation中找到有关这些属性的更多信息 .

  • 0

    由于您只运行一个经纪人,我担心当您的经纪人关闭时,您将无法存储消息 .

    但是,当您将经纪人关闭时,您不会收到任何异常/警告/错误,这很奇怪 .

    我期望“无法更新元数据”或“过期消息”错误,因为当 生产环境 者向提到的针对bootstrap.servers属性的代理发送消息时,它首先检查zookeeper以查找活动控制器(或领导者)和分区 . 因此,在您的情况下,因为您在独立模式下运行kafka,并且当代理关闭时, 生产环境 者不应该收到领导者信息和错误输出 .

    您能否查看以下属性设置为:

    request.timeout.ms
    max.block.ms
    

    用这些值来玩(减少,可能)?并检查结果?

    您可能想要尝试的另一个选项是以同步方式向Kafka发送消息(阻止send()方法,直到收到消息),这是一个可能有用的代码片段(取自this documentation reference):

    如果要模拟简单的阻塞调用,可以立即调用get()方法:

    byte[] key = "key".getBytes();
    byte[] value = "value".getBytes();
    ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
    producer.send(record).get();
    

    在这种情况下,如果由于任何原因未成功发送消息,kafka应该抛出异常 .

    我希望这有帮助 .

相关问题