首页 文章

ServiceActivators的Spring Integration并发

提问于
浏览
1

这是我之前的问题Spring Integration File reading的延续 . 总之,我有一个fileIn通道(一个队列),然后是一个处理文件的ServiceActivator和一个出站适配器来保存文件 .

我想引入并发,以处理多个线程中的消息 . 我使用的是java DSL(但不是Java8) . 我能够用以下方式做到这一点......

@Bean
public MessageChannel fileInChannel() {
    return MessageChannels.queue("fileIn").get();
}

@Bean
public IntegrationFlow fileProcessingFlow() {
    return IntegrationFlows.from(fileInChannel())
            .handle(myFileProcessor, "processFile",
                    new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
                        @Override
                        public void accept(GenericEndpointSpec<ServiceActivatingHandler> t) {
                            t.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1).taskExecutor(Executors.newCachedThreadPool()));
                        }
                    })
            .handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
            .get();
}

这有效!我也试过以下

public IntegrationFlow fileProcessingFlow() {
    return IntegrationFlows.from(fileInChannel())
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .handle(myFileProcessor)
            .handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
            .get();
}

这也有效!!我不知道这只是一种风格,还是一种方法比另一种更好 . 如果是这样,哪种方法更好 .

其次,在上述情况下,“文件写入”(即最后一步)是顺序的还是它将在不同的线程中工作 . 如果我也需要并发,我应该在handle(fileProcessor)和handle(outBoundAdapter)之间引入另一个taskExecutor通道吗?最终outboundadapter将是一个远程文件S3适配器,因此问题

1 回答

  • 0

    虽然我倾向于选择第二种,但这只是风格 . 文件适配器是线程安全的 .

    通常,写入将并行发生 .

    唯一的例外是如果您使用 FileExistsMode.APPEND 编写,在这种情况下,在写入期间保持锁定,如果文件名哈希到同一个锁(有256个锁)作为另一个正在写入的文件,那么它将第一次完成时运行 .

相关问题