这是我之前的问题Spring Integration File reading的延续 . 总之,我有一个fileIn通道(一个队列),然后是一个处理文件的ServiceActivator和一个出站适配器来保存文件 .
我想引入并发,以处理多个线程中的消息 . 我使用的是java DSL(但不是Java8) . 我能够用以下方式做到这一点......
@Bean
public MessageChannel fileInChannel() {
return MessageChannels.queue("fileIn").get();
}
@Bean
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows.from(fileInChannel())
.handle(myFileProcessor, "processFile",
new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> t) {
t.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1).taskExecutor(Executors.newCachedThreadPool()));
}
})
.handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
.get();
}
这有效!我也试过以下
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows.from(fileInChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(myFileProcessor)
.handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
.get();
}
这也有效!!我不知道这只是一种风格,还是一种方法比另一种更好 . 如果是这样,哪种方法更好 .
其次,在上述情况下,“文件写入”(即最后一步)是顺序的还是它将在不同的线程中工作 . 如果我也需要并发,我应该在handle(fileProcessor)和handle(outBoundAdapter)之间引入另一个taskExecutor通道吗?最终outboundadapter将是一个远程文件S3适配器,因此问题
1 回答
虽然我倾向于选择第二种,但这只是风格 . 文件适配器是线程安全的 .
通常,写入将并行发生 .
唯一的例外是如果您使用
FileExistsMode.APPEND
编写,在这种情况下,在写入期间保持锁定,如果文件名哈希到同一个锁(有256个锁)作为另一个正在写入的文件,那么它将第一次完成时运行 .