首页 文章

spring 批 - 处理器链

提问于
浏览
1

我需要随后执行七个独特的过程(一个接一个) . 数据存储在Mysql中 . 我正在考虑以下选项,如果我错了,请纠正我,或者是否有更好的解决方案 .

质量要求:

  • 从Db读取数据,最后执行七个过程(datavalidation,calculation1,calculation2 ...等),将处理后的数据写入DB .

  • 需要以块的形式处理数据 .

我的解决方案和问题:数据读取:

  • 使用JdbcCursorItemReader读取数据,因为这是性能最佳的数据库读取器 - 但是,SQL非常复杂,所以我可能不得不考虑使用JdbcTemplate的自定义ItemReader?这使我在处理数据方面具有更大的灵活性 .

处理:

  • 定义七个步骤和块,使用databean在步骤之间共享数据 . 但是,这不是一个好主意,因为数据进程在块中以及每个块之后,step1 writer将在databean中创建一组新数据 . 当此databean与其他步骤共享时,数据完整性将成为问题 .

  • 使用StepExecutionContext在步骤之间共享数据 . 但这可能会影响性能,因为这涉及批处理作业存储库 .

  • 只定义一个步骤,一个ItemReader和一个进程链(七个进程),并创建一个ItemWriter,将处理后的数据写入DB . 但是,我将无法管理或监控每个不同的流程,所有流程都将一步到位 .

2 回答

  • 5

    org.springframework.batch.item.support.CompositeItemProcessor是Spring Batch Framework中的开箱即用组件,可以支持您的第二个选项 . 这将允许你做到以下几点; - 在您的设计/解决方案中保持分离以便从数据库中读取(itemreader) - 保持每个处理器的分离'concerns'和配置 - 允许任何单个处理器返回null,而不管先前的进程如何

    CompositeItemProcessor 遍历一个委托循环,所以's '类似' to an action pattern. it'在你所描述的场景中非常有用,并且仍然允许你利用Chunk的好处(异常,重试,提交策略等)

  • 1

    建议:

    1)使用JdbcCursorItemReader读取数据 .

    所有开箱即用的组件都是一个不错的选择,因为它们已经实现了使您的步骤可重新启动的ItemStream接口 . 但是就像你提到的那样,有时候,请求只是为了复杂,或者像我一样,你已经拥有了可以重用的服务或DAO .

    我建议你使用ItemReaderAdapter . 它允许您配置委托服务以调用以获取数据 .

    <bean id="MyReader" class="xxx.adapters.MyItemReaderAdapter">
           <property name="targetObject" ref="AnExistingDao" />
           <property name="targetMethod" value="next" />        
       </bean>
    

    请注意,targetMethod必须遵守ItemReaders的读取协定(当没有更多数据时返回null)

    如果您的工作不需要重新启动,您只需使用该类: org.springframework.batch.item.adapter.ItemReaderAdapter

    但是如果你需要重新启动你的工作,你可以像这样创建自己的ItemReaderAdapter:

    public class MyItemReaderAdapter<T> extends AbstractMethodInvokingDelegator<T> implements ItemReader<T>, ItemStream {
    
    
    
    
    private long currentCount = 0;
    
    private final String CONTEXT_COUNT_KEY = "count"; 
    
    /**
     * @return return value of the target method.
     */
    public T read() throws Exception {
    
        super.setArguments(new Long[]{currentCount++});
        return invokeDelegateMethod();
    }
    @Override
    public void open(ExecutionContext executionContext)
            throws ItemStreamException {
        currentCount = executionContext.getLong(CONTEXT_COUNT_KEY,0);
    
    }
    
    
    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CONTEXT_COUNT_KEY, currentCount);
        log.info("Update Stream current count : " + currentCount);
    }
    
    
    @Override
    public void close() throws ItemStreamException {
        // TODO Auto-generated method stub
    
    }
    
    }
    

    因为开箱即用的itemReaderAdapter不可重启,所以您只需创建自己的实现ItemStream

    2)关于7步与1步 .

    我会在这一步上使用compositeProcessor进行一步 . 7步选项只会带来IMO的问题 .

    1)7步databean:所以你的编写器在一个数据库中提交,直到第7步..然后第7步编写器尝试提交真正的数据库和繁荣错误!全部丢失,批次必须从步骤1重新开始!

    2)带有上下文的7个步骤:可能会更好,因为您将在spring批处理元数据中保存状态 . 但是将大数据存储在springBatch的元数据中并不是一个好习惯!

    3)是去IMO的方式 . ;-)

相关问题