我正在开发一个Spring Boot应用程序,该应用程序使用以Kafka主题为源的Spring Integration流程 . 我们的集成流程开始使用包含SubscribableChannels的接口和springframework.cloud.stream.annotation.Input和Output注释 . 这些配置为使用spring.cloud.stream.kafka.bindings通过Cloud Config从Kafka读取 .
当应用程序首次启动时,它会立即开始阅读Kafka主题 . 这是一个问题,因为应用程序需要初始化一些本地的,不可持久的数据库才能开始正确处理传入的Kafka消息 .
我们目前正在使用@PostConstruct在Kafka启动之前填充这些内存数据库,但这不是最理想的,因为应用程序无法使用Eureka,Feign等来可靠地找到具有内存最新数据的 Health 服务数据库 .
由于各种原因,无法更改体系结构,以便共享或预填充内存数据库 . 只要知道当我称之为内存数据库时,我会简化一些事情,它实际上是另一种服务 .
启动Spring Boot应用程序的最佳方法是,从Kafka读取的Integration Flow是否在暂停状态下启动,并且在其他某个进程完成后可以取消暂停?
1 回答
我假设您使用
KafkaMessageDrivenChannelAdapter
并且根据您提到的Spring Integration Java DSL -Kafka.messageDrivenChannelAdapter()
确切地说 . 可以使用id
和autoStartup(false)
配置一个 . 因此,它不会立即开始消耗Kafka主题 . 每当您准备好使用时,您都可以start()
此组件使用提到的ID从应用程序上下文中获取它作为Lifecycle
.或者您可以向Control Bus发送相应的消息 .
UPDATE
如果您处理Spring Cloud Stream和Kafka Binder,您应该考虑注入一个
BindingsEndpoint
bean并执行其changeState(@Selector String name, State state)
作为绑定名称和State.STOPPED
. 当你的内存数据库准备就绪时,你用State.STARTED
回拨它:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binding_visualization_and_control