quartz源码1-spring配置
2019-01-08 本文已影响0人
modou1618
一 任务job
1.1 jobDetail相关
1.1.1 JobDetailFactoryBean
指定继承job接口的执行任务类的方式配置jobdetail,任务执行时通过job接口调用执行任务
属性 | 说明 |
---|---|
String name; | jobDetail名称 |
String group; | jobDetail分组 |
Class<?> jobClass; | jobDetail执行的任务类 |
JobDataMap jobDataMap | jobDetail的持久化数据 |
boolean durability | 是否持久化 |
boolean requestsRecovery | jobDetail是否可恢复 |
String description | jobDetail描述 |
String beanName | jobDetail的bean名称 |
ApplicationContext applicationContext; | bean容器 |
String applicationContextJobDataKey; | job调用参数中bean容器的key值 |
JobDetail jobDetail; | 前述属性初始化的JobDetailImpl实际对象 |
1.1.2 MethodInvokingJobDetailFactoryBean
- 指定目标类,任务执行方法的方式配置jobdetail,任务执行时通过反射调用执行任务
-
prepare()
根据目标类,方法,参数初始化反射方法对象。 - 初始化任务执行的job接口实现类jobClass
-
jdi.getJobDataMap().put("methodInvoker", this);
任务执行时,初始化jobClass的methodInvoker属性
public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {
this.prepare();
String name = this.name != null?this.name:this.beanName;
Class<?> jobClass = this.concurrent?MethodInvokingJobDetailFactoryBean.MethodInvokingJob.class:MethodInvokingJobDetailFactoryBean.StatefulMethodInvokingJob.class;
JobDetailImpl jdi = new JobDetailImpl();
jdi.setName(name);
jdi.setGroup(this.group);
jdi.setJobClass(jobClass);
jdi.setDurability(true);
jdi.getJobDataMap().put("methodInvoker", this);
this.jobDetail = jdi;
this.postProcessJobDetail(this.jobDetail);
}
public void prepare() throws ClassNotFoundException, NoSuchMethodException {
String targetMethod;
if(this.staticMethod != null) {
int lastDotIndex = this.staticMethod.lastIndexOf(46);
if(lastDotIndex == -1 || lastDotIndex == this.staticMethod.length()) {
throw new IllegalArgumentException("staticMethod must be a fully qualified class plus method name: e.g. 'example.MyExampleClass.myExampleMethod'");
}
targetMethod = this.staticMethod.substring(0, lastDotIndex);
String methodName = this.staticMethod.substring(lastDotIndex + 1);
this.targetClass = this.resolveClassName(targetMethod);
this.targetMethod = methodName;
}
Class<?> targetClass = this.getTargetClass();
targetMethod = this.getTargetMethod();
Assert.notNull(targetClass, "Either 'targetClass' or 'targetObject' is required");
Assert.notNull(targetMethod, "Property 'targetMethod' is required");
Object[] arguments = this.getArguments();
Class<?>[] argTypes = new Class[arguments.length];
for(int i = 0; i < arguments.length; ++i) {
argTypes[i] = arguments[i] != null?arguments[i].getClass():Object.class;
}
try {
this.methodObject = targetClass.getMethod(targetMethod, argTypes);
} catch (NoSuchMethodException var6) {
this.methodObject = this.findMatchingMethod();
if(this.methodObject == null) {
throw var6;
}
}
}
1.1.2.1 任务执行
-
public static class MethodInvokingJob extends QuartzJobBean
任务继承自QuartzJobBean - QuartzJobBean负责属性注入
private MethodInvoker methodInvoker;
- 任务执行,反射调用目标方法对象
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
context.setResult(this.methodInvoker.invoke());
}
public Object invoke() throws InvocationTargetException, IllegalAccessException {
Object targetObject = this.getTargetObject();
Method preparedMethod = this.getPreparedMethod();
if(targetObject == null && !Modifier.isStatic(preparedMethod.getModifiers())) {
throw new IllegalArgumentException("Target method must not be non-static without a target");
} else {
ReflectionUtils.makeAccessible(preparedMethod);
return preparedMethod.invoke(targetObject, this.getArguments());
}
}
1.2 jobFactory
1.2.1 AdaptableJobFactory
- 配置的job类,如果为runnable则创建代理。
- 配置的job类,job接口实现类则直接返回
- 其他类型不符合要求,抛异常
protected Job adaptJob(Object jobObject) throws Exception {
if(jobObject instanceof Job) {
return (Job)jobObject;
} else if(jobObject instanceof Runnable) {
return new DelegatingJob((Runnable)jobObject);
} else {
throw new IllegalArgumentException("Unable to execute job class [" + jobObject.getClass().getName() + "]: only [org.quartz.Job] and [java.lang.Runnable] supported.");
}
}
1.2.2 SpringBeanJobFactory extends AdaptableJobFactory
- 非QuartzJobBean类则做一些属性注入的处理
- ignoredUnknownProperties配置表示指定job类中不可写则不需注入的属性。
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object job = super.createJobInstance(bundle);
if(this.isEligibleForPropertyPopulation(job)) {
BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(job);
MutablePropertyValues pvs = new MutablePropertyValues();
if(this.schedulerContext != null) {
pvs.addPropertyValues(this.schedulerContext);
}
pvs.addPropertyValues(bundle.getJobDetail().getJobDataMap());
pvs.addPropertyValues(bundle.getTrigger().getJobDataMap());
if(this.ignoredUnknownProperties != null) {
String[] var5 = this.ignoredUnknownProperties;
int var6 = var5.length;
for(int var7 = 0; var7 < var6; ++var7) {
String propName = var5[var7];
if(pvs.contains(propName) && !bw.isWritableProperty(propName)) {
pvs.removePropertyValue(propName);
}
}
bw.setPropertyValues(pvs);
} else {
bw.setPropertyValues(pvs, true);
}
}
return job;
}
1.3 job
1.3.1 DelegatingJob
代理runnable的任务类
1.3.2 QuartzJobBean
- 做属性注入的工作
public final void execute(JobExecutionContext context) throws JobExecutionException {
try {
BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);
MutablePropertyValues pvs = new MutablePropertyValues();
pvs.addPropertyValues(context.getScheduler().getContext());
pvs.addPropertyValues(context.getMergedJobDataMap());
bw.setPropertyValues(pvs, true);
} catch (SchedulerException var4) {
throw new JobExecutionException(var4);
}
this.executeInternal(context);
}
二 触发器trigger
2.1 SimpleTriggerFactoryBean
- 周期触发器,可指定周期时间和触发次数。
2.1.1 属性
属性 | 说明 |
---|---|
String name; | trigger名称 |
String group; | trigger分组 |
JobDetail jobDetail; | trigger所属job信息 |
JobDataMap jobDataMap | job持久化数据 |
Date startTime; | 开始时间 |
long startDelay; | 延迟多久开始触发 |
long repeatInterval; | 重复触发的周期间隔 |
int repeatCount = -1; | 重复次数 |
int priority; | 调度优先级 |
int misfireInstruction; | 误点策略 |
String description | 描述信息 |
String beanName; | 当前bean名称 |
SimpleTrigger simpleTrigger; | 使用前述属性初始化SimpleTriggerImpl |
2.1.2 接口说明
-
public void afterPropertiesSet()
bean初始化完成后调用,初始化SimpleTrigger对象,作为当前bean实际返回的对象。
public SimpleTrigger getObject() {
return this.simpleTrigger;
}
public Class<?> getObjectType() {
return SimpleTrigger.class;
}
2.2 CronTriggerFactoryBean
- 定时触发器,根据cron规则定时触发。其他配置和周期触发器相同
属性 | 说明 |
---|---|
String cronExpression | 定时规则 |
CronTrigger cronTrigger; | 定时触发器,CronTriggerImpl实现类 |
三 调度类SchedulerFactoryBean
-
FactoryBean<Scheduler>
接口表示工厂bean -
InitializingBean
接口表示属性初始化完成后调用afterPropertiesSet()
-
SmartLifecycle
接口表示所有bean 初始化完成后调用start()
3.1 afterPropertiesSet()
3.1.1 初始化调度器工厂
SchedulerFactory schedulerFactory = (SchedulerFactory)BeanUtils.instantiateClass(this.schedulerFactoryClass);
this.initSchedulerFactory(schedulerFactory);
- 根据用户配置初始化调度器工厂
((StdSchedulerFactory)schedulerFactory).initialize(mergedProps);
3.1.2 创建调度器
- 初始化线程的类加载器
-
SchedulerRepository repository
缓存创建调度器 - synchronized锁避免并发创建调度器
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName) throws SchedulerException {
Thread currentThread = Thread.currentThread();
ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
boolean overrideClassLoader = this.resourceLoader != null && !this.resourceLoader.getClassLoader().equals(threadContextClassLoader);
if(overrideClassLoader) {
currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
}
Scheduler var10;
try {
SchedulerRepository repository = SchedulerRepository.getInstance();
synchronized(repository) {
Scheduler existingScheduler = schedulerName != null?repository.lookup(schedulerName):null;
Scheduler newScheduler = schedulerFactory.getScheduler();
if(newScheduler == existingScheduler) {
throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
}
if(!this.exposeSchedulerInRepository) {
SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
}
var10 = newScheduler;
}
} finally {
if(overrideClassLoader) {
currentThread.setContextClassLoader(threadContextClassLoader);
}
}
return var10;
}
- 获取或创建调度器
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
}
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
sched = instantiate();
return sched;
}
3.1.2.1 StdSchedulerFactory.instantiate()
- 用户配置转换
- 远端调度器代理初始化
RemoteScheduler
- 自定义类加载器实例化
loadHelper = (ClassLoadHelper) loadClass(classLoadHelperClass) .newInstance();
- JMX远端调度器初始化
- job工厂类实例化
jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass) .newInstance();
- id生成器实例化
instanceIdGenerator = (InstanceIdGenerator) loadHelper.loadClass(instanceIdGeneratorClass) .newInstance();
- 线程池实例化
ThreadPool
- 数据持久化实例化
JobStore
- 数据库连接池实例化
- 插件初始化
- listener初始化
- job线程执行器初始化
- 执行job容器工厂初始化
JobRunShellFactory jrsf = null; // Create correct run-shell factory...
if (userTXLocation != null) {
UserTransactionHelper.setUserTxLocation(userTXLocation);
}
if (wrapJobInTx) {
jrsf = new JTAJobRunShellFactory();
} else {
jrsf = new JTAAnnotationAwareJobRunShellFactory();
}
- 组装上述调度器相关资源
QuartzSchedulerResources
QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
rsrcs.setName(schedName);
rsrcs.setThreadName(threadName);
rsrcs.setInstanceId(schedInstId);
rsrcs.setJobRunShellFactory(jrsf);
rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
rsrcs.setBatchTimeWindow(batchTimeWindow);
rsrcs.setMaxBatchSize(maxBatchSize);
rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
rsrcs.setJMXExport(jmxExport);
rsrcs.setJMXObjectName(jmxObjectName);
if (managementRESTServiceEnabled) {
ManagementRESTServiceConfiguration managementRESTServiceConfiguration = new ManagementRESTServiceConfiguration();
managementRESTServiceConfiguration.setBind(managementRESTServiceHostAndPort);
managementRESTServiceConfiguration.setEnabled(managementRESTServiceEnabled);
rsrcs.setManagementRESTServiceConfiguration(managementRESTServiceConfiguration);
}
if (rmiExport) {
rsrcs.setRMIRegistryHost(rmiHost);
rsrcs.setRMIRegistryPort(rmiPort);
rsrcs.setRMIServerPort(rmiServerPort);
rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry);
rsrcs.setRMIBindName(rmiBindName);
}
SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);
rsrcs.setThreadExecutor(threadExecutor);
threadExecutor.initialize();
rsrcs.setThreadPool(tp);
if(tp instanceof SimpleThreadPool) {
if(threadsInheritInitalizersClassLoader)
((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
}
tp.initialize();
tpInited = true;
rsrcs.setJobStore(js);
// add plugins
for (int i = 0; i < plugins.length; i++) {
rsrcs.addSchedulerPlugin(plugins[i]);
}
- 初始化调度器
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
- 初始化调度器代理
Scheduler scheduler = new StdScheduler(qs);
- 各资源初始化
- 调度初始化
qs.initialize();
具体细节见后续文章
3.1.3 初始化调度器上下文
- bean容器
- 用户配置的schedulerContextMap
- 更新调度器的job工厂
3.1.4 注册用户配置的listener
- 调度流程监听回调
- job状态监听回调
- trigger状态监听回调
protected void registerListeners() throws SchedulerException {
ListenerManager listenerManager = this.getScheduler().getListenerManager();
int var3;
int var4;
if(this.schedulerListeners != null) {
SchedulerListener[] var2 = this.schedulerListeners;
var3 = var2.length;
for(var4 = 0; var4 < var3; ++var4) {
SchedulerListener listener = var2[var4];
listenerManager.addSchedulerListener(listener);
}
}
if(this.globalJobListeners != null) {
JobListener[] var6 = this.globalJobListeners;
var3 = var6.length;
for(var4 = 0; var4 < var3; ++var4) {
JobListener listener = var6[var4];
listenerManager.addJobListener(listener);
}
}
if(this.globalTriggerListeners != null) {
TriggerListener[] var7 = this.globalTriggerListeners;
var3 = var7.length;
for(var4 = 0; var4 < var3; ++var4) {
TriggerListener listener = var7[var4];
listenerManager.addTriggerListener(listener);
}
}
}
3.1.5 注册用户配置的job和trigger
- 事务方式统一添加配置的job和trigger
- 单独的xml配置job和trigger的注册
- 添加job
this.getScheduler().addJob(jobDetail, true);
- 添加trigger
this.getScheduler().rescheduleJob(trigger.getKey(), trigger);
this.getScheduler().scheduleJob(trigger);
this.getScheduler().scheduleJob(jobDetail, trigger);
protected void registerJobsAndTriggers() throws SchedulerException {
TransactionStatus transactionStatus = null;
if(this.transactionManager != null) {
transactionStatus = this.transactionManager.getTransaction(new DefaultTransactionDefinition());
}
try {
if(this.jobSchedulingDataLocations != null) {
ClassLoadHelper clh = new ResourceLoaderClassLoadHelper(this.resourceLoader);
clh.initialize();
XMLSchedulingDataProcessor dataProcessor = new XMLSchedulingDataProcessor(clh);
String[] var4 = this.jobSchedulingDataLocations;
int var5 = var4.length;
for(int var6 = 0; var6 < var5; ++var6) {
String location = var4[var6];
dataProcessor.processFileAndScheduleJobs(location, this.getScheduler());
}
}
Iterator var10;
if(this.jobDetails != null) {
var10 = this.jobDetails.iterator();
while(var10.hasNext()) {
JobDetail jobDetail = (JobDetail)var10.next();
this.addJobToScheduler(jobDetail);
}
} else {
this.jobDetails = new LinkedList();
}
if(this.calendars != null) {
var10 = this.calendars.keySet().iterator();
while(var10.hasNext()) {
String calendarName = (String)var10.next();
Calendar calendar = (Calendar)this.calendars.get(calendarName);
this.getScheduler().addCalendar(calendarName, calendar, true, true);
}
}
if(this.triggers != null) {
var10 = this.triggers.iterator();
while(var10.hasNext()) {
Trigger trigger = (Trigger)var10.next();
this.addTriggerToScheduler(trigger);
}
}
} catch (Throwable var9) {
if(transactionStatus != null) {
try {
this.transactionManager.rollback(transactionStatus);
} catch (TransactionException var8) {
this.logger.error("Job registration exception overridden by rollback exception", var9);
throw var8;
}
}
if(var9 instanceof SchedulerException) {
throw (SchedulerException)var9;
}
if(var9 instanceof Exception) {
throw new SchedulerException("Registration of jobs and triggers failed: " + var9.getMessage(), var9);
}
throw new SchedulerException("Registration of jobs and triggers failed: " + var9.getMessage());
}
if(transactionStatus != null) {
this.transactionManager.commit(transactionStatus);
}
}
3.2 start()
- 支持延迟启动,通过调度线程延迟调度
- 调度器start接口调用
scheduler.start();
protected void startScheduler(final Scheduler scheduler, final int startupDelay) throws SchedulerException {
if(startupDelay <= 0) {
this.logger.info("Starting Quartz Scheduler now");
scheduler.start();
} else {
if(this.logger.isInfoEnabled()) {
this.logger.info("Will start Quartz Scheduler [" + scheduler.getSchedulerName() + "] in " + startupDelay + " seconds");
}
Thread schedulerThread = new Thread() {
public void run() {
try {
Thread.sleep((long)(startupDelay * 1000));
} catch (InterruptedException var3) {
;
}
if(SchedulerFactoryBean.this.logger.isInfoEnabled()) {
SchedulerFactoryBean.this.logger.info("Starting Quartz Scheduler now, after delay of " + startupDelay + " seconds");
}
try {
scheduler.start();
} catch (SchedulerException var2) {
throw new SchedulingException("Could not start Quartz Scheduler after delay", var2);
}
}
};
schedulerThread.setName("Quartz Scheduler [" + scheduler.getSchedulerName() + "]");
schedulerThread.setDaemon(true);
schedulerThread.start();
}
}