IT阔论我爱编程

Quartz任务调度模块分析

2018-04-08  本文已影响35人  七佰

一 问题背景
公司系统中每天有大量的后台任务需要调度执行,如构建索引、统计报表、周期同步数据等等,要求任务调度系统具备高可用性、负载均衡特性,可以管理并监控任务的执行流程,以保证任务的正确执行。

但是最近 一直出现,到达时间节点,多节点几乎同时跑一个任务的情况。并且,在指定节点运行之后,依旧无效。

二 历史选择实践方案
2.1 Crontab+SQL

每天晚上运行定时任务,通过SQL脚本+crontab方式执行,例如,

#crm
0 2 * * * /xxx/mtcrm/shell/mtcrm_daily_stat.sql    //每天凌晨2:00执行统计
30 7 * * * /xxx/mtcrm/shell/mtcrm_data_fix.sql     //每天早上7:30执行数据修复

该方案存在以下问题:
直接访问数据库,各系统业务接口没有重用。
完成复杂业务需求时,会引入过多中间表。
业务逻辑计算完全依赖SQL,增大数据库压力。
任务失败无法自动恢复。

2.1 Python+SQL

采用python脚本(多数据源)+SQL方式执行,例如,

 def connectCRM():
  return MySQLdb.Connection("host1", "uname", "xxx", "crm", 3306, charset="utf8")

 def connectTemp():
  return MySQLdb.Connection("host1", "uname", "xxx", "temp", 3306, charset="utf8")

该方案存在问题:

直接访问数据,需要理解各系统的数据结构,无法满足动态任务问题,各系统业务接口没有重用。
无负载均衡。
任务失败无法恢复。
在JAVA语言开发中出现异构,且很难统一到自动部署系统中。

2.3 Spring+JDK Timer

该方案使用spring+JDK Timer方式,调用接口完成定时任务,在分布式部署环境下,防止多个节点同时运行任务,需要写死host,控制在一台服务器上执行task。

<bean id="accountStatusTaskScanner"  class="xxx.crm.service.impl.AccountStatusTaskScanner" />
    <task:scheduler id="taskScheduler" pool-size="5" />
    <task:scheduled-tasks scheduler="taskScheduler">
    <task:scheduled ref="accountStatusTaskScanner" method="execute" cron="0 0 1 * * ?" />
</task:scheduled-tasks>

该方案较方案1,2有很大改进,但仍存在以下问题:

步骤复杂、分散,任务量增大的情况下,很难扩展
使用写死服务器Host的方式执行task,存在单点风险,负载均衡手动完成。
应用重启,任务无法自动恢复。
CRM系统定时任务走过了很多弯路:定时任务多种实现方式,使配置和代码分散在多处,难以维护和监控;任务执行过程没有保证,没有错误恢复;任务执 行异常没有反馈(邮件);没有集群支持、负载均衡。CRM系统需要分布式的任务调度框架,统一解决问题,Java可以使用的任务调度框架有 Quartz,Jcrontab,cron4j,我们选择了Quartz。

3 使用Quartz
Quart是一个具有丰富功能的,开源的任务管理库,可以与任何Java应用进行组合使用。

运行环境
Quartz can run embedded within another free standing application
Quartz can be instantiated within an application server (or servlet container), and participate in XA transactions
Quartz can run as a stand-alone program (within its own Java Virtual Machine), to be used via RMI
Quartz can be instantiated as a cluster of stand-alone programs (with load-balance and fail-over capabilities) for the execution of jobs

任务调度
当触发器触发时,任务被有规划的允许。触发器可以被创建以一下形式。
a. 一天中的特定时间
b.一周的某些特定天
c.一月的特定天
not on certain days listed within a registered Calendar (such as business holidays)
repeated a specific number of times
repeated until a specific time/date
repeated indefinitely
repeated with a delay interval
Jobs are given names by their creator and can also be organized into named groups. Triggers may also be given names and placed into groups, in order to easily organize them within the scheduler. Jobs can be added to the scheduler once, but registered with multiple Triggers. Within an enterprise Java environment, Jobs can perform their work as part of a distributed (XA) transaction.

任务执行
Jobs can be any Java class that implements the simple Job interface, leaving infinite possibilities for the work your Jobs can perform.
Job class instances can be instantiated by Quartz, or by your application’s framework.
When a Trigger occurs, the scheduler notifies zero or more Java objects implementing the JobListener and TriggerListener interfaces (listeners can be simple Java objects, or EJBs, or JMS publishers, etc.). These listeners are also notified after the Job has executed.
As Jobs are completed, they return a JobCompletionCode which informs the scheduler of success or failure. The JobCompletionCode can also instruct the scheduler of any actions it should take based on the success/fail code - such as immediate re-execution of the Job.

