外部模块将数千条消息发送到消息代理 . 每条消息的TimeToLive属性等于5秒 . 另一个模块应该使用和处理所有消息 .
从Spring Integration文档中我发现,分阶段事件驱动架构(消费者)对负载中的显着峰值做出了更好的响应 .
我目前的实现使用EDA(甚至驱动架构),例如,
<si:channel id="inputChannel"/>
<!-- get messages from PRESENCE_ENGINE queue -->
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
channel="inputChannel" destination="sso" connection-factory="connectionFactory"
max-concurrent-consumers="1" auto-startup="true" acknowledge="transacted" extract-payload="true"/>
<si:service-activator id ="activatorClient" input-channel="inputChannel" ref="messageService" method="processMessage"/>
<bean id="messageService" class="com.my.messaging.MessageService"/>
<bean id="sso"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SSO" />
</bean>
显然是在重载下,例如 . 传入数千条消息,processMessage()可能需要超过5秒 . 并且MessageService可能无法处理所有消息 .
我的想法如下:
-
修改processMessage(),以便消息而不是被处理的消息仅存储在MongoDB中 . 然后我可以独立处理单独任务中的消息 . 在这种情况下,MongoDB将充当CACHE .
-
使用大量消费者(SEDA模型) . inputChannel是直接通道 .
-
异步处理消息,例如inputChannel是队列通道,并且异步处理消息 .
在做出决定之前,我想问你哪种情况更有效 . 也许方案2)和3)提供了一种机制来满足我的要求,即所有消息都应该被处理,即使是重负载也是如此 .
EDIT:
我已经实现了方案2,我每秒发送1000条消息 . 这是统计有多少消息丢失的参数:
最大并发消费者;传输TimeToLive = 5secs .;空闲消费限制;发送的消息数量;收到的消息数量
10 ; Yes ; 1 ; 1001 ; 297
100 ; Yes ; 1 ; 1001 ; 861
150 ; Yes ; 1 ; 1001 ; 859
300 ; Yes ; 1 ; 1001 ; 861
300 ; No ; 1 ; 1001 ; 860
300 ; No ; 100 ; 1001 ; 1014
300 ; No ; 50 ; 1001 ; 1011
似乎闲置 - 消费者限制比最大并发消费者更具侵略性地创造消费者 . 这是在这种情况下使用idle-consumer-limit的好方法吗?
这是发件人/消费者的配置文件:
<!-- SENDER
Keep Alive Sender sends messages to backup server -->
<si:channel id="sendToChannel"/>
<si:channel id="presChannel"/>
<si:inbound-channel-adapter id="senderEntity" channel="sendToChannel" method="sendMessage">
<bean class="com.ucware.ucpo.sso.cache.CacheSender"/>
<si:poller fixed-rate="${sender.sendinterval}"></si:poller>
</si:inbound-channel-adapter>
<si:router id="messageRouter" method="routeMessage" input-channel="sendToChannel">
<bean class="com.ucware.ucpo.sso.messaging.MessageRouter"/>
</si:router>
<!-- Subscriber to a channel dispatcher, Send messages to JMS -->
<int-jms:outbound-channel-adapter explicit-qos-enabled="${jms.qos.enabled}" time-to-live="${jms.message.lifetime}"
channel="presChannel" connection-factory="connectionFactory" destination="pres" extract-payload="false"/>
<bean id="pres"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="PRES" />
</bean>
<!-- RECEIVER -->
<si:channel id="receiveChannel"/>
<!-- get messages from PRES queue -->
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
channel="receiveChannel" destination="presence" connection-factory="connectionFactory" idle-consumer-limit="50"
max-concurrent-consumers="300" auto-startup="true" acknowledge="transacted" extract-payload="true"/>
<si:service-activator id ="activatorClient" input-channel="receiveChannel" ref="messageService" method="processMessage"/>
<bean id="messageService" class="com.cache.MessageService"/>
1 回答
首先,您可以尝试使用
max-concurrent-consumers
属性 . 如你所见,在你的情况下1
真的不够 . 您应该调查MessageService
为什么这么慢 . 任何其他情况看起来都是开销,因为JMS已经是持久性的并且具有异步性质 - 基于队列 . 如果它没有用,那么使用<queue>
channel with presistenceMessageStore
,例如MongoDB的