首页 文章

Spring Batch - 循环读取器/处理器/写入器步骤

提问于
浏览
1

答案

根据接受的答案代码,对该代码的以下调整对我有用:

// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(steps.size());         

    Flow[] flows = new Flow[steps.size()];
    for (int i = 0; i < steps.size(); i++) {
        flows[i] = new FlowBuilder<SimpleFlow>(steps.get(i).getName()).start(steps.get(i)).build();
    }           

    return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
        .split(taskExecutor)                
        .add(flows)
        .build();
}

编辑

我已经将问题更新为正确循环的版本,但是随着应用程序的扩展,能够并行处理很重要,而且我仍然不知道如何在运行时动态地使用javaconfig ...

精炼问题: How do I create a reader-processor-writer dynamically at runtime 表示5种不同的情况(5个查询意味着现在配置的循环为5)?

我的LoopDecider看起来像这样:

public class LoopDecider implements JobExecutionDecider {

    private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
    private static final String COMPLETED = "COMPLETED";
    private static final String CONTINUE = "CONTINUE";
    private static final String ALL = "queries";
    private static final String COUNT = "count";

    private int currentQuery;
    private int limit;

    @SuppressWarnings("unchecked")
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        List<String> allQueries = (List<String>) jobExecution.getExecutionContext().get(ALL);
        this.limit = allQueries.size();
        jobExecution.getExecutionContext().put(COUNT, currentQuery);
        if (++currentQuery >= limit) {
            return new FlowExecutionStatus(COMPLETED);
        } else {
            LOG.info("Looping for query: " + allQueries.get(currentQuery - 1));
            return new FlowExecutionStatus(CONTINUE);
        }       
    }

}

基于查询列表(HQL查询),我想要每个查询的读取器 - 处理器 - 编写器 . 我当前的配置如下所示:

Job

@Bean
public Job subsetJob() throws Exception {
    LoopDecider loopDecider = new LoopDecider();        
    FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
    Flow flow = flowBuilder
            .start(createHQL())
            .next(extractData())
            .next(loopDecider)
            .on("CONTINUE")
            .to(extractData())
            .from(loopDecider)
            .on("COMPLETED")                
            .end()
            .build();       

    return jobBuilderFactory.get("subsetJob")               
            .start(flow)                
            .end()
            .build();
}

Step

public Step extractData(){
    return stepBuilderFactory.get("extractData")
            .chunk(100_000)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
}

Reader

public HibernateCursorItemReader reader(){      
    CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
    reader.setSessionFactory(HibernateUtil.getSessionFactory());        
    reader.setUseStatelessSession(false);
    return reader;
}

Processor

public DynamicRecordProcessor processor(){
    return new DynamicRecordProcessor();
}

Writer

public FlatFileItemWriter writer(){
    CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();               
    writer.setLineAggregator(new DelimitedLineAggregator(){{
        setDelimiter(TARGET_DELIMITER);
        setFieldExtractor(new PassThroughFieldExtractor());
        }}
    );
    return writer;
}

目前,该过程适用于单个查询 . 但是,我实际上有一个查询列表 .

我最初的想法是循环步骤并将步骤传递给查询列表,并为每个查询读取 - 进程 - 写入 . 这对于并行分块也是理想的选择 .

但是,当我将查询列表作为参数添加到extractData步骤时,我为每个查询创建一个步骤,返回一个步骤列表,而不是预期的单个步骤 . 工作开始抱怨它需要一步而不是一系列步骤 .

另一个想法是创建一个自定义MultiHibernateCursorItemReader与MultiItemResourceReader具有相同的想法,但我真的在寻找一个更开箱即用的解决方案 .

@Bean
public List<Step> extractData(@Value("#{jobExecutionContext[HQL]}") List<String> queries){
    List<Step> steps = new ArrayList<Step>();
    for (String query : queries) {
        steps.add(stepBuilderFactory.get("extractData")
            .chunk(100_000)
            .reader(reader(query))
            .processor(processor())
            .writer(writer(query))
            .build());
    }
    return steps;
}

