在启动汇合点网络消费者时,在调用订阅和后续轮询之后,似乎需要很长时间才能从服务器接收“分配已分配”事件,因此消息(大约10-15秒) .
起初我认为有一个自动主题创建开销,但无论消费者的主题/消费者群体是否已经存在,时间都是相同的 .
我使用此配置启动我的使用者,其余代码与汇合的高级消费者示例中的相同:
var kafkaConfig = new Dictionary<string, object>
{
{"group.id", config.ConsumerGroup},
{"statistics.interval.ms", 60000},
{"fetch.wait.max.ms", 10},
{"bootstrap.servers", config.BrokerList},
{"enable.auto.commit", config.AutoCommit},
{"socket.blocking.max.ms",1},
{"fetch.error.backoff.ms",1 },
{"socket.nagle.disable",true },
{"auto.commit.interval.ms", 5000},
{
"default.topic.config", new Dictionary<string, object>()
{
{"auto.offset.reset", "smallest"}
}
}
};
kafka群集由具有默认设置的远程数据中心中的3台中低端规格机器组成 . 是否有可以调整的代理或客户端设置以降低启动时间?
编辑:在启动时间约为2秒的情况下,自己使用Assign而不是Subscribe结果分配分区
1 回答
Kafka消费者按设计分组工作 - 您看到的延迟是群组协调员(位于群集,而不是客户端)等待任何现有/先前会话超时并允许任何其他消费者在为具有活动连接的所有使用者分配分区之前,要启动相同的组 .
实际上,如果你足够快地重新启动你的测试消费者,你会看到延迟跳到差不多30秒,因为
session.timeout.ms
的默认值是30000,并且群集仍然没有"noticed",前一个消费者已经离开这个超时如果你在重新启动之间更改group.id
,那么'll see the delay drop drastically as the cluster won'等待属于不同组的现有消费者 .最后,在再次启动之前尝试干净地退出您的消费者(调用Unsubscribe()并确保消费者被处置) .
看起来
session.timeout.ms
可以降低到6000以减少任何现有消费者群体连接的超时,但不会降低 .即使一切都开始“干净”,看起来你仍然会延迟长达7秒(我猜测标准连接设置加上等待同一组中的任何其他消费者开始) . 如果您使用Assign()而不是Subscribe(),那么您选择自己将分区分配给您的消费者,并且自动组 balancer 不适用 .