首页 文章

Spring AMQP AcknowledgeMode.AUTO运行缓慢

提问于
浏览
2

我有一个 生产环境 者每秒向RabbitMQ发送20条消息,而且我有一个消费者,它应该以与生成它们相同的速度接收消息 .

我必须实施一些条件:

  • 每秒生成和消耗20条消息 .

  • 保存 生产环境 订单 .

  • 消息不应该't be lost (that'为什么我使用AcknowledgeMode.AUTO) .

当我使用Spring AMQP实现(org.springframework.amqp.rabbit)时,我的消费者每秒处理最多6条消息 . 但是,如果我使用本机AMQP库(com.rabbitmq.client),它每秒都会执行所有20条消息,包括ack - auto和manual .

The question is:

为什么消费者案例中的Spring实现工作如此缓慢,我该如何解决这个问题呢?

如果我设置prefetchCount(20)它可以根据需要工作,但我不能使用预取,因为它可以在拒绝情况下破坏顺序 .

Spring 天amqp:

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitMqServer);
    connectionFactory.setUsername(rabbitMqUsername);
    connectionFactory.setPassword(rabbitMqPassword);
    return connectionFactory;
}

...

private SimpleMessageListenerContainer createContainer(Queue queue, Receiver receiver, AcknowledgeMode acknowledgeMode) {
    SimpleMessageListenerContainer persistentListenerContainer = new SimpleMessageListenerContainer();
    persistentListenerContainer.setConnectionFactory(connectionFactory());
    persistentListenerContainer.setQueues(queue);
    persistentListenerContainer.setMessageListener(receiver);
    persistentListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
    return persistentListenerContainer;
}

...

@Override
public void onMessage(Message message) {saveToDb}

1 回答

  • 1

    Spring AMQP(2.0之前)默认预取为1,正如您所说,即使在拒绝之后也可以保证顺序 .

    本机客户端默认不应用 basicQos() ,这实际上意味着它具有无限预取 .

    所以你不是在比较苹果和苹果 .

    尝试使用本机客户端 channel.basicQos(1) ,您应该看到与默认的spring amqp设置类似的结果 .

    EDIT

    当比较苹果和苹果时,无论是否有框架,我都会得到类似的结果......

    @SpringBootApplication
    public class So47995535Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So47995535Application.class, args).close();
        }
    
        private final CountDownLatch latch = new CountDownLatch(100);
    
        private int nativeCount;
    
        private int rlCount;
    
        @Bean
        public ApplicationRunner runner(ConnectionFactory factory, RabbitTemplate template,
                SimpleMessageListenerContainer container) {
            return args -> {
                for (int i = 0; i < 100; i++) {
                    template.convertAndSend("foo", "foo" + i);
                }
                container.start();
                Connection conn = factory.createConnection();
                Channel channel = conn.createChannel(false);
                channel.basicQos(1);
                channel.basicConsume("foo", new DefaultConsumer(channel) {
    
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        System.out.println("native " + new String(body));
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        nativeCount++;
                        latch.countDown();
                    }
    
                });
                latch.await(60, TimeUnit.SECONDS);
                System.out.println("Native: " + this.nativeCount + " LC: " + this.rlCount);
                channel.close();
                conn.close();
                container.stop();
            };
        }
    
        @Bean
        public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueueNames("foo");
            container.setPrefetchCount(1);
            container.setAutoStartup(false);
            container.setMessageListener((MessageListener) m -> {
                System.out.println("LC " + new String(m.getBody()));
                this.rlCount++;
                this.latch.countDown();
            });
            return container;
        }
    
    }
    

    Native: 50 LC: 50
    

相关问题