首页 文章

Spring Amqp Consumer在运行一段时间后暂停

提问于
浏览
1

我们有一个带有Ha-all策略的2节点RabbitMQ集群 . 我们在应用程序中使用Spring AMQP与RabbitMQ交谈 . 制作人部分工作正常,但消费者工作了一段时间并暂停 . 生产环境 者和消费者作为不同的应用程序运行 . 有关消费者部分的更多信息

  • 我们使用 SimpleMessageListenerContainerChannelAwareMessageListener ,使用手动 ack 模式并默认 prefetch(1)

  • 在我们的应用程序中,我们创建队列(按需)并将其添加到侦听器

  • 当我们开始使用10 ConcurrentConsumers 和20 MaxConcurrentConsumers 时,消耗大约需要15个小时并暂停 . 当我们将 MaxConcurrentConsumers 增加到75时,这种情况会在1小时内发生 .

在RabbitMQ UI上,当出现这种情况时,我们会在 Channels 选项卡上看到包含3/4 un ack ed消息的 Channels ,直到那时它只有1个un ack ed消息 .

我们的线程转储类似于this . 但心跳设定为60并没有帮助改善这种情况 .

大多数线程转储都有以下消息 . 如果需要,我将附加整个线程转储 . 如果我错过任何可能导致消费者暂停的设置,请告诉我?

"pool-6-thread-16" #86 prio=5 os_prio=0 tid=0x00007f4db09cb000 nid=0x3b33 waiting on condition [0x00007f4ebebec000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007b9930b68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer.handleDelivery(BlockingQueueConsumer.java:660)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

更多信息我们动态地向SimpleMessageListenerContainer添加和删除队列,我们怀疑这会导致一些问题,因为每次我们从侦听器添加或删除队列时,所有BlockingQueueConsumer都会被删除并再次创建 . 你认为这是否会导致这个问题?

2 回答

  • 0

    AMQP-621现已合并为主人;我们将在接下来的几天内发布1.6.1.RELEASE .

  • 0

    您的问题是目标侦听器中的下游 .

    看, prefetch(1) 原因:

    this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
    

    如果我们不在这个队列中轮询我们这里有什么?

    BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body));
    

    对,停在锁上 .

相关问题