首页 文章

Spring Integration - JPA入站通道适配器,拆分器,服务激活器并发工作单元

提问于
浏览
2

我有一个非常简单的Spring集成管道设置

  • 将具有状态X(带JPA)的N次资产加载到通道中

  • 将每个JPA实体拆分到新通道

  • 使用服务激活器处理JPA实体,最后将实体的状态更新为Y.

此管道的同步性质意味着JPA入站通道适配器仅触发前一个入站通道适配器中的所有消息,并依次拆分通道已处理,并发送到 nullChannel

这非常有效,但效率低下 .

服务激活器执行一些操作,其中之一是它调用外部REST API,然后更新资产的状态,因此它将被排除在#1之外 .

这里的问题是服务激活器需要大约1秒来处理单个消息(大部分时间是对REST API的调用),因此,250个JPA实体的队列可能需要250秒来处理 .

如果我们同时调用REST API,比如说5次,它仍然需要1秒 .

所以,我想知道我们是否可以对我们的管道进行简单的更改,可能添加 AggregatorTask Executor ,这将允许整个管道作为同步"Unit Of Work"运行,但允许 Service Activator 同时处理 .

这是集成配置

<channel id="newAssetChannel" />
<channel id="splitAssetChannel" />

<int-jpa:inbound-channel-adapter
        id="newAssetChannelAdapter"
        channel="newAssetChannel"
        entity-manager-factory="entityManagerFactory" 
        entity-class="com.foo.domain.Asset" 
        jpa-query="select a from Asset a where (a.status = 'NEW' or a.status = 'UPDATED') and a.health = 'OK' ORDER BY a.priority DESC, a.updatedDate ASC"
        max-results="250">
    <poller fixed-rate="5000" max-messages-per-poll="1" />
</int-jpa:inbound-channel-adapter>

<splitter expression="payload"
    input-channel="newAssetChannel"
    output-channel="splitNewAssetChannel" />

<service-activator
    id="newAssetServiceActivator"
    input-channel="splitNewAssetChannel"
    output-channel="nullChannel"
    ref="assetProcessor"
    method="processNew" />

1 回答

  • 1

    好吧, aggregator 真的是等待所有回复的正确方法,但是在 ExecutorChannel 之后 splitter 你无论如何都会让poller的手自由 . 因此,在并行拆分聚合之前,您应该有一些障碍 .

    您可以使用 <gateway> 执行此操作:

    <gateway id="gateway" default-request-channel="splitterChannel"/>
    
    <service-activator id="gatewayTestService" input-channel="newAssetChannel" output-channel="saveRowsChannel" ref="gateway"/>
    

    分离器的 output-channel 必须是 ExecutorChannel . newAssetServiceActivator 必须输出到 aggregator . 聚合器没有 output-channel 表示对 gateway 的回复 .

相关问题