5、quartz使用总结

2017-12-27  本文已影响0人  ltjxwxz

在此梳理一下项目中用到的关于quartz的知识:
1、Spring提供的类
(1)SchedulerFactoryBean
(2)Job相关的类:Job执行任务的逻辑需要自己写,既然用了spring,自然要使用spring提供的Job相关的类。有两个:MethodInvokingJobDetailFactoryBean和QuartzJobBean。其中MethodInvokingJobDetailFactoryBean不支持存储到数据库,会报java.io.NotSerializableException,遂放弃。

2、并发控制
官方文档提供了一种并发控制方法:@DisallowConcurrentExecution
该限制仅针对于JobDetail,同一时刻仅允许执行一个JobDetail,但可以并发执行多个Job类的不同实例。也就是如果用Job构建了多个JobDetail,如JobDetail1,JobDetail2,JobDetail3,那么这3个JobDetail还是并发执行的。
根据org.quartz.threadPool.threadCount配置的线程个数 和 org.quartz.threadPool.class配置的线程类执行自己写的逻辑。

3、数据持久化
quartz提供两种持久化类型:RAMJobStore和JDBC JobStore
RAMJobStore持久化到内存,重启应用后任务丢失。
JDBC JobStore可以持久化到数据库,重启后任务依然存在。
下载官网提供的quartz-2.2.3-distribution.tar.gz包,quartz\quartz-2.2.3\docs\dbTables提供了各种数据库的脚本,建表,quartz.properties文件中配置jobStore类型,代理类和数据源。同时在配置文件中指定quartz.properties文件的位置。

4、动态管理任务
(1)增加:

scheduler.scheduleJob(jobDetail, trigger);

(2)删除:

scheduler.pauseTrigger(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.unscheduleJob(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.deleteJob(JobKey.jobKey(testSuite.getName(), project.getName()));

5、执行的状态
执行状态存放在qrtz_triggers表的trigger_state字段,源码中完整的状态有:WAITING,ACQUIRED,EXECUTING,COMPLETE,BLOCKED,ERROR,PAUSED,PAUSED_BLOCKED。配置文件中配置的JobStore是JobStoreTX,但是状态变化的相关代码都在JobStoreSupport类中,JobStoreSupport调用配置的Delegate拼接sql语句,完成状态变化。
从源码中可以看出,acquired状态表示已经获得的,在job自定义逻辑之前执行。
其他网友整理的状态变化图:


image.png

6、自定义Job类中使用spring管理的service
Job继承spring提供的类QuartzJobBean,竟然不能直接注入自己写的service。原因是Quartz初始化是自己的JobContext,不同于Spring的ApplicationContext,所以无法直接注入。后来找到一种解决办法,在构建SchedulerFactoryBean的时候存放到map中。Job中使用时再取出来。

@Bean(name = "schedulerFactory")
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
    SchedulerFactoryBean factory = new SchedulerFactoryBean();
    factory.setQuartzProperties(quartzProperties());
    // 把用到的job类中用到的service,dao等传给他,用@Autowired注解无法注入
    Map<String, Object> springBeanMap = new HashMap<String, Object>();
    springBeanMap.put("testngService", testngService);
    springBeanMap.put("quartzService", quartzService);
    springBeanMap.put("triggerDao", triggerDao);
    factory.setSchedulerContextAsMap(springBeanMap);
    factory.setWaitForJobsToCompleteOnShutdown(true);
    return factory;
}
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
    JobDataMap dataMap = context.getJobDetail().getJobDataMap();
    try {
        testngService = (TestngService)context.getScheduler().getContext().get("testngService");
    } catch (SchedulerException e) {
        e.printStackTrace();
        logger.error(e.getMessage());
    }
}

7、监听器
quartz提供了TriggerListeners、JobListeners和SchedulerListeners,使用方法在quartz\quartz-2.2.3\examples中有,很详细。
注意:经过测试,监听器在运行过程中动态注册,第一次注册可用,重启后失效。

8、总结:这次学习从0开始到应用到项目中,帮助最大的是官方提供的example代码、源代码和说明文档,在理解这些的基础上,学习一些优秀的博客,总结如下:
中文说明文档:https://www.w3cschool.cn/quartz_doc/quartz_doc-lwuv2d2a.html
增删改查:http://snailxr.iteye.com/blog/2076903#comments
并发:http://blog.csdn.net/will_awoke/article/details/38921273
   https://www.cnblogs.com/Rozdy/p/4220186.html
   http://www.blogjava.net/stevenjohn/archive/2015/07/26/426425.html
