首页 文章

Spring-amqp:使用匿名独占队列时的异常

提问于
浏览
0

我有一系列tomcat / spring应用程序,需要使用几个不同的扇出交换,从几个不同服务器的单个实例向多个客户端发送消息 . 每个服务器实例都有自己的一组交换 . 交换机由服务器声明,并由每个客户端生成并绑定到这些交换机的匿名队列 . 我正在使用匿名队列,因为我事先并不知道会有多少消费者,当然我不能将多个消费者绑定到一个命名队列而不会破坏扇出交换的性质 . 在客户端,我在日志中遇到了大量异常,涉及尝试重新声明或删除这些匿名队列 .

绘制更清晰的图片:想象一下,我有2个服务,服务A和服务B,每个服务都发布到一对扇出交换 - 例如ExchangeA1,ExchangeA2用于服务A,ExchangeB1,ExchangeB2用于服务B.现在想象一下已经有一个客户端 Build 了绑定到这4个交换机中每个交换机的随机命名队列,因此它可以监控来自这两个服务的消息 .

消息从服务器端成功发送到交换机没有问题,所以我将不使用服务器端配置,但如果它变得相关,我可以提供 . 这是客户端spring-config:

<rabbit:queue id="ClientQ1" />
<rabbit:queue id="ClientQ2" />

<!-- declare the exchanges so we can bind to them -->
<rabbit:fanout-exchange id="ExchangeA1.id" name="ExchangeA1">
    <rabbit:bindings>
        <rabbit:binding queue="ClientQ1"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<rabbit:fanout-exchange id="ExchangeA2.id" name="ExchangeA2">
    <rabbit:bindings>
        <rabbit:binding queue="ClientQ2"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<rabbit:connection-factory id="rabbitConnectionFactory"  <!-- this is a simple singleton bean that establishes an SSL connection to rabbitmq -->
                        connection-factory="rabbitMQConnectionFactoryBean"
                        publisher-confirms="true"
                        channel-cache-size="5" />

<rabbit:admin id="myRabbitAdmin" connection-factory="rabbitConnectionFactory"/>

<rabbit:listener-container id="rabbitListenerContainer"
                           acknowledge="auto"
                           prefetch="1000"
                           connection-factory="rabbitConnectionFactory"
                           message-converter="EventMessageConverterBean" >

    <rabbit:listener ref="Q1ListenerBean"
                     admin="myRabbitAdmin"
                     method="processEvent"
                     queue-names="#{ClientQ1.name}" />

    <rabbit:listener ref="Q2ListenerBean"
                     admin="myRabbitAdmin"
                     method="processEvent"
                     queue-names="#{ClientQ2.name}" />

</rabbit:listener-container>

现在,假设在此客户端的某些其他依赖项中,相同的配置基本上重复绑定到ExchangeB(1,2) .

在启动时,我看到了这种模式之后的大量异常:

ERROR - SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(1158) | Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:481)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1083)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4d2b49c-140f-4893-a764-4c84f945d482]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:554)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:453)
... 2 more
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:801)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:666)
at com.sun.proxy.$Proxy114.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:533)
... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4d2b49c-140f-4893-a764-4c84f945d482' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 11 more

我的调试引导我进入SimpleMessageListenerContainer(:954)中的redeclareElementsIfNecessary()方法,该方法尝试重新声明所有队列/ exhcnages / bindings(如果有的话) . 这失败是因为rabbitmq不允许重新排列独占队列 . (或者我认为)

搜索类似的问题导致我:Spring AMQP v1.4.2 - Rabbit reconnection issue on network failure,这是切向相关的,但解决方案不适用于我的问题 . 由于匿名队列本质上是排他性的,我必须使用匿名队列,我似乎陷入了摇滚和硬地之间 . 我的问题是这些:

  • 我是否在用我的听众配置做错了什么?我能以某种方式改变我的配置,我会错过这会缓解这个问题吗?

  • 为什么要调用redeclareElementsIfNecessary?在我看来,在第一次声明之后,所有队列都应该存在 .

  • 这些错误真的是无害的吗?如果是这样,我怎样才能拦截它们以避免出现在我的日志中的所有异常?

任何信息,将不胜感激 .

spring-amqp 1.4.6

rabbitmq 3.6.10

Update 1 与兔子相关的一些相关的spring-debug日志输出:admin:

