答案
根据接受的答案代码,对该代码的以下调整对我有用:
// helper method to create a split flow out of a List of steps
private static Flow createParallelFlow(List<Step> steps) {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(steps.size());
Flow[] flows = new Flow[steps.size()];
for (int i = 0; i < steps.size(); i++) {
flows[i] = new FlowBuilder<SimpleFlow>(steps.get(i).getName()).start(steps.get(i)).build();
}
return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
.split(taskExecutor)
.add(flows)
.build();
}
编辑
我已经将问题更新为正确循环的版本,但是随着应用程序的扩展,能够并行处理很重要,而且我仍然不知道如何在运行时动态地使用javaconfig ...
精炼问题: How do I create a reader-processor-writer dynamically at runtime 表示5种不同的情况(5个查询意味着现在配置的循环为5)?
我的LoopDecider看起来像这样:
public class LoopDecider implements JobExecutionDecider {
private static final Logger LOG = LoggerFactory.getLogger(LoopDecider.class);
private static final String COMPLETED = "COMPLETED";
private static final String CONTINUE = "CONTINUE";
private static final String ALL = "queries";
private static final String COUNT = "count";
private int currentQuery;
private int limit;
@SuppressWarnings("unchecked")
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
List<String> allQueries = (List<String>) jobExecution.getExecutionContext().get(ALL);
this.limit = allQueries.size();
jobExecution.getExecutionContext().put(COUNT, currentQuery);
if (++currentQuery >= limit) {
return new FlowExecutionStatus(COMPLETED);
} else {
LOG.info("Looping for query: " + allQueries.get(currentQuery - 1));
return new FlowExecutionStatus(CONTINUE);
}
}
}
基于查询列表(HQL查询),我想要每个查询的读取器 - 处理器 - 编写器 . 我当前的配置如下所示:
Job
@Bean
public Job subsetJob() throws Exception {
LoopDecider loopDecider = new LoopDecider();
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>(FLOW_NAME);
Flow flow = flowBuilder
.start(createHQL())
.next(extractData())
.next(loopDecider)
.on("CONTINUE")
.to(extractData())
.from(loopDecider)
.on("COMPLETED")
.end()
.build();
return jobBuilderFactory.get("subsetJob")
.start(flow)
.end()
.build();
}
Step
public Step extractData(){
return stepBuilderFactory.get("extractData")
.chunk(100_000)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
Reader
public HibernateCursorItemReader reader(){
CustomHibernateCursorItemReader reader = new CustomHibernateCursorItemReader();
reader.setSessionFactory(HibernateUtil.getSessionFactory());
reader.setUseStatelessSession(false);
return reader;
}
Processor
public DynamicRecordProcessor processor(){
return new DynamicRecordProcessor();
}
Writer
public FlatFileItemWriter writer(){
CustomFlatFileItemWriter writer = new CustomFlatFileItemWriter();
writer.setLineAggregator(new DelimitedLineAggregator(){{
setDelimiter(TARGET_DELIMITER);
setFieldExtractor(new PassThroughFieldExtractor());
}}
);
return writer;
}
目前,该过程适用于单个查询 . 但是,我实际上有一个查询列表 .
我最初的想法是循环步骤并将步骤传递给查询列表,并为每个查询读取 - 进程 - 写入 . 这对于并行分块也是理想的选择 .
但是,当我将查询列表作为参数添加到extractData步骤时,我为每个查询创建一个步骤,返回一个步骤列表,而不是预期的单个步骤 . 工作开始抱怨它需要一步而不是一系列步骤 .
另一个想法是创建一个自定义MultiHibernateCursorItemReader与MultiItemResourceReader具有相同的想法,但我真的在寻找一个更开箱即用的解决方案 .
@Bean
public List<Step> extractData(@Value("#{jobExecutionContext[HQL]}") List<String> queries){
List<Step> steps = new ArrayList<Step>();
for (String query : queries) {
steps.add(stepBuilderFactory.get("extractData")
.chunk(100_000)
.reader(reader(query))
.processor(processor())
.writer(writer(query))
.build());
}
return steps;
}
Question
如何循环步骤并将其集成到作业中?
1 回答
不要将您的步骤,读者,处理器和作者实例化为Spring-Beans . 没有必要这样做 . 只有你的作业实例必须是一个Spring Bean .
因此,只需从步骤,读取器,编写器和处理器创建者方法中删除@Bean和@StepScope配置,并在需要时将其实例化 .
只有一个catch,你必须手动调用afterPropertiesSet() . 例如 . :
这样,您的步骤,读取器,编写器实例将自动“步长”,因为您明确地为每个步骤实例化它们 .
如果我的回答不够明确,请告诉我 . 然后我会添加一个更详细的例子 .
编辑
一个简单的例子: