Springboot Quartz 任务调度框架
1.maven依赖
maven依赖
2. Quartz 核心概念
1.Job 表示一个工作,要执行的具体内容。
2.JobDetail 表示一个具体的可执行的调度程序,Job 是这个可执行程调度程序所要执行的内容,另外 JobDetail 还包含了这个任务调度的方案和策略。
3.Trigger 代表一个调度参数的配置,什么时候去调。
4.Scheduler 代表一个调度容器,一个调度容器中可以注册多个 JobDetail 和 Trigger。当 Trigger 与 JobDetail 组合,就可以被 Scheduler 容器调度了。
jobDetail
Scheduler 通过jobDetail识别将要调用的job的类型,来调用job工作
我们传给scheduler一个JobDetail实例,因为我们在创建JobDetail时,将要执行的job的类名传给了JobDetail,所以scheduler就知道了要执行何种类型的job;每次当scheduler执行job时,在调用其execute(…)方法之前会创建该类的一个新的实例;执行完毕,对该实例的引用就被丢弃了,实例会被垃圾回收;这种执行策略带来的一个后果是,job必须有一个无参的构造函数(当使用默认的JobFactory时);另一个后果是,在job类中,不应该定义有状态的数据属性,因为在job的多次执行中,这些属性的值不会保留。
如果需要有状态的数据属性时,需要用JobDataMap,这个就相当于Map,可以将数据传入job中
JobDataMap
JobDataMap中可以包含不限量的(序列化的)数据对象,在job实例执行的时候,可以使用其中的数据;JobDataMap是Java Map接口的一个实现,额外增加了一些便于存取基本类型的数据的方法。
将job加入到scheduler之前,在构建JobDetail时,可以将数据放入JobDataMap
Trigger 触发器
Job中包含了任务执行的逻辑,Scheduler负责扫描需要执行的Job任务,trigger则定义job何时被执行。
下面对触发器的子类分别进行讲述,我们主要看Trigger的4个可用的派生类,分别是:
org.quartz.SimpleTrigger
org.quartz.CronTrigger (常用)
org.quartz.DateIntervalTrigger
org.quartz.NthIncludedDayTrigger
SimpleTrigger
SimpleTrigger是一种设置和使用简单的触发器,它是在指定日期/时间且可能需要重复执行n次的时机下使用的。这种触发器不适合每天定时执行任务这种场景
它适合的任务类似于:9:00 开始,每隔1小时,每隔几分钟,每隔几秒钟执行一次。
CronTrigger
类似于SimpleTrigger,指定从某一个时间开始,以一定的时间间隔执行的任务。 但是不同的是SimpleTrigger指定的时间间隔为毫秒,没办法指定每隔一个月执行一次(每月的时间间隔不是固定值),而CalendarIntervalTrigger支持的间隔单位有秒,分钟,小时,天,月,年,星期。
它适合的任务类似于:9:00 开始执行,并且以后每周 9:00 执行一次
Cron表达式的格式:秒 分 时 日 月 周 年(可选)
DailyTimeIntervalTrigger
指定每天的某个时间段内,以一定的时间间隔执行任务。并且它可以支持指定星期。
它适合的任务类似于:指定每天9:00 至 18:00 ,每隔70秒执行一次,并且只要周一至周五执行。
NthIncludedDayTrigger
NthIncludedDayTrigger适用于在每一个间隔类型(月或年等)的第N天触发。直接上代码:
它适合的任务类似于:每个月的第10天中午12点整触发
3.代码实现
job 继承QuartzJobBean实现executeInternal方法
4.misfired 策略
对于判定为misfired job,其实有很多条件,目前了解到的有:
1.到执行时间时,上一个任务还未完成;
2.过期时间已超过设置的misfireThreshold;
3.线程池中已没有空闲线程
4.线程池中虽有空闲线程,但有优先级更高的任务
5.服务器停止运行
产生misfire后,会根据设置的misfire策略进行任务的处理。
对CronTrigger来说,有三种misfire策略:
withMisfireHandlingInstructionDoNothing
不触发立即执行,等待下次Cron触发频率到达时刻开始按照Cron频率依次执行
withMisfireHandlingInstructionFireAndProceed
以错过的第一个频率时间立刻开始执行,重做错过的所有频率周期后,当下一次触发频率发生时间大于当前时间后,再按照正常的Cron频率依次执行
withMisfireHandlingInstructionIgnoreMisfires
以当前时间为触发频率立刻触发一次执行,并发执行一次做完,然后按照Cron频率依次执行
5.job、trigger、scheduler监听器
由于quartz没有实现对任务执行情况做记录,所以主要用监听器来实现记录任务的执行情况,记录任务是否被错过,任务具体什么时候被执行,执行过程中是否抛出异常等。
监听器的执行顺序:
SimpleSchedulerListener.java
public class SimpleSchedulerListener extends SchedulerListenerSupport {
private static Logger logger = LoggerFactory.getLogger(SimpleSchedulerListener.class);
@Override
public void jobScheduled(Trigger trigger) {
String jobName = trigger.getJobKey().getName();
logger.info(jobName + " has been scheduled");
}
@Override
public void jobUnscheduled(TriggerKey triggerKey) {
logger.info(triggerKey + " is being unscheduled");
}
@Override
public void triggerFinalized(Trigger trigger) {
logger.info("Trigger is finished for " + trigger.getJobKey().getName());
}
@Override
public void triggerPaused(TriggerKey triggerKey) {
logger.info(triggerKey + " is being paused");
}
@Override
public void triggersPaused(String triggerGroup) {
logger.info("trigger group "+triggerGroup + " is being paused");
}
@Override
public void triggerResumed(TriggerKey triggerKey) {
logger.info(triggerKey + " is being resumed");
}
@Override
public void triggersResumed(String triggerGroup) {
logger.info("trigger group "+triggerGroup + " is being resumed");
}
@Override
public void jobAdded(JobDetail jobDetail) {
logger.info(jobDetail.getKey()+" is added");
}
@Override
public void jobDeleted(JobKey jobKey) {
logger.info(jobKey+" is deleted");
}
@Override
public void jobPaused(JobKey jobKey) {
logger.info(jobKey+" is paused");
}
@Override
public void jobsPaused(String jobGroup) {
logger.info("job group "+jobGroup+" is paused");
}
@Override
public void jobResumed(JobKey jobKey) {
logger.info(jobKey+" is resumed");
}
@Override
public void jobsResumed(String jobGroup) {
logger.info("job group "+jobGroup+" is resumed");
}
@Override
public void schedulerError(String msg, SchedulerException cause) {
logger.error(msg, cause.getUnderlyingException());
}
@Override
public void schedulerInStandbyMode() {
logger.info("scheduler is in standby mode");
}
@Override
public void schedulerStarted() {
logger.info("scheduler has been started");
}
@Override
public void schedulerStarting() {
logger.info("scheduler is being started");
}
@Override
public void schedulerShutdown() {
logger.info("scheduler has been shutdown");
}
@Override
public void schedulerShuttingdown() {
logger.info("scheduler is being shutdown");
}
@Override
public void schedulingDataCleared() {
logger.info("scheduler has cleared all data");
}
}
MonitorTriggerListener.java
public class MonitorTriggerListener extends TriggerListenerSupport {
private static Logger log = LoggerFactory.getLogger(MonitorTriggerListener.class);
@Override
public String getName() {
// TODO Auto-generated method stub
return getClass().getSimpleName();
}
@Override
public void triggerFired(Trigger trigger, JobExecutionContext jobExecutionContext) {
//Trigger 被触发了,此时job上的execute()方法将要被执行
}
@Override
public boolean vetoJobExecution(Trigger trigger, JobExecutionContext jobExecutionContext) {
//trigger被触发后,job将要被执行时Scheduler调用该方法,如返回true则job此次将不被执行
return false;
}
@Override
public void triggerMisfired(Trigger trigger) {
log.info("当前任务为错过任务" );
trigger.getJobDataMap().put("isMisfire",true);
}
@Override
public void triggerComplete(Trigger trigger, JobExecutionContext jobExecutionContext, Trigger.CompletedExecutionInstruction completedExecutionInstruction) {
log.info("Trigger被触发并且完成了job的执行,此方法被调用"+trigger.getJobKey().getName());
}
}
MonitorJobListener.java
public class MonitorJobListener extends JobListenerSupport {
private final static Logger log = LoggerFactory.getLogger(MonitorJobListener.class);
@Override
public String getName() {
return getClass().getSimpleName();
}
/**
* (1)
* 任务执行之前执行
* Called by the Scheduler when a JobDetail is about to be executed (an associated Trigger has occurred).
*/
@Override
public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {
log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"->开始任务:" + jobExecutionContext.getJobDetail().getJobClass().getName());
log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()));
Long now = System.currentTimeMillis();
Long jobfire = jobExecutionContext.getScheduledFireTime().getTime();
log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(jobExecutionContext.getScheduledFireTime().getTime()));
if (now - 10000L> jobfire){
log.info("当前任务为错过任务" );
jobExecutionContext.getTrigger().getJobDataMap().put("isMisfire",true);
}else {
jobExecutionContext.getTrigger().getJobDataMap().put("isMisfire",false);
}
jobExecutionContext.getTrigger().getJobDataMap().put("now",now);
}
/**
* (2)
* 这个方法正常情况下不执行,但是如果当TriggerListener中的vetoJobExecution方法返回true时,那么执行这个方法.
* 需要注意的是 如果方法(2)执行 那么(1),(3)这个俩个方法不会执行,因为任务被终止了嘛.
* Called by the Scheduler when a JobDetail was about to be executed (an associated Trigger has occurred),
* but a TriggerListener vetoed it's execution.
*/
@Override
public void jobExecutionVetoed(JobExecutionContext jobExecutionContext) {
log.info("========这个方法正常情况下不执行,但是如果当TriggerListener中的vetoJobExecution方法返回true时,那么执行这个方法.==========" + jobExecutionContext.getJobDetail().getJobClass().getName());
}
/**
* (3)
* 任务执行完成后执行,jobException如果它不为空则说明任务在执行过程中出现了异常
* Called by the Scheduler after a JobDetail has been executed, and be for the associated Trigger's triggered(xx) method has been called.
*/
@Override
public void jobWasExecuted(JobExecutionContext jobExecutionContext, JobExecutionException e) {
//log.info("========任务执行完成后执行,jobException如果它不为空则说明任务在执行过程中出现了异常==========" + getName());
log.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"->结束任务任务:" + getName());
Long now = (Long) jobExecutionContext.getTrigger().getJobDataMap().get("now");
System.currentTimeMillis();
JobBo jobService = SpringContextHelp.getBean(JobBo.class);
Long jobfire = jobExecutionContext.getScheduledFireTime().getTime();
String ip_address = Constants.getApplicationValue("applicationContext","servlet.ip.address");
Calendar cr = Calendar.getInstance();
cr.setTimeInMillis(now);
Date d1= cr.getTime();
cr.setTimeInMillis(jobfire);
Date d2 = cr.getTime();
Boolean isMisfire = (Boolean) jobExecutionContext.getTrigger().getJobDataMap().get("isMisfire") == null? false:(Boolean) jobExecutionContext.getTrigger().getJobDataMap().get("isMisfire");
try {
QrtzTaskDetailVo vo = new QrtzTaskDetailVo(jobExecutionContext.getScheduler().getSchedulerName(),
jobExecutionContext.getJobDetail().getKey().getName(),
jobExecutionContext.getJobDetail().getKey().getGroup(),
jobExecutionContext.getTrigger().getKey().getName(),
jobExecutionContext.getTrigger().getKey().getGroup(),
d2,
d1,isMisfire,ip_address);
if (e != null){
log.info(e.getMessage());
vo.setError_detail(e.getMessage());
vo.setFinish_status(2);
}else {
vo.setFinish_status(1);
}
jobService.saveQrtzTaskDetail(vo);
} catch (SchedulerException s) {
s.printStackTrace();
}
}
}
6.Quartz 集群
注意事项
时间同步问题
Quartz实际并不关心你是在相同还是不同的机器上运行节点。当集群放置在不同的机器上时,称之为水平集群。节点跑在同一台机器上时,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是无法接受的,因为一旦机器崩溃了,所有的节点也就被终止了。对于水平集群,存在着时间同步问题。
节点用时间戳来通知其他实例它自己的最后检入时间。假如节点的时钟被设置为将来的时间,那么运行中的Scheduler将再也意识不到那个结点已经宕掉了。另一方面,如果某个节点的时钟被设置为过去的时间,也许另一节点就会认定那个节点已宕掉并试图接过它的Job重运行。最简单的同步计算机时钟的方式是使用某一个Internet时间服务器(Internet Time Server ITS)。
节点争抢Job问题
因为Quartz使用了一个随机的负载均衡算法, Job以随机的方式由不同的实例执行。Quartz官网上提到当前,还不存在一个方法来指派(钉住) 一个 Job 到集群中特定的节点。