首页 文章

Spring Boot AMQP接收器

提问于
浏览
4

我正在创建一个简单的Spring Boot应用程序,我希望接收发送到AMQP(Rabbit)交换机(来自其他应用程序)的消息 .

我收到一个错误,指出我要接收的队列不存在 . 当我看到http://localhost:15672/#/queues时,我可以看到它确实存在;我在开始这个应用程序之前删除它 .

我在另一篇文章中读到,如果队列不存在,则必须声明RabbitAdmin,并使用declareExchange()和declareQueue()来克服这个问题 . 即使有了这个,我仍然会看到同样的错误 . 此外,该错误使b / c混淆:

引起:com.rabbitmq.client.ShutdownSignalException:通道错误;协议方法:#method(reply-code = 404,reply-text = NOT_FOUND - no queue' from-logger-exchange ' in vhost ' /',class-id = 50,method-id = 10)

请注意,“无队列”表示我的交换名称 .

这是配置代码(我意识到Boot创建了connectionFactory和RabbitAdmin,但是当它不起作用时开始添加它们):

'import javax.annotation.PostConstruct;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class Config {
    @Value("${spring.activemq.broker-url}")
    String host;
    @Value("${spring.activemq.user}")
    String userName;
    @Value("${spring.activemq.password}")
    String password;

    @Value("${config.exchangeName}")
    String exchangeName;

    @Value("${config.queueName}")
    String queueName;

    @PostConstruct
    public void printVariables() {
        System.out.println("host " + host);
        System.out.println("userName " + userName);
        System.out.println("password " + password);
        System.out.println("exchangeName " + exchangeName);
        System.out.println("queueName " + queueName);

        System.out.println("queue.name " + queue().getName());
        System.out.println("exchange.name " + topicExchange().getName());
        System.out.println("binding " + binding().toString());

        System.out.println("connectionFactory " + connectionFactory().toString());
        System.out.println("rabbitAdmin " + rabbitAdmin().toString());
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        rabbitAdmin.declareExchange(topicExchange());
        rabbitAdmin.declareQueue(queue());
        return rabbitAdmin;
    }

    @Bean
    public Queue queue() {
        return new Queue(queueName);
    }

//  @Bean
//  public FanoutExchange fanoutExchange() {
//      return new FanoutExchange(exchangeName);
//  }

    public TopicExchange topicExchange() {
        return new TopicExchange(exchangeName, false, true);
    }

    @Bean
    public Binding binding() {
        //return BindingBuilder.bind(queue()).to(fanoutExchange());
        return BindingBuilder.bind(queue()).to(topicExchange()).with("my.routing.key");
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
}

这是接收者:

import org.springframework.amqp.rabbit.annotation.RabbitListener;

public class Receiver {

    @RabbitListener(queues="${config.exchangeName}")
    public void receive(String input) {
        System.out.println("   Receiver#receive input: " + input);
    }
}

这是控制台输出的第一位:

.   ____          _            __
/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.3.2.RELEASE)

2016-02-23 10:20:47.710  INFO 18804 --- [           main] c.i.logging.receiver.Application         : Starting Application on INT002520 with PID 18804 (C:\Users\matt.aspen\Documents\_logging\log-message-receiver2\target\classes started by matt.aspen in C:\Users\matt.aspen\Documents\_logging\log-message-receiver2)
2016-02-23 10:20:47.710  INFO 18804 --- [           main] c.i.logging.receiver.Application         : No active profile set, falling back to default profiles: default
2016-02-23 10:20:47.710 DEBUG 18804 --- [           main] o.s.boot.SpringApplication               : Loading source class com.intelligrated.logging.receiver.Application
2016-02-23 10:20:47.750 DEBUG 18804 --- [           main] o.s.b.c.c.ConfigFileApplicationListener  : Loaded config file 'classpath:/application.properties'
2016-02-23 10:20:47.750 DEBUG 18804 --- [           main] o.s.b.c.c.ConfigFileApplicationListener  : Skipped (empty) config file 'classpath:/application.properties' for profile default
2016-02-23 10:20:47.750  INFO 18804 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@679b62af: startup date [Tue Feb 23 10:20:47 PST 2016]; root of context hierarchy
2016-02-23 10:20:48.482  INFO 18804 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [class org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$68858653] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
host localhost
userName guest
password guest
exchangeName from-logger-exchange
queueName from-logger-queue
queue.name from-logger-queue
exchange.name from-logger-exchange
binding Binding [destination=from-logger-queue, exchange=from-logger-exchange, routingKey=my.routing.key]
connectionFactory CachingConnectionFactory [channelCacheSize=1, host=localhost, port=5672, active=true connectionFactory]
2016-02-23 10:20:48.642  INFO 18804 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SimpleConnection@6239aba6 [delegate=amqp://guest@127.0.0.1:5672/]
rabbitAdmin org.springframework.amqp.rabbit.core.RabbitAdmin@5f354bcf
2016-02-23 10:20:48.753 DEBUG 18804 --- [           main] inMXBeanRegistrar$SpringApplicationAdmin : Application Admin MBean registered with name 'org.springframework.boot:type=Admin,name=SpringApplication'
2016-02-23 10:20:48.998  INFO 18804 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2016-02-23 10:20:49.208  INFO 18804 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -2147482648
2016-02-23 10:20:49.208  INFO 18804 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2016-02-23 10:20:49.218  WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:from-logger-exchange
2016-02-23 10:20:49.228  WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[from-logger-exchange]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1171) [spring-rabbit-1.5.3.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_40]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61) ~[amqp-client-3.5.7.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_40]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_40]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_40]
    at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_40]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:764) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
    at com.sun.proxy.$Proxy31.queueDeclarePassive(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
    ... 3 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'from-logger-exchange' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[amqp-client-3.5.7.jar:na]
    ... 12 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'from-logger-exchange' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.5.7.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554) ~[amqp-client-3.5.7.jar:na]
    ... 1 common frames omitted

2016-02-23 10:20:54.226  WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:from-logger-exchange
2016-02-23 10:20:54.226  WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=2

更令人困惑的是,当我看到http://localhost:15672/#/connections时,我看到了连接,http://localhost:15672/#/channels我看到了 Channels ,http://localhost:15672/#/exchanges我看到了"from-logger-exchange",http://localhost:15672/#/queues我看到"from-logger-queue"

1 回答

  • 0

    一个问题是你的交换不是 @Bean .

    我明白了

    reply-code=404, reply-text=NOT_FOUND - no exchange 'from.logging.exchange' in vhost '/', class-id=50, method-id=20)
    

    用你的配置 . 将 @Bean 添加到交易所会修复它 .

    EDIT

    另外(正如Artem在下面指出的那样),你在监听器上有错误的属性

    @RabbitListener(queues="${config.exchangeName}")
    

    应该

    @RabbitListener(queues="${config.queueName}")
    

相关问题