1、什么是quartz?

  quartz是一个开源的定时任务框架,具备将定时任务持久化至数据库以及分布式环境下多节点调度的能力。当当的elastic-job便是以quartz为基础,结合zookeeper开发出来的一款产品。

2、整合springboot示例

  项目使用springboot提高开发效率,并将定时任务持久化到mysql数据库中。

2.1)引入quartz依赖

 <dependency>
     <groupId>org.quartz-scheduler</groupId>
     <artifactId>quartz</artifactId>
     <version>2.3.0</version>
 </dependency>
 <dependency>
     <groupId>org.quartz-scheduler</groupId>
     <artifactId>quartz-jobs</artifactId>
     <version>2.3.0</version>
 </dependency>

2.2)配置quartz

先引入配置的config代码,在下一节中对每一段配置进行详细解释:

/**
 * 分布式定时任务quartz配置
 * Created by chenjunyi on 2018/6/6.
 */
@Configuration
public class QuartzConfiguration {

    /**
     * quartz的JobFactory,根据注册的JobClass从spring应用上下文中获取job实例
     */
    public static class AutoSpringBeanJobFactory extends AdaptableJobFactory implements SchedulerContextAware {

        /** spring应用上下文 */
        private ApplicationContext applicationContext;

        /** scheduler上下文 */
        private SchedulerContext   schedulerContext;

        /** 需要忽略的属性 */
        private String[]           ignoredUnknownProperties = null;

        private void setApplicationContext(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }

        @Override
        public void setSchedulerContext(SchedulerContext schedulerContext) {
            this.schedulerContext = schedulerContext;
        }

        private void setIgnoredUnknownProperties(String... ignoredUnknownProperties) {
            this.ignoredUnknownProperties = ignoredUnknownProperties;
        }

        @Override
        protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
            //获取定时任务的clazz,并从spring上下文中获取实例
            Class<? extends Job> clazz = bundle.getJobDetail().getJobClass();
            Job job = applicationContext.getBean(clazz);

            if (isEligibleForPropertyPopulation(job)) {
                //非继承自QuartzJobBean的Job,设置job属性
                BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(job);
                MutablePropertyValues pvs = new MutablePropertyValues();
                if (this.schedulerContext != null) {
                    pvs.addPropertyValues(this.schedulerContext);
                }
                pvs.addPropertyValues(bundle.getJobDetail().getJobDataMap());
                pvs.addPropertyValues(bundle.getTrigger().getJobDataMap());
                if (this.ignoredUnknownProperties != null) {
                    for (String propName : this.ignoredUnknownProperties) {
                        if (pvs.contains(propName) &amp;&amp; !bw.isWritableProperty(propName)) {
                            pvs.removePropertyValue(propName);
                        }
                    }
                    bw.setPropertyValues(pvs);
                } else {
                    bw.setPropertyValues(pvs, true);
                }
            }
            return job;
        }