任务持久层
The design of Quartz includes a JobStore interface that can be implemented to provide various mechanisms for the storage of jobs.
With the use of the included JDBCJobStore, all Jobs and Triggers configured as “non-volatile” are stored in a relational database via JDBC.
With the use of the included RAMJobStore, all Jobs and Triggers are stored in RAM and therefore do not persist between program executions - but this has the advantage of not requiring an external database.
Transactions
Quartz can participate in JTA transactions, via the use of JobStoreCMT (a subclass of JDBCJobStore).
Quartz can manage JTA transactions (begin and commit them) around the execution of a Job, so that the work performed by the Job automatically happens within a JTA transaction.
Clustering
Fail-over.

负载均衡
Quartz’s built-in clustering features rely upon database persistence via JDBCJobStore (described above).
Terracotta extensions to Quartz provide clustering capabilities without the need for a backing database.
Listeners & Plug-Ins
Applications can catch scheduling events to monitor or control job/trigger behavior by implementing one or more listener interfaces.
The Plug-In mechanism can be used add functionality to Quartz, such keeping a history of job executions, or loading job and trigger definitions from a file.
Quartz ships with a number of “factory built” plug-ins and listeners.

Quartz的集群部署


image.png

Quartz集群中的每个节点是一个独立的Quartz应用,它又管理着其他的节点。该集群需要分别对每个节点分别启动或停止,不像应用服务器的集 群,独立的Quartz节点并不与另一个节点或是管理节点通信。Quartz应用是通过数据库表来感知到另一应用。只有使用持久的JobStore才能完 成Quqrtz集群。

Quartz 核心表
Table Name Description
QRTZ_CALENDARS 存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_JOB_DETAILS 存储每一个已配置的Job的详细信息
QRTZ_JOB_LISTENERS 存储有关已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS 存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERS Trigger作为Blob类型存储
QRTZ_TRIGGER_LISTENERS 存储已配置的TriggerListener的信息
QRTZ_TRIGGERS 存储已配置的Trigger的信息

集群源码分析
以下是quartz源码的结构图


image.png

我们先从分析QuartzSchedulerThread开始

/**
     * <p>
     * The main processing loop of the <code>QuartzSchedulerThread</code>.
     * </p>
     */
    @Override
    public void run() {
        int acquiresFailed = 0;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }

                        // reset failure counter when paused, so that we don't
                        // wait again after unpausing
                        acquiresFailed = 0;
                    }

                    if (halted.get()) {
                        break;
                    }
                }

                // wait a bit, if reading from job store is consistently
                // failing (e.g. DB is down or restarting)..
                if (acquiresFailed > 1) {
                    try {
                        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
                        Thread.sleep(delay);
                    } catch (Exception ignore) {
                    }
                }

                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                    List<OperableTrigger> triggers;

                    long now = System.currentTimeMillis();

                    clearSignaledSchedulingChange();
                    try {
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                        if (log.isDebugEnabled())
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if (acquiresFailed == 0) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    } catch (RuntimeException e) {
                        if (acquiresFailed == 0) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    }

                    if (triggers != null && !triggers.isEmpty()) {

                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
                        while(timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    try {
                                        // we could have blocked a long while
                                        // on 'synchronize', so we must recompute
                                        now = System.currentTimeMillis();
                                        timeUntilTrigger = triggerTime - now;
                                        if(timeUntilTrigger >= 1)
                                            sigLock.wait(timeUntilTrigger);
                                    } catch (InterruptedException ignore) {
                                    }
                                }
                            }
                            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                        }

                        // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                        if(triggers.isEmpty())
                            continue;

                        // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occurred while firing triggers '"
                                                + triggers + "'", se);
                                //QTZ-179 : a problem occurred interacting with the triggers from the db
                                //we release them and loop again
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }

                        }

                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
                    }
                } else { // if(availThreadCount > 0)
                    // should never happen, if threadPool.blockForAvailableThreads() follows contract
                    continue; // while (!halted)
                }

                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }

            } catch(RuntimeException re) {
                getLog().error("Runtime error occurred in main trigger firing loop.", re);
            }
        } // while (!halted)

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

参考文献
http://www.icartype.com/?p=140
https://blog.csdn.net/u010648555/article/details/54863144

上一篇下一篇

猜你喜欢

热点阅读