首页 文章

如何重新启动失败的 spring 批处理作业并让它从中断的地方继续?

提问于
浏览
7

根据Spring Batch文档,开箱即用的重新启动作业但是我无法从它离开的位置开始 . 例如如果我的步骤处理了10个记录,它应该从记录11开始,每当我重新启动它时进行处理 . 在实践中,这不会发生 . 它从beginnen中读取并重新处理所有内容 .

是否有人使用基于Java配置的简单作业配置来读取分隔文件并将内容写入可以从停止点重新启动的db表?

@Configuration
public class BatchConfiguration {

    @Value("${spring-batch.databaseType}")
    private String databaseType;

    @Value("${spring-batch.databaseSchema}")
    private String schemaName;

    @Bean
    public JobBuilderFactory jobBuilderFactory(final JobRepository jobRepository) {
        return new JobBuilderFactory(jobRepository);
    }

    @Bean
    public StepBuilderFactory stepBuilderFactory(final JobRepository jobRepository,
        final PlatformTransactionManager transactionManager) {
        return new StepBuilderFactory(jobRepository, transactionManager);
    }

    @Bean
    public JobRepository jobRepository(final DataSource dataSource, final PlatformTransactionManager transactionManager) {

        final JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean();
        bean.setDatabaseType(databaseType);
        bean.setDataSource(dataSource);
        if (StringUtils.isNotBlank(schemaName)) {
            bean.setTablePrefix(schemaName);
        }
        bean.setTransactionManager(transactionManager);
        try {
            bean.afterPropertiesSet();
            return bean.getObject();
        } catch (final Exception e) {
            throw new BatchConfigurationException("Invalid batch job repository configuration.", e);
        }
    }

    @Bean
    public JobLauncher jobLauncher(final JobRepository jobRepository) {

        final SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }

}

@Configuration
@EnableScheduling
@ComponentScan("com.some.package")
public class BatchJobConfiguration {

    @Resource
    private JobBuilderFactory jobBuilderFactory;

    @Resource
    private StepBuilderFactory stepBuilderFactory;

    @Value("${savings-transaction.file}")
    private String savingsTransactionFile;

    @Value("${savings-balance.file}")
    private String savingsBalanceFile;

    @Value("${processed-directory}")
    private String processedDirectory;

    private static final Integer IMPORT_CHUNKSIZE = 10;

    @Bean
    @DependsOn("stepBuilderFactory")
    public Step savingsTransactionStep(final PlatformTransactionManager transactionManager,
            @Qualifier("savingsTransactionItemReader") final ItemReader<SavingsTransactionItem> savingsTransactionItemReader,
            @Qualifier("savingsTransactionProcessor") final ItemProcessor<SavingsTransactionItem, SavingsTransaction> processor,
            @Qualifier("savingsTransactionItemWriter") final ItemWriter<SavingsTransaction> savingsTransactionItemWriter,
            @Qualifier("savingsTransactionStepListener") final SavingsTransactionStepListener listener) {

        return stepBuilderFactory.get("savingsTransactionStep")
                .transactionManager(transactionManager)
                .<SavingsTransactionItem, SavingsTransaction> chunk(IMPORT_CHUNKSIZE)
                .reader(savingsTransactionItemReader)
                .processor(processor)
                .writer(savingsTransactionItemWriter)
                .listener(listener)
                .build();
    }

    @Bean
    public Step savingsTransactionCleanUpStep(final PlatformTransactionManager transactionManager,
            final JobRepository jobRepository) {

        final TaskletStep taskletStep = new TaskletStep("savingsTransactionCleanUpStep");

        final FileMovingTasklet tasklet = new FileMovingTasklet();
        tasklet.setFileNamePattern(savingsTransactionFile);
        tasklet.setProcessedDirectory(processedDirectory);
        taskletStep.setTasklet(tasklet);
        taskletStep.setTransactionManager(transactionManager);
        taskletStep.setJobRepository(jobRepository);
        try {
            taskletStep.afterPropertiesSet();
        } catch (final Exception e) {
            throw new BatchConfigurationException("Failed to configure tasklet!", e);
        }

        return taskletStep;
    }