        private boolean isEligibleForPropertyPopulation(Object jobObject) {
            return (!(jobObject instanceof QuartzJobBean));
        }

    }

    /**
     * 配置任务工厂实例
     * @param applicationContext spring上下文实例
     * @return 任务工厂实例
     */
    @Bean
    public JobFactory jobFactory(ApplicationContext applicationContext) {
        AutoSpringBeanJobFactory jobFactory = new AutoSpringBeanJobFactory();
        jobFactory.setApplicationContext(applicationContext);
        return jobFactory;
    }

    /**
     * 配置任务调度器,使用项目数据源作为quartz数据源
     * @param jobFactory 自定义配置任务工厂
     * @param dataSource 数据源实例
     * @return 任务调度器
     */
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, DataSource dataSource) {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        //将spring管理job自定义工厂交由调度器维护
        schedulerFactoryBean.setJobFactory(jobFactory);
        //设置覆盖已存在的任务(配置已失效,因为改写了原有的注册方式,JOB注册时便已自动进行替换)
        schedulerFactoryBean.setOverwriteExistingJobs(true);
        //项目启动完成后,等待50秒后开始执行调度器初始化(需要小于JOB的间隔时间)
        schedulerFactoryBean.setStartupDelay(50);
        //设置调度器自动运行
        schedulerFactoryBean.setAutoStartup(true);
        //设置数据源,使用与项目统一数据源
        schedulerFactoryBean.setDataSource(dataSource);
        //设置定时调度器命名空间
        schedulerFactoryBean.setSchedulerName("MY-QUARTZ-SCHEDULER");
        //设置存储在quartz上文中的Spring应用上下文key
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
        //设置属性
        Properties properties = new Properties();
        //设置调度器实例名
        properties.setProperty("org.quartz.scheduler.instanceName", "SCHEDULER-INSTANCE");
        //设置调度器实例ID,在cluster中使用,AUTO标识自动生成
        properties.setProperty("org.quartz.scheduler.instanceId", "AUTO");
        //禁用rmi配置
        properties.setProperty("org.quartz.scheduler.rmi.export", "false");
        //禁用rmi配置
        properties.setProperty("org.quartz.scheduler.rmi.proxy", "false");
        //quartz线程池实现类
        properties.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
        //quartz线程池线程数
        properties.setProperty("org.quartz.threadPool.threadCount", "10");
        //quartz线程池线程优先级
        properties.setProperty("org.quartz.threadPool.threadPriority", "5");
        //quartz线程池是否自动加载数据库内的定时任务
        properties.setProperty("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", "true");
        //Job错过执行时间的阈值
        properties.setProperty("org.quartz.jobStore.misfireThreshold", "60000");
        //Job持久化方式配置
        properties.setProperty("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
        //Job的JDBC持久化驱动,此处配置为MySql
        properties.setProperty("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
        //配置是否使用
        properties.setProperty("org.quartz.jobStore.useProperties", "false");
        //持久化的quartz表结构前缀
        properties.setProperty("org.quartz.jobStore.tablePrefix", "QRTZ_");
        //是否是集群quartz
        properties.setProperty("org.quartz.jobStore.isClustered", "true");
        //集群quartz中节点有效性检查时间间隔
        properties.setProperty("org.quartz.jobStore.clusterCheckinInterval", "20000");
        //错过执行时间的Job最大持有数
        properties.setProperty("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
        schedulerFactoryBean.setQuartzProperties(properties);
        //返回结果
        return schedulerFactoryBean;
    }

}

2.3)编写定时任务

为了各个定时任务方便注册,使用模板模式定义一个抽象基类。该抽象基类实现InterruptableJob接口,当然也可以实现其他接口或抽象类(该继承树的顶级接口为org.quartz.Job)。

/**
 * 抽象定时任务,完成向quartz注册的功能
 * Created by chenjunyi on 2018/6/6.
 */
@Slf4j
public abstract class AbstractScheduler implements InterruptableJob {

    @Autowired
    private Scheduler scheduler;

    /**
     * 向定时任务调度器注册
     * @throws SchedulerException 注册时发生异常
     */
    @PostConstruct
    protected void register() throws SchedulerException {
        //任务和触发器名称(若不进行设置,则quartz默认使用UUID,因此每次启动应用都会注册一个新任务)
        String jobName = this.getClass().getSimpleName() + "Job";
        String triggerName = this.getClass().getSimpleName() + "Trigger";
        //设置定时任务
        JobDetail jobDetail = JobBuilder.newJob(this.getClass()).withIdentity(jobName).build();
        //创建任务触发器
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(getCron());
        Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerName).withSchedule(scheduleBuilder).build();
        //将触发器与任务绑定到调度器内
        Set<Trigger> triggers = new HashSet<>();
        triggers.add(trigger);
        scheduler.scheduleJob(jobDetail, triggers, true);
        log.info(">>>>>注册[Trigger={}, Job={}]的定时任务成功", triggerName, jobName);
    }

    /**
     * 获取cron表达式
     * @return cron表达式
     */
    protected abstract String getCron();

}

实现一个自定义的demo定时任务,并通过@Service交于Spring-IOC容器进行托管,代码如下:

/**
 * 示例定时任务
 * Created by chenjunyi on 2018/6/5.
 */
@Service
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class DemoScheduler extends AbstractScheduler {

    @Value("${env.cron.demoScheduler}")
    private String      cron;

    @Autowired
    private DemoService demoService;

    @Override
    public void interrupt() throws UnableToInterruptJobException {

    }

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        demoService.sayHello();
    }

    @Override
    protected String getCron() {
        return this.cron;
    }

}

  

