Quartz 源码解读-初识Quartz

04 Quartz-初识Quartz-QuartzSchedul

2019-05-20  本文已影响0人  花神子
quartz

我在03 Quartz-初识Quartz-Scheduler初始化 篇中解说了getScheduler() 的instantiate(),简单的介绍了Scheduler的整个初始化过程,接下来一起了解下之后的流程;

Scheduler使用

getScheduler

public Scheduler getScheduler() throws SchedulerException {
    if (cfg == null) {
        initialize();
    }
    //从缓存中查询获取Schedule任务,任务名称从配置中获取,若无指定,则默认指定QuartzScheduler
    SchedulerRepository schedRep = SchedulerRepository.getInstance();
    Scheduler sched = schedRep.lookup(getSchedulerName());
    //判断若存在且已停止运行,则从缓存中移除
    if (sched != null) {
        if (sched.isShutdown()) {
            schedRep.remove(getSchedulerName());
        } else {
            return sched;
        }
    }
    sched = instantiate();
    return sched;
}

start()

之前在03 Quartz-初识Quartz-Scheduler初始化解说Scheduler 其实就是交互API,里面还有一个QuartzScheduler的实例变量,调用start()方法真正调用的是QuartzScheduler的start()方法。

public void start() throws SchedulerException {
    sched.start(); 
}

QuartzScheduler的start()方法。

public void start() throws SchedulerException {

    if (shuttingDown|| closed) {  throw ( "... 省略异常处理过程;"); }

    // QTZ-212 : calling new schedulerStarting() method on the listeners
    // right after entering start()
    notifySchedulerListenersStarting();
    //如果不为空则进行恢复逻辑
    if (initialStart == null) {
        initialStart = new Date();
        this.resources.getJobStore().schedulerStarted();            
        startPlugins();
    } else {
        resources.getJobStore().schedulerResumed();
    }

    schedThread.togglePause(false);

    getLog().info(
            "Scheduler " + resources.getUniqueIdentifier() + " started.");
    
    notifySchedulerListenersStarted();
}

1.1.3 togglePause()

看下这个方法schedThread.togglePause(false);属于QuartzSchedulerThread 这个方法其实全局就两处进行调用,参数决定是否进行暂停切换。初始化这职位false; 其他只有关闭等逻辑才进行设置true

/**
 * <p>
 * Signals the main processing loop to pause at the next possible point.
 * </p>
 */
void togglePause(boolean pause) {
    synchronized (sigLock) {
        paused = pause;

        if (paused) {
            signalSchedulingChange(0);
        } else {
            sigLock.notifyAll();
        }
    }
}

1.1.4 run()

The main processing loop of the <code>QuartzSchedulerThread</code>.
正如上面所说:

run()方法比较长,只能逐一解说了。

