quartz源码2-初始化

2019-01-11  本文已影响0人  modou1618

一 StdSchedulerFactory.getScheduler

1.1 配置获取

key 说明
org.quartz.scheduler.instanceName 实例名称
org.quartz.scheduler.threadName 线程名称
org.quartz.scheduler.instanceId 集群实例id生成方式
org.quartz.scheduler.rmi.* rmi配置
org.quartz.scheduler.jmx.* jmx配置
org.quartz.jobStore.* 持久化配置
org.quartz.threadPool.* 线程池配置
org.quartz.scheduler.jobFactory.* job工厂配置
other 其他调度配置

1.2 实例id生成器InstanceIdGenerator

方式
SimpleInstanceIdGenerator return InetAddress.getLocalHost().getHostName() + System.currentTimeMillis();
HostnameInstanceIdGenerator return InetAddress.getLocalHost().getHostName();
SystemPropertyInstanceIdGenerator prepend前置值+系统配置+postpend后置值

1.3 job工厂JobFactory

1.3.1 SimpleJobFactory

1.3.2 PropertySettingJobFactory

JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.putAll(scheduler.getContext());
        jobDataMap.putAll(bundle.getJobDetail().getJobDataMap());
        jobDataMap.putAll(bundle.getTrigger().getJobDataMap());

1.4 线程池ThreadPool

    private int count = -1;
    private List<WorkerThread> workers;
    private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
    private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

1.5 持久化JobStore

1.5.1 RAMJobStore

1.5.2 JobStoreCMT

1.5.3 JobStoreTX

1.5.4 访问锁Semaphore

1.5.4.1 SimpleSemaphore

ThreadLocal<HashSet<String>> lockOwners = new ThreadLocal<HashSet<String>>();
HashSet<String> locks = new HashSet<String>();

1.5.4.3 DBSemaphore

1.5.5 db连接池ConnectionProvider

支持用户配置连接池,也可使用quartz默认的连接池

1.5.6 db连接池代理DBConnectionManager

代理多个连接池,根据池名称区分

1.6 线程执行器ThreadExecutor

JobStore中对应线程执行的线程池

1.7 插件SchedulerPlugin

1.8 JobListener和TriggerListener

Method  nameSetter = listener.getClass().getMethod("setName", strArg);
nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );

1.9 JobRunShellFactory

1.10 QuartzSchedulerResources

1.11 QuartzScheduler

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);
        
        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }

1.12 StdScheduler

1.13 添加引用,避免gc清除对象

   // prevents the repository from being garbage collected
   qs.addNoGCObject(schedRep);
   // prevents the db manager from being garbage collected
   if (dbMgr != null) {
        qs.addNoGCObject(dbMgr);
   }

二 调度线程初始化

public void start() throws SchedulerException {

        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }

        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
        notifySchedulerListenersStarting();

        if (initialStart == null) {
            initialStart = new Date();
            this.resources.getJobStore().schedulerStarted();            
            startPlugins();
        } else {
            resources.getJobStore().schedulerResumed();
        }

        schedThread.togglePause(false);

        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");
        
        notifySchedulerListenersStarted();
    }
private boolean signaled;
private long signaledNextFireTime;
// check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }

                        // reset failure counter when paused, so that we don't
                        // wait again after unpausing
                        acquiresFailed = 0;
                    }

                    if (halted.get()) {
                        break;
                    }
                }

三 集群管理线程初始化

public void run() {
            while (!shutdown) {

                if (!shutdown) {
                    long timeToSleep = getClusterCheckinInterval();
                    long transpiredTime = (System.currentTimeMillis() - lastCheckin);
                    timeToSleep = timeToSleep - transpiredTime;
                    if (timeToSleep <= 0) {
                        timeToSleep = 100L;
                    }

                    if(numFails > 0) {
                        timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                    }
                    
                    try {
                        Thread.sleep(timeToSleep);
                    } catch (Exception ignore) {
                    }
                }

                if (!shutdown && this.manage()) {
                    signalSchedulingChangeImmediately(0L);
                }

            }//while !shutdown
        }

四 误点管理线程初始化

public void run() {
            
            while (!shutdown) {

                long sTime = System.currentTimeMillis();

                RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();

                if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
                    signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
                }

                if (!shutdown) {
                    long timeToSleep = 50l;  // At least a short pause to help balance threads
                    if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                        timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                        if (timeToSleep <= 0) {
                            timeToSleep = 50l;
                        }

                        if(numFails > 0) {
                            timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                        }
                    }
                    
                    try {
                        Thread.sleep(timeToSleep);
                    } catch (Exception ignore) {
                    }
                }//while !shutdown
            }
        }
上一篇 下一篇

猜你喜欢

热点阅读