Quartz源码阅读

2019-03-26  本文已影响0人  紫色红色黑色

前言

Quartz是Java实现的定时器框架,该文章分析Quartz执行原理,没有涉及用法。调试中使用到多线程调试,可以使用下面设置,将Suspend设置为Thread。另外想了解框架源码时,建议先看文档看资料。框架实现有个简单的思路。顺着你的思路,带着问题去源码里验证。针对一个问题多debug几遍。阅读源码时,建议自上而下的去看。知道这个方法的作用是什么,具体的实现细节后面在看。


多线程调试

下面例子使用SimpleTriggerRAMJobStore

public static void main(String[] args) {
    JobDetail job = newJob(HelloJob.class)
            .withIdentity("job1", "group1")
            .build();

    Trigger trigger = newTrigger()
            .withIdentity("trigger1", "group1")
            .startNow()
            .withSchedule(simpleSchedule().withIntervalInMinutes(2).repeatForever())
            .build();

    StdSchedulerFactory factory = new StdSchedulerFactory();

    try {
        Scheduler scheduler = factory.getScheduler();
        scheduler.scheduleJob(job, trigger);
        scheduler.start();
    } catch (SchedulerException e) {
        e.printStackTrace();
    }

}

执行原理

Quartz中两个核心类QuartzSchedulerThreadSimpleThreadPool。前者是调度线程,主要是获取要执行的任务,将任务传递给后者。后者通过内部维护的工作线程,接受任务并执行。

QuartzSchedulerThread

该类也是Thread子类,重写了run()。在QuartzScheduler中被创建并启动。

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
    throws SchedulerException {
    this.resources = resources;
    if (resources.getJobStore() instanceof JobListener) {
        addInternalJobListener((JobListener)resources.getJobStore());
    }
    //创建调度线程,并启动线程
    this.schedThread = new QuartzSchedulerThread(this, resources);
    ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
    schedThreadExecutor.execute(this.schedThread);
    if (idleWaitTime > 0) {
        this.schedThread.setIdleWaitTime(idleWaitTime);
    }

    jobMgr = new ExecutingJobsManager();
    addInternalJobListener(jobMgr);
    errLogger = new ErrorLogger();
    addInternalSchedulerListener(errLogger);

    signaler = new SchedulerSignalerImpl(this, this.schedThread);
    
    getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}

QuartzSchedulerThreadrun()如下:
1.线程启动时,paused默认为true,halted默认是false,进入循环1;
2.QuartzScheduler调用start()时,委托给QuartzSchedulerThreadtogglePause(false)。该方法将paused设置为false,并唤醒sigLock对象上的监视锁。QuartzSchedulerThread跳出循环1;
3.调用RAMJobStoreacquireNextTriggers(),获取30s内要执行的trigger的List
4.如果List为空,线程进入等待状态,时间最多30s。如果List不为空,并且fire时间和现在时间差大于2ms,线程进入循环2,直到时间差小于1ms,线程跳出循环2.
5.调用RAMJobStoretriggersFired()。遍历入参中的trigger,先根据trigger的key查询triggersByKey中的trigger2,然后计算trigger的nextFireTime,再计算trigger2的nextFireTime。最后trigger返回,trigger2放入treeset中。
6.调用JobRunShellinitialize(),将job、trigger放入JobRunShell的属性JobExecutionContextImpl中。任务的实质就是JobRunShell
7.调用SimpleThreadPoolrunInThread(),将任务传递给线程池中的WorkerThread。工作线程执行任务。

