首页 文章

哪种类型的Spring Integration Channel?

提问于
浏览
0

我在ActiveMQ中有一个队列,并希望使用spring集成将消息从它中删除到我们的应用程序中 . 我们已经部署了两次应用程序(如果其中一个失败) - 每个消息应该只由其中一个应用程序处理 . 另外,在消息处理期间,如果致命的应用程序失败,我需要一个jms tx管理器 . 因此我的通道适配器看起来像这样:

<int-jms:message-driven-channel-adapter 
    channel="myChannel" 
    connection-factory="jmsConnectionFactory"
    pub-sub-domain="false"
    destination-name="MY_QUEUE" 
    transaction-manager="jmsTxManager" />

myChannel 是直接通道时,这一切都正常,但我想使用任务执行程序,以便可以一次处理许多消息 .

为了迎合致命的应用程序失败,我认为可能是一个集合点通道(我相信当一个线程在任务 Actuator 中变得空闲时,通道适配器将转移到活动的mq以检索另一个消息),因为它不会't be any messages held in memory on the channel. This doesn'似乎工作,以下代码抛出 TaskRejectException

<int:channel id="myChannel">
    <int:rendezvous-queue />
</int:channel>
<task:executor id="taskExecutor" pool-size="2" queue-capacity="0" />
<int:router input-channel="myChannel" expression="payload.getType() + 'Channel'">
    <int:poller fixed-rate="1000" task-executor="taskExecutor" />
</int:router>

位于路由器之后的服务激活器,同步处理,需要10秒才能处理,所以我希望在t = 0s时消失并检索2条消息(线程池的大小),处理它们,将线程释放到线程池在t = 10s,然后再次从活动mq请求消息 . 然而,似乎在t = 0时检索到超过2条消息 .

任何人都可以建议我应该做什么?

1 回答

  • 4

    为了使交易有效,您必须使用直接渠道 .

    您可以使用 message-driven-channel-adapterconcurrent-consumersmax-concurrent-consumers )上的并发属性来控制并发线程的数量 .

相关问题