首页 文章

Spring AMQP - 发送者和接收消息

提问于
浏览
4

我在收到RabbitMQ的消息时遇到了问题 . 我正在发送如下信息

HashMap<Object, Object> senderMap=new HashMap<>();
        senderMap.put("STATUS", "SUCCESS");
        senderMap.put("EXECUTION_START_TIME", new Date());

        rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);

如果我们在RabbitMQ中看到,我们将获得一个完全限定的类型 .

In the current scenario, we have n number of producer for the same consumer. If i use any mapper, it leads to an exception. How will i send a message so that it doesn't contain any type_id and i can receive the message as Message object and later i can bind it to my custom object in the receiver.

我收到的消息如下 . 你能告诉我如何使用 Jackson2MessageConverter ,这样消息就会直接从Receiver端绑定到我的Object / HashMap . 此外,我现在已从发件人中删除了Type_ID .

消息在RabbitMQ中的显示方式

priority:0 delivery_mode:2个头:ContentTypeId:java.lang.Object KeyTypeId:java.lang.Object content_encoding:UTF-8 content_type:application / json {“Execution_start_time”:1473747183636,“status”:“SUCCESS”}

@Component
public class AdapterOutputHandler {

    private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);

    @RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
    public void handleAdapterQueueMessage(HashMap<String,Object> message){

        System.out.println("Receiver:::::::::::"+message.toString());

    }

}

连接

@Bean(name="adapterOPListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);

    }

例外

Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert Message content. Could not resolve __TypeId__ in header and no defaultType provided
    at org.springframework.amqp.support.converter.DefaultClassMapper.toClass(DefaultClassMapper.java:139)

I don't want to use __TYPE__ID from sender because they are multiple senders for the same queue and only one consumer.

2 回答

  • 3

    它导致例外

    有什么例外?

    TypeId:com.diff.approach.JobListenerDTO

    这意味着您正在发送DTO,而不是您在问题中描述的哈希映射 .

    如果要删除typeId标头,可以使用消息后处理器...

    rabbitTemplate.convertAndSend(Constants.INPUT_QUEUE, dto, m -> {
        m.getMessageProperties.getHeaders().remove("__TypeId__");
        return m;
    });
    

    (或 , new MessagePostProcessor() {...} 如果您不使用Java 8) .

    EDIT

    您使用的是什么版本的Spring AMQP?使用1.6,您甚至不必删除 __TypeId__ 标头 - 框架查看侦听器参数类型并告诉Jackson转换器类型,以便它自动转换为(如果可以) . 正如你在这里看到的那样;它没有删除类型ID工作正常...

    package com.example;
    
    import java.util.HashMap;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Bean;
    
    @SpringBootApplication
    public class So39443850Application {
    
        private static final String QUEUE = "so39443850";
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So39443850Application.class, args);
            context.getBean(RabbitTemplate.class).convertAndSend(QUEUE, new DTO("baz", "qux"));
            context.getBean(So39443850Application.class).latch.await(10, TimeUnit.SECONDS);
            context.getBean(RabbitAdmin.class).deleteQueue(QUEUE);
            context.close();
        }
    
        private final CountDownLatch latch = new CountDownLatch(1);
    
        @RabbitListener(queues = QUEUE, containerFactory = "adapterOPListenerContainerFactory")
        public void listen(HashMap<String, Object> message) {
            System.out.println(message.getClass() + ":" + message);
            latch.countDown();
        }
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            return template;
        }
    
        @Bean
        public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    
        public static class DTO {
    
            private String foo;
    
            private String baz;
    
            public DTO(String foo, String baz) {
                this.foo = foo;
                this.baz = baz;
            }
    
            public String getFoo() {
                return this.foo;
            }
    
            public void setFoo(String foo) {
                this.foo = foo;
            }
    
            public String getBaz() {
                return this.baz;
            }
    
            public void setBaz(String baz) {
                this.baz = baz;
            }
    
        }
    
    }
    

    结果:

    class java.util.HashMap:{foo=baz, baz=qux}
    

    这在the documentation中有描述......

    在1.6之前的版本中,转换JSON的类型信息必须在消息头中提供,或者需要自定义ClassMapper . 从版本1.6开始,如果没有类型信息头,则可以从目标方法参数推断出类型 .

    您还可以将自定义 ClassMapper 配置为始终返回 HashMap .

  • 0
    • 想要在收到消息时使用“a”不同的Java calss?

    使用自定义ClassMapper配置@Bean Jackson2JsonMessageConverter

    • 收到消息时想要使用“很多”不同的Java calss吗?如 :
    @MyAmqpMsgListener
    void handlerMsg(
            // Main message class, by MessageConverter
            @Payload MyMsg myMsg, 
    
            // Secondary message class - by MessageConverter->ConversionService
            @Payload Map<String, String> map,
    
            org.springframework.messaging.Message<MyMsg> msg,
            org.springframework.amqp.core.Message amqpMsg
    ) {
        // ...
    }
    

    提供自定义@Bean ConverterConversionServiceRabbitListenerAnnotationBeanPostProcessor

    @Bean
    FormattingConversionServiceFactoryBean rabbitMqCs(
            Set<Converter> converters
    ) {
        FormattingConversionServiceFactoryBean fac = new FormattingConversionServiceFactoryBean();
        fac.setConverters(converters);
        return fac;
    }
    @Bean
    DefaultMessageHandlerMethodFactory messageHandlerMethodFactory(
            @Qualifier("rabbitMqCs")
            FormattingConversionService rabbitMqCs
    ) {
        DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
        defaultFactory.setConversionService(rabbitMqCs);
        return defaultFactory;
    }
    
    // copied from RabbitBootstrapConfiguration
    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor(
            MessageHandlerMethodFactory handlerFac
    ) {
        RabbitListenerAnnotationBeanPostProcessor bpp = new RabbitListenerAnnotationBeanPostProcessor();
        bpp.setMessageHandlerMethodFactory(handlerFac);
        return bpp;
    }
    
    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }
    

    参考文献:

相关问题