首页 文章

如何使Spring JMSListener突发到最大并发线程?

提问于
浏览
7

我有一个使用ActiveMQ版本5.10的Spring JMS应用程序 . 我正在执行简单的并发测试 . 我使用Spring Boot,当前版本和注释来配置JMSListener和消息生成器 .

消息生成器只是尽可能快地在队列上抛出消息 . 消息侦听器将消息从队列中拉出,但在获取消息后休眠1秒钟 - 模拟消息侦听器在获取消息后需要执行的一些工作 .

我将JMSListener设置为100-1000个并发线程 . 如果我同时启动消息 生产环境 者和消费者(两者都在他们自己的JVM中运行),则消费者永远不会超过最小配置的线程,即使最大范围设置为1000 .

如果我让 生产环境 者首先开始并在队列上放置几千条消息,然后启动一个或多个消费者实例,它将稳定地提升线程,从100开始,然后每秒20个左右的线程,直到达到状态队列中有大约20-30条消息在飞行中 . 它永远不会捕获生成器 - 即使消费者没有接近其maxConcurrency计数,也总会有一些消息在队列中 .

为什么消息使用者没有突然进入一堆额外的线程来清空队列而不是让队列中有20-30条消息呢?消费者是否有办法继续快速添加线程以便赶上队列中的消息?

以下是代码的相关部分 .

Message Producer

@Component
public class ClientServiceImpl implements ClientService {

    private static final String QUEUE="message.test.queue";

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void submitMessage(ImportantMessage importantMessage) {

        System.out.println("*** Sending " + importantMessage);
        jmsTemplate.convertAndSend(QUEUE, importantMessage);

    }
}

Message Consumer

@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(AmqConsumerApplication.class, args);
    }
    @Value("${JMSHost}")
    private String JMS_BROKER_URL;

    @Autowired
    static Command command;

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
        ((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
        ((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
        ((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false);
        return factory;
    }

}

监听器配置为这样......

@Component
public class TransformationListener {

    private static final String QUEUE="message.test.queue?consumer.prefetchSize=10";

    @JmsListener(destination=QUEUE, concurrency = "100-1000")
    public void handleRequest(ImportantMessage importantMessage) {
        System.out.println("*** Recieved message: " + importantMessage + " on thread" + Thread.currentThread().getId());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

1 回答

  • 0

    你还在面对这种行为吗?您是否在http://activemq.apache.org/what-is-the-prefetch-limit-for.html上阅读了此建议"Pooled Consumers and prefetch"您是否尝试过prefetchSize = 0或1?我认为1可以解决您的问题 . 如果prefetchSize> 1,则可能需要将AbortSlowAckConsumerStrategy降低到低于默认值30s . 要在您的情况下有超过100个线程消耗消息,您需要超过1000条未消耗且未在队列中预取的消息,因为prefetchSize为10 .

相关问题