我有一个Spring批处理作业,包括一个分区步骤和分区步骤正在进行块处理 .
我可以从方法 public void write(List<? extends VO> itemsToWrite)
进一步启动新线程(实现 Runnable
)吗?
基本上,编写者在这里使用Lucene编写索引,因为编写者有 List
的 chunk-size
项,我想将 List
划分为段并将每个段传递给新的 Runnable
.
这是一个好方法吗?
我编写了一个样本,它大部分时间都可以工作,但却被卡住了几次 .
有什么我需要担心的吗?或者是否有 Spring 季批量内置的东西来实现这一目标?
我不希望写入由整个块的单个线程发生 . 我希望进一步划分大块 .
Lucene IndexWriter
是线程安全的,列出了一种方法here
示例代码 - Writer获取 List
项目,我从线程池中打开线程?即使我等待游泳池终止一块,也会有任何顾虑,
@Override
public void write(List<? extends IndexerInputVO> inputItems) throws Exception {
int docsPerThread = Constants.NUMBER_OF_DOCS_PER_INDEX_WRITER_THREADS;
int docSize = inputItems.size();
int remainder = docSize%docsPerThread;
int poolSize = docSize/docsPerThread;
ExecutorService executor = Executors.newFixedThreadPool(poolSize+1);
int fromIndex=0;
int toIndex = docsPerThread;
if(docSize < docsPerThread){
executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems));
}else{
for(int i=1;i<=poolSize;i++){
executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex)));
fromIndex+=docsPerThread;
toIndex+=docsPerThread;
}
if(remainder != 0){
toIndex=docSize;
executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex)));
}
}
executor.shutdown();
while(executor.isTerminated()){
;
}
1 回答
我不确定在编写器中启动新的Threads是个好主意 . 这些线程超出了spring批处理框架的范围,因此您需要为上面的实现关闭和取消策略 . 如果一个段的处理失败,则可能导致整个队列失败 .
作为替代方法,我可以建议将您的自定义列表从作者升级到下一步,如官方文档中所述passingDataToFutureSteps