首页 文章

Apache驼峰,RabbitMQ如何发送消息/对象

提问于
浏览
5

我希望有人可以就此事提供一些帮助 .

我正在使用camel rabbitmq并且出于测试目的我正在尝试向队列发送消息,我正在尝试在rabbitmq界面中显示该消息,然后再将其读回 .

但是,我不能让这个工作 .

我认为有用的是我在rabbitmq管理界面的交换选项卡中创建了一个新的交换 . 在我的java代码中,我将消息发送到该交换 . 当代码执行时,我可以看到Web界面中的峰值显示已收到某些内容但我看不到已收到的内容 . 当我尝试阅读时,我无法阅读并得到以下错误:<在路线中:路线(路线2)[[来自[rabbitmq://192.168.59.103:5672 / rt ...因为路线路线2没有输出处理器 . 您需要向路径添加输出,例如to(“log:foo”) .

有人能为我提供一个关于如何发送消息的实际例子,在网络交流中看到它并阅读它吗?任何显示此过程的教程也将受到赞赏 .

谢谢

=================第二部分

我现在得到的错误如下:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot redeclare exchange 'rhSearchExchange' in vhost '/' with different type, durable, internal or autodelete value, class-id=40, method-id=10), null, ""}
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 47 more

我有以下设置:

我得到这个错误,我相信我做错了URI,我必须定义一些我缺少的额外参数我的交换是直接类型我的队列是持久类型我的uri是:rabbitmq:// 192.168.59.105:5672/rhSearchExchange?username=guest&password=guest&routingKey=rhSearchQueue

对此有何看法?

谢谢

2 回答

  • 1

    所以我昨天能够解决这个问题,我遇到了同样(或至少类似的)问题 .

    RabbitMQ URI中的选项必须与创建交换的选项完全匹配 . 例如,在我的配置中,我有一个名为 tasks 的交换,它是一个直接类型,是持久的,并且没有配置为自动解除 . 请注意,rabbitmq camel组件中autodelete选项的默认值为 true . 另外,我想用路由键 camel 获取消息 . 这意味着我的rabbitmq URI需要看起来像:

    rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel
    

    另外,我想从一个名为 task_queue 的现有队列中读取,而不是让rabbitmq camel组件声明它自己的队列 . 因此,我还需要添加一个额外的查询参数,所以我的rabbitmq URI是

    rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel&queue=task_queue
    

    这个配置对我有用 . 下面,我从配置交换和队列的代码中添加了一些Java代码片段,并发送消息和我的Camel Route配置 .

    Exchange和队列配置:

    rabbitConnFactory = new ConnectionFactory();
    rabbitConnFactory.setHost("localhost");
    final Connection conn = rabbitConnFactory.newConnection();
    final Channel channel = conn.createChannel();
    
    // declare a direct, durable, non autodelete exchange named 'tasks'    
    channel.exchangeDeclare("tasks", "direct", true); 
    // declare a durable, non exclusive, non autodelete queue named 'task_queue'
    channel.queueDeclare("task_queue", true, false, false, null); 
    // bind 'task_queue' to the 'tasks' exchange with the routing key 'camel'
    channel.queueBind("task_queue", "tasks", "camel");
    

    发送消息:

    channel.basicPublish("tasks", "camel", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello, world!".getBytes());
    

    骆驼路线:

    @Override
    public void configure() throws Exception {
        from("rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel&queue=task_queue")
            .to("mock:result");
    }
    

    我希望这有帮助!

  • 9

    因为这是针对rabbitmq / camel集成的谷歌的最高点,我觉得需要为主题添加更多内容 . 缺乏简单的骆驼示例令我惊讶 .

    import org.apache.camel.CamelContext;
    import org.apache.camel.ConsumerTemplate;
    import org.apache.camel.Endpoint;
    import org.apache.camel.Exchange;
    import org.apache.camel.ProducerTemplate;
    import org.apache.camel.impl.DefaultCamelContext;
    import org.junit.Test;
    
    public class CamelTests {
        CamelContext context;
        ProducerTemplate producer;
        ConsumerTemplate consumer;
        Endpoint endpoint;
    
        @Test
        public void camelRabbitMq() throws Exception {
            context = new DefaultCamelContext();
    
            context.start();
    
            endpoint = context.getEndpoint("rabbitmq://192.168.56.11:5672/tasks?username=benchmark&password=benchmark&autoDelete=false&routingKey=camel&queue=task_queue");
    
            producer = context.createProducerTemplate();
    
            producer.setDefaultEndpoint(endpoint);
            producer.sendBody("one");
            producer.sendBody("two");
            producer.sendBody("three");
            producer.sendBody("four");
            producer.sendBody("done");
    
            consumer = context.createConsumerTemplate();
            String body = null;
            while (!"done".equals(body)) {
                Exchange receive = consumer.receive(endpoint);
                body = receive.getIn().getBody(String.class);
                System.out.println(body);
            }
    
            context.stop();
    
        }
    
    }
    

相关问题