Springboot Quartz 任务调度框架

2020-05-24  本文已影响0人  Swye

1.maven依赖

maven依赖

2. Quartz 核心概念

1.Job 表示一个工作,要执行的具体内容。

2.JobDetail 表示一个具体的可执行的调度程序,Job 是这个可执行程调度程序所要执行的内容,另外 JobDetail 还包含了这个任务调度的方案和策略。

3.Trigger 代表一个调度参数的配置,什么时候去调。

4.Scheduler 代表一个调度容器,一个调度容器中可以注册多个 JobDetail 和 Trigger。当 Trigger 与 JobDetail 组合,就可以被 Scheduler 容器调度了。


jobDetail

Scheduler 通过jobDetail识别将要调用的job的类型,来调用job工作

我们传给scheduler一个JobDetail实例,因为我们在创建JobDetail时,将要执行的job的类名传给了JobDetail,所以scheduler就知道了要执行何种类型的job;每次当scheduler执行job时,在调用其execute(…)方法之前会创建该类的一个新的实例;执行完毕,对该实例的引用就被丢弃了,实例会被垃圾回收;这种执行策略带来的一个后果是,job必须有一个无参的构造函数(当使用默认的JobFactory时);另一个后果是,在job类中,不应该定义有状态的数据属性,因为在job的多次执行中,这些属性的值不会保留。

如果需要有状态的数据属性时,需要用JobDataMap,这个就相当于Map,可以将数据传入job中

JobDataMap

JobDataMap中可以包含不限量的(序列化的)数据对象,在job实例执行的时候,可以使用其中的数据;JobDataMap是Java Map接口的一个实现,额外增加了一些便于存取基本类型的数据的方法。

将job加入到scheduler之前,在构建JobDetail时,可以将数据放入JobDataMap

Trigger 触发器

Job中包含了任务执行的逻辑,Scheduler负责扫描需要执行的Job任务,trigger则定义job何时被执行。

下面对触发器的子类分别进行讲述,我们主要看Trigger的4个可用的派生类,分别是:

org.quartz.SimpleTrigger

org.quartz.CronTrigger (常用)

org.quartz.DateIntervalTrigger

org.quartz.NthIncludedDayTrigger


SimpleTrigger

SimpleTrigger是一种设置和使用简单的触发器,它是在指定日期/时间且可能需要重复执行n次的时机下使用的。这种触发器不适合每天定时执行任务这种场景

它适合的任务类似于:9:00 开始,每隔1小时,每隔几分钟,每隔几秒钟执行一次。


CronTrigger

类似于SimpleTrigger,指定从某一个时间开始,以一定的时间间隔执行的任务。 但是不同的是SimpleTrigger指定的时间间隔为毫秒,没办法指定每隔一个月执行一次(每月的时间间隔不是固定值),而CalendarIntervalTrigger支持的间隔单位有秒,分钟,小时,天,月,年,星期。

它适合的任务类似于:9:00 开始执行,并且以后每周 9:00 执行一次

Cron表达式的格式:秒 分 时 日 月 周 年(可选)


DailyTimeIntervalTrigger

指定每天的某个时间段内,以一定的时间间隔执行任务。并且它可以支持指定星期。

它适合的任务类似于:指定每天9:00 至 18:00 ,每隔70秒执行一次,并且只要周一至周五执行。


NthIncludedDayTrigger

NthIncludedDayTrigger适用于在每一个间隔类型(月或年等)的第N天触发。直接上代码:

它适合的任务类似于:每个月的第10天中午12点整触发



3.代码实现

job 继承QuartzJobBean实现executeInternal方法

4.misfired 策略

对于判定为misfired job,其实有很多条件,目前了解到的有:

1.到执行时间时,上一个任务还未完成;

2.过期时间已超过设置的misfireThreshold;

3.线程池中已没有空闲线程

4.线程池中虽有空闲线程,但有优先级更高的任务

5.服务器停止运行

产生misfire后,会根据设置的misfire策略进行任务的处理。

对CronTrigger来说,有三种misfire策略:

withMisfireHandlingInstructionDoNothing

不触发立即执行,等待下次Cron触发频率到达时刻开始按照Cron频率依次执行

withMisfireHandlingInstructionFireAndProceed

以错过的第一个频率时间立刻开始执行,重做错过的所有频率周期后,当下一次触发频率发生时间大于当前时间后,再按照正常的Cron频率依次执行

withMisfireHandlingInstructionIgnoreMisfires

以当前时间为触发频率立刻触发一次执行,并发执行一次做完,然后按照Cron频率依次执行

