我试图在我的Spring Integration应用程序中使用AMQP支持的消息通道,但我认为我从根本上误解了某些内容,特别是在 Message<?>
接口以及如何在RabbitMQ队列中写入和读取 GenericMessage<?>
的实例 .
鉴于我有一个包含以下域模型对象的Spring Integration应用程序:
@Immutable
class Foo {
String name
long quantity
}
我声明了一个名为 fooChannel
的AMQP支持消息通道,如下所示:
@Bean
public AmqpChannelFactoryBean deliveryPlacementChannel(CachingConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true)
factoryBean.setConnectionFactory(connectionFactory)
factoryBean.setQueueName("foo")
factoryBean.beanName = 'fooChannel'
factoryBean.setPubSub(false)
factoryBean
}
当我最初尝试向我的 fooChannel
发送消息时,我收到了 java.io.NotSerializableException
. 我意识到这是由于我的AMQP支持的 fooChannel
使用的 RabbitTemplate
使用 org.springframework.amqp.support.converter.SimpleMessageConverter
这个事实只能使用字符串,可序列化实例或字节数组,其中我的 Foo
模型不是那些东西 .
因此,我认为我应该使用 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
来确保我的 Foo
模型正确转换为/来自和AMQP消息 . 但是,似乎正在添加到支持我的 fooChannel
的RabbitMQ队列的消息类型是 org.springframework.messaging.support.GenericMessage
类型 . 这意味着当我的AMQP支持 fooChannel
尝试使用来自RabbitMQ队列的消息时,它会收到以下异常:
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class org.springframework.messaging.support.GenericMessage]: can not instantiate from JSON object (missing default constructor or creator, or perhaps need to add/enable type information?)
从查看 GenericMessage
类我看到它被设计为不可变的,这清楚地解释了为什么 Jackson2JsonMessageConverter
无法从JSON转换为 GenericMessage
类型 . 但是,我不确定我应该做什么才能让我的 fooChannel
得到AMQP的支持,并且我的包含我的 Foo
模型的Spring Integration消息的转换是否正常工作?
就我的应用程序的流程而言,我有以下 Transformer
组件,它从(非AMQP支持的) barChannel
消耗 Bar
模型,并在 fooChannel
上放置 Foo
模型,如下所示:
@Transformer(inputChannel = 'barChannel', outputChannel = 'fooChannel')
public Foo transform(Bar bar) {
//transform logic removed for brevity
new Foo(name: 'Foo1', quantity: 1)
}
然后我有一个 ServiceActivator
组件,我希望从我的 fooChannel
消耗如下:
@ServiceActivator(inputChannel = 'fooChannel')
void consumeFoos(Foo foo){
// Do something with foo
}
我正在使用 spring-integration-core:4.2.5.RELEASE
和 spring-integration-amqp:4.2.5.RELEASE
.
任何人都可以建议我的Spring Integration应用程序的配置出错了吗?
如果需要进一步的信息以便更好地澄清我的问题或疑问,请告诉我 . 谢谢
1 回答
是 - amqp支持的通道目前仅限于Java可序列化对象 .
我们应该提供一个选项来将
Message<?>
映射到Spring AMQPMessage
(就像通道适配器那样)而不是.........转换整个消息 .
您可以使用一对通道适配器(出站/入站)而不是通道 .
由于您使用的是Java配置,因此可以将适配器对包装在新的
MessageChannel
实现中 .我打开了JIRA Issue for this .