3、整合代码详解

3.1)AutoSpringBeanJobFactory

JobFactory,即定时任务Job工厂,在定时任务触发时,通过该factory获取定时任务实例并执行。该接口的继承树比较简单,实现的子类只有2个,如下图所示:
图片描述
不论是AdaptableJobFactory还是SimpleJobFactory,其获取Job实例的方式都比较简单,即通过JobDetail的Class直接newInstance,以AdaptableJobFactory为例,相关源码如下:

@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
    try {
        Object jobObject = createJobInstance(bundle); //直接根据class.newInstance获取实例
        return adaptJob(jobObject); //job适配,判断job类型并进行包装,此处忽略
    } catch (Exception ex) {
        throw new SchedulerException("Job instantiation failed", ex);
    }
}

protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
    return bundle.getJobDetail().getJobClass().newInstance();
}

  但这样带来了一个问题,因为我们的定时任务可能会依赖spring-ioc容器中的其他bean(需要进行注入),直接newInstance创建的实例无法被spring-ioc容器托管,从而在执行时拿不到注入的对象,导致NPE(NullPointException)。为了解决这个问题,在网上搜索了解决办法,给出的思路基本都是通过newInstance拿到Job对象后,再手动将其托管到spring-ioc容器中,如下代码所示:

//参考博客链接:https://www.jianshu.com/p/d52d62fb2ac6;作者:恒宇少年
public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {

    /** spring-beanfactory */
    private transient AutowireCapableBeanFactory beanFactory;

    @Override
    public void setApplicationContext(final ApplicationContext context) {
        beanFactory = context.getAutowireCapableBeanFactory();
    }

    @Override
    protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
        final Object job = super.createJobInstance(bundle);
        beanFactory.autowireBean(job); //通过newInstance获取job实例后,将其交付给spring-ioc
        return job;
    }
}

  通过在AutowiringSpringBeanJobFactory中维护一个AutowireCapableBeanFactory,并且在获取Job实例之后使用autowireBean方法将其交给spring-ioc进行托管,从而保证该实例在执行过程中能够拿到注入的对象。
  这样的处理同样有个问题,就是每次触发定时任务时,都要newInstance创建对象,那么能不能让Job在spring-ioc容器启动时便被托管(即使用@Service等注解),然后直接获取到这个托管的对象呢?解决这个问题的思路在于获取Job实例的方式,既然我们可以获取到Job的Class(由Job在向Scheduler注册时设置),那么便可以通过ApplicationContext拿到相应的实例。  因此,在AutoSpringBeanJobFactory中,取代了AutowireCapableBeanFactory,维护一个ApplicationContext,并通过Job的Class获取实例。

Class<? extends Job> clazz = bundle.getJobDetail().getJobClass();
Job job = applicationContext.getBean(clazz);

3.2)AbstractScheduler

  该抽象基类实现了自动注册的逻辑,不需要在config中配置额外的注册。思路是通过@PostConstruct注解,在该类被Spring完成初始化后,执行注册动作,有以下几点需要注意:

  • 需要设置任务和触发器名称。如果不设置,quartz默认使用UUID,因此每次启动应用都会注册一个新任务;
  • 调度器的绑定,要使用含有replace功能的方法(方法参数带有boolean replace的)。由于是将Job持久化到数据库,应用再次启动时,会读取数据库中的任务列表。不含replace功能的方法在进行一次新的注册时,发现任务已存在的话,就会报错;而含replace功能的方法会更新数据库的Job配置信息;
  • 由于我们在Job初始化时便进行了任务注册,且采用的是replace的方式,因此在config中的schedulerFactoryBean.setOverwriteExistingJobs(true)参数配置便失效了(因为SchedulerFactoryBean在afterPropertiesSet这个属于SpringBean的生命周期方法中,调用了自身的registerJobsAndTriggers方法,该方法会根据此参数决定是否调用含replace功能的绑定方法进行更新Job,我们自己的Job注册实现中便完成了此功能);
  • 继续上一条,值得注意的是,SchedulerFactoryBean.registerJobsAndTriggers的方法中,会根据是否设置了TransactionManager来决定是否将所有Job和Trigger的更新放在同一个事务中,由于目前的应用没有需要使用事务来更新Job的需求,并且若更新失败,启动应用时会抛出异常,因此该问题放置待解决(解决办法也很简单,在register方法中添加事务控制即可);

