首页 文章

Spring Integration:延迟后发布两次消息

提问于
浏览
4

我使用以下XML片段:

<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
                                  queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
    <int:delayer id="commandDelayer" default-delay="30000"/>
    <int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>

<int:payload-type-router input-channel="commands">
....
....

它正在执行这些任务:

  • 从名为'commands'的RabbitMQ队列中消耗消息 .

  • 将消息执行延迟30秒 .

  • 在指定的延迟后继续执行消息 .

如果在启动上述代码的应用程序之前该消息已存在于命令队列中,则在启动时,应用程序将在单独的线程中执行两次消息 .

我想我知道为什么会这样 .

一旦应用程序上下文完全初始化,Spring会重新安排DelayHandler的消息存储中持久存储的消息 . 请参阅 DelayHandler.java 中的以下代码段:

public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!this.initialized.getAndSet(true)) {
        this.reschedulePersistedMessages();
    }
}

因此,如果消息在应用程序启动之前已经存在于RabbitMQ队列中,则在Spring上下文初始化期间,将从队列中拾取消息并将其添加到DelayHandler的消息存储库中 . 完成上下文初始化后,如果同时消息未从消息存储库中释放,则上面的代码片段重新安排相同的消息 .

现在,当两个单独的线程正在执行相同的消息时,如果一个线程已执行,则应从消息存储库中删除该消息,而另一个线程不应继续执行 .

在执行该线程时, DelayHandler.java 下面的代码允许第二个线程释放重复的消息,导致同一消息的重复执行,因为消息存储是SimpleMessageStore的一个实例,并且没有进一步的检查来停止执行 .

private void doReleaseMessage(Message<?> message) {
    if (this.messageStore instanceof SimpleMessageStore
            || ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
        this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
        this.handleMessageInternal(message);
    }
    else {
        if (logger.isDebugEnabled()) {
            logger.debug("No message in the Message Store to release: " + message +
                    ". Likely another instance has already released it.");
        }
    }
}

这是Spring Integration中的错误吗?

1 回答

  • 3

    那好吧!

    那真是个好消息 .

    谢谢你指出来了!

    请举一个JIRA issue,我们将在下一个版本中对此进行处理 .

    我可以解释发生了什么 .

    所有Spring Integration都从 Lifecycle.start() 开始工作 . 在您的情况下 <int-amqp:inbound-channel-adapter> 从RabbitMQ接收消息并将其发送到集成流程 . 他们是 delayed .

    只有在 start 之后,应用程序上下文才会引发 ContextRefreshedEvent . 捕获甚至 DelayHandlermessageStore 中获取所有消息,并且,当你知道它们时, reschedules 它们 .

    因此,是的,我们可能有两个针对同一消息的预定任务 .

    有趣的是它仅适用于 SimpleMessageStore ,因为它没有 removeMessage 函数用于存储到 groups 的消息 .

    我看到几种变体作为解决方法:

    • 延迟 start for <int-amqp:inbound-channel-adapter> . 例如,从 <inbound-channel-adapter> 处理相同的 ContextRefreshedEvent 并将 @amqpAdapter.start() 命令消息发送到 <control-bus>

    • 自Spring Integration 4.1起,另一个选项可用,其名称为Idempotent Receiver . 使用它你可以丢弃 duplicate 消息,我想 idempotentKey 正是 messageId . 清洁Idempotent接收器模式!

    • 还有一个选项位于 persistent MessageStore ,我们真的可以依赖 removeMessage 操作 .

    关于此事的JIRA机票:https://jira.spring.io/browse/INT-3560

相关问题