首页 文章

如何在spring集成中推迟消费消息

提问于
浏览
1

我正在使用Spring Integration 5.0.1和Spring Boot 2.0.0.RC1开发应用程序

目前,应用程序响应 ApplicationReadyEvent 并运行一些可能需要一段时间才能完成的初始化代码 . 这不使用任何 spring 集成组件 .

我还有一些非常基本的集成流程,使用java dsl编写并在配置中声明为bean .

当流程开始消费消息时,有什么方法可以推迟吗?我希望能够在初始化完成后手动启动它们 .

似乎配置 ControlBus 将是解决方案,但我不知道如何将这样的东西与其他流程连接起来 .

以下是流消耗消息的示例:

IntegrationFlows.from(sourceGateway)
                .transform(Transformers.fromJson(IncomingTask.class, jsonObjectMapper))

                .handle(IncomingTask.class, (incomingTask, headers) -> {

                //stuff with the task here.

                })

                .get();

1 回答

  • 1

    对,你肯定可以在这件事上使用 ControlBus . 使用Java DSL,它看起来像:

    @Bean
    public IntegrationFlow controlBus() {
        return IntegrationFlowDefinition::controlBus;
    }
    

    要使用它,你需要这个:

    @Autowired
    @Qualifier("controlBus.input")
    private MessageChannel controlBusChannel;
    

    现在我们需要知道你的目标 IntegraionFlow 如何开始 . 什么消息消息 . 例如我有这个:

    @Bean
    public IntegrationFlow fileFlow1() {
        return IntegrationFlows.from("fileFlow1Input")
                .handle(Files.outboundAdapter(tmpDir.getRoot()),
                            c -> c.id("fileWriting").autoStartup(false))
                    .get();
        }
    

    注意 c.id("fileWriting").autoStartup(false) . id 用于 endpoints bean,可以通过发送到控制总线的命令进行访问 . autoStartup(false) 意味着它不会立即消耗消息,但只有当我们调用 start() 时才会消耗消息 . 我这样做:

    this.controlBusChannel.send(new GenericMessage<>("@fileWriting.start()"));
    

    在您的配置中应该确保类似的东西,以便将消息消耗推迟到您需要的时间 .

相关问题