我有一个非常简单的Spring集成管道设置
-
将具有状态X(带JPA)的N次资产加载到通道中
-
将每个JPA实体拆分到新通道
-
使用服务激活器处理JPA实体,最后将实体的状态更新为Y.
此管道的同步性质意味着JPA入站通道适配器仅触发前一个入站通道适配器中的所有消息,并依次拆分通道已处理,并发送到 nullChannel
这非常有效,但效率低下 .
服务激活器执行一些操作,其中之一是它调用外部REST API,然后更新资产的状态,因此它将被排除在#1之外 .
这里的问题是服务激活器需要大约1秒来处理单个消息(大部分时间是对REST API的调用),因此,250个JPA实体的队列可能需要250秒来处理 .
如果我们同时调用REST API,比如说5次,它仍然需要1秒 .
所以,我想知道我们是否可以对我们的管道进行简单的更改,可能添加 Aggregator
和 Task Executor
,这将允许整个管道作为同步"Unit Of Work"运行,但允许 Service Activator
同时处理 .
这是集成配置
<channel id="newAssetChannel" />
<channel id="splitAssetChannel" />
<int-jpa:inbound-channel-adapter
id="newAssetChannelAdapter"
channel="newAssetChannel"
entity-manager-factory="entityManagerFactory"
entity-class="com.foo.domain.Asset"
jpa-query="select a from Asset a where (a.status = 'NEW' or a.status = 'UPDATED') and a.health = 'OK' ORDER BY a.priority DESC, a.updatedDate ASC"
max-results="250">
<poller fixed-rate="5000" max-messages-per-poll="1" />
</int-jpa:inbound-channel-adapter>
<splitter expression="payload"
input-channel="newAssetChannel"
output-channel="splitNewAssetChannel" />
<service-activator
id="newAssetServiceActivator"
input-channel="splitNewAssetChannel"
output-channel="nullChannel"
ref="assetProcessor"
method="processNew" />
1 回答
好吧,
aggregator
真的是等待所有回复的正确方法,但是在ExecutorChannel
之后splitter
你无论如何都会让poller的手自由 . 因此,在并行拆分聚合之前,您应该有一些障碍 .您可以使用
<gateway>
执行此操作:分离器的
output-channel
必须是ExecutorChannel
.newAssetServiceActivator
必须输出到aggregator
. 聚合器没有output-channel
表示对gateway
的回复 .