Quartz 源码解析(四) —— QuartzSchedule

2018-09-03  本文已影响352人  icameisaw

大概内容

scheduler.scheduleJob()

Scheduler使用

StdScheduler

StdScheduler的方法基本上都代理给QuartzScheduler类来处理。

public class StdScheduler implements Scheduler {
    private QuartzScheduler sched;

    public StdScheduler(QuartzScheduler sched) {
        this.sched = sched;
    }

    public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
        throws SchedulerException {
        return sched.scheduleJob(jobDetail, trigger);
    }

    public void start() throws SchedulerException {
        sched.start();
    }

    /**
     * 只有这个方法没有委托给QuartzScheduler
     * 除了getClass(),其他方法在QuartzScheduler都可以拿到
     */
    public SchedulerMetaData getMetaData() {
        return new SchedulerMetaData(getSchedulerName(),
                getSchedulerInstanceId(), getClass(), false, isStarted(),
                isInStandbyMode(), isShutdown(), sched.runningSince(),
                sched.numJobsExecuted(), sched.getJobStoreClass(),
                sched.supportsPersistence(), sched.isClustered(), sched.getThreadPoolClass(),
                sched.getThreadPoolSize(), sched.getVersion());
    }

    // 其他代码

}

QuartzScheduler

Quartz的小心脏,org.quartz.Scheduler接口的间接实现。

public class QuartzScheduler implements RemotableQuartzScheduler {

    // QuartzSchedulerResources对象是通过构造器放进去的
    public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);

        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }

    public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException {
        validateState();

        if (jobDetail == null) {
            throw new SchedulerException("JobDetail cannot be null");
        }
        if (trigger == null) {
            throw new SchedulerException("Trigger cannot be null");
        }
        if (jobDetail.getKey() == null) {
            throw new SchedulerException("Job's key cannot be null");
        }
        if (jobDetail.getJobClass() == null) {
            throw new SchedulerException("Job's class cannot be null");
        }
        // TriggerBuilder.build()会生成一个OperableTrigger实例。
        OperableTrigger trig = (OperableTrigger)trigger;

        if (trigger.getJobKey() == null) {
            trig.setJobKey(jobDetail.getKey());
        } else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
            throw new SchedulerException(
                "Trigger does not reference given job!");
        }

        trig.validate();

        Calendar cal = null;
        if (trigger.getCalendarName() != null) {
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
        }
        // TODO: 解析各种类型的Trigger
        Date ft = trig.computeFirstFireTime(cal);

        if (ft == null) {
            throw new SchedulerException(
                    "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
        }
            // 关键代码就是下面这一行
        resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
        notifySchedulerListenersJobAdded(jobDetail);
        notifySchedulerThread(trigger.getNextFireTime().getTime());
        notifySchedulerListenersSchduled(trigger);

        return ft;
    }

    // 其他代码

}

RAMJobStore

介绍一下RAMJobStore的属性

class RAMJobStore {
    #HashMap<JobKey,JobWrapper> jobsByKey
    #HashMap<TriggerKey,TriggerWrapper> triggersByKey
    #HashMap<String,HashMap<JobKey,JobWrapper>> jobsByGroup
    #HashMap<String,HashMap<TriggerKey,TriggerWrapper>> triggersByGroup
    #TreeSet<TriggerWrapper> timeTriggers
    #HashMap<String,Calendar> calendarsByName
    #Map<JobKey,List<TriggerWrapper>> triggersByJob
    #Object lock
    #HashSet<String> pausedTriggerGroups
    #HashSet<String> pausedJobGroups
    #HashSet<JobKey> blockedJobs
    #long misfireThreshold
    #SchedulerSignaler signaler
    -Logger log
    -{static}AtomicLong ftrCtr
}

下面是RAMJobStore.storeJob()代码解析,storeTrigger()方法的逻辑类似。

/**
 * 内部类JobWrapper是一个包括jobKey和jobDetail的类。
 * 克隆一个新的JobDetail来创建一个JobWrapper,然后维护到jobsByKey和jobsByGroup属性中。
 * 维护HashMap系列对象的时候,通过lock的synchronized代码块来做线程同步
 */