public void run() {
    int acquiresFailed = 0;

    while (!halted.get()) {
        try {
            //循环1,是线程初始状态
            //paused默认是true,halted默认是false
            // check if we're supposed to pause...
            synchronized (sigLock) {
                while (paused && !halted.get()) {
                    try {
                        // wait until togglePause(false) is called...
                        sigLock.wait(1000L);
                    } catch (InterruptedException ignore) {
                    }
                    acquiresFailed = 0;
                }

                if (halted.get()) {
                    break;
                }
            }

            //线程池中可用的工作线程数
            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
            if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                List<OperableTrigger> triggers;
                long now = System.currentTimeMillis();

                try {
                    //RAMJobStore中TreeSet中获取30s内要执行的trigger
                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
            
                } catch (JobPersistenceException jpe) {
                    continue;
                } catch (RuntimeException e) {
                    continue;
                }

                if (triggers != null && !triggers.isEmpty()) {

                    //即将执行的trigger和现在时间的时间差
                    now = System.currentTimeMillis();
                    long triggerTime = triggers.get(0).getNextFireTime().getTime();
                    long timeUntilTrigger = triggerTime - now;
                    
                    //循环2,如果trigger大于2ms
                    while(timeUntilTrigger > 2) {
                        synchronized (sigLock) {
                            if (halted.get()) {
                                break;
                            }
                            if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                try {
                                    // we could have blocked a long while
                                    // on 'synchronize', so we must recompute
                                    now = System.currentTimeMillis();
                                    timeUntilTrigger = triggerTime - now;
                                    //当时间差小于1ms,跳出循环
                                    if(timeUntilTrigger >= 1)
                                        sigLock.wait(timeUntilTrigger);
                                } catch (InterruptedException ignore) {
                                }
                            }
                        }

                        now = System.currentTimeMillis();
                        timeUntilTrigger = triggerTime - now;
                    }

                    // set triggers to 'executing'
                    List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                    boolean goAhead = true;
                    synchronized(sigLock) {
                        goAhead = !halted.get();
                    }
                    if(goAhead) {
                        try {
                            //triggers中trigger的fire时间,封装成job和trigger绑定的对象
                            //从triggersByKey中取出trigger,并计算trigger下一次fire时间,时间不为空时放入treeset中
                            List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                            if(res != null) bndles = res;
                        } catch (SchedulerException se) {
                            qs.notifySchedulerListenersError(
                                    "An error occurred while firing triggers '"
                                            + triggers + "'", se);
                            //QTZ-179 : a problem occurred interacting with the triggers from the db
                            //we release them and loop again
                            for (int i = 0; i < triggers.size(); i++) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                            }
                            continue;
                        }

                    }

                    for (int i = 0; i < bndles.size(); i++) {
                        TriggerFiredResult result =  bndles.get(i);
                        TriggerFiredBundle bndle =  result.getTriggerFiredBundle();

                        JobRunShell shell = null;
                        try {
                            //创建任务shell,JobRunShell实现Runnable
                            shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                            //初始化任务,
                            //生成包含job、scheduler、trigger的JobExecutionContextImpl
                            shell.initialize(qs);
                        } catch (SchedulerException se) {
                            qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            continue;
                        }
                        //将任务传递给SimpleThreadPool
                        if (qsRsrcs.getThreadPool().runInThread(shell) == false) {}

                    }

                    continue; // while (!halted)
                }
            } else { // if(availThreadCount > 0)
                // should never happen, if threadPool.blockForAvailableThreads() follows contract
                continue; // while (!halted)
            }

            //如果没有可执行的trigger,线程最多等待30s
            long now = System.currentTimeMillis();
            long waitTime = now + getRandomizedIdleWaitTime();
            long timeUntilContinue = waitTime - now;
            synchronized(sigLock) {
                try {
                  if(!halted.get()) {
                    // QTZ-336 A job might have been completed in the mean time and we might have
                    // missed the scheduled changed signal by not waiting for the notify() yet
                    // Check that before waiting for too long in case this very job needs to be
                    // scheduled very soon
                    if (!isScheduleChanged()) {
                      sigLock.wait(timeUntilContinue);
                    }
                  }
                } catch (InterruptedException ignore) {}
            }

        } catch(RuntimeException re) {
            getLog().error("Runtime error occurred in main trigger firing loop.", re);
        }
    } // while (!halted)

    // drop references to scheduler stuff to aid garbage collection...
    qs = null;
    qsRsrcs = null;
}

WorkerThread

该类是SimpleThreadPool的内部类。是Thread的子类,run()如下:
1.线程启动,默认执行循环1,如果任务为空就执行循环2;
2.有任务时,线程跳出循环2,执行任务;
3.线程恢复原状,方便下次使用。

