我有一组作为集群运行的 Kafka 代理实例。我有一个向 Kafka 生成数据的客户端:
props.put("metadata.broker.list", "broker1:9092,broker2:9092,broker3:9092");
当我们使用 tcpdump 监控时,我可以看到只有与 broker1 和 broker2 的连接是 ESTABLISHED,而对于 broker3,我的生产者没有连接。我只有一个分区的单个主题。
我的问题:
-
经纪人数与主题分区之间的关系如何?我应该总是有经纪人数=分数吗?
-
为什么在我的情况下,我无法连接到 broker3?或至少我的网络监控没有显示我的生产者与 broker3 建立的连接?
如果我能从生产者的立场深入了解与经纪人的联系是如何工作的,那将是很棒的。
1 回答
显然,你的制作人不需要连接到
broker3
:)我将尝试向您解释当您向 Kafka 生成数据时会发生什么:
你启动了一些经纪人,比方说 3,然后用 2 个分区创建一些主题
foo
,复制因子 2.很简单的例子,但对于某人来说可能是一个真实的案例。您可以使用配置到这些代理的
metadata.broker.list
(或新生成器中的bootstrap.servers
)创建生产者。值得一提的是,您不一定要指定群集中的所有代理,实际上您只能指定其中的一个代理,它仍然可以工作。我也会解释一下。您使用生产者向主题
foo
发送消息。生产者查找其本地元数据缓存,以查看哪些代理是主题
foo
的每个分区的领导者以及您的foo
主题有多少分区。由于这是第一次发送给生产者,因此本地缓存不包含任何内容。生产者按顺序向
metadata.broker.list
中的每个代理发送TopicMetadataRequest
,直到第一次成功响应。这就是为什么我提到该列表中的 1 个经纪人只要活着就会工作。返回
TopicMetadataResponse
将包含有关所请求主题的信息,在您的情况下,它是foo
和群集中的代理。基本上,此响应包含以下内容:集群中的代理列表,其中每个代理都有 ID,主机和端口。此列表可能不包含群集中的整个代理列表,但至少应包含负责服务主题主题的代理列表。
主题元数据列表,其中每个条目都有主题名称,分区数,每个分区的领导者代理 ID 以及每个分区的 ISR 代理 ID。
基于
TopicMetadataResponse
,您的生成器构建其本地缓存,现在确切地知道主题foo
partition0
的请求应该转到代理 X.根据主题中的分区数量,生产者对您的消息进行分区并累积它,并知道它应作为批处理的一部分发送给某个代理。
批处理已满或
linger.ms
超时通过时,生产者将批处理刷新到代理。通过“刷新”我的意思是“打开与代理的新连接或重用现有的连接,然后发送ProduceRequest
”。生产者不需要打开与所有代理的不必要的连接,因为您生成的主题可能无法由某些代理提供服务,并且您的群集可能非常大。想象一下 1000 个代理群集有很多主题,但其中一个主题只有一个分区 - 你只需要一个连接,而不是 1000 个。
在您的特定情况下,我不能 100%确定为什么您有两个开放连接到代理,如果您只有一个分区,但我假设在元数据发现期间打开了一个连接并缓存以便重用,第二个是实际的代理连接产生数据。但是,在这种情况下我可能错了。
但无论如何,根本没有必要为第三个经纪人建立连接。
关于你的问题“我应该总是有多少经纪人=分数?”答案很可能是没有。如果你解释一下你想要实现的目标,也许我能够指出你正确的方向,但这一点太宽泛而无法解释。我建议阅读这个以澄清事情。
UPD 回答评论中的问题:
元数据缓存更新为 2 种情况:
如果生产者由于任何原因未能与代理进行通信 - 这包括代理根本无法访问且代理响应错误的情况(例如“我不再是该分区的领导者,请离开”)
如果没有发生故障,客户端仍会每隔
metadata.max.age.ms
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java#L42-L43)刷新元数据以发现新的代理和分区本身。