下面是我创建的批处理作业的读取器,处理器,编写器和步骤的相关代码部分 .
我需要更新表中的 flag column ,从中读取数据(源表)以标记此作业正在处理此数据,以便其他应用程序不会获取该数据 . 然后,一旦读取记录的处理完成,我需要将该列恢复为原始值,以便其他应用程序也可以处理这些记录 .
我想,听众是采取的方法(ItemReadListener?) . 读者监听器似乎只适用于第一步(即更新标志列),但不适用于块结束时的恢复 . 挑战似乎是在处理器末端提供读取数据 .
任何人都可以建议可行的方法吗?
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemReader<RemittanceVO> reader, ItemWriter<RemittanceClaimVO> writer,
ItemProcessor<RemittanceVO, RemittanceClaimVO> processor) {
return stepBuilderFactory.get("step1")
.<RemittanceVO, RemittanceClaimVO> chunk(Constants.SPRING_BATCH_CHUNK_SIZE)
.reader(reader)
.processor(processor)
.writer(writer)
.taskExecutor(simpleAsyntaskExecutor)
.throttleLimit(Constants.THROTTLE_LIMIT)
.build();
}
@Bean
public ItemReader<RemittanceVO> reader() {
JdbcPagingItemReader<RemittanceVO> reader = new JdbcPagingItemReader<RemittanceVO>();
reader.setDataSource(dataSource);
reader.setRowMapper(new RemittanceRowMapper());
reader.setQueryProvider(queryProvider);
reader.setPageSize(Constants.SPRING_BATCH_READER_PAGE_SIZE);
return reader;
}
@Bean
public ItemProcessor<RemittanceVO, RemittanceClaimVO> processor() {
return new MatchClaimProcessor();
}
@Bean
public ItemWriter<RemittanceClaimVO> writer(DataSource dataSource) {
return new MatchedClaimWriter();
}
我几天前开始使用Spring Batch,所以不熟悉所有提供的建模和模式 .
1 回答
首先,关于使用asyncTaskExecutor的一个小提示:你必须同步阅读器,否则你将遇到并发问题 . 您可以使用SynchronizedItemStreamReader执行此操作:
其次,一个可能的方法来解决你真正的问题:
我会使用一个简单的tasklet来“标记”你想要处理的条目 . 您可以使用一个简单的UPDATE语句来完成此操作,因为您知道您的选择标准 . 这样,您只需要一个呼叫,因此只需要一个事务 .
在那之后,我将实现与读者,处理器和编写器的正常步骤 . 读者必须只读取标记的条目,使您的select子句也非常简单 .
为了恢复该标志,您可以在第三步中执行该操作,该步骤实现为tasklet并使用适当的UPDATE语句(如第一步) . 要确保在异常情况下还原标志,只需适当配置您的作业流,以便即使步骤2失败也执行步骤3( - >查看我对此问题的回答Spring Batch Java Config: Skip step when exception and go to next steps)
当然,如果使用compositeItemWriter,也可以在编写块时恢复标志 . 但是,如果在步骤2中发生异常,则需要一种策略如何恢复标志 .
IMO,使用Listener不是一个好主意,因为事务处理是不同的 .