首页 文章

为什么spring-amqp的消费者表现很慢?

提问于
浏览
1

我同时启动了 生产环境 者和消费者 . 6小时后, 生产环境 者将大约6亿条消息发送到队列中并在6小时后停止 生产环境 者,但消费者正在连续运行,即使在运行18小时后仍然有4条消息正在排队 . 有谁可以让我知道为什么消费者表现很慢?

提前致谢!

@Bean
    public SimpleMessageListenerContainer listenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueNames(this.queueName);
        container.setMessageListener(new MessageListenerAdapter(new TestMessageHandler(), new JsonMessageConverter()));
        return container;
    }
@Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
                "localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(new JsonMessageConverter());
        template.setRoutingKey(this.queueName);
        template.setQueue(this.queueName);
        return template;
    }

    public class TestMessageHandler  {
           // receive messages
        public void handleMessage(MessageBeanTest msgBean) {
                   //  Storing bean data into CSV file
             }
    }

2 回答

  • 2

    根据WikiPedia,crore == 10,000,000所以你的意思是6000万 .

    容器只能像监听器一样快地处理消息 - 您需要分析您对每条消息的处理方式 .

    您还需要尝试容器并发设置(concurrentConsumers),预取等,以获得最佳性能,但它仍然最终成为占用大部分处理时间的侦听器;容器上面有很多垃圾 . 如果您的侦听器构造不正确,则增加并发性将无济于事 .

    如果您正在使用交易,那将大大减缓消费 .

    尝试使用对消息不起作用的侦听器 .

    最后,在提出这样的问题时,您应该始终显示配置 .

  • 1

    根据Gary的建议,您可以按如下方式设置它们 . 看看@RabbitListener

    @Bean
    public SimpleRabbitListenerContainerFactory listenerContainer(     {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(baseConfig.connectionFactory());
        factory.setConcurrentConsumers(7); // choose a value
        factory.setPrefetchCount(1); // how many messages per consumer at a time
        factory.setMaxConcurrentConsumers(10); // choose a value
        factory.setDefaultRequeueRejected(false); // if you want to deadletter
        return factory;
    }
    

相关问题