首页 文章

Spring JMS ActiveMQ - 无法让多个消费者同时处理

提问于
浏览
3

我是使用JMS / ActiveMQ的新手,我有一个Spring / Hibernate应用程序,它从ActiveMQ中的Queue中获取消息并处理这些消息以实现持久性 . 由于消息需要一段时间来处理和持久化,因此我将DefaultMessageListenerContainer配置为具有多个使用者(例如,5-10),因此可以同时处理多个消息 . 我已经查看了很多ActiveMQ和Spring API文档,我认为我需要做的就是将maxConcurrentConsumers设置为10 set concurrentConsumers为5或者在DefaultMessageListenerContainer上将并发设置为5-10 . 一旦我这样做,我可以从ActiveMQ的内置控制台看到我的队列确实有5个消费者 . 但是当我在队列中丢弃10或100条消息时,处理似乎是单线程的,我添加了一个日志行来打印线程ID,它似乎是顺序处理所有请求的相同线程ID . 从控制台上的ActiveMQ队列页面,我单击浏览活动消费者链接以查看正在发生的事情,看起来一个消费者有100个待处理的消息而其他4个消息都没有 .

我做了一些研究,从Spring(http://forum.springsource.org/showthread.php?61170-Messages-missed-using-DefaultMessageListenerContainer)发现了这篇文章,并添加了一个值为2的预取策略,认为每个消费者都在签署1000条消息 . 现在,当我发送另一批消息时,一个消费者将有2-3个消息未决,但其他4个消费者仍然处于空闲状态,并且所有消费者最终都会顺序处理所有消息 . 在这一点上,我想也许它可以做任何事情 . 希望有人可以指出我做错了什么,除了尝试一个设置(constantPendingMessageLimitStrategy)之外,我总是触及activemq.xml . 我正在使用ActiveMQ 5.8 .

<bean id="importRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="initialRedeliveryDelay" value="15000" />
    <property name="maximumRedeliveries" value="-1" />
    <property name="useExponentialBackOff" value="true" />
    <property name="backOffMultiplier" value="2" />
</bean>

<bean id="importPrefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
    <property name="all" value="2"></property>
</bean>

<bean id="importConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="${import.queue.url}"/>
    <property name="redeliveryPolicy" ref="importRedeliveryPolicy" />
    <property name="prefetchPolicy" ref="importPrefetchPolicy"></property>
</bean>

<bean id="importQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="${import.queue.name}" />
</bean>

<bean id="importListener" class="com.mycompany.ImportQueueListener" >
    <property name="importService" ref="importService"></property>
    <property name="sessionFactory" ref="sessionFactory"/>
</bean>

<bean id="importJmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
   <property name="connectionFactory" ref="importConnectionFactory" />
    <property name="destination" ref="importQueue" />
    <property name="messageListener" ref="importListener" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="10"></property>
    <property name="concurrentConsumers" value="5"></property>
</bean>

2 回答

  • 0

    您应该尝试用池替换 ActiveMQConnectionFactory

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy method="stop">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
          </property>
       </bean>
      </property>
    </bean>
    <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
       <property name="connectionFactory">
        <ref local="jmsFactory"/>
      </property>
    </bean>
    
  • 0
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/jms
    http://www.springframework.org/schema/jms/spring-jms.xsd">
    
    
    
    
     <!--  <bean class="org.apache.activemq.command.ActiveMQQueue" id="destination">  
         <constructor-arg value="TEST.Q1"></constructor-arg>  
      </bean>-->
    
      <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="TOPIC_NAME" />
      </bean>
    
    
    
     <bean class="org.springframework.jms.core.JmsTemplate" id="producerTemplate">
      <property name="connectionFactory" ref="connectionFactory"/>
       <property name="defaultDestination" ref="destination"/>
     </bean>  
    
     <!--ActiveMq broker URL configured here-->
       <bean class="org.apache.activemq.ActiveMQConnectionFactory" id="connectionFactory" >
            <property name="brokerURL">
              <value>tcp://localhost:61616</value>
            </property>
       </bean>  
    
        <!--producer configured here-->
    
       <bean class="Producer" id="simpleMessageProducer">  
           <property name="jmsTemplate" ref="producerTemplate"></property>  
       </bean> 
    
    
        <!--listeners configured here-->
    
         <bean class="Consumer" id="simpleMessageListener">  
    
          </bean>  
         <bean class="ConsumerSecond" id="simpleMessageListenerSecond">   </bean> 
    
    
    
         <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="jmsContainer"> 
         <property name="connectionFactory" ref="connectionFactory"></property>  
         <property name="destination" ref="destination"></property>  
         <property name="messageListener" ref="simpleMessageListener"></property>  
    
    
        </bean> 
         <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="jmsContainer1"> 
         <property name="connectionFactory" ref="connectionFactory"></property>  
         <property name="destination" ref="destination"></property>
         <property name="messageListener" ref="simpleMessageListenerSecond"></property>
    
        </bean>  
    
    
    
    
    
    </beans>
    

相关问题