首页 文章

从Spring Batch分区步骤的Writer启动Runnable

提问于
浏览
0

我有一个Spring批处理作业,包括一个分区步骤和分区步骤正在进行块处理 .

我可以从方法 public void write(List<? extends VO> itemsToWrite) 进一步启动新线程(实现 Runnable )吗?

基本上,编写者在这里使用Lucene编写索引,因为编写者有 Listchunk-size 项,我想将 List 划分为段并将每个段传递给新的 Runnable .

这是一个好方法吗?

我编写了一个样本,它大部分时间都可以工作,但却被卡住了几次 .

有什么我需要担心的吗?或者是否有 Spring 季批量内置的东西来实现这一目标?

我不希望写入由整个块的单个线程发生 . 我希望进一步划分大块 .

Lucene IndexWriter 是线程安全的,列出了一种方法here

示例代码 - Writer获取 List 项目,我从线程池中打开线程?即使我等待游泳池终止一块,也会有任何顾虑,

@Override
    public void write(List<? extends IndexerInputVO> inputItems) throws Exception {


        int docsPerThread = Constants.NUMBER_OF_DOCS_PER_INDEX_WRITER_THREADS;
        int docSize = inputItems.size();
        int remainder = docSize%docsPerThread;
        int poolSize = docSize/docsPerThread;

        ExecutorService executor = Executors.newFixedThreadPool(poolSize+1);


        int fromIndex=0;
        int toIndex = docsPerThread;

        if(docSize < docsPerThread){
            executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems));
        }else{
            for(int i=1;i<=poolSize;i++){
                executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex)));
                fromIndex+=docsPerThread;
                toIndex+=docsPerThread;
            }

            if(remainder != 0){
                toIndex=docSize;
                executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex)));
            }
        }

        executor.shutdown();

        while(executor.isTerminated()){
            ;
        }

1 回答

  • 0

    我不确定在编写器中启动新的Threads是个好主意 . 这些线程超出了spring批处理框架的范围,因此您需要为上面的实现关闭和取消策略 . 如果一个段的处理失败,则可能导致整个队列失败 .

    作为替代方法,我可以建议将您的自定义列表从作者升级到下一步,如官方文档中所述passingDataToFutureSteps

相关问题