我已经使用AMQP(RabbitMQ)实现了 Remote Chunking
. 现在我需要从Web容器中运行并行作业 .
我的简单控制器( testJob
使用远程分块):
@Controller
public class JobController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job testJob;
@RequestMapping("/job/test")
public void test() {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addDate("date",new Date());
try {
jobLauncher.run(personJob,jobParametersBuilder.toJobParameters());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | JobInstanceAlreadyCompleteException e) {
e.printStackTrace();
}
}
}
testJob
从文件系统(主块)读取数据并将其发送到远程块(从块) . 问题是 ItemReader
不是线程安全的 .
对于一些常见的批处理用例,使用多线程步骤存在一些实际限制 . 步骤中的许多参与者(例如读者和作者)是有状态的,并且如果状态没有被线程隔离,那么这些组件在多线程步骤中不可用 . 特别是Spring Batch的大多数现成的读者和作者都不是为多线程使用而设计的 . 但是,可以使用无状态或线程安全的读取器和编写器,并且Spring Batch Samples中有一个示例(parallelJob),它显示了使用过程指示器(参见第6.12节“防止状态持久性”)跟踪已在数据库输入表中处理的项目 .
我在spring批处理github存储库https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/java/org/springframework/batch/sample/common/StagingItemReader.java上考虑了parallelJob示例
我对过程指标模式有点困惑 . 在哪里可以找到有关此模式的更多详细信息?
1 回答
如果您只关心
ItemReader
实例将在作业调用之间共享,则可以将ItemReader
声明为步骤范围,并且每次调用都会获得一个新实例,这将消除线程问题 .但是要回答关于过程指示器模式的直接问题,我不确定它本身的好文档在哪里 . 在Spring Batch Samples中有一个它的实现示例(并行作业使用它) .
它背后的想法是您为要处理的记录提供状态 . 在作业/步骤开始时,您将这些记录标记为正在处理中 . 记录提交后,您将其标记为已处理 . 这样就无需跟踪阅读器中的状态,因为您的状态实际上位于数据库中(您的查询只查找标记为进程中的记录) .