首页 文章

如果正在进行相同参数(但不同的日期/时间)的作业,则Spring批处理跳过作业

提问于
浏览
0

一些作业比我们的调度程序间隔运行的时间更长 . 跳过这些工作是非常安全的 . 如何检测具有相同名称和参数(运行时间除外)的作业当前是否正在运行?

Quartz运行Batch作业:

import org.springframework.batch.core.JobParametersBuilder;

@Override
protected void executeInternal(org.quartz.JobExecutionContext context) {
    Map<String, Object> jobDataMap = context.getMergedJobDataMap();
    String jobName = (String) jobDataMap.get("job.name");

    JobParametersBuilder builder = new JobParametersBuilder();
    builder.addString("company", jobDataMap.get("company"));
    builder.addDate("run.date", new Date());

    try {
        jobLauncher.run(jobLocator.getJob(jobName), jobParametersBuilder.toJobParameters());
    }
    catch (JobExecutionException e) {
        log.error("Could not execute job.", e);
    }
}

我需要查找是否正在运行特定 "job.name" / "company" (无论 "run.date" )的任何执行 .

我可以使用针对 BATCH_ 表运行的纯SQL来实现这一点 .

检查应该在Tasklet中运行,以便JobRepository可以自动连接 . 是否有可能只找到 JobRepository 或其他Spring Batch bean?

2 回答

  • 0

    您可以从识别中排除“run.date” .

    使用公共 JobParameter(Date parameter, boolean identifying)

    来自JobParameter的文档 .

    标识标志用于指示参数是否将用作作业实例标识的一部分 .

    所以只需添加没有 builder.addDate("run.date", new Date()); 的参数并运行作业 .

  • 0

    我的解决方案基于 JobExplorer.findRunningJobExecutions() (找到最新未完成的工作):

    /**
     * If there are jobs with same parameters, that started later then some number we can run new job again.
     *
     * {@link JobExplorer#findRunningJobExecutions(String)} selects jobs with BATCH_JOB_EXECUTION.END_TIME is null.
     *
     * @return  null if there is no jobs with same parameters, otherwise the latest running.
     */
    public JobExecution getLatestRunningJob(JobExplorer jobExplorer, String jobName, String agency, String branch) {
        long now = new Date().getTime();
        long minDiff = Long.MAX_VALUE;
        JobExecution latestExecution = null;
        Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(jobName);
        for (JobExecution execution : Optional.ofNullable(runningJobs).orElse(Collections.emptySet())) {
            JobParameters params = execution.getJobParameters();
            // log.warn("agencyName {}", params.getString("agencyName"));
            // log.warn("branchName {}", params.getString("branchName"));
            if ( ! agency.equals(params.getString("agencyName")))
                continue;
            if ( ! branch.equals(params.getString("branchName")))
                continue;
            // log.warn("create {}", execution.getCreateTime());
            // log.warn("start {}", execution.getStartTime());
    
            long diff = now - execution.getCreateTime().getTime();
            if (diff < 0) {
                log.warn("Impossible, new job executed before old! Old JobExecution id: {}", execution.getJobId());
                continue;
            }
            if (diff < minDiff) {
                minDiff = diff;
                latestExecution = execution;
            }
        }
        return latestExecution;
    }
    

    和决策者:

    /**
     * @param execution  not null
     * @param limitMinutes  how long job should be runnning to ignore
     */
    public boolean canIgnoreLateJob(JobExecution execution, long limitMinutes) {
        long now = new Date().getTime();
        long diffMinutes = TimeUnit.MINUTES.convert(now - execution.getCreateTime().getTime(), TimeUnit.MILLISECONDS);
        if (diffMinutes < limitMinutes) {
            log.warn("Recent JobExecution {} is still not finished, can't run new job, diff (minute): {}",
                    execution.getJobId(), diffMinutes);
            return false;
        } else {
            log.warn("Very old JobExecution {} is still not finished, can run new job, diff (minute): {}",
                    execution.getJobId(), diffMinutes);
            return true;
        }
    }
    

    这个解决方案的缺点是 JobExplorer 解决了JobExecution中的每个字段(包括所有步骤) . 所以表现并不完美 . 但解决方案是可靠的 .

相关问题