首页 文章

Spring Batch Item Reader只执行一次

提问于
浏览
4

试图实现Spring批处理,但遇到一个奇怪的问题,我们的 ItemReader 类只执行一次 .

以下是详细信息 .

如果我们在DB中有1000行 . 我们的Item阅读器从DB中获取1000行,并将列表传递给ItemWriter ItemWriter成功删除所有项目 . 现在,ItemReader再次尝试从DB获取数据,但没有找到,因此返回NULL,因此执行停止 . 但是我们已经配置批处理以使用Quartz调度程序执行,这是每分钟 . 现在,如果我们通过转储导入在DB中插入1000行,那么批处理作业应该在下次执行时选择此数据,但它甚至不执行,尽管JobLauncher正在执行 .

Configuration : -

1.我们有ItemReader,ItemWriter,提交间隔等于1 .

<batch:job id="csrfTokenBatchJob">
    <batch:step id="step1">
      <tasklet>
        <chunk reader="csrfTokenReader" writer="csrfTokenWriter" commit-interval="1"></chunk>
      </tasklet>
    </batch:step>
  </batch:job>

2.Job计划在每分钟触发 .

<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <property name="triggers">
      <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
        <property name="jobDetail" ref="jobDetail" />
        <property name="cronExpression" value="0 0/1 * * * ?" />
      </bean>
    </property>
  </bean>

3.Job配置

<bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
    <property name="jobClass" value="com.tavant.oauth.batch.job.CSRFTokenJobLauncher" />
    <property name="jobDataAsMap">
        <map>
            <entry key="jobName" value="csrfTokenCleanUpBatchJob" />
            <entry key="jobLocator" value-ref="jobRegistry" />
            <entry key="jobLauncher" value-ref="jobLauncher" />
        </map>
    </property>
</bean>

第一次它成功执行,但后来它没有执行,但我可以在日志中看到 JobLauncher 正在执行 .

@Component("csrfTokenReader")
@Scope(value="step")
public class CSRFTokenReader implements ItemReader<List<CSRFToken>> {

    private static final Logger logger = LoggerFactory.getLogger(CSRFTokenReader.class);

    @Autowired
    private CleanService cleanService;

    @Override
    public List<CSRFToken> read() {
        List<CSRFToken> csrfTokenList = null;
        try{

            int keepUpto = Integer.valueOf(PropertiesContext.getInstance().getProperties().getProperty("token.keep", "1"));

            Calendar calTime = Calendar.getInstance();
            calTime.add(Calendar.HOUR, -keepUpto);
            Date toKeep = calTime.getTime();

            csrfTokenList = cleanService.getCSRFTokenByTime(toKeep);
        }
        catch(Throwable th){
            logger.error("Exception in running job At " + new Date() + th);
        }
        if(CollectionUtils.isEmpty(csrfTokenList)){
            return null;
        }
        return csrfTokenList;
    }
}

编辑: -

public class CSRFTokenJobLauncher extends QuartzJobBean {
    static final String JOB_NAME = "jobName";
    private JobLocator jobLocator;
    private JobLauncher jobLauncher;
    public void setJobLocator(JobLocator jobLocator) {
        this.jobLocator = jobLocator;
    }
    public void setJobLauncher(JobLauncher jobLauncher) {
        this.jobLauncher = jobLauncher;
    }
    @Override
    protected void executeInternal(JobExecutionContext context) {
        Map<String, Object> jobDataMap = context.getMergedJobDataMap();
        String jobName = (String) jobDataMap.get(JOB_NAME);
        log.info("Quartz trigger firing with Spring Batch jobName="+jobName);
        JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
        try {
            jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
        }
        catch (JobExecutionException e) {
            log.error("Could not execute job.", e);
        }
    }
    private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) {
        JobParametersBuilder builder = new JobParametersBuilder();
        for (Entry<String, Object> entry : jobDataMap.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            if (value instanceof String && !key.equals(JOB_NAME)) {
                builder.addString(key, (String) value);
            }
            else if (value instanceof Float || value instanceof Double) {
                builder.addDouble(key, ((Number) value).doubleValue());
            }
            else if (value instanceof Integer || value instanceof Long) {
                builder.addLong(key, ((Number)value).longValue());
            }
            else if (value instanceof Date) {
                builder.addDate(key, (Date) value);
            }
        }
        return builder.toJobParameters();
    }
}

2 回答

  • 4

    经过几个小时的浪费,现在问题似乎已经解决了,我在tasklet中配置了 allow-start-if-complete="true" . 现在,Batch Item Reader按照计划执行 .

    <batch:job id="csrfTokenBatchJob">
        <batch:step id="step1">
          <batch:tasklet allow-start-if-complete="true">
            <batch:chunk reader="csrfTokenReader" writer="csrfTokenWriter" commit-interval="1"></batch:chunk>
          </batch:tasklet>
        </batch:step>
      </batch:job>
    
  • 2

    Spring批处理记录数据库中的每个作业 . 这就是为什么 Spring 季批次需要区分每个工作运行的原因 . 它检查作业是否已在同一天执行,除非任何作业参数与之前的运行不同,否则不会再次启动,如果启用了完整设置,则允许启动 .

    选项1: - 如上所述,我们可以使用allow-start-if-complete =“true”

    选项2: - 始终传递作为当前日期时间戳的作业参数 . 这样,作业参数值始终是唯一的 .

    JobExecution jobExecution = jobLauncher.run(reportJob, new JobParametersBuilder()
                        .addDate("now", new Date())
    

    选项3: - 使用递增器,例如RunIdIncrementer,这样我们就不需要确保每次都传递唯一的作业参数 .

    @Bean
        public Job job1(JobBuilderFactory jobs, Step s1) {
            return jobs.get("job1")
                    .incrementer(new RunIdIncrementer())
                    .flow(s1)
                    .end()
                    .build();
        }
    

相关问题