Parellel处理Spring批处理StaxEventItemReader

我有一个如下定义的 spring 批处理作业 .

<batch:step id="convert">
    <batch:tasklet >
        <batch:chunk reader="contentItemReader" writer="contentItemWriter"
                             processor="processor" commit-interval="10000" >
        </batch:chunk>
     </batch:tasklet>
</batch:step>

contentItemReader如下 .

@Bean
 public StaxEventItemReader contentItemReader() {
        StaxEventItemReader reader = new StaxEventItemReader();
        reader.setFragmentRootElementName("ContentItem");
        reader.setResource(new FileSystemResource(baseDirectory.concat(inputFile)));
        reader.setUnmarshaller(contentItemUnmarshaller());
        return reader;
 }

一切都很好,除了它比我想要的慢一点 . 我知道这个读者不是线程安全的 . 所以我认为我不能将taskExecutor添加到tasklet中 . ContentItems彼此不依赖,所以我想并行地将数据提供给处理器 . ItemProcessing可能相当耗时 . 因此,虽然我知道我不能拥有多线程阅读器,但我应该能够处理多线程项目 .

因为我使用的是flatFile ItemWriter,所以ItemWriters也需要是单线程的 .

完成此任务的最佳方法是什么?

回答(1)

2 years ago

只需将读者包装成以下内容:

public class SynchronizedWrapperReader<T> implements ItemStreamReader<T> {

  private ItemReader<T> itemReader;
  private boolean isStream = false;

  public void setItemReader(ItemReader<T> itemReader) {
    this.itemReader = itemReader;
    if (itemReader instanceof ItemStream) {
      isStream = true;
    }
  }

  @Override
  public void close() {
    if (isStream) {
      ((ItemStream) itemReader).close();
    }
  }

  @Override
  public void open(ExecutionContext executionContext) {
    if (isStream) {
      ((ItemStream) itemReader).open(new ExecutionContext());
    }
  }

  @Override
  public void update(ExecutionContext executionContext) {
  }

  @Override
  public synchronized T read() throws Exception {
    return itemReader.read();
  }
}

你的作家也一样 .

请注意,订单不再保证 .

已编辑:

有关如何在config.xml中使用它的评论 . 所以,这是一个如何将Wrapper与FlatFileItemReader一起使用的简单示例:

<batch:step id="convert">
    <batch:tasklet >
        <batch:chunk reader="wrappedReader" writer="..."
                             processor="..." commit-interval="10000" >
        </batch:chunk>
     </batch:tasklet>
</batch:step>

<bean id="wrappedReader" class=[package].SynchronizedWrapperReader">
   <property name="itemReader">
      <bean class="org.springframework.batch.item.file.FlatFileItemReader">
          <property .../>
          <property .../>
      </bean>
   </property>
</bean>