如何在 spring 批次中定义另一个流程中的并行子流程?

我想在 Spring 季批次中实现如下的流程结构 .

Job
          /   \
       Flow1  Flow2  
         /      \
      Step1    Step2
        /       /  \
       /    Step3  Flow3
      /                \  
     /                 Step4
     \                  /
      \                /
       \              /    
            Step5

作业配置伪代码如下:

@Configuration
public class JobConfiguration {

......

    @Bean
    public Job Job() {

    Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
                            .start(step1())
                            .build();

    Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
           .start(step2())
           .next(step3())
           .split(new SimpleAsyncTaskExecutor()).add(flow3)
           .build();

    Flow flow3 = new FlowBuilder<SimpleFlow>("flow3")
                            .start(step4())
                            .build();

    return jobBuilderFactory.get("job")
            .incrementer(new RunIdIncrementer())
            .start(flow1)
            .split(new SimpleAsyncTaskExecutor()).add(flow2)
            .next(Step5())
            .end()
            .build();
    }
......
}

当我运行批处理时,日志显示执行step1,step2,step3和step5,但不运行step4 .

我想知道如何在另一个流中定义子流,上面的代码是否正确实现呢?

提前致谢!

回答(1)

2 years ago

隔离运行每个流表明 Flow1Flow3 是正确的,但 Flow2 不是 . 仅运行 Flow2

return jobBuilderFactory.get("job")
            .incrementer(new RunIdIncrementer())
            .start(flow2)
            .build()
            .build();

显示执行step2和step3,但不执行step4 . 所以问题在于这个流程的定义 .

您需要像 Flow1Flow2 那样在 Step3Flow3 之间定义并行流 . 这是一个例子:

@Bean
public Job Job() {

    Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
            .start(step1())
            .build();

    Flow flow3 = new FlowBuilder<SimpleFlow>("flow3")
            .start(step4())
            .build();

    Flow parallelFlow = new FlowBuilder<SimpleFlow>("parallelFlow")
            .start(step3())
            .split(new SimpleAsyncTaskExecutor()).add(flow3)
            .build();

    Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
            .start(step2())
            .next(parallelFlow)
            .build();

    return jobs.get("job")
            .incrementer(new RunIdIncrementer())
            .start(flow1)
            .split(new SimpleAsyncTaskExecutor()).add(flow2)
            .next(step5())
            .end()
            .build();
}

希望这可以帮助 .