首页 文章

kafka new producer无法在其中一个代理关闭后更新元数据

提问于
浏览
5

我有一个kafka环境,有2个经纪人和1个动物园管理员 .

当我试图向kafka发送消息时,如果我停止代理1(这是领导者1),客户端会停止生成消息并给我以下错误,尽管代理2被选为主题和分区的新领导者 .

org.apache.kafka.common.errors.TimeoutException:60000毫秒后无法更新元数据 .

经过10分钟后,由于经纪人2是新的领导者,我希望 生产环境 者向经纪人2发送数据,但是由于给出了上述异常,它继续失败 . lastRefreshMs和lastSuccessfullRefreshMs仍然相同,尽管metadataExpireMs对于 生产环境 者来说是300000 .

我在 生产环境 者方面使用kafka new Producer实现 .

似乎生成器启动时,它绑定到一个代理,如果该代理发生故障,它甚至不会尝试连接到集群中的另一个代理 .

但我的期望是,如果经纪人倒闭,它应该直接检查可用的另一个经纪人的元数据并向他们发送数据 .

顺便说一句,我的主题是4分区,复制因子为2.在有意义的情况下提供此信息 .

配置参数 .

{request.timeout.ms=30000, retry.backoff.ms=100, buffer.memory=33554432, ssl.truststore.password=null, batch.size=16384, ssl.keymanager.algorithm=SunX509, receive.buffer.bytes=32768, ssl.cipher.suites=null, ssl.key.password=null, sasl.kerberos.ticket.renew.jitter=0.05, ssl.provider=null, sasl.kerberos.service.name=null, max.in.flight.requests.per.connection=5, sasl.kerberos.ticket.renew.window.factor=0.8, bootstrap.servers=[10.201.83.166:9500, 10.201.83.167:9500], client.id=rest-interface, max.request.size=1048576, acks=1, linger.ms=0, sasl.kerberos.kinit.cmd=/usr/bin/kinit, ssl.enabled.protocols=[TLSv1.2, TLSv1.1, TLSv1], metadata.fetch.timeout.ms=60000, ssl.endpoint.identification.algorithm=null, ssl.keystore.location=null, value.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, ssl.truststore.location=null, ssl.keystore.password=null, key.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, block.on.buffer.full=false, metrics.sample.window.ms=30000, metadata.max.age.ms=300000, security.protocol=PLAINTEXT, ssl.protocol=TLS, sasl.kerberos.min.time.before.relogin=60000, timeout.ms=30000, connections.max.idle.ms=540000, ssl.trustmanager.algorithm=PKIX, metric.reporters=[], compression.type=none, ssl.truststore.type=JKS, max.block.ms=60000, retries=0, send.buffer.bytes=131072, partitioner.class=class org.apache.kafka.clients.producer.internals.DefaultPartitioner, reconnect.backoff.ms=50, metrics.num.samples=2, ssl.keystore.type=JKS}

使用案例:

1-启动BR1和BR2生成数据(Leader为BR1)

2-停止BR2产生数据(罚款)

3-停止BR1(这意味着此时集群中没有活动的工作代理)然后启动BR2并生成数据(尽管领导者是BR2但是失败)

4-启动BR1生成数据(领导者仍然是BR2,但数据生成得很好)

5-停止BR2(现在BR1是领导者)

6-停止BR1(BR1仍是领先者)

7-启动BR1生成数据(消息再次生成)

如果 生产环境 商将最新的成功数据发送给BR1,然后所有经纪人都下降,那么 生产环境 商希望BR1再次起床,尽管BR2已经上升并且是新领导者 . 这是预期的行为吗?

2 回答

  • 2

    花了好几个小时后,我在我的情况下想出了 Kafka 的行为 . 可能是这是一个错误或可能是这需要以这种方式完成由于原因在引擎盖下,但实际上,如果我会做这样的实现我不会这样做:)

    当所有经纪人都倒闭时,如果你只能起床一个经纪人,那么这个经纪人必须是最后才能成功产生消息的经纪人 .

    假设你有5个经纪人; BR1,BR2,BR3,BR4和BR5 . 如果一切都失败了,如果最后死亡的经纪人是BR3(这是最后一位领导者),虽然你启动所有经纪人BR1,BR2,BR4和BR5,但除非你启动BR3,否则没有任何意义 .

  • 7

    您需要增加重试次数 . 在您的情况下,您需要将其设置为> = 5 .

    这是您的制作人知道您的群集有新领导者的唯一方法 .

    除此之外,请确保您的所有经纪人都有您的分区副本 . 否则你不会得到新的领导者 .

相关问题