集群:http://www.importnew.com/22896.html
   http://soulshard.iteye.com/blog/337886
   https://tech.meituan.com/mt-crm-quartz.html
核心概念:http://blog.csdn.net/guolong1983811/article/details/51501346
     http://blog.csdn.net/beliefer/article/details/51578546
     https://www.cnblogs.com/pzy4447/p/5201674.html
问题:http://blog.csdn.net/jackylovesjava/article/details/50044271

9、以下是一些代码
9.1、完整的配置 quartz.properites

# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false

#线程池相关配置
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true

#错过执行时间设置
#org.quartz.jobStore.misfireThreshold: 60000

#quartz信息持久化到oracle数据库
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties: false
org.quartz.jobStore.dataSource: myDS
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.isClustered: false

#数据库连接参数
org.quartz.dataSource.myDS.driver: oracle.jdbc.driver.OracleDriver
org.quartz.dataSource.myDS.URL: jdbc:oracle:thin:@10.10.52.14:1521:wxkfdb
org.quartz.dataSource.myDS.user: autotesting
org.quartz.dataSource.myDS.password: test
org.quartz.dataSource.myDS.maxConnections: 5

9.2、quartz整合spring boot的配置类

@Configuration
public class QuartzCofig {

    @Autowired
    private TestngService testngService;

    @Autowired
    private TriggerDao triggerDao;

    @Autowired
    private QuartzService quartzService;

    @Bean(name = "schedulerFactory")
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setQuartzProperties(quartzProperties());
        // 把用到的job类中用到的service,dao等传给他,用@Autowired注解无法注入
        Map<String, Object> springBeanMap = new HashMap<String, Object>();
        springBeanMap.put("testngService", testngService);
        springBeanMap.put("quartzService", quartzService);
        springBeanMap.put("triggerDao", triggerDao);
        factory.setSchedulerContextAsMap(springBeanMap);
        factory.setWaitForJobsToCompleteOnShutdown(true);
        return factory;
    }

    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        // 指定quart.properties文件位置
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        //在quartz.properties中的属性被读取并注入后再初始化对象
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

    /*
     * 通过SchedulerFactoryBean获取Scheduler的实例
     * name不能设置为scheduler,否则QuartzService里注入的不是此处定义的scheduler
     */
    @Bean(name="myScheduler")
    public Scheduler scheduler() throws IOException {
        System.out.println("schedulerFactoryBean().getScheduler():" + schedulerFactoryBean().getScheduler());
        return schedulerFactoryBean().getScheduler();
    }
}

9.3、自定义的Job逻辑

@Configuration
@Component
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class ScheduleJob extends QuartzJobBean {

    private Logger logger = LoggerFactory.getLogger(ScheduleJob.class);

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        JobDataMap dataMap = context.getJobDetail().getJobDataMap();

        TestSuite testSuite = (TestSuite)dataMap.get("testSuite");
        Project project = (Project) dataMap.get("project");

        TestngService testngService = null;
        try {
            testngService = (TestngService)context.getScheduler().getContext().get("testngService");
        } catch (SchedulerException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
        logger.info("---" + context.getJobDetail().getKey() + "想要执行---");
        testngService.run(testSuite, project);
    }
}

9.4、增加,删除service

@Service("quartzService")
public class QuartzService {

    @Resource(name = "myScheduler")
    private Scheduler scheduler;

    @Autowired
    private TriggerDao triggerDao;

    @Autowired
    private ProcessDao processDao;

    @Autowired
    private ApplicationContext applicationContext;

    private Logger logger = LoggerFactory.getLogger(QuartzService.class);

    /**
     * 增加或修改一个job
     * @param testSuite
     * @param project
     */
    public void addJob(TestSuite testSuite, Project project) {
        String runTime = testSuite.getRuntime();
         if(!StringUtils.isEmpty(testSuite.getRuntime())) {
            // 生成一个triggerKey
            String testSuiteName = testSuite.getName();
            String projectName = project.getName();

             JobDataMap jobDataMap = new JobDataMap();
             jobDataMap.put("testSuite", testSuite);
             jobDataMap.put("project", project);

            JobDetail jobDetail = JobBuilder.newJob(ScheduleJob.class)
                    .withIdentity(testSuiteName, projectName)
                    .usingJobData(jobDataMap)
                    .build();
            // 向Job传值

//            jobDetail.getJobDataMap().put("testSuite", testSuite);
//            jobDetail.getJobDataMap().put("project", project);
             TestSuite testsuite = (TestSuite) jobDetail.getJobDataMap().get("testSuite");
            // misfire处理:上一个job执行结束,立即执行这个
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(runTime);
//                        .withMisfireHandlingInstructionFireAndProceed();
            CronTrigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity(testSuiteName, projectName)
                    .withSchedule(scheduleBuilder)
                    .build();
            try {
                scheduler.scheduleJob(jobDetail, trigger);
            } catch (SchedulerException e) {
                e.printStackTrace();
            }
            Trigger triggerInsert = new Trigger(project.getName() + "." + testSuite.getName(),
                    TriggerStateConstant.WAITING, null, project.getProjectid());
            triggerDao.insertOne(triggerInsert);
        }
    }

