首页 文章

在 Kafka 寻求行动

提问于
浏览
2

我有一个方案可以使用Kafka执行一些管理任务 .

有一个主题A有10个分区 . 有5个不同的应用程序使用来自主题A的消息,每个应用程序(app1,app2,app3,app4,app5)都有自己的消费者“group.id”(group1,group2,group3,group4,group5),因此每个应用程序都可以与他们自己的抵消单独工作 . auto offset commit设置为true . 每个应用程序的使用者组中的消费者数量是不同的(例如app1在group1中有10个使用者,app2在group2中有5个使用者) .

现在管理员想要,

case-1:将app2的偏移量从当前偏移位置(X)向前转移5个位置 . 对于这个任务,你应该停止app2并创建一个单独的KafkaConsumer(不是spring),其组ID与app2相同,即“group2”并提交新的偏移量X 5并启动app2吗?

case-2:将主题A的所有应用程序的偏移量从其当前偏移位置(X)转发5个位置,假设所有应用程序都处于相同的偏移量X.对于此任务,是否应停止所有应用程序并创建一个KafkaConsumer空组ID“”,并提交新的偏移量X 5并启动所有应用程序?或者类似于case-1但是对于每个应用程序单独进行?

或者有没有更好的方式与 Kafka ?

1 回答

  • 1

    ConsumerSeekAware 不帮你吗?

    为了寻求,您的监听器必须实现具有以下方法的ConsumerSeekAware:

    void registerSeekCallback(ConsumerSeekCallback callback);
    
    void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
    
    void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
    

    要在运行时任意搜索,请使用registerSeekCallback中的回调引用来获取相应的线程 .

    https://docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html/_reference.html#seek

相关问题