quartz源码4-job执行
2019-01-11 本文已影响0人
modou1618
一 JITJobRunShell
1.1 初始化
- 获取jobdetail信息
- job工厂创建job
- 初始化job执行上下文
public void initialize(QuartzScheduler sched)
throws SchedulerException {
this.qs = sched;
Job job = null;
JobDetail jobDetail = firedTriggerBundle.getJobDetail();
try {
job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler);
} catch (SchedulerException se) {
sched.notifySchedulerListenersError(
"An error occured instantiating job to be executed. job= '"
+ jobDetail.getKey() + "'", se);
throw se;
} catch (Throwable ncdfe) { // such as NoClassDefFoundError
SchedulerException se = new SchedulerException(
"Problem instantiating class '"
+ jobDetail.getJobClass().getName() + "' - ", ncdfe);
sched.notifySchedulerListenersError(
"An error occured instantiating job to be executed. job= '"
+ jobDetail.getKey() + "'", se);
throw se;
}
this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
}
1.2 run执行
- 在不同执行阶段触发listener回调
-
begin()
获取配置的用户事务UserTransaction
并启动 -
job.execute(jec);
调度job类执行实现接口 -
complete()
提交或回滚用户事务 -
notifyJobStoreJobComplete()
做一些后续清理工作
begin();
notifyListenersBeginning(jec);
job.execute(jec);
notifyJobListenersComplete(jec, jobExEx);
trigger.executionComplete(jec, jobExEx);
notifyTriggerListenersComplete(jec, instCode);
if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
jec.incrementRefireCount();
complete(false);
continue;
}
complete(true);
qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
1.2.1 notifyJobStoreJobComplete()
-
retryExecuteInNonManagedTXLock
中db操作执行失败则不断重试
protected void notifyJobStoreJobComplete(OperableTrigger trigger, JobDetail detail, CompletedExecutionInstruction instCode) {
resources.getJobStore().triggeredJobComplete(trigger, detail, instCode);
}
public void triggeredJobComplete(final OperableTrigger trigger,
final JobDetail jobDetail, final CompletedExecutionInstruction triggerInstCode) {
retryExecuteInNonManagedTXLock(
LOCK_TRIGGER_ACCESS,
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
triggeredJobComplete(conn, trigger, jobDetail,triggerInstCode);
}
});
}
- 部分流程为更新trigger状态为error或complete或删除trigger
- 部分流程为对非并发job,则恢复block状态trigger为原状态。
getDelegate().updateTriggerStatesForJobFromOtherState(conn,
jobDetail.getKey(), STATE_WAITING,
STATE_BLOCKED);
getDelegate().updateTriggerStatesForJobFromOtherState(conn,
jobDetail.getKey(), STATE_PAUSED,
STATE_PAUSED_BLOCKED);
- 支持job数据持久化,则db保存已变更的jobdatamap
if (jobDetail.getJobDataMap().isDirty()) {
getDelegate().updateJobData(conn, jobDetail);
}
- 删除对应的firetrigger
getDelegate().deleteFiredTrigger(conn, trigger.getFireInstanceId());
二 job工厂
2.1 SimpleJobFactory
根据配置的job实现类,实例化job对象
2.2 PropertySettingJobFactory
实例化job对象后,注入job上下文中的配置信息到job对象中