5.job、trigger、scheduler监听器

由于quartz没有实现对任务执行情况做记录,所以主要用监听器来实现记录任务的执行情况,记录任务是否被错过,任务具体什么时候被执行,执行过程中是否抛出异常等。

监听器的执行顺序:

SimpleSchedulerListener.java

public class SimpleSchedulerListener extends SchedulerListenerSupport {

    private static Logger logger = LoggerFactory.getLogger(SimpleSchedulerListener.class);

    @Override

    public void jobScheduled(Trigger trigger) {

        String jobName = trigger.getJobKey().getName();

        logger.info(jobName + " has been scheduled");

    }

    @Override

    public void jobUnscheduled(TriggerKey triggerKey) {

        logger.info(triggerKey + " is being unscheduled");

    }

    @Override

    public void triggerFinalized(Trigger trigger) {

        logger.info("Trigger is finished for " + trigger.getJobKey().getName());

    }

    @Override

    public void triggerPaused(TriggerKey triggerKey) {

        logger.info(triggerKey + " is being paused");

    }

    @Override

    public void triggersPaused(String triggerGroup) {

        logger.info("trigger group "+triggerGroup + " is being paused");

    }

    @Override

    public void triggerResumed(TriggerKey triggerKey) {

        logger.info(triggerKey + " is being resumed");

    }

    @Override

    public void triggersResumed(String triggerGroup) {

        logger.info("trigger group "+triggerGroup + " is being resumed");

    }

    @Override

    public void jobAdded(JobDetail jobDetail) {

        logger.info(jobDetail.getKey()+" is added");

    }

    @Override

    public void jobDeleted(JobKey jobKey) {

        logger.info(jobKey+" is deleted");

    }

    @Override

    public void jobPaused(JobKey jobKey) {

        logger.info(jobKey+" is paused");

    }

    @Override

    public void jobsPaused(String jobGroup) {

        logger.info("job group "+jobGroup+" is paused");

    }

    @Override

    public void jobResumed(JobKey jobKey) {

        logger.info(jobKey+" is resumed");

    }

    @Override

    public void jobsResumed(String jobGroup) {

        logger.info("job group "+jobGroup+" is resumed");

    }

    @Override

    public void schedulerError(String msg, SchedulerException cause) {

        logger.error(msg, cause.getUnderlyingException());

    }

    @Override

    public void schedulerInStandbyMode() {

        logger.info("scheduler is in standby mode");

    }

    @Override

    public void schedulerStarted() {

        logger.info("scheduler has been started");

    }

    @Override

    public void schedulerStarting() {

        logger.info("scheduler is being started");

    }

    @Override

    public void schedulerShutdown() {

        logger.info("scheduler has been shutdown");

    }

    @Override

    public void schedulerShuttingdown() {

        logger.info("scheduler is being shutdown");

    }

    @Override

    public void schedulingDataCleared() {

        logger.info("scheduler has cleared all data");

    }

}

MonitorTriggerListener.java

public class MonitorTriggerListener extends TriggerListenerSupport {

    private static Logger log = LoggerFactory.getLogger(MonitorTriggerListener.class);

    @Override

    public String getName() {

        // TODO Auto-generated method stub

        return getClass().getSimpleName();

    }

    @Override

    public void triggerFired(Trigger trigger, JobExecutionContext jobExecutionContext) {

        //Trigger 被触发了,此时job上的execute()方法将要被执行

    }

    @Override

    public boolean vetoJobExecution(Trigger trigger, JobExecutionContext jobExecutionContext) {

        //trigger被触发后,job将要被执行时Scheduler调用该方法,如返回true则job此次将不被执行

        return false;

    }

    @Override

    public void triggerMisfired(Trigger trigger) {

        log.info("当前任务为错过任务" );

        trigger.getJobDataMap().put("isMisfire",true);

    }

    @Override

    public void triggerComplete(Trigger trigger, JobExecutionContext jobExecutionContext, Trigger.CompletedExecutionInstruction completedExecutionInstruction) {

        log.info("Trigger被触发并且完成了job的执行,此方法被调用"+trigger.getJobKey().getName());

    }

}

MonitorJobListener.java

public class MonitorJobListener extends JobListenerSupport {

    private final static Logger log = LoggerFactory.getLogger(MonitorJobListener.class);

    @Override

    public String getName() {

        return getClass().getSimpleName();

    }

    /**

    * (1)

    * 任务执行之前执行

    * Called by the Scheduler when a JobDetail is about to be executed (an associated Trigger has occurred).

    */

    @Override