17:22:14,783 DEBUG RabbitAdmin:382 - Initializing declarations
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceAExchange1'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceAExchange2'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceBExchange1'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceBExchange2'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#0'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#1'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#2'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceAQueue1'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceAQueue2'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceBQueue1'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceBQueue2'
17:22:14,784 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'consumer.index.amqp.consumerQueue'
17:22:14,785 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#0'
17:22:14,785 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#1'
17:22:14,786 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#2'
17:22:14,786 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#3'
17:22:14,786 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#4'
17:22:14,786  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (07475112-7be0-4c8d-b6e5-3279e81a1aff) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,786  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (e4825008-954f-4154-bd61-9b67e0ff582e) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,786  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (001bd357-6203-49a6-b573-7f1a998ff750) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,786  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (3880a65f-0104-4c99-96ac-1958ace7e2e0) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,786  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (3b1bc32a-872e-4fc3-ba2a-de200bc7b758) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,791  INFO CachingConnectionFactory:213 - Created new connection: SimpleConnection@3158bf2b [delegate=amqp://admin@172.30.12.59:5671/]
17:22:14,791 DEBUG RabbitAdmin:382 - Initializing declarations
17:22:14,791 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceAExchange1'
17:22:14,791 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceAExchange2'
17:22:14,791 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceBExchange1'
17:22:14,791 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'serviceBExchange2'
17:22:14,791 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#0'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#1'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.core.TopicExchange#2'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceAQueue1'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceAQueue2'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceBQueue1'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'ServiceBQueue2'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'consumer.index.amqp.consumerQueue'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#0'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#1'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#2'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#3'
17:22:14,792 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'org.springframework.amqp.rabbit.config.BindingFactoryBean#4'
17:22:14,792  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (07475112-7be0-4c8d-b6e5-3279e81a1aff) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,792  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (e4825008-954f-4154-bd61-9b67e0ff582e) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,792  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (001bd357-6203-49a6-b573-7f1a998ff750) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,792  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (3880a65f-0104-4c99-96ac-1958ace7e2e0) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,792  INFO RabbitAdmin:399 - Auto-declaring a non-durable, auto-delete, or exclusive Queue (3b1bc32a-872e-4fc3-ba2a-de200bc7b758) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
17:22:14,811 DEBUG CachingConnectionFactory:395 - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://admin@172.30.12.59:5671/,1)
17:22:14,811 DEBUG CachingConnectionFactory:395 - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://admin@172.30.12.59:5671/,1)
17:22:14,821 DEBUG RabbitTemplate:1045 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@172.30.12.59:5671/,1)
17:22:14,821 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceAExchange1'
17:22:14,822 DEBUG RabbitTemplate:1045 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@172.30.12.59:5671/,1)
17:22:14,825 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceAExchange1'
17:22:14,827 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceAExchange2'
17:22:14,828 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceAExchange2'
17:22:14,829 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceBExchange1'
17:22:14,830 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceBExchange1'
17:22:14,832 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceBExchange2'
17:22:14,835 DEBUG RabbitAdmin:444 - declaring Exchange 'serviceBExchange2'
17:22:14,836 DEBUG RabbitAdmin:472 - declaring Queue '07475112-7be0-4c8d-b6e5-3279e81a1aff'
17:22:14,837 DEBUG RabbitAdmin:472 - declaring Queue '07475112-7be0-4c8d-b6e5-3279e81a1aff'
17:22:14,843 DEBUG RabbitAdmin:472 - declaring Queue 'e4825008-954f-4154-bd61-9b67e0ff582e'
17:22:14,845 DEBUG RabbitAdmin:472 - declaring Queue '001bd357-6203-49a6-b573-7f1a998ff750'
17:22:14,848 DEBUG RabbitAdmin:472 - declaring Queue '3880a65f-0104-4c99-96ac-1958ace7e2e0'
17:22:14,853 DEBUG RabbitAdmin:472 - declaring Queue '3b1bc32a-872e-4fc3-ba2a-de200bc7b758'
17:22:14,856 DEBUG PublisherCallbackChannelImpl:654 - PendingConfirms cleared
17:22:14,856 DEBUG RabbitAdmin:511 - Binding destination [07475112-7be0-4c8d-b6e5-3279e81a1aff (QUEUE)] to exchange [serviceAExchange1] with routing key []
17:22:14,858 ERROR CachingConnectionFactory:292 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue '07475112-7be0-4c8d-b6e5-3279e81a1aff' in vhost '/', class-id=50, method-id=10)
17:22:14,859 DEBUG CachingConnectionFactory:673 - Detected closed channel on exception.  Re-initializing: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@172.30.12.59:5671/,1)
17:22:14,860 DEBUG RabbitAdmin:511 - Binding destination [e4825008-954f-4154-bd61-9b67e0ff582e (QUEUE)] to exchange [serviceAExchange2] with routing key []
17:22:14,862 DEBUG RabbitAdmin:511 - Binding destination [001bd357-6203-49a6-b573-7f1a998ff750 (QUEUE)] to exchange [serviceBExchange1] with routing key []
17:22:14,864  WARN RabbitAdmin:494 - Failed to declare queue:Queue [name=07475112-7be0-4c8d-b6e5-3279e81a1aff, durable=false, autoDelete=true, exclusive=true, arguments=null], continuing...
java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
        [...]
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue '07475112-7be0-4c8d-b6e5-3279e81a1aff' in vhost '/', class-id=50, method-id=10)
[ More declaration errors follow ]

1 回答

  • 3

    1.4.6已近2岁;你看到当前版本(1.7.4)的相同行为吗?

    例外情况 reply-code=405, reply-text=RESOURCE_LOCKED 表示队列中有两个使用者 . Tomcat(或任何Web容器)上的Spring Apps的一个常见错误是加载应用程序上下文两次 - 一次在根应用程序上下文中,再次在调度程序servlet的上下文中 .

    打开 org.springframework 的DEBUG日志记录并查看所有bean声明应该会有所帮助 .

    我刚刚将你的配置加载到一个简单的app(main)中,即使在通过管理UI强制关闭连接之后我也没有看到任何问题...

    Auto-declaring a non-durable, auto-delete, or exclusive Queue (47c551b8-c290-4ff0-ae42-11f24d576399) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
    Auto-declaring a non-durable, auto-delete, or exclusive Queue (c494990e-9069-4561-8d64-1eb0373cf681) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
    Started So46496960Application in 1.454 seconds (JVM running for 1.879)
    #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
    #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
    SimpleConnection@5fa6541 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 58388], acknowledgeMode=AUTO local queue size=0
    SimpleConnection@5fa6541 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 58388], acknowledgeMode=AUTO local queue size=0
    rabbitConnectionFactory#4015e40b:1/SimpleConnection@712becf3 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 58404]
    Auto-declaring a non-durable, auto-delete, or exclusive Queue (47c551b8-c290-4ff0-ae42-11f24d576399) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
    Auto-declaring a non-durable, auto-delete, or exclusive Queue (c494990e-9069-4561-8d64-1eb0373cf681) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
    

相关问题