int acquiresFailed = 0;//a
//获取可用线程个数
public int blockForAvailableThreads() {
    synchronized (nextRunnableLock) {
        while ((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
            try {
                nextRunnableLock.wait(500);
            } catch (InterruptedException ignore) {
            }
        }
        //返回可用线程集合的大小
        return availWorkers.size();
    }
}
idleWaitTime:
在调度程序处于空闲状态时,调度程序查询的触发器可用之前等待的时间量(以毫秒为单位),默认是30秒;30秒内没有需要执行的任务,则等待一个随机时间。getRandomizedIdleWaitTime产生一个30秒内随机等待时间。
可通过配置属性org.quartz.scheduler.idleWaitTime设置。

batchTriggerAcquisitionMaxCount:
允许调度程序节点一次获取(用于触发)的触发器的最大数量,默认是1;
可通过org.quartz.scheduler.batchTriggerAcquisitionMaxCount改写。

batchTriggerAcquisitionFireAheadTimeWindow:
时间窗口调节参数
允许触发器在其预定的火灾时间之前被获取和触发的时间(毫秒)的时间量,默认是0;
可通过org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow改写
if (availThreadCount > 0) { // will always be true, due to  semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();

//清除调度改变的信号
clearSignaledSchedulingChange();
try {
    //到JobStore中获取下次被触发的触发器
    triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime,
            Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
    lastAcquireFailed = false;
    if (log.isDebugEnabled())
        log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
    if (!lastAcquireFailed) {
        qs.notifySchedulerListenersError(
                "An error occurred while scanning for the next triggers to fire.", jpe);
    }
    lastAcquireFailed = true;
    continue;
} catch (RuntimeException e) {
    if (!lastAcquireFailed) {
        getLog().error("quartzSchedulerThreadLoop: RuntimeException " + e.getMessage(), e);
    }
    lastAcquireFailed = true;
    continue;
}

if (triggers != null && !triggers.isEmpty()) {
    now = System.currentTimeMillis();
    //这里为什么triggers的第一个对象就是最早需要被执行的? 
    long triggerTime = triggers.get(0).getNextFireTime().getTime();
    long timeUntilTrigger = triggerTime - now;
    //如果第一条下次触发时间大于当前时间则进入等待
    while (timeUntilTrigger > 2) {
        synchronized (sigLock) {
            if (halted.get()) {
                break;
            }
            if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                try {
                    now = System.currentTimeMillis();
                    timeUntilTrigger = triggerTime - now;
                    if (timeUntilTrigger >= 1)
                        sigLock.wait(timeUntilTrigger);
                } catch (InterruptedException ignore) {
                }
            }
        }
        //等待的过程中看看有没有收到调度信号
        if (releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
            break;
        }
        now = System.currentTimeMillis();
        timeUntilTrigger = triggerTime - now;
    }

    // this happens if releaseIfScheduleChangedSignificantly
    // decided to release triggers
    //这个可能在前面等待的时候被清理掉了
    if (triggers.isEmpty())
        continue;
//TriggerFiredResult-->TriggerFiredBundle-->(job, trigger, 一堆time)
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

boolean goAhead = true;
synchronized (sigLock) {
    goAhead = !halted.get();
}
if (goAhead) {
    try {
        // 通知jobStore开始执行了,根据trigger获取其对应的JobDetail, 封装成TriggerFiredResult
        List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
        if (res != null)
            bndles = res;
    } catch (SchedulerException se) {
        qs.notifySchedulerListenersError(
                "An error occurred while firing triggers '" + triggers + "'", se);
        // QTZ-179 : a problem occurred interacting with
        // the triggers from the db
        // we release them and loop again
        for (int i = 0; i < triggers.size(); i++) {
            qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
        }
        continue;
    }

}

for (int i = 0; i < bndles.size(); i++) {
    TriggerFiredResult result = bndles.get(i);
    TriggerFiredBundle bndle = result.getTriggerFiredBundle();
    Exception exception = result.getException();

    if (exception instanceof RuntimeException) {
        getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
        qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
        continue;
    }

    // it's possible to get 'null' if the triggers was
    // paused,
    // blocked, or other similar occurrences that
    // prevent it being
    // fired at this time... or if the scheduler was
    // shutdown (halted)
    if (bndle == null) {
        qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
        continue;
    }

    // 下面是开始执行任务
    JobRunShell shell = null;
    try {
        //构造执行对象,JobRunShell实现了Runnable
        shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
        //这个里面会用我们自定义的Job来new一个对象,并把相关执行Job是需要的数据传给JobExecutionContextImpl(这是我们自定义job的execute方法参数)
        shell.initialize(qs);
    } catch (SchedulerException se) {
        qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),
                CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
        continue;
    }
// 这里是把任务放入到线程池中
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
    // this case should never happen, as it is
    // indicative of the
    // scheduler being shutdown or a bug in the
    // thread pool or
    // a thread pool being used concurrently - which
    // the docs
    // say not to do...
    getLog().error("ThreadPool.runInThread() return false!");
        //放到线程池失败后,通知jobStore完成
    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(),
            CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

public boolean runInThread(Runnable runnable) {
    if (runnable == null) {
        return false;
    }
    synchronized (nextRunnableLock) {
        handoffPending = true;
        // Wait until a worker thread is available
        while ((availWorkers.size() < 1) && !isShutdown) {
            try {
                nextRunnableLock.wait(500);
            } catch (InterruptedException ignore) {
            }
        }
        if (!isShutdown) {
            //到可用线程集合里取出第一个
            WorkerThread wt = (WorkerThread) availWorkers.removeFirst();
            //放到该正在使用线程池中
            busyWorkers.add(wt);
            //运行任务
            wt.run(runnable);
        } else {
            // If the thread pool is going down, execute the Runnable
            // within a new additional worker thread (no thread from the
            // pool).
            //如果线程池关闭了则创建一个额外的线程
            WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio,
                    isMakeThreadsDaemons(), runnable);
            busyWorkers.add(wt);
            workers.add(wt);
            wt.start();
        }
        nextRunnableLock.notifyAll();
        handoffPending = false;
    }

    return true;
}

执行任务的是WorkerThread线程,把任务传给WorkerThread,它会调用任务的run方法,这里即是调用JobRunShell的run方法, JobRunShell 的run方法最终就是调自定义Job的execute方法。

//添加任务
public void run(Runnable newRunnable) {
    synchronized (lock) {
        if (runnable != null) {
            throw new IllegalStateException("Already running a Runnable!");
        }

        runnable = newRunnable;
        lock.notifyAll();
    }
}
/**
 * <p>
 * Loop, executing targets as they are received.
 * </p>
 */
@Override
public void run() {
    boolean ran = false;
    while (run.get()) {
        try {
            synchronized (lock) {
                while (runnable == null && run.get()) {
                    lock.wait(500);
                }

                if (runnable != null) {
                    ran = true;
                    //调用任务的run方法
                    runnable.run();
                }
            }
        } 
        .....
    }
}
上一篇 下一篇

猜你喜欢

热点阅读