首页 文章

spring boot activemq使用者连接池

提问于
浏览
1

Spring Boot需要配置ActiveMQ消费者连接池吗?我在Spring启动应用程序中只有一个消费者(作为微服务), 生产环境 者在另一个应用程序中 . 我对以下内容感到困惑:(摘自http://activemq.apache.org/spring-support.html

注意:虽然PooledConnectionFactory允许创建活动消费者的集合,但它并不是消费者 . 池化对于连接,会话和 生产环境 者来说是有意义的,这些可以是很少使用的资源,创建起来很昂贵并且可以以最小的成本保持闲置 . 另一方面,消费者通常只是在启动时创建并离开,处理来的消息 . 当消费者完成时,它会被预取缓冲区,在消费者再次活动之前它们会被阻止 .

在同一页面上,我可以看到:
您可以使用activemq-pool org.apache.activemq.pool.PooledConnectionFactory为您的消费者集合有效地汇集连接和会话,或者您可以使用Spring JMS org.springframework.jms.connection.CachingConnectionFactory来实现相同的目标 . 影响

我尝试了CachingConnectionFactory(可以采用ActiveMQConnectionFactory),它只有很少的setter来保存cacheConsumers(boolean),cacheProducers(boolean),没有任何相关的池连接 . 我知道1个连接可以为您提供多个会话,然后每个会话您有多个消费者/ 生产环境 者 . 但我的问题是消费者我们如何汇总,因为上述声明说它是默认的 . 所以我只用一种方法做到了这一点:
@Bean public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory,DefaultJmsListenerContainerFactoryConfigurer configurer){

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // This provides all boot's default to this factory, including the message converter
    factory.setConcurrency("3-10");
    configurer.configure(factory, connectionFactory);
    // You could still override some of Boot's default if necessary.
    return factory;
}</em><br>

Dynamic scaling此链接也暗示了这一点,但我找不到具体的解决方案 .

有人遇到过这种情况,请给出你的建议 . 感谢阅读这篇文章和任何帮助非常感谢 . 生产环境 的其他细节:

  • 此消费者每秒将收到约500条消息 .

  • 使用Spring Boot版本1.5.8.RELEASE,

  • ActiveMQ 5.5是我的JMS

1 回答

  • 0

    在activemq中有一个名为org.apache.activemq.jms.pool的包,它提供了PooledConsumer . 下面是代码 . 请检查并查看它是否适合您 . 我知道它不是 spring 方式,但您可以轻松地设置自定义轮询方法 .

    PooledConnectionFactory pooledConFactory = null;
        PooledConnection pooledConnection = null;
        PooledSession pooledSession = null;
        PooledMessageConsumer pooledConsumer = null;
        Message message = null;
        try
        {
            // Get the connection object from PooledConnectionFactory
            pooledConFactory = ( PooledConnectionFactory ) this.jmsTemplateMap.getConnectionFactory();
            pooledConnection = ( PooledConnection ) pooledConFactory.createConnection();
            pooledConnection.start();
    
            // Create the PooledSession from pooledConnection object
            pooledSession = ( PooledSession ) pooledConnection.createSession( false, 1 );
    
            // Create the PooledMessageConsumer from session with given ack mode and destination
            pooledConsumer = ( PooledMessageConsumer ) pooledSession.
                    createConsumer( this.jmsTemplateMap.getDefaultDestination(), <messageFilter if any>);
    
            while ( true )
            {
                message = pooledConsumer.receiveNoWait();
                if ( message != null) 
                    break;
            }
    
        }
        catch ( JMSException ex )
        {
            LOGGER.error("JMS Exception occured, closing the session", ex );
        }
        return message;
    

相关问题