public void storeJob(JobDetail newJob,boolean replaceExisting) throws ObjectAlreadyExistsException {
    JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
    boolean repl = false;

    synchronized (lock) {
        if (jobsByKey.get(jw.key) != null) {
            if (!replaceExisting) {
                throw new ObjectAlreadyExistsException(newJob);
            }
            repl = true;
        }

        if (!repl) {
            // get job group
            HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
            if (grpMap == null) {
                grpMap = new HashMap<JobKey, JobWrapper>(100);
                jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
            }
            // add to jobs by group
            grpMap.put(newJob.getKey(), jw);
            // add to jobs by FQN map
            jobsByKey.put(jw.key, jw);
        } else {
            // update job detail
            JobWrapper orig = jobsByKey.get(jw.key);
            orig.jobDetail = jw.jobDetail; // already cloned
        }
    }
}

scheduler.start()

QuartzScheduler

案例用的是RAMJobStore,其中的schedulerStarted()和schedulerResumed()是空方法,没有代码立即。对于JobStoreSupport,这两个方法是有很多逻辑的,后面的篇章再做解析。

public class QuartzScheduler implements RemotableQuartzScheduler {

    public void start() throws SchedulerException {
        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }

        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
        notifySchedulerListenersStarting();

        if (initialStart == null) {//初始化标识为null,进行初始化操作
            initialStart = new Date();
            // RAMJobStore 啥都不做
            // JobStoreSupport 判断是否集群,恢复Job等
            this.resources.getJobStore().schedulerStarted();           
            startPlugins();
        } else {
            resources.getJobStore().schedulerResumed();// 如果已经初始化过,则恢复jobStore
        }

        schedThread.togglePause(false);// 唤醒所有等待的线程

        getLog().info("Scheduler " + resources.getUniqueIdentifier() + " started.");

        notifySchedulerListenersStarted();
    }

    // 其他代码

}

QuartzSchedulerThread

public class QuartzSchedulerThread extends Thread {

    /**
     * pause为true,发出让主循环暂停的信号,以便线程在下一个可处理的时刻暂停
     * pause为false,唤醒sigLock对象的所有等待队列的线程
     */
    void togglePause(boolean pause) {
        synchronized (sigLock) {
            paused = pause;

            if (paused) {
                signalSchedulingChange(0);
            } else {
                sigLock.notifyAll();
            }
        }
    }

    // 其他代码

}

Listener事件监听

Listener事件监听是观察者模式的一个应用。
QuartzScheduler的scheduleJob()start()方法都有notifyXXX代码逻辑,这些就是JobDetail、Trigger和Scheduler事件监听的代码逻辑。
在《Scheduler的初始化》篇章里面,初始化一个Scheduler,里面有"根据PropertiesParser创建Listeners"的步骤,Listeners就包括JobListener和TriggerListener的List对象。
SchedulerListener不支持配置在quartz.properties里面,初始化Scheduler的过程中没有这一块的代码逻辑。如果要添加一个观察者,那么可以通过StdScheduler.getListenerManager()获取ListenerManager实例,通过它可以拿到所有观察者的引用。

类图

Quartz Listener类图

角色说明

类名 角色
QuartzScheduler Subject
SchedulerListener Observer
JobListener Observer
TriggerListener Observer

代码示例

Subject通知Observer,都是遍历Observer列表,触发相应的通知,实现事件监听的效果。
这里特别说明一下,获取Listeners集合的时候,是通过新建一个不可改变的集合对象来实现。如果是为了避免多线程的读写问题,这和CopyOnWriteList写时复制的做法相反,而且这里读的场景大于写的场景。况且,ListenerManagerImpl的add()方法都做了代码块的synchronized。新建一个不可改变的集合来返回,这么做的目的没有想明白。

public void notifySchedulerListenersJobAdded(JobDetail jobDetail) {
    // build a list of all scheduler listeners that are to be notified...
    List<SchedulerListener> schedListeners = buildSchedulerListenerList();

    // notify all scheduler listeners
    for(SchedulerListener sl: schedListeners) {
        try {
            sl.jobAdded(jobDetail);
        } catch (Exception e) {
            getLog().error(
                    "Error while notifying SchedulerListener of JobAdded.",
                    e);
        }
    }
}

ListenerManagerImpl

public class ListenerManagerImpl implements ListenerManager {
  // 其他代码
  public List<SchedulerListener> getSchedulerListeners() {
      synchronized (schedulerListeners) {
          return java.util.Collections.unmodifiableList(new ArrayList<SchedulerListener>(schedulerListeners));
      }
  }
}

系列文章

上一篇下一篇

猜你喜欢

热点阅读