我使用以下XML片段:
<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
<int:delayer id="commandDelayer" default-delay="30000"/>
<int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>
<int:payload-type-router input-channel="commands">
....
....
它正在执行这些任务:
-
从名为'commands'的RabbitMQ队列中消耗消息 .
-
将消息执行延迟30秒 .
-
在指定的延迟后继续执行消息 .
如果在启动上述代码的应用程序之前该消息已存在于命令队列中,则在启动时,应用程序将在单独的线程中执行两次消息 .
我想我知道为什么会这样 .
一旦应用程序上下文完全初始化,Spring会重新安排DelayHandler的消息存储中持久存储的消息 . 请参阅 DelayHandler.java
中的以下代码段:
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!this.initialized.getAndSet(true)) {
this.reschedulePersistedMessages();
}
}
因此,如果消息在应用程序启动之前已经存在于RabbitMQ队列中,则在Spring上下文初始化期间,将从队列中拾取消息并将其添加到DelayHandler的消息存储库中 . 完成上下文初始化后,如果同时消息未从消息存储库中释放,则上面的代码片段重新安排相同的消息 .
现在,当两个单独的线程正在执行相同的消息时,如果一个线程已执行,则应从消息存储库中删除该消息,而另一个线程不应继续执行 .
在执行该线程时, DelayHandler.java
下面的代码允许第二个线程释放重复的消息,导致同一消息的重复执行,因为消息存储是SimpleMessageStore的一个实例,并且没有进一步的检查来停止执行 .
private void doReleaseMessage(Message<?> message) {
if (this.messageStore instanceof SimpleMessageStore
|| ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
this.handleMessageInternal(message);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("No message in the Message Store to release: " + message +
". Likely another instance has already released it.");
}
}
}
这是Spring Integration中的错误吗?
1 回答
那好吧!
那真是个好消息 .
谢谢你指出来了!
请举一个JIRA issue,我们将在下一个版本中对此进行处理 .
我可以解释发生了什么 .
所有Spring Integration都从
Lifecycle.start()
开始工作 . 在您的情况下<int-amqp:inbound-channel-adapter>
从RabbitMQ接收消息并将其发送到集成流程 . 他们是delayed
.只有在
start
之后,应用程序上下文才会引发ContextRefreshedEvent
. 捕获甚至DelayHandler
从messageStore
中获取所有消息,并且,当你知道它们时,reschedules
它们 .因此,是的,我们可能有两个针对同一消息的预定任务 .
有趣的是它仅适用于
SimpleMessageStore
,因为它没有removeMessage
函数用于存储到groups
的消息 .我看到几种变体作为解决方法:
延迟
start
for<int-amqp:inbound-channel-adapter>
. 例如,从<inbound-channel-adapter>
处理相同的ContextRefreshedEvent
并将@amqpAdapter.start()
命令消息发送到<control-bus>
自Spring Integration 4.1起,另一个选项可用,其名称为Idempotent Receiver . 使用它你可以丢弃
duplicate
消息,我想idempotentKey
正是messageId
. 清洁Idempotent接收器模式!还有一个选项位于
persistent
MessageStore
,我们真的可以依赖removeMessage
操作 .关于此事的JIRA机票:https://jira.spring.io/browse/INT-3560