首页 文章

如果经纪人崩溃,spring amqp会停止消费者线程

提问于
浏览
1

我试图在代理失败的情况下关闭整个amqp设置而不关闭上下文/应用程序 . 我从一个监听器中尝试了以下内容,我在其中捕获与代理失败的连接错误相关的事件,

connectionFactory.clearConnectionListeners();
 connectionFactory.stop();
 connectionFactory.destroy();
 myContainer.stop();  
 myContainer.shutdown();

但是,我一直在重新尝试连接,

org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:62)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:309)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:547)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1387)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1368)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1344)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:335)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:1102)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1278)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)

看看CachingConnectionFactory,我看到了,

/**
 * Stop the connection factory to prevent its connection from being used.
 * Note: ignored unless the application context is in the process of being stopped.
 */
@Override
public void stop() {
    if (this.contextStopped) {
        this.running = false;
        this.stopped = true;
        this.deferredCloseExecutor.shutdownNow();
    }
    else {
        logger.warn("stop() is ignored unless the application context is being stopped");
    }
}

当我尝试停止connectionFactory时,我在日志中看到的是

WARN --- [onsumerThread_8] o.s.a.r.c.CachingConnectionFactory : stop() is ignored unless the application context is being stopped

这是什么意思,我必须关闭应用程序?如果没有从消费者线程到代理的无休止连接尝试,应用程序是否无法继续运行?

谢谢

EDIT

好的,EventListener按预期工作,我可以关闭listenerContainers,应用程序继续正常工作 . 但这种方法在启动时咬我 . 这在应用程序运行时工作正常,然后我关闭了代理 . 但是应用程序无法启动,因为在接收失败的启动时,EventListener关闭容器并且应用程序上下文拒绝加载 . 这有意义吗?

如何实现这两个目标 - 1.在启动时,2 . 在运行期间,使应用程序能够承受经纪人的损失 .

EDIT 捕获失败事件的方法有助于在运行时绕过代理失败,如@garyrussel和@artembilan所建议的那样 . 只是关闭MessageListenerContainer似乎停止了消费者重新连接尝试 . 事件监听器使我能够更新全局变量,以将代理的故障传达给系统中的其他模块 . 这很重要,因为只使用rabbitTemplate的制作人仍然会失败 . 使用这个全局布尔变量,我可以有条件地进行发送 . 它还为我们提供了将来添加REST endpoints 以重新启动ListenerContainers的选项,只需在bean上调用 start 即可 . 我还没有解决RabbitMQ的启动时失败问题 . 似乎有一个 @Lazy 加载ListenerContainer和ConnectionFactory可能是要走的路 . 但还没试过 . 非常感谢你的支持 .

1 回答

  • 0

    说实话,你的担忧是绝对不清楚的 . 当Rabbit Broker回来时,做 clearConnectionListenersdestroyshutdown 将使您的组件在将来变得无用且无用 .

    您需要的只是 stop() 所有Listener容器,因为只有他们主动尝试连接到代理以获取新消息 .

    请确保您确实停止了应用程序中的所有监听器容器 . 当然,不要停止 ConnectionFactory . 当Broker回来时你将无法恢复 .

    看起来与您的问题完全相关:spring amqp rabbit max consumer connection retries

相关问题