首页 文章

spring-cloud-stream使用者分区重新分配

提问于
浏览
0

场景:

  • 运行spring-boot项目,该项目使用名为'test'的分区,该分区有10个分区 . 分区分配发生在13:00:00

  • 在~13:00:30使用以下内容向主题添加分区:
    ./kafka-topics.sh --alter --zookeeper zookeeper:2181 --topic test --partitions 100

  • 在~13:05:30时触发分区重新分配 .

我跑了几次这样的步骤,看起来每隔约5分钟重新分配一次 .

  • 有没有办法改变重新分配检查的操作频率?

  • 我们希望它每隔几秒钟 . 这个操作是否很重,这是每5分钟发生一次的原因?或者它几乎可以忽略不计?

EDIT:

我的用例如下:我们有集成测试,可以启动我们的微服务 . 当主题的使用者首先引导时,如果主题不存在,则创建该主题,并且它创建的分区数等于配置的 concurrency (例如10) . 然后,该主题的 生产环境 者启动并且他配置的 partitonCount (例如20)大于创建的分区的数量,因此spring-cloud-stream添加丢失的分区,同时消费者分配的分区没有改变,它从前10个分区(1-10)继续消耗 . 问题是 生产环境 者正在向所有20个分区发布消息,因此在为消费者分配新分区之前,不会消耗发送到最后10个分区(11-20)的消息 .
此行为会导致测试出现问题,我们不能等待5分钟,直到将所有分区分配给使用者 . 此外,我们不希望事先创建具有所需分区数量的主题,我们希望它仍然可以由spring-cloud-stream处理 .

EDIT 2:

似乎控制"reassignment"的相关属性是 metadata.max.age.ms .

即使我们没有看到任何分区领导层更改以主动发现任何新的代理或分区,我们强制刷新元数据的时间段(以毫秒为单位) .

1 回答

  • 2

    所以这里有几个问题 .

    首先,"spring-cloud-stream"和/或"spring-kafka"没有进行任何类型的重新 balancer ,分区重新分配等 . 这都是在Kafka内完成的 . 在Kafka有一个客户端属性默认为5分钟(我相信)如果消费者没有轮询那么多时间认为它已经死了等等 . 无论如何我会推荐你到apache-kafka Channels 来获取有关Kafka的更多信息内部 .

    此外,添加分区,重新分配和重新 balancer 是昂贵的操作,如果不认真考虑其影响,则不应尝试 . 所以,我很想知道你不断添加分区的用例是什么?

相关问题