Question
如何循环步骤并将其集成到作业中?

1 回答

  • 4

    不要将您的步骤,读者,处理器和作者实例化为Spring-Beans . 没有必要这样做 . 只有你的作业实例必须是一个Spring Bean .

    因此,只需从步骤,读取器,编写器和处理器创建者方法中删除@Bean和@StepScope配置,并在需要时将其实例化 .

    只有一个catch,你必须手动调用afterPropertiesSet() . 例如 . :

    // @Bean -> delete
    // @StepScope -> delete
    public FlatFileItemWriter writer(@Value("#{jobExecutionContext[fileName]}") String fileName){
        FlatFileItemWriter writer = new FlatFileItemWriter();
        writer.setResource(new FileSystemResource(new File(TARGET_LOCATION + fileName + TARGET_FILE_EXTENSION)));       
        writer.setLineAggregator(new DelimitedLineAggregator(){{
            setDelimiter(TARGET_DELIMITER);
            setFieldExtractor(new PassThroughFieldExtractor());
            }}
        );
    
        // ------- ADD!!
        writer.afterPropertiesSet();
    
        return writer;
    }
    

    这样,您的步骤,读取器,编写器实例将自动“步长”,因为您明确地为每个步骤实例化它们 .

    如果我的回答不够明确,请告诉我 . 然后我会添加一个更详细的例子 .

    编辑

    一个简单的例子:

    @Configuration
    public class MyJobConfiguration {
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
    
        List<String> filenames = Arrays.asList("file1.txt", "file2.txt");
    
        @Bean
        public Job myJob() {
    
           List<Step> steps = filenames.stream().map(name -> createStep(filename));
    
           return jobBuilderFactory.get("subsetJob")               
                .start(createParallelFlow(steps));                
                .end()
                .build();
        }
    
    
        // helper method to create a step
        private Step createStep(String filename) {
        {
            return stepBuilderFactory.get("convertStepFor" + filename); // !!! Stepname has to be unique
                .chunk(100_000)
                .reader(createFileReader(new FileSystemResource(new File(filename)), new YourInputLineMapper()));
                .processor(new YourConversionProcessor());
                .writer(createFileWriter(new FileSystemResource(new File("converted_"+filename)), new YourOutputLineAggregator()));
                .build();
        }
    
    
        // helper method to create a split flow out of a List of steps
        private static Flow createParallelFlow(List<Step> steps) {
            SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
            taskExecutor.setConcurrencyLimit(steps.size());
    
            List<Flow> flows = steps.stream() // we have to convert the steps to a flows
                .map(step -> //
                        new FlowBuilder<Flow>("flow_" + step.getName()) //
                        .start(step) //
                        .build()) //
                .collect(Collectors.toList());
    
            return new FlowBuilder<SimpleFlow>("parallelStepsFlow").split(taskExecutor) //
                .add(flows.toArray(new Flow[flows.size()])) //
                .build();
        }
    
    
        // helper methods to create filereader and filewriters
        public static <T> ItemReader<T> createFileReader(Resource source, LineMapper<T> lineMapper) throws Exception {
            FlatFileItemReader<T> reader = new FlatFileItemReader<>();
    
            reader.setEncoding("UTF-8");
            reader.setResource(source);
            reader.setLineMapper(lineMapper);
            reader.afterPropertiesSet();
    
            return reader;
        }
    
        public static <T> ItemWriter<T> createFileWriter(Resource target, LineAggregator<T> aggregator) throws Exception {
            FlatFileItemWriter<T> writer = new FlatFileItemWriter<>();
    
            writer.setEncoding("UTF-8");
            writer.setResource(target);
            writer.setLineAggregator(aggregator);
    
            writer.afterPropertiesSet();
            return writer;
        }
    }
    

相关问题