    public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {

        log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"->开始任务:" + jobExecutionContext.getJobDetail().getJobClass().getName());

        log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()));

        Long now = System.currentTimeMillis();

        Long jobfire = jobExecutionContext.getScheduledFireTime().getTime();

        log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(jobExecutionContext.getScheduledFireTime().getTime()));

        if (now - 10000L> jobfire){

            log.info("当前任务为错过任务" );

            jobExecutionContext.getTrigger().getJobDataMap().put("isMisfire",true);

        }else {

            jobExecutionContext.getTrigger().getJobDataMap().put("isMisfire",false);

        }

        jobExecutionContext.getTrigger().getJobDataMap().put("now",now);

    }

    /**

    * (2)

    * 这个方法正常情况下不执行,但是如果当TriggerListener中的vetoJobExecution方法返回true时,那么执行这个方法.

    * 需要注意的是 如果方法(2)执行 那么(1),(3)这个俩个方法不会执行,因为任务被终止了嘛.

    * Called by the Scheduler when a JobDetail was about to be executed (an associated Trigger has occurred),

    * but a TriggerListener vetoed it's execution.

    */

    @Override

    public void jobExecutionVetoed(JobExecutionContext jobExecutionContext) {

        log.info("========这个方法正常情况下不执行,但是如果当TriggerListener中的vetoJobExecution方法返回true时,那么执行这个方法.==========" + jobExecutionContext.getJobDetail().getJobClass().getName());

    }

    /**

    * (3)

    * 任务执行完成后执行,jobException如果它不为空则说明任务在执行过程中出现了异常

    * Called by the Scheduler after a JobDetail has been executed, and be for the associated Trigger's triggered(xx) method has been called.

    */

    @Override

    public void jobWasExecuted(JobExecutionContext jobExecutionContext, JobExecutionException e) {

        //log.info("========任务执行完成后执行,jobException如果它不为空则说明任务在执行过程中出现了异常==========" + getName());

        log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"->结束任务任务:" + getName());

        Long now = (Long) jobExecutionContext.getTrigger().getJobDataMap().get("now");

        System.currentTimeMillis();

        JobBo jobService = SpringContextHelp.getBean(JobBo.class);

        Long jobfire = jobExecutionContext.getScheduledFireTime().getTime();

        String ip_address =  Constants.getApplicationValue("applicationContext","servlet.ip.address");

        Calendar cr = Calendar.getInstance();

        cr.setTimeInMillis(now);

        Date d1= cr.getTime();

        cr.setTimeInMillis(jobfire);

        Date d2 = cr.getTime();

        Boolean isMisfire = (Boolean) jobExecutionContext.getTrigger().getJobDataMap().get("isMisfire") == null? false:(Boolean) jobExecutionContext.getTrigger().getJobDataMap().get("isMisfire");

        try {

            QrtzTaskDetailVo vo = new  QrtzTaskDetailVo(jobExecutionContext.getScheduler().getSchedulerName(),

                    jobExecutionContext.getJobDetail().getKey().getName(),

                    jobExecutionContext.getJobDetail().getKey().getGroup(),

                    jobExecutionContext.getTrigger().getKey().getName(),

                    jobExecutionContext.getTrigger().getKey().getGroup(),

                    d2,

                    d1,isMisfire,ip_address);

            if (e != null){

                log.info(e.getMessage());

              vo.setError_detail(e.getMessage());

              vo.setFinish_status(2);

            }else {

                vo.setFinish_status(1);

            }

            jobService.saveQrtzTaskDetail(vo);

        } catch (SchedulerException s) {

            s.printStackTrace();

        }

    }

}

6.Quartz 集群


注意事项

时间同步问题

  Quartz实际并不关心你是在相同还是不同的机器上运行节点。当集群放置在不同的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是无法接受的,因为一旦机器崩溃了,所有的节点也就被终止了。对于水平集群,存在着时间同步问题。

  节点用时间戳来通知其他实例它自己的最后检入时间。假如节点的时钟被设置为将来的时间,那么运行中的Scheduler将再也意识不到那个结点已经宕掉了。另一方面,如果某个节点的时钟被设置为过去的时间,也许另一节点就会认定那个节点已宕掉并试图接过它的Job重运行。最简单的同步计算机时钟的方式是使用某一个Internet时间服务器(Internet Time Server ITS)。

节点争抢Job问题

  因为Quartz使用了一个随机的负载均衡算法, Job以随机的方式由不同的实例执行。Quartz官网上提到当前,还不存在一个方法来指派(钉住) 一个 Job 到集群中特定的节点。

上一篇 下一篇

猜你喜欢

热点阅读