首页 文章

Spring集成 - AMQP支持消息通道和消息转换

提问于
浏览
1

我试图在我的Spring Integration应用程序中使用AMQP支持的消息通道,但我认为我从根本上误解了某些内容,特别是在 Message<?> 接口以及如何在RabbitMQ队列中写入和读取 GenericMessage<?> 的实例 .

鉴于我有一个包含以下域模型对象的Spring Integration应用程序:

@Immutable
class Foo {
  String name
  long quantity
}

我声明了一个名为 fooChannel 的AMQP支持消息通道,如下所示:

@Bean
public AmqpChannelFactoryBean deliveryPlacementChannel(CachingConnectionFactory connectionFactory) {
  AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true)
  factoryBean.setConnectionFactory(connectionFactory)
  factoryBean.setQueueName("foo")
  factoryBean.beanName = 'fooChannel'
  factoryBean.setPubSub(false)
  factoryBean
}

当我最初尝试向我的 fooChannel 发送消息时,我收到了 java.io.NotSerializableException . 我意识到这是由于我的AMQP支持的 fooChannel 使用的 RabbitTemplate 使用 org.springframework.amqp.support.converter.SimpleMessageConverter 这个事实只能使用字符串,可序列化实例或字节数组,其中我的 Foo 模型不是那些东西 .

因此,我认为我应该使用 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter 来确保我的 Foo 模型正确转换为/来自和AMQP消息 . 但是,似乎正在添加到支持我的 fooChannel 的RabbitMQ队列的消息类型是 org.springframework.messaging.support.GenericMessage 类型 . 这意味着当我的AMQP支持 fooChannel 尝试使用来自RabbitMQ队列的消息时,它会收到以下异常:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class org.springframework.messaging.support.GenericMessage]: can not instantiate from JSON object (missing default constructor or creator, or perhaps need to add/enable type information?)

从查看 GenericMessage 类我看到它被设计为不可变的,这清楚地解释了为什么 Jackson2JsonMessageConverter 无法从JSON转换为 GenericMessage 类型 . 但是,我不确定我应该做什么才能让我的 fooChannel 得到AMQP的支持,并且我的包含我的 Foo 模型的Spring Integration消息的转换是否正常工作?

就我的应用程序的流程而言,我有以下 Transformer 组件,它从(非AMQP支持的) barChannel 消耗 Bar 模型,并在 fooChannel 上放置 Foo 模型,如下所示:

@Transformer(inputChannel = 'barChannel', outputChannel = 'fooChannel')
public Foo transform(Bar bar) {
  //transform logic removed for brevity
  new Foo(name: 'Foo1', quantity: 1)
}

然后我有一个 ServiceActivator 组件,我希望从我的 fooChannel 消耗如下:

@ServiceActivator(inputChannel = 'fooChannel')
void consumeFoos(Foo foo){
  // Do something with foo
}

我正在使用 spring-integration-core:4.2.5.RELEASEspring-integration-amqp:4.2.5.RELEASE .

任何人都可以建议我的Spring Integration应用程序的配置出错了吗?

如果需要进一步的信息以便更好地澄清我的问题或疑问,请告诉我 . 谢谢

1 回答

  • 1

    是 - amqp支持的通道目前仅限于Java可序列化对象 .

    我们应该提供一个选项来将 Message<?> 映射到Spring AMQP Message (就像通道适配器那样)而不是......

    this.amqpTemplate.convertAndSend(this.getExchangeName(), this.getRoutingKey(), message);
    

    ...转换整个消息 .

    您可以使用一对通道适配器(出站/入站)而不是通道 .

    由于您使用的是Java配置,因此可以将适配器对包装在新的 MessageChannel 实现中 .

    我打开了JIRA Issue for this .

相关问题