spring 批 - 处理器链

我需要随后执行七个独特的过程(一个接一个) . 数据存储在Mysql中 . 我正在考虑以下选项,如果我错了,请纠正我,或者是否有更好的解决方案 .

质量要求:

  • 从Db读取数据,最后执行七个过程(datavalidation,calculation1,calculation2 ...等),将处理后的数据写入DB .

  • 需要以块的形式处理数据 .

我的解决方案和问题:数据读取:

  • 使用JdbcCursorItemReader读取数据,因为这是性能最佳的数据库读取器 - 但是,SQL非常复杂,所以我可能不得不考虑使用JdbcTemplate的自定义ItemReader?这使我在处理数据方面具有更大的灵活性 .

处理:

  • 定义七个步骤和块,使用databean在步骤之间共享数据 . 但是,这不是一个好主意,因为数据进程在块中以及每个块之后,step1 writer将在databean中创建一组新数据 . 当此databean与其他步骤共享时,数据完整性将成为问题 .

  • 使用StepExecutionContext在步骤之间共享数据 . 但这可能会影响性能,因为这涉及批处理作业存储库 .

  • 只定义一个步骤,一个ItemReader和一个进程链(七个进程),并创建一个ItemWriter,将处理后的数据写入DB . 但是,我将无法管理或监控每个不同的流程,所有流程都将一步到位 .

回答(2)

2 years ago

org.springframework.batch.item.support.CompositeItemProcessor是Spring Batch Framework中的开箱即用组件,可以支持您的第二个选项 . 这将允许你做到以下几点; - 在您的设计/解决方案中保持分离以便从数据库中读取(itemreader) - 保持每个处理器的分离'concerns'和配置 - 允许任何单个处理器返回null,而不管先前的进程如何

CompositeItemProcessor 遍历一个委托循环,所以's '类似' to an action pattern. it'在你所描述的场景中非常有用,并且仍然允许你利用Chunk的好处(异常,重试,提交策略等)

2 years ago

建议:

1)使用JdbcCursorItemReader读取数据 .

所有开箱即用的组件都是一个不错的选择,因为它们已经实现了使您的步骤可重新启动的ItemStream接口 . 但是就像你提到的那样,有时候,请求只是为了复杂,或者像我一样,你已经拥有了可以重用的服务或DAO .

我建议你使用ItemReaderAdapter . 它允许您配置委托服务以调用以获取数据 .

<bean id="MyReader" class="xxx.adapters.MyItemReaderAdapter">
       <property name="targetObject" ref="AnExistingDao" />
       <property name="targetMethod" value="next" />        
   </bean>

请注意,targetMethod必须遵守ItemReaders的读取协定(当没有更多数据时返回null)

如果您的工作不需要重新启动,您只需使用该类: org.springframework.batch.item.adapter.ItemReaderAdapter

但是如果你需要重新启动你的工作,你可以像这样创建自己的ItemReaderAdapter:

public class MyItemReaderAdapter<T> extends AbstractMethodInvokingDelegator<T> implements ItemReader<T>, ItemStream {




private long currentCount = 0;

private final String CONTEXT_COUNT_KEY = "count"; 

/**
 * @return return value of the target method.
 */
public T read() throws Exception {

    super.setArguments(new Long[]{currentCount++});
    return invokeDelegateMethod();
}
@Override
public void open(ExecutionContext executionContext)
        throws ItemStreamException {
    currentCount = executionContext.getLong(CONTEXT_COUNT_KEY,0);

}


@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    executionContext.putLong(CONTEXT_COUNT_KEY, currentCount);
    log.info("Update Stream current count : " + currentCount);
}


@Override
public void close() throws ItemStreamException {
    // TODO Auto-generated method stub

}

}

因为开箱即用的itemReaderAdapter不可重启,所以您只需创建自己的实现ItemStream

2)关于7步与1步 .

我会在这一步上使用compositeProcessor进行一步 . 7步选项只会带来IMO的问题 .

1)7步databean:所以你的编写器在一个数据库中提交,直到第7步..然后第7步编写器尝试提交真正的数据库和繁荣错误!全部丢失,批次必须从步骤1重新开始!

2)带有上下文的7个步骤:可能会更好,因为您将在spring批处理元数据中保存状态 . 但是将大数据存储在springBatch的元数据中并不是一个好习惯!

3)是去IMO的方式 . ;-)