首页 文章

Spring AMQP RabbitMq中的预定/延迟消息传递

提问于
浏览
5

我很难在Spring AMQP / Rabbit MQ中找到预定/延迟消息的方法 .
经过大量的搜索仍然无法在Spring AMQP中做到这一点 . 有人可以告诉我如何在Spring AMQP中做 x-delay .
如果消费者方面发生了一些异常,我想延迟消息 . RabbitMQ说要添加x-delay并安装我已经完成的插件,但仍然会立即收到消息而没有任何延迟

我收到了消息
收到<(正文:'[B@60a4ae5f(byte[26])' MessageProperties [headers =

@Bean
ConnectionFactory connectionFactory(){

    CachingConnectionFactory connectionFactory=new CachingConnectionFactory("127.0.0.1");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setPort(1500);
    connectionFactory.setPublisherReturns(true);
    return connectionFactory;

}

@Bean
Binding binding(@Qualifier("queue")Queue queue, DirectExchange exchange) {
    return new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), queue.getName(), null);
    //return BindingBuilder.bind(queue).to(exchange).with(queueName);   
}

@Bean
DirectExchange exchange() {
    DirectExchange exchange=new DirectExchange("delay-exchange");
    return exchange;
}

消费者 - -
@覆盖

public void onMessage(Message message, Channel channel) throws Exception {

    System.out.println("Received <" + message+ ">" +rabbitTemplate);

    if(i==1){
        AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
        Map<String,Object> headers = message.getMessageProperties().getHeaders();
        headers.put("x-delay", 15000);
        props.headers(headers);
        i++;
        channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(),
                props.build(), message.getBody());
    }
    }

1 回答

  • 6

    首先看起来你不遵循Scheduling Messages with RabbitMQ文章:

    要使用延迟消息交换,您只需要声明一个提供“x-delayed-message”交换类型的交换,如下所示:

    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
    

    我想说使用Spring AMQP可以实现同样的目的:

    @Bean
    CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("my-exchange", "x-delayed-message", true, false, args);
    }
    

    另一个问题是你真的应该将消息发布到 delay-exchange ,而不是任何其他消息 . 再说一遍:无论如何,那个文档都提到了 .

    UPDATE

    自Spring AMQP 1.6起,延迟消息支持作为开箱即用的功能:https://spring.io/blog/2016/02/16/spring-amqp-1-6-0-milestone-1-and-1-5-4-available .

相关问题