Quartz JobRunShell实现

2016-12-13  本文已影响0人  wangqiaoshi
Class Diagram.png

quartz中,Job接口是要实现execute方法的.JobRunShell run方法会实例化job实现类,然后调用execute.run方法中也会有捕获异常,以及捕获异常如何处理的机制.
那么JobRunShell是通过JobExecutionException来判断是要重试执行,还是不在重试.

JobRunShell run实现

qs.addInternalSchedulerListener(this);
try {
    OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
    JobDetail jobDetail = jec.getJobDetail();
    do {
        JobExecutionException jobExEx = null;
        Job job = jec.getJobInstance();
        try {
            begin();
        } catch (SchedulerException se) {
            qs.notifySchedulerListenersError("Error executing Job ("
                    + jec.getJobDetail().getKey()
                    + ": couldn't begin execution.", se);
            break;
        }
        // notify job & trigger listeners...
        try {
            if (!notifyListenersBeginning(jec)) {
                break;
            }
        } catch(VetoedException ve) {
            try {
                CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
                qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);
                 
                // QTZ-205
                // Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not.
                if (jec.getTrigger().getNextFireTime() == null) {
                    qs.notifySchedulerListenersFinalized(jec.getTrigger());
                }
 
                complete(true);
            } catch (SchedulerException se) {
                qs.notifySchedulerListenersError("Error during veto of Job ("
                        + jec.getJobDetail().getKey()
                        + ": couldn't finalize execution.", se);
            }
            break;
        }
 
        long startTime = System.currentTimeMillis();
        long endTime = startTime;
 
        // execute the job
        try {
            log.debug("Calling execute on job " + jobDetail.getKey());
            job.execute(jec);//执行jobexecute
            endTime = System.currentTimeMillis();
        } catch (JobExecutionException jee) {//如果execute抛出异常,并且是JobExecutionException,JobExecutionException会保存着是重试,还是结束的信息
            endTime = System.currentTimeMillis();
            jobExEx = jee;
            getLog().info("Job " + jobDetail.getKey() +
                    " threw a JobExecutionException: ", jobExEx);
        } catch (Throwable e) { //execute抛出异常
            endTime = System.currentTimeMillis();
            getLog().error("Job " + jobDetail.getKey() +
                    " threw an unhandled Exception: ", e);
            SchedulerException se = new SchedulerException(
                    "Job threw an unhandled exception.", e);
            qs.notifySchedulerListenersError("Job ("
                    + jec.getJobDetail().getKey()
                    + " threw an exception.", se);
            jobExEx = new JobExecutionException(se, false);
        }
        jec.setJobRunTime(endTime - startTime);
        // notify all job listeners
        if (!notifyJobListenersComplete(jec, jobExEx)) {
            break;
        }
        CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
 
        // update the trigger
        try {
            instCode = trigger.executionComplete(jec, jobExEx);
        } catch (Exception e) {
            // If this happens, there's a bug in the trigger...
            SchedulerException se = new SchedulerException(
                    "Trigger threw an unhandled exception.", e);
            qs.notifySchedulerListenersError(
                    "Please report this error to the Quartz developers.",
                    se);
        }
        // notify all trigger listeners
        if (!notifyTriggerListenersComplete(jec, instCode)) {
            break;
        }
        // update job/trigger or re-execute job
        if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {//如果在execute设置的是重复执行,那么会重新执行run函数
            jec.incrementRefireCount();
            try {
                complete(false);
            } catch (SchedulerException se) {
                qs.notifySchedulerListenersError("Error executing Job ("
                        + jec.getJobDetail().getKey()
                        + ": couldn't finalize execution.", se);
            }
            continue;
        }
        try {
            complete(true);
        } catch (SchedulerException se) {
            qs.notifySchedulerListenersError("Error executing Job ("
                    + jec.getJobDetail().getKey()
                    + ": couldn't finalize execution.", se);
            continue;
        }
        qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
        break;
    } while (true);
 
} finally {
    qs.removeInternalSchedulerListener(this);
}

自定义retry次数job实现

class MyRetryJob  extends  Job{
  override def execute(context: JobExecutionContext): Unit = {
    val dataMap = context.getMergedJobDataMap
    val retry = dataMap.getInt("retry")
    var retryCount = dataMap.getOrDefault("retryCount",new Integer(1)).asInstanceOf[Integer]

    try{
        println("exec "+(retryCount))
        retryCount = retryCount+1

        dataMap.put("retryCount",retryCount)
        throw new Exception("just for test exception")
    }catch {
      case e:Exception=>
        val execError = new JobExecutionException(e)
        if(retryCount<=retry){
          execError.setRefireImmediately(true)
        }
        else{
          execError.setUnscheduleAllTriggers(true)
        }
        throw  execError
    }

  }
}
上一篇下一篇

猜你喜欢

热点阅读