    @Bean
    @DependsOn("jobBuilderFactory")
    public Job job(final Step savingsTransactionStep,
            final Step savingsTransactionCleanUpStep) {

        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(savingsTransactionStep)  
                .next(savingsTransactionCleanUpStep)                    
                .on("FINISHED")
                .end()
                .build()
                .build();
    }
}

重新启动作业的单元测试代码

final Date now = new Date();
    jobMananger.processRegistrations(now);

    final List<SavingsBalance> savingsBalances = savingsBalanceDao.findAll();
    assertEquals(9, savingsBalances.size());

    FileUtils.moveFile(new File("target/AEA001_20160610.dat"), new File("target/AEA001_20160610_invalid.dat"));
    FileUtils.moveFile(new File("target/AEA001_20160610_valid.dat"), new File("target/AEA001_20160610.dat"));

    jobMananger.processRegistrations(now);

    final List<SavingsBalance> savingsBalances2 = savingsBalanceDao.findAll();
    System.out.println(savingsBalances2.size());
    int found = 0;
    for (final SavingsBalance savingsBalance : savingsBalances2) {

        final String id = savingsBalance.getId();
        if ("12345".equals(id)) {
            found++;
        }

    }

    assertEquals("Invalid number of found balances!", 1, found);

工作经理的实施

public class JobManager {

    @Resource
    private JobLauncher jobLauncher;

    @Resource
    private Job job;

    @Transactional(propagation = Propagation.NOT_SUPPORTED)
    public void processRegistrations(final Date date) {

        try {

            final Map<String, JobParameter> parameters = new HashMap<>();
            parameters.put("START_DATE", new JobParameter(date));

            final JobParameters jobParameters = new JobParameters(parameters);
            final JobExecution execution = jobLauncher.run(job, jobParameters);
            LOG.info("Exit Status : " + execution.getStatus());
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
                | JobParametersInvalidException e) {
            LOG.error("Failed to process registrations.", e);
        }
    }

}

3 回答

  • 1

    您似乎必须配置以下bean才能重新启动作业 .

    @Bean
    public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
            final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
        final SimpleJobOperator jobOperator = new SimpleJobOperator();
        jobOperator.setJobLauncher(jobLauncher);
        jobOperator.setJobRepository(jobRepository);
        jobOperator.setJobRegistry(jobRegistry);
        jobOperator.setJobExplorer(jobExplorer);
        return jobOperator;
    }
    
    @Bean
    public JobExplorer jobExplorer(final DataSource dataSource) throws Exception {
        final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
        bean.setDataSource(dataSource);
        bean.setTablePrefix("BATCH_");
        bean.setJdbcOperations(new JdbcTemplate(dataSource));
        bean.afterPropertiesSet();
        return bean.getObject();
    }
    

    然后,您需要从批处理表中检索批处理实例ID,以便能够使用jobOperator重新启动该特定实例 .

    final Long restartId = jobOperator.restart(id);
    final JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
    
  • 5

    在JobManager类中,不使用JobLauncher,而是使用JobOperator.restart()nethod .

    你的工作没有从最后一个失败的步骤重新启动的原因是因为使用JobLauncher你又开始了一个新的工作,因此它从第一步开始工作 .

    请确保“restartable”属性设置为true(默认情况下,它设置为true) .

    这是示例代码 .

    public boolean resumeWorkflow(long executionId)
            throws WorkflowResumeServiceException {
        JobOperator jobOperator = (JobOperator) ApplicationContextProvider.getApplicationContext().getBean("jobOperator");
    
    
        try 
        {
    
            LOGGER.info("SUMMARY AFTER RESTART:" + jobOperator.getSummary(executionId));
            jobOperator.restart(executionId);
        }
    }
    

    您需要获取失败作业的jobExecutionid并将其传递给上述方法 .

    请注意,无法重新启动以“已完成”状态完成的作业 .

    你也可以阅读这篇文章Restarting a job

  • 0

    您使用新的JobParameters开始工作,因此SB不会恢复作业,而是启动新作业 .
    如果你想恢复Job,你应该从Job bean配置中删除增量器 .

相关问题