public void run() {
    boolean ran = false;
    
    //循环1
    //run是AtomicBoolean,默认是true
    while (run.get()) {
        try {
            synchronized(lock) {
                //循环2,任务为空
                while (runnable == null && run.get()) {
                    lock.wait(500);
                }

                if (runnable != null) {
                    ran = true;
                    //执行任务
                    runnable.run();
                }
            }
        } catch (InterruptedException unblock) {

        } catch (Throwable exceptionInRunnable) {

        } finally {
            //任务执行完,工作线程恢复原状,方便下次使用
            synchronized(lock) {
                runnable = null;
            }
            // repair the thread in case the runnable mucked it up...
            if(getPriority() != tp.getThreadPriority()) {
                setPriority(tp.getThreadPriority());
            }

            if (runOnce) {
                   run.set(false);
                clearFromBusyWorkersList(this);
            } else if(ran) {
                ran = false;
                makeAvailable(this);
            }

        }
    }

    //if (log.isDebugEnabled())
    try {
        getLog().debug("WorkerThread is shut down.");
    } catch(Exception e) {
        // ignore to help with a tomcat glitch
    }
}

JobRunShell

该类实现Runnable,是调度线程传递给工作线程的任务。该类的run()如下:
1.获取job,执行job;
2.重新计算trigger。

public void run() {
    qs.addInternalSchedulerListener(this);

    try {
        OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
        JobDetail jobDetail = jec.getJobDetail();

        do {

            JobExecutionException jobExEx = null;
            //获取job
            Job job = jec.getJobInstance();

            try {
                begin();
            } catch (SchedulerException se) {
                qs.notifySchedulerListenersError("Error executing Job ("
                        + jec.getJobDetail().getKey()
                        + ": couldn't begin execution.", se);
                break;
            }


            long startTime = System.currentTimeMillis();
            long endTime = startTime;

            // execute the job
            try {
                log.debug("Calling execute on job " + jobDetail.getKey());
                //执行任务
                job.execute(jec);
                endTime = System.currentTimeMillis();
            } catch (JobExecutionException jee) {
                endTime = System.currentTimeMillis();
                jobExEx = jee;
                getLog().info("Job " + jobDetail.getKey() +
                        " threw a JobExecutionException: ", jobExEx);
            } catch (Throwable e) {
                endTime = System.currentTimeMillis();
                getLog().error("Job " + jobDetail.getKey() +
                        " threw an unhandled Exception: ", e);
                SchedulerException se = new SchedulerException(
                        "Job threw an unhandled exception.", e);
                qs.notifySchedulerListenersError("Job ("
                        + jec.getJobDetail().getKey()
                        + " threw an exception.", se);
                jobExEx = new JobExecutionException(se, false);
            }

            jec.setJobRunTime(endTime - startTime);

            // notify all job listeners
            if (!notifyJobListenersComplete(jec, jobExEx)) {
                break;
            }

            CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;

            // update the trigger
            try {
                instCode = trigger.executionComplete(jec, jobExEx);
            } catch (Exception e) {
                // If this happens, there's a bug in the trigger...
                SchedulerException se = new SchedulerException(
                        "Trigger threw an unhandled exception.", e);
                qs.notifySchedulerListenersError(
                        "Please report this error to the Quartz developers.",
                        se);
            }

            // notify all trigger listeners
            if (!notifyTriggerListenersComplete(jec, instCode)) {
                break;
            }

            // update job/trigger or re-execute job
            if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
                jec.incrementRefireCount();
                try {
                    complete(false);
                } catch (SchedulerException se) {
                    qs.notifySchedulerListenersError("Error executing Job ("
                            + jec.getJobDetail().getKey()
                            + ": couldn't finalize execution.", se);
                }
                continue;
            }

            try {
                complete(true);
            } catch (SchedulerException se) {
                qs.notifySchedulerListenersError("Error executing Job ("
                        + jec.getJobDetail().getKey()
                        + ": couldn't finalize execution.", se);
                continue;
            }
            //更新trigger
            qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
            break;
        } while (true);

    } finally {
        qs.removeInternalSchedulerListener(this);
    }
}
上一篇 下一篇

猜你喜欢

热点阅读