Quartz JobRunShell实现
2016-12-13 本文已影响0人
wangqiaoshi
Class Diagram.png
quartz中,Job接口是要实现execute方法的.JobRunShell run方法会实例化job实现类,然后调用execute.run方法中也会有捕获异常,以及捕获异常如何处理的机制.
那么JobRunShell是通过JobExecutionException来判断是要重试执行,还是不在重试.
JobRunShell run实现
qs.addInternalSchedulerListener(this);
try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();
do {
JobExecutionException jobExEx = null;
Job job = jec.getJobInstance();
try {
begin();
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't begin execution.", se);
break;
}
// notify job & trigger listeners...
try {
if (!notifyListenersBeginning(jec)) {
break;
}
} catch(VetoedException ve) {
try {
CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);
// QTZ-205
// Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not.
if (jec.getTrigger().getNextFireTime() == null) {
qs.notifySchedulerListenersFinalized(jec.getTrigger());
}
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error during veto of Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
break;
}
long startTime = System.currentTimeMillis();
long endTime = startTime;
// execute the job
try {
log.debug("Calling execute on job " + jobDetail.getKey());
job.execute(jec);//执行jobexecute
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {//如果execute抛出异常,并且是JobExecutionException,JobExecutionException会保存着是重试,还是结束的信息
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getKey() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) { //execute抛出异常
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getKey() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getKey()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
}
jec.setJobRunTime(endTime - startTime);
// notify all job listeners
if (!notifyJobListenersComplete(jec, jobExEx)) {
break;
}
CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
// update the trigger
try {
instCode = trigger.executionComplete(jec, jobExEx);
} catch (Exception e) {
// If this happens, there's a bug in the trigger...
SchedulerException se = new SchedulerException(
"Trigger threw an unhandled exception.", e);
qs.notifySchedulerListenersError(
"Please report this error to the Quartz developers.",
se);
}
// notify all trigger listeners
if (!notifyTriggerListenersComplete(jec, instCode)) {
break;
}
// update job/trigger or re-execute job
if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {//如果在execute设置的是重复执行,那么会重新执行run函数
jec.incrementRefireCount();
try {
complete(false);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
continue;
}
try {
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
continue;
}
qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
break;
} while (true);
} finally {
qs.removeInternalSchedulerListener(this);
}
自定义retry次数job实现
class MyRetryJob extends Job{
override def execute(context: JobExecutionContext): Unit = {
val dataMap = context.getMergedJobDataMap
val retry = dataMap.getInt("retry")
var retryCount = dataMap.getOrDefault("retryCount",new Integer(1)).asInstanceOf[Integer]
try{
println("exec "+(retryCount))
retryCount = retryCount+1
dataMap.put("retryCount",retryCount)
throw new Exception("just for test exception")
}catch {
case e:Exception=>
val execError = new JobExecutionException(e)
if(retryCount<=retry){
execError.setRefireImmediately(true)
}
else{
execError.setUnscheduleAllTriggers(true)
}
throw execError
}
}
}