3.3)quartz持久化的表

  是的,这些表需要手工创建,建表语句如下:

DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;


CREATE TABLE QRTZ_JOB_DETAILS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    JOB_NAME  VARCHAR(200) NOT NULL,
    JOB_GROUP VARCHAR(200) NOT NULL,
    DESCRIPTION VARCHAR(250) NULL,
    JOB_CLASS_NAME   VARCHAR(250) NOT NULL,
    IS_DURABLE VARCHAR(1) NOT NULL,
    IS_NONCONCURRENT VARCHAR(1) NOT NULL,
    IS_UPDATE_DATA VARCHAR(1) NOT NULL,
    REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
    JOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);

CREATE TABLE QRTZ_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    JOB_NAME  VARCHAR(200) NOT NULL,
    JOB_GROUP VARCHAR(200) NOT NULL,
    DESCRIPTION VARCHAR(250) NULL,
    NEXT_FIRE_TIME BIGINT(13) NULL,
    PREV_FIRE_TIME BIGINT(13) NULL,
    PRIORITY INTEGER NULL,
    TRIGGER_STATE VARCHAR(16) NOT NULL,
    TRIGGER_TYPE VARCHAR(8) NOT NULL,
    START_TIME BIGINT(13) NOT NULL,
    END_TIME BIGINT(13) NULL,
    CALENDAR_NAME VARCHAR(200) NULL,
    MISFIRE_INSTR SMALLINT(2) NULL,
    JOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
        REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);

CREATE TABLE QRTZ_SIMPLE_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    REPEAT_COUNT BIGINT(7) NOT NULL,
    REPEAT_INTERVAL BIGINT(12) NOT NULL,
    TIMES_TRIGGERED BIGINT(10) NOT NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_CRON_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    CRON_EXPRESSION VARCHAR(200) NOT NULL,
    TIME_ZONE_ID VARCHAR(80),
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_SIMPROP_TRIGGERS
  (          
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    STR_PROP_1 VARCHAR(512) NULL,
    STR_PROP_2 VARCHAR(512) NULL,
    STR_PROP_3 VARCHAR(512) NULL,
    INT_PROP_1 INT NULL,
    INT_PROP_2 INT NULL,
    LONG_PROP_1 BIGINT NULL,
    LONG_PROP_2 BIGINT NULL,
    DEC_PROP_1 NUMERIC(13,4) NULL,
    DEC_PROP_2 NUMERIC(13,4) NULL,
    BOOL_PROP_1 VARCHAR(1) NULL,
    BOOL_PROP_2 VARCHAR(1) NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) 
    REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_BLOB_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    BLOB_DATA BLOB NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_CALENDARS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    CALENDAR_NAME  VARCHAR(200) NOT NULL,
    CALENDAR BLOB NOT NULL,
    PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);

CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_GROUP  VARCHAR(200) NOT NULL, 
    PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);

CREATE TABLE QRTZ_FIRED_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    ENTRY_ID VARCHAR(95) NOT NULL,
    TRIGGER_NAME VARCHAR(200) NOT NULL,
    TRIGGER_GROUP VARCHAR(200) NOT NULL,
    INSTANCE_NAME VARCHAR(200) NOT NULL,
    FIRED_TIME BIGINT(13) NOT NULL,
    SCHED_TIME BIGINT(13) NOT NULL,
    PRIORITY INTEGER NOT NULL,
    STATE VARCHAR(16) NOT NULL,
    JOB_NAME VARCHAR(200) NULL,
    JOB_GROUP VARCHAR(200) NULL,
    IS_NONCONCURRENT VARCHAR(1) NULL,
    REQUESTS_RECOVERY VARCHAR(1) NULL,
    PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);

CREATE TABLE QRTZ_SCHEDULER_STATE
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    INSTANCE_NAME VARCHAR(200) NOT NULL,
    LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
    CHECKIN_INTERVAL BIGINT(13) NOT NULL,
    PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);

CREATE TABLE QRTZ_LOCKS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    LOCK_NAME  VARCHAR(40) NOT NULL, 
    PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);

commit;