调度执行--quartz核心线程类

2019-02-25  本文已影响0人  丁钰铭

QuartzSchedulerThread

调度器初始化后构造了核心调度线程,quartz整个作业获取执行均由此线程执行

调度过程:
1.等待线程池内的可用线程(此处如果线程池繁忙的话,会造成调度延迟或者失败)
2.查询本次可执行的触发器,通过now+idleWaitTime确定可以获取的Triggers的时间范围,获取的数量由Math.min(availThreadCount,qsRsrcs.getMaxBatchSize())最小值决定(此处可以通过配置优化,默认的batchSize是1)
3.告诉JobStore现在要触发作业了,并且更新了trigger的触发时间
4.循环触发此次作业,作业方法的调用是通过JobRunShell进行的,将构造出的每一个JobRunShell放入线程池等待调度

public void run(){
    boolean lastAcquireFailed=false;

    while(!halted.get()){
        try{
                 //省略代码…检查调度器是否暂停或停止
        //等待可用线程,此方法阻塞直到线程池有可用线程为止
        int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
        if(availThreadCount>0){
            //willalwaysbetrue,duetosemanticsofblockForAvailableThreads...
        
            List<OperableTrigger> triggers=null;
            
            Long now=System.currentTimeMillis();
            
            clearSignaledSchedulingChange();
            
            try{
                //查询本次执行的触发器。
                //参数(1)下一次间隔时间是now+idleWaitTime(此变量用来控制获取每一批触发器的间隔,设置过小导致频繁查询)
                //参数(2)查询数量为 可用线程数或配置的最大批 两者最小
                triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                now+idleWaitTime,Math.min(availThreadCount,qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());
                lastAcquireFailed = false;
            }catch(JobPersistenceExceptionjpe){
                if(!lastAcquireFailed){
                    qs.notifySchedulerListenersError(
                    "Anerroroccurredwhilescanningforthenexttriggerstofire.",
                    jpe);
                }
                lastAcquireFailed=true;
                continue;
                }catch(RuntimeExceptione){
                if(!lastAcquireFailed){
                    getLog().error("quartzSchedulerThreadLoop:RuntimeException"
                    +e.getMessage(),e);
                }
                lastAcquireFailed=true;
                continue;
            }
    
        if(triggers!=null&&!triggers.isEmpty()){
        
    //省略代码…此处检查调度器是否到执行时间,与系统时间判断,只有时间差小于2毫秒才可继续执行
        if(goAhead){
        try{
            //告诉JobStore现在要触发作业了,并且更新了trigger的触发时间
            List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
            if(res!=null)
                bndles=res;
        }catch(SchedulerExceptionse){
            qs.notifySchedulerListenersError(
            "Anerroroccurredwhilefiringtriggers'"
            +triggers+"'",se);
            //QTZ-179:aproblemoccurredinteractingwiththetriggersfromthedb
            //wereleasethemandloopagain
            for(inti=0;i<triggers.size();i++){
            qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
            }
            continue;
        }
        
        }
    
        //循环触发此次作业
        for(inti=0;i<bndles.size();i++){
            TriggerFiredResult result=bndles.get(i);
            TriggerFiredBundle bndle=result.getTriggerFiredBundle();
            Exception exception=result.getException();
            
            if(exception instanceof RuntimeException){
                getLog().error("RuntimeExceptionwhilefiringtrigger"+triggers.get(i),exception);
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                continue;
            }
        
            //it's possible to get 'null' if the triggers was paused,
            //blocked,or other similar occurrences that prevent it being
            //fired at this time...o rif the scheduler was shutdown(halted)
            if(bndle==null){
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                continue;
            }
        
            JobRunShell shell=null;
            try{
                //构造可执行作业,并初始化,真正执行作业是在这个类
                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                shell.initialize(qs);
            }catch(SchedulerExceptionse){
                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i),bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                continue;
            }
            //放入线程池中 执行作业!!
            if(qsRsrcs.getThreadPool().runInThread(shell)==false){
                //thiscaseshouldneverhappen,asitisindicativeofthe
                //schedulerbeingshutdownorabuginthethreadpoolor
                //athreadpoolbeingusedconcurrently-whichthedocs
                //saynottodo...
                getLog().error("ThreadPool.runInThread()returnfalse!");
                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i),bndle.getJobDetail(),CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
            }
    
        }
        
        continue;//while(!halted)
        }
    }else{//if(availThreadCount>0)
    //shouldneverhappen,ifthreadPool.blockForAvailableThreads()followscontract
    continue;//while(!halted)
    }
    
    longnow=System.currentTimeMillis();
    longwaitTime=now+getRandomizedIdleWaitTime();
    longtimeUntilContinue=waitTime-now;
    synchronized(sigLock){
    try{
    if(!halted.get()){
    //QTZ-336Ajobmighthavebeencompletedinthemeantimeandwemighthave
    //missedthescheduledchangedsignalbynotwaitingforthenotify()yet
    //Checkthatbeforewaitingfortoolongincasethisveryjobneedstobe
    //scheduledverysoon
    if(!isScheduleChanged()){
    sigLock.wait(timeUntilContinue);
    }
    }
    }catch(InterruptedExceptionignore){
    }
    }
    
    }catch(RuntimeExceptionre){
    getLog().error("Runtimeerroroccurredinmaintriggerfiringloop.",re);
    }
    }//while(!halted)
    
    //dropreferencestoschedulerstufftoaidgarbagecollection...
    qs=null;
    qsRsrcs=null;
}
上一篇 下一篇

猜你喜欢

热点阅读