首页 文章

使用Spring Integration启动Spring Boot应用程序Kafka消费者暂停了

提问于
浏览
1

我正在开发一个Spring Boot应用程序,该应用程序使用以Kafka主题为源的Spring Integration流程 . 我们的集成流程开始使用包含SubscribableChannels的接口和springframework.cloud.stream.annotation.Input和Output注释 . 这些配置为使用spring.cloud.stream.kafka.bindings通过Cloud Config从Kafka读取 .

当应用程序首次启动时,它会立即开始阅读Kafka主题 . 这是一个问题,因为应用程序需要初始化一些本地的,不可持久的数据库才能开始正确处理传入的Kafka消息 .

我们目前正在使用@PostConstruct在Kafka启动之前填充这些内存数据库,但这不是最理想的,因为应用程序无法使用Eureka,Feign等来可靠地找到具有内存最新数据的 Health 服务数据库 .

由于各种原因,无法更改体系结构,以便共享或预填充内存数据库 . 只要知道当我称之为内存数据库时,我会简化一些事情,它实际上是另一种服务 .

启动Spring Boot应用程序的最佳方法是,从Kafka读取的Integration Flow是否在暂停状态下启动,并且在其他某个进程完成后可以取消暂停?

1 回答

  • 1

    我假设您使用 KafkaMessageDrivenChannelAdapter 并且根据您提到的Spring Integration Java DSL - Kafka.messageDrivenChannelAdapter() 确切地说 . 可以使用 idautoStartup(false) 配置一个 . 因此,它不会立即开始消耗Kafka主题 . 每当您准备好使用时,您都可以 start() 此组件使用提到的ID从应用程序上下文中获取它作为 Lifecycle .

    或者您可以向Control Bus发送相应的消息 .

    UPDATE

    如果您处理Spring Cloud Stream和Kafka Binder,您应该考虑注入一个 BindingsEndpoint bean并执行其 changeState(@Selector String name, State state) 作为绑定名称和 State.STOPPED . 当你的内存数据库准备就绪时,你用 State.STARTED 回拨它:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binding_visualization_and_control

相关问题