    /**
     * 删除job
     * @param testSuite
     * @param project
     */
    public void deleteJob(TestSuite testSuite, Project project) {
        try {
            scheduler.pauseTrigger(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
            scheduler.unscheduleJob(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
            scheduler.deleteJob(JobKey.jobKey(testSuite.getName(), project.getName()));
            // 删除sttrigger表的记录
            triggerDao.deleteByTriggerId(project.getName() + "." + testSuite.getName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }

    /**
     * 判断该project下是否有trigger触发,如果有返回true
     * @param projectId
     * @return
     */
    public boolean hasTriggerFired(String projectId) {
        List<Trigger> triggerList = triggerDao.findByProjectId(projectId);
        if(CollectionUtils.isEmpty(triggerList)) {
            return true;
        } else {
            return false;
        }
    }

    /**
     * 获取可以执行的process
     * @param projectId
     * @return
     */
    public Process getAvaliableProcess(String projectId) {
        // 获取该project下所有process
        List<String> processIdList = processDao.findIdByProjectId(projectId);
        // 获取正在执行中的testsuite对应的processid
        List<String> executingProcssIdList = triggerDao.findExecutingProcess(projectId);

        List<String> differentList = new ArrayList<>();
        if(!CollectionUtils.isEmpty(executingProcssIdList)) {
            Set<String> processIdSet = new HashSet<>();
            processIdSet.addAll(processIdList);
            Set<String> executingProcssIdSet = new HashSet<String>();
            executingProcssIdSet.addAll(executingProcssIdList);
            // 取差集
            Set<String> differentSet = CommonUtil.getDifferentSet(processIdSet, executingProcssIdSet);
            differentList.addAll(differentSet);
        } else {
            differentList = processIdList;
        }
        // 随机获取一个Process
        if(differentList.size() != 0) {
            String randomProcessId = CommonUtil.getRandom(differentList);
            return processDao.findByProcessId(randomProcessId);
        } else {
            return null;
        }
    }
}

9.5、资源调度的单例类。
(1)用单例模式的原因:要保证每个Job执行的过程中获得的ProcessResource类的对象是同一个对象,map 是同一个map,否则有多个map的话,使用的就不是同一份资源了。

public class ProcessResource {

    private ProcessDao processDao = (ProcessDao) SpringUtil.getBean("processDao");
    private Map<String, LinkedList<Process>> map = new HashMap<String, LinkedList<Process>>();
    private static ProcessResource instance = null;
    private Object lock = new Object();

    private Logger logger = LoggerFactory.getLogger(ProcessResource.class);


    private ProcessResource() {
        if (instance != null) {
            return;
        }
    }

    public static ProcessResource getInstance() {
        if (instance == null) {
            synchronized (ProcessResource.class) {
                if (instance == null) {
                    instance = new ProcessResource();
                    instance.init();
                }
            }
        }
        return instance;
    }


    public void init() {
        List<String> projectIdList = processDao.findProjectId();
        for(String projectId : projectIdList) {
            LinkedList<Process> list = processDao.findByProjectId(projectId);
            map.put(projectId, list);
        }
        instance.setMap(map);
    }

    public Process getProcess(String projectId) {
        synchronized (lock) {
            LinkedList<Process> list = instance.getMap().get(projectId);
            // 判断list中是否有元素,如果有,返回, 如果没有,打印信息
            if(!CollectionUtils.isEmpty(list)) {
                return list.removeFirst();
            } else {
                logger.info("ProcessResource中没有可用的Process了....");
                return null;
            }
        }
    }

    /**
     * 释放资源
     * @param process
     * @param projectId
     */
    public void releaseProcess(Process process, String projectId) {
        LinkedList<Process> list = instance.getMap().get(projectId);
        // 判断list中是否有元素,如果有,返回, 如果没有,打印信息
        list.addLast(process);
    }

    public void setMap(Map<String, LinkedList<Process>> map) {
        this.map = map;
    }

    public Map<String, LinkedList<Process>> getMap() {
        return map;
    }
}
上一篇下一篇

猜